/* this is really arbitrarily chosen */
#define DEFAULT_MODE 1
-#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
-
-#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
-#define DEFAULT_BURST_VALUE 0
#define DEFAULT_HANDLE_READ TRUE
PROP_0,
PROP_MODE,
- PROP_UNIT_FORMAT,
-
- PROP_BURST_FORMAT,
- PROP_BURST_VALUE,
-
PROP_HANDLE_READ,
PROP_NUM_FDS,
return fdset_mode_type;
}
-#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type())
-static GType
-gst_unit_format_get_type (void)
-{
- static GType unit_format_type = 0;
- static const GEnumValue unit_format[] = {
- {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"},
- {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"},
- {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"},
- {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"},
- {0, NULL, NULL},
- };
-
- if (!unit_format_type) {
- unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format);
- }
- return unit_format_type;
-}
-
static void gst_multi_fd_sink_finalize (GObject * object);
static void gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink);
GST_TYPE_FDSET_MODE, DEFAULT_MODE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
- g_param_spec_enum ("unit-format", "Units format",
- "The unit to measure the max/soft-max/queued properties",
- GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
- g_param_spec_enum ("burst-format", "Burst format",
- "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
- GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
- g_param_spec_uint64 ("burst-value", "Burst value",
- "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
- DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
/**
* GstMultiFdSink::handle-read
*
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
add_full), NULL, NULL,
gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
- G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT,
- G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64);
+ G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT,
+ G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64);
/**
* GstMultiFdSink::remove:
* @gstmultifdsink: the multifdsink element to emit this signal on
this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
- this->unit_format = DEFAULT_UNIT_FORMAT;
-
- this->def_burst_format = DEFAULT_BURST_FORMAT;
- this->def_burst_value = DEFAULT_BURST_VALUE;
-
this->handle_read = DEFAULT_HANDLE_READ;
}
g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
client->fd.fd = fd;
- client->burst_min_format = min_format;
- client->burst_min_value = min_value;
- client->burst_max_format = max_format;
- client->burst_max_value = max_value;
+ mhclient->burst_min_format = min_format;
+ mhclient->burst_min_value = min_value;
+ mhclient->burst_max_format = max_format;
+ mhclient->burst_max_value = max_value;
CLIENTS_LOCK (sink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
- sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
- -1);
+ mhsink->def_burst_format, mhsink->def_burst_value,
+ mhsink->def_burst_format, -1);
}
/* "remove" signal implementation */
return TRUE;
}
-/* Get the number of buffers from the buffer queue needed to satisfy
- * the maximum max in the configured units.
- * If units are not BUFFERS, and there are insufficient buffers in the
- * queue to satify the limit, return len(queue) + 1 */
-static gint
-get_buffers_max (GstMultiFdSink * sink, gint64 max)
-{
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- switch (sink->unit_format) {
- case GST_TCP_UNIT_FORMAT_BUFFERS:
- return max;
- case GST_TCP_UNIT_FORMAT_TIME:
- {
- GstBuffer *buf;
- int i;
- int len;
- gint64 diff;
- GstClockTime first = GST_CLOCK_TIME_NONE;
-
- len = mhsink->bufqueue->len;
-
- for (i = 0; i < len; i++) {
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
- if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
- if (first == -1)
- first = GST_BUFFER_TIMESTAMP (buf);
-
- diff = first - GST_BUFFER_TIMESTAMP (buf);
-
- if (diff > max)
- return i + 1;
- }
- }
- return len + 1;
- }
- case GST_TCP_UNIT_FORMAT_BYTES:
- {
- GstBuffer *buf;
- int i;
- int len;
- gint acc = 0;
-
- len = mhsink->bufqueue->len;
-
- for (i = 0; i < len; i++) {
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
- acc += gst_buffer_get_size (buf);
-
- if (acc > max)
- return i + 1;
- }
- return len + 1;
- }
- default:
- return max;
- }
-}
-
-/* find the positions in the buffer queue where *_min and *_max
- * is satisfied
- */
-/* count the amount of data in the buffers and return the index
- * that satifies the given limits.
- *
- * Returns: index @idx in the buffer queue so that the given limits are
- * satisfied. TRUE if all the limits could be satisfied, FALSE if not
- * enough data was in the queue.
- *
- * FIXME, this code might now work if any of the units is in buffers...
- */
-static gboolean
-find_limits (GstMultiFdSink * sink,
- gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
- gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
-{
- GstClockTime first, time;
- gint i, len, bytes;
- gboolean result, max_hit;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- /* take length of queue */
- len = mhsink->bufqueue->len;
-
- /* this must hold */
- g_assert (len > 0);
-
- GST_LOG_OBJECT (sink,
- "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
- ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
- buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
- GST_TIME_ARGS (time_max));
-
- /* do the trivial buffer limit test */
- if (buffers_min != -1 && len < buffers_min) {
- *min_idx = len - 1;
- *max_idx = len - 1;
- return FALSE;
- }
-
- result = FALSE;
- /* else count bytes and time */
- first = -1;
- bytes = 0;
- /* unset limits */
- *min_idx = -1;
- *max_idx = -1;
- max_hit = FALSE;
-
- i = 0;
- /* loop through the buffers, when a limit is ok, mark it
- * as -1, we have at least one buffer in the queue. */
- do {
- GstBuffer *buf;
-
- /* if we checked all min limits, update result */
- if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
- /* don't go below 0 */
- *min_idx = MAX (i - 1, 0);
- }
- /* if we reached one max limit break out */
- if (max_hit) {
- /* i > 0 when we get here, we subtract one to get the position
- * of the previous buffer. */
- *max_idx = i - 1;
- /* we have valid complete result if we found a min_idx too */
- result = *min_idx != -1;
- break;
- }
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-
- bytes += gst_buffer_get_size (buf);
-
- /* take timestamp and save for the base first timestamp */
- if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
- GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
- GST_TIME_ARGS (time));
- if (first == -1)
- first = time;
-
- /* increase max usage if we did not fill enough. Note that
- * buffers are sorted from new to old, so the first timestamp is
- * bigger than the next one. */
- if (time_min != -1 && first - time >= time_min)
- time_min = -1;
- if (time_max != -1 && first - time >= time_max)
- max_hit = TRUE;
- } else {
- GST_LOG_OBJECT (sink, "No timestamp on buffer");
- }
- /* time is OK or unknown, check and increase if not enough bytes */
- if (bytes_min != -1) {
- if (bytes >= bytes_min)
- bytes_min = -1;
- }
- if (bytes_max != -1) {
- if (bytes >= bytes_max) {
- max_hit = TRUE;
- }
- }
- i++;
- }
- while (i < len);
-
- /* if we did not hit the max or min limit, set to buffer size */
- if (*max_idx == -1)
- *max_idx = len - 1;
- /* make sure min does not exceed max */
- if (*min_idx == -1)
- *min_idx = *max_idx;
-
- return result;
-}
-
-/* parse the unit/value pair and assign it to the result value of the
- * right type, leave the other values untouched
- *
- * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
- */
-static gboolean
-assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
- GstClockTime * time)
-{
- gboolean res = TRUE;
-
- /* set only the limit of the given format to the given value */
- switch (format) {
- case GST_TCP_UNIT_FORMAT_BUFFERS:
- *buffers = (gint) value;
- break;
- case GST_TCP_UNIT_FORMAT_TIME:
- *time = value;
- break;
- case GST_TCP_UNIT_FORMAT_BYTES:
- *bytes = (gint) value;
- break;
- case GST_TCP_UNIT_FORMAT_UNDEFINED:
- default:
- res = FALSE;
- break;
- }
- return res;
-}
-
-/* count the index in the buffer queue to satisfy the given unit
- * and value pair starting from buffer at index 0.
- *
- * Returns: TRUE if there was enough data in the queue to satisfy the
- * burst values. @idx contains the index in the buffer that contains enough
- * data to satisfy the limits or the last buffer in the queue when the
- * function returns FALSE.
- */
-static gboolean
-count_burst_format (GstMultiFdSink * sink, gint * min_idx,
- GstFormat min_format, guint64 min_value, gint * max_idx,
- GstFormat max_format, guint64 max_value)
-{
- gint bytes_min = -1, buffers_min = -1;
- gint bytes_max = -1, buffers_max = -1;
- GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
-
- assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
- assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
-
- return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
- max_idx, bytes_max, buffers_max, time_max);
-}
-
-/* decide where in the current buffer queue this new client should start
- * receiving buffers from.
- * This function is called whenever a client is connected and has not yet
- * received a buffer.
- * If this returns -1, it means that we haven't found a good point to
- * start streaming from yet, and this function should be called again later
- * when more buffers have arrived.
- */
-static gint
-gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
-{
- gint result;
- GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] new client, deciding where to start in queue", client->fd.fd);
- GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
- mhsink->bufqueue->len);
- switch (mhclient->sync_method) {
- case GST_SYNC_METHOD_LATEST:
- /* no syncing, we are happy with whatever the client is going to get */
- result = mhclient->bufpos;
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result);
- break;
- case GST_SYNC_METHOD_NEXT_KEYFRAME:
- {
- /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
- * is a sync point, we can proceed, otherwise we need to keep waiting */
- GST_LOG_OBJECT (sink,
- "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
- mhclient->bufpos);
-
- result = find_prev_syncframe (mhsink, mhclient->bufpos);
- if (result != -1) {
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d",
- client->fd.fd, result);
- break;
- }
-
- /* client is not on a syncbuffer, need to skip these buffers and
- * wait some more */
- GST_LOG_OBJECT (sink,
- "[fd %5d] new client, skipping buffer(s), no syncpoint found",
- client->fd.fd);
- mhclient->bufpos = -1;
- break;
- }
- case GST_SYNC_METHOD_LATEST_KEYFRAME:
- {
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd);
-
- /* for new clients we initially scan the complete buffer queue for
- * a sync point when a buffer is added. If we don't find a keyframe,
- * we need to wait for the next keyframe and so we change the client's
- * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
- */
- result = find_next_syncframe (mhsink, 0);
- if (result != -1) {
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd,
- result);
- break;
- }
-
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
- "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd);
- /* throw client to the waiting state */
- mhclient->bufpos = -1;
- /* and make client sync to next keyframe */
- mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
- break;
- }
- case GST_SYNC_METHOD_BURST:
- {
- gboolean ok;
- gint max;
-
- /* move to the position where we satisfy the client's burst
- * parameters. If we could not satisfy the parameters because there
- * is not enough data, we just send what we have (which is in result).
- * We use the max value to limit the search
- */
- ok = count_burst_format (sink, &result, client->burst_min_format,
- client->burst_min_value, &max, client->burst_max_format,
- client->burst_max_value);
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d",
- client->fd.fd, ok, result);
-
- GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
-
- /* we hit the max and it is below the min, use that then */
- if (max != -1 && max <= result) {
- result = MAX (max - 1, 0);
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d",
- client->fd.fd, result);
- }
- break;
- }
- case GST_SYNC_METHOD_BURST_KEYFRAME:
- {
- gint min_idx, max_idx;
- gint next_syncframe, prev_syncframe;
-
- /* BURST_KEYFRAME:
- *
- * _always_ start sending a keyframe to the client. We first search
- * a keyframe between min/max limits. If there is none, we send it the
- * last keyframe before min. If there is none, the behaviour is like
- * NEXT_KEYFRAME.
- */
- /* gather burst limits */
- count_burst_format (sink, &min_idx, client->burst_min_format,
- client->burst_min_value, &max_idx, client->burst_max_format,
- client->burst_max_value);
-
- GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
- /* first find a keyframe after min_idx */
- next_syncframe = find_next_syncframe (mhsink, min_idx);
- if (next_syncframe != -1 && next_syncframe < max_idx) {
- /* we have a valid keyframe and it's below the max */
- GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
- result = next_syncframe;
- break;
- }
-
- /* no valid keyframe, try to find one below min */
- prev_syncframe = find_prev_syncframe (mhsink, min_idx);
- if (prev_syncframe != -1) {
- GST_WARNING_OBJECT (sink,
- "using keyframe below min in BURST_KEYFRAME sync mode");
- result = prev_syncframe;
- break;
- }
-
- /* no prev keyframe or not enough data */
- GST_WARNING_OBJECT (sink,
- "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
-
- /* throw client to the waiting state */
- mhclient->bufpos = -1;
- /* and make client sync to next keyframe */
- mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
- result = -1;
- break;
- }
- case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
- {
- gint min_idx, max_idx;
- gint next_syncframe;
-
- /* BURST_WITH_KEYFRAME:
- *
- * try to start sending a keyframe to the client. We first search
- * a keyframe between min/max limits. If there is none, we send it the
- * amount of data up 'till min.
- */
- /* gather enough data to burst */
- count_burst_format (sink, &min_idx, client->burst_min_format,
- client->burst_min_value, &max_idx, client->burst_max_format,
- client->burst_max_value);
-
- GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
- /* first find a keyframe after min_idx */
- next_syncframe = find_next_syncframe (mhsink, min_idx);
- if (next_syncframe != -1 && next_syncframe < max_idx) {
- /* we have a valid keyframe and it's below the max */
- GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
- result = next_syncframe;
- break;
- }
-
- /* no keyframe, send data from min_idx */
- GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
-
- /* make sure we don't go over the max limit */
- if (max_idx != -1 && max_idx <= min_idx) {
- result = MAX (max_idx - 1, 0);
- } else {
- result = min_idx;
- }
-
- break;
- }
- default:
- g_warning ("unknown sync method %d", mhclient->sync_method);
- result = mhclient->bufpos;
- break;
- }
- return result;
-}
-
/* Handle a write on a client,
* which indicates a read request from a client.
*
if (mhclient->bufpos == -1) {
/* client is too fast, remove from write queue until new buffer is
* available */
+ // FIXME: specific
gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+ //
/* if we flushed out all of the client buffers, we can stop */
if (mhclient->flushcount == 0)
goto flushed;
/* for new connections, we need to find a good spot in the
* bufqueue to start streaming from */
if (mhclient->new_connection && !flushing) {
- gint position = gst_multi_fd_sink_new_client (sink, client);
+ gint position = gst_multi_handle_sink_new_client (mhsink, mhclient);
if (position >= 0) {
/* we got a valid spot in the queue */
mhclient->bufpos = position;
} else {
/* cannot send data to this client yet */
+ // FIXME: specific
gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
return TRUE;
}
data = info.data;
maxsize = info.size - mhclient->bufoffset;
+ // FIXME: specific
/* try to write the complete buffer */
#ifdef MSG_NOSIGNAL
#define FLAGS MSG_NOSIGNAL
}
}
-/* calculate the new position for a client after recovery. This function
- * does not update the client position but merely returns the required
- * position.
- */
-static gint
-gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
-{
- gint newbufpos;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
- GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-
- GST_WARNING_OBJECT (sink,
- "[fd %5d] client %p is lagging at %d, recover using policy %d",
- client->fd.fd, client, mhclient->bufpos, mhsink->recover_policy);
-
- switch (mhsink->recover_policy) {
- case GST_RECOVER_POLICY_NONE:
- /* do nothing, client will catch up or get kicked out when it reaches
- * the hard max */
- newbufpos = mhclient->bufpos;
- break;
- case GST_RECOVER_POLICY_RESYNC_LATEST:
- /* move to beginning of queue */
- newbufpos = -1;
- break;
- case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
- /* move to beginning of soft max */
- newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
- break;
- case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
- /* find keyframe in buffers, we search backwards to find the
- * closest keyframe relative to what this client already received. */
- newbufpos = MIN (mhsink->bufqueue->len - 1,
- get_buffers_max (sink, mhsink->units_soft_max) - 1);
-
- while (newbufpos >= 0) {
- GstBuffer *buf;
-
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
- if (is_sync_frame (mhsink, buf)) {
- /* found a buffer that is not a delta unit */
- break;
- }
- newbufpos--;
- }
- break;
- default:
- /* unknown recovery procedure */
- newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
- break;
- }
- return newbufpos;
-}
-
/* Queue a buffer on the global queue.
*
* This function adds the buffer to the front of a GArray. It removes the
queuelen = mhsink->bufqueue->len;
if (mhsink->units_max > 0)
- max_buffers = get_buffers_max (sink, mhsink->units_max);
+ max_buffers = get_buffers_max (mhsink, mhsink->units_max);
else
max_buffers = -1;
if (mhsink->units_soft_max > 0)
- soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
+ soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
gint newpos;
- newpos = gst_multi_fd_sink_recover_client (sink, client);
+ newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient);
if (newpos != mhclient->bufpos) {
mhclient->dropped_buffers += mhclient->bufpos - newpos;
mhclient->bufpos = newpos;
/* get index where the limits are ok, we don't really care if all limits
* are ok, we just queue as much as we need. We also don't compare against
* the max limits. */
- find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
+ find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min,
mhsink->time_min, &max, -1, -1, -1);
max_buffer_usage = MAX (max_buffer_usage, usage + 1);
case PROP_MODE:
multifdsink->mode = g_value_get_enum (value);
break;
- case PROP_UNIT_FORMAT:
- multifdsink->unit_format = g_value_get_enum (value);
- break;
- case PROP_BURST_FORMAT:
- multifdsink->def_burst_format = g_value_get_enum (value);
- break;
- case PROP_BURST_VALUE:
- multifdsink->def_burst_value = g_value_get_uint64 (value);
- break;
case PROP_HANDLE_READ:
multifdsink->handle_read = g_value_get_boolean (value);
break;
case PROP_MODE:
g_value_set_enum (value, multifdsink->mode);
break;
- case PROP_UNIT_FORMAT:
- g_value_set_enum (value, multifdsink->unit_format);
- break;
- case PROP_BURST_FORMAT:
- g_value_set_enum (value, multifdsink->def_burst_format);
- break;
- case PROP_BURST_VALUE:
- g_value_set_uint64 (value, multifdsink->def_burst_value);
- break;
case PROP_HANDLE_READ:
g_value_set_boolean (value, multifdsink->handle_read);
break;
typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
-/**
- * GstTCPUnitType:
- * @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined
- * @GST_TCP_UNIT_FORMAT_BUFFERS : buffers
- * @GST_TCP_UNIT_FORMAT_TIME : timeunits (in nanoseconds)
- * @GST_TCP_UNIT_FORMAT_BYTES : bytes
- *
- * The units used to specify limits.
- */
-typedef enum
-{
- GST_TCP_UNIT_FORMAT_UNDEFINED,
- GST_TCP_UNIT_FORMAT_BUFFERS,
- GST_TCP_UNIT_FORMAT_TIME,
- GST_TCP_UNIT_FORMAT_BYTES,
-} GstTCPUnitType;
-
/* structure for a client
*/
typedef struct {
GstPollFD fd;
gboolean is_socket;
-
- gboolean caps_sent;
-
- /* method to sync client when connecting */
- GstSyncMethod sync_method;
- GstFormat burst_min_format;
- guint64 burst_min_value;
- GstFormat burst_max_format;
- guint64 burst_max_value;
} GstTCPClient;
/**
gboolean handle_read;
- /* these values are used to check if a client is reading fast
- * enough and to control receovery */
- GstFormat unit_format;/* the type of the units */
-
- GstFormat def_burst_format;
- guint64 def_burst_value;
-
guint8 header_flags;
};
PROP_BYTES_QUEUED,
PROP_TIME_QUEUED,
-#if 0
PROP_UNIT_FORMAT,
-#endif
PROP_UNITS_MAX,
PROP_UNITS_SOFT_MAX,
PROP_BYTES_TO_SERVE,
PROP_BYTES_SERVED,
-#if 0
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
-#endif
PROP_QOS_DSCP,
"Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
-#if 0
g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
g_param_spec_enum ("burst-format", "Burst format",
"The format of the burst units (when sync-method is burst[[-with]-keyframe])",
g_param_spec_uint64 ("burst-value", "Burst value",
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-#endif
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
g_param_spec_int ("qos-dscp", "QoS diff srv code point",
this->timeout = DEFAULT_TIMEOUT;
this->def_sync_method = DEFAULT_SYNC_METHOD;
-#if 0
this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
-#endif
this->qos_dscp = DEFAULT_QOS_DSCP;
return result;
}
-
-#if 0
/* Get the number of buffers from the buffer queue needed to satisfy
* the maximum max in the configured units.
* If units are not BUFFERS, and there are insufficient buffers in the
* queue to satify the limit, return len(queue) + 1 */
-static gint
+gint
get_buffers_max (GstMultiHandleSink * sink, gint64 max)
{
switch (sink->unit_format) {
return max;
}
}
-#endif
-#if 0
/* find the positions in the buffer queue where *_min and *_max
* is satisfied
*/
*
* FIXME, this code might now work if any of the units is in buffers...
*/
-static gboolean
+gboolean
find_limits (GstMultiHandleSink * sink,
gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
}
return res;
}
-#endif
-#if 0
/* count the index in the buffer queue to satisfy the given unit
* and value pair starting from buffer at index 0.
*
return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
max_idx, bytes_max, buffers_max, time_max);
}
-#endif
-#if 0
/* decide where in the current buffer queue this new client should start
* receiving buffers from.
* This function is called whenever a client is connected and has not yet
* start streaming from yet, and this function should be called again later
* when more buffers have arrived.
*/
-static gint
+gint
gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
- GstSocketClient * client)
+ GstMultiHandleClient * client)
{
gint result;
GST_DEBUG_OBJECT (sink,
- "[socket %p] new client, deciding where to start in queue",
- client->socket);
+ "%s new client, deciding where to start in queue", client->debug);
GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
sink->bufqueue->len);
switch (client->sync_method) {
/* no syncing, we are happy with whatever the client is going to get */
result = client->bufpos;
GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket,
- result);
+ "%s SYNC_METHOD_LATEST, position %d", client->debug, result);
break;
case GST_SYNC_METHOD_NEXT_KEYFRAME:
{
/* if one of the new buffers (between client->bufpos and 0) in the queue
* is a sync point, we can proceed, otherwise we need to keep waiting */
GST_LOG_OBJECT (sink,
- "[socket %p] new client, bufpos %d, waiting for keyframe",
- client->socket, client->bufpos);
+ "%s new client, bufpos %d, waiting for keyframe",
+ client->debug, client->bufpos);
result = find_prev_syncframe (sink, client->bufpos);
if (result != -1) {
GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
- client->socket, result);
+ "%s SYNC_METHOD_NEXT_KEYFRAME: result %d", client->debug, result);
break;
}
/* client is not on a syncbuffer, need to skip these buffers and
* wait some more */
GST_LOG_OBJECT (sink,
- "[socket %p] new client, skipping buffer(s), no syncpoint found",
- client->socket);
+ "%s new client, skipping buffer(s), no syncpoint found",
+ client->debug);
client->bufpos = -1;
break;
}
case GST_SYNC_METHOD_LATEST_KEYFRAME:
{
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket);
+ GST_DEBUG_OBJECT (sink, "%s SYNC_METHOD_LATEST_KEYFRAME", client->debug);
/* for new clients we initially scan the complete buffer queue for
* a sync point when a buffer is added. If we don't find a keyframe,
result = find_next_syncframe (sink, 0);
if (result != -1) {
GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
- client->socket, result);
+ "%s SYNC_METHOD_LATEST_KEYFRAME: result %d", client->debug, result);
break;
}
GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
- "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket);
+ "%s SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
+ "switching to SYNC_METHOD_NEXT_KEYFRAME", client->debug);
/* throw client to the waiting state */
client->bufpos = -1;
/* and make client sync to next keyframe */
client->burst_min_value, &max, client->burst_max_format,
client->burst_max_value);
GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
- client->socket, ok, result);
+ "%s SYNC_METHOD_BURST: burst_unit returned %d, result %d",
+ client->debug, ok, result);
GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
if (max != -1 && max <= result) {
result = MAX (max - 1, 0);
GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d",
- client->socket, result);
+ "%s SYNC_METHOD_BURST: result above max, taken down to %d",
+ client->debug, result);
}
break;
}
}
return result;
}
-#endif
#if 0
/* Handle a write on a client,
}
#endif
-#if 0
/* calculate the new position for a client after recovery. This function
* does not update the client position but merely returns the required
* position.
*/
-static gint
+gint
gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
- GstSocketClient * client)
+ GstMultiHandleClient * client)
{
gint newbufpos;
GST_WARNING_OBJECT (sink,
- "[socket %p] client %p is lagging at %d, recover using policy %d",
- client->socket, client, client->bufpos, sink->recover_policy);
+ "%s client %p is lagging at %d, recover using policy %d",
+ client->debug, client, client->bufpos, sink->recover_policy);
switch (sink->recover_policy) {
case GST_RECOVER_POLICY_NONE:
}
return newbufpos;
}
-#endif
#if 0
/* Queue a buffer on the global queue.
case PROP_BUFFERS_MIN:
multihandlesink->buffers_min = g_value_get_int (value);
break;
-#if 0
case PROP_UNIT_FORMAT:
multihandlesink->unit_format = g_value_get_enum (value);
break;
case PROP_UNITS_SOFT_MAX:
multihandlesink->units_soft_max = g_value_get_int64 (value);
break;
-#endif
case PROP_RECOVER_POLICY:
multihandlesink->recover_policy = g_value_get_enum (value);
break;
case PROP_SYNC_METHOD:
multihandlesink->def_sync_method = g_value_get_enum (value);
break;
-#if 0
case PROP_BURST_FORMAT:
multihandlesink->def_burst_format = g_value_get_enum (value);
break;
case PROP_BURST_VALUE:
multihandlesink->def_burst_value = g_value_get_uint64 (value);
break;
-#endif
case PROP_QOS_DSCP:
multihandlesink->qos_dscp = g_value_get_int (value);
gst_multi_handle_sink_setup_dscp (multihandlesink);
case PROP_TIME_QUEUED:
g_value_set_uint64 (value, multihandlesink->time_queued);
break;
-#if 0
case PROP_UNIT_FORMAT:
g_value_set_enum (value, multihandlesink->unit_format);
break;
case PROP_UNITS_SOFT_MAX:
g_value_set_int64 (value, multihandlesink->units_soft_max);
break;
-#endif
case PROP_RECOVER_POLICY:
g_value_set_enum (value, multihandlesink->recover_policy);
break;
case PROP_BYTES_SERVED:
g_value_set_uint64 (value, multihandlesink->bytes_served);
break;
-#if 0
case PROP_BURST_FORMAT:
g_value_set_enum (value, multihandlesink->def_burst_format);
break;
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multihandlesink->def_burst_value);
break;
-#endif
case PROP_QOS_DSCP:
g_value_set_int (value, multihandlesink->qos_dscp);
break;
/* method to sync client when connecting */
GstSyncMethod sync_method;
-// FIXME: refactor format vs unit
-#if 0
GstFormat burst_min_format;
guint64 burst_min_value;
GstFormat burst_max_format;
guint64 burst_max_value;
-#endif
GstCaps *caps; /* caps of last queued buffer */
gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client);
+gint get_buffers_max (GstMultiHandleSink * sink, gint64 max);
+gint
+gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
+ GstMultiHandleClient * client);
+gint
+gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
+ GstMultiHandleClient * client);
+gboolean
+find_limits (GstMultiHandleSink * sink,
+ gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
+ gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max);
+
/**
* GstMultiHandleSink:
LAST_SIGNAL
};
-
-/* this is really arbitrarily chosen */
-#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
-
-#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
-#define DEFAULT_BURST_VALUE 0
-
enum
{
PROP_0,
- PROP_UNIT_FORMAT,
-
- PROP_BURST_FORMAT,
- PROP_BURST_VALUE,
-
PROP_NUM_SOCKETS,
PROP_LAST
gobject_class->get_property = gst_multi_socket_sink_get_property;
gobject_class->finalize = gst_multi_socket_sink_finalize;
- g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
- g_param_spec_enum ("unit-format", "Units format",
- "The unit to measure the max/soft-max/queued properties",
- GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
- g_param_spec_enum ("burst-format", "Burst format",
- "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
- GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
- g_param_spec_uint64 ("burst-value", "Burst value",
- "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
- DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
g_param_spec_uint ("num-sockets", "Number of sockets",
"The current number of client sockets",
{
this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
- this->unit_format = DEFAULT_UNIT_FORMAT;
-
- this->def_burst_format = DEFAULT_BURST_FORMAT;
- this->def_burst_value = DEFAULT_BURST_VALUE;
-
this->cancellable = g_cancellable_new ();
}
g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
client->socket = G_SOCKET (g_object_ref (socket));
- client->burst_min_format = min_format;
- client->burst_min_value = min_value;
- client->burst_max_format = max_format;
- client->burst_max_value = max_value;
+ mhclient->burst_min_format = min_format;
+ mhclient->burst_min_value = min_value;
+ mhclient->burst_max_format = max_format;
+ mhclient->burst_max_value = max_value;
CLIENTS_LOCK (sink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method,
- sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
- -1);
+ mhsink->def_burst_format, mhsink->def_burst_value,
+ mhsink->def_burst_format, -1);
}
/* "remove" signal implementation */
return TRUE;
}
-/* Get the number of buffers from the buffer queue needed to satisfy
- * the maximum max in the configured units.
- * If units are not BUFFERS, and there are insufficient buffers in the
- * queue to satify the limit, return len(queue) + 1 */
-static gint
-get_buffers_max (GstMultiSocketSink * sink, gint64 max)
-{
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- switch (sink->unit_format) {
- case GST_FORMAT_BUFFERS:
- return max;
- case GST_FORMAT_TIME:
- {
- GstBuffer *buf;
- int i;
- int len;
- gint64 diff;
- GstClockTime first = GST_CLOCK_TIME_NONE;
-
- len = mhsink->bufqueue->len;
-
- for (i = 0; i < len; i++) {
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
- if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
- if (first == -1)
- first = GST_BUFFER_TIMESTAMP (buf);
-
- diff = first - GST_BUFFER_TIMESTAMP (buf);
-
- if (diff > max)
- return i + 1;
- }
- }
- return len + 1;
- }
- case GST_FORMAT_BYTES:
- {
- GstBuffer *buf;
- int i;
- int len;
- gint acc = 0;
-
- len = mhsink->bufqueue->len;
-
- for (i = 0; i < len; i++) {
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
- acc += gst_buffer_get_size (buf);
-
- if (acc > max)
- return i + 1;
- }
- return len + 1;
- }
- default:
- return max;
- }
-}
-
-/* find the positions in the buffer queue where *_min and *_max
- * is satisfied
- */
-/* count the amount of data in the buffers and return the index
- * that satifies the given limits.
- *
- * Returns: index @idx in the buffer queue so that the given limits are
- * satisfied. TRUE if all the limits could be satisfied, FALSE if not
- * enough data was in the queue.
- *
- * FIXME, this code might now work if any of the units is in buffers...
- */
-static gboolean
-find_limits (GstMultiSocketSink * sink,
- gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
- gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
-{
- GstClockTime first, time;
- gint i, len, bytes;
- gboolean result, max_hit;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- /* take length of queue */
- len = mhsink->bufqueue->len;
-
- /* this must hold */
- g_assert (len > 0);
-
- GST_LOG_OBJECT (sink,
- "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
- ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
- buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
- GST_TIME_ARGS (time_max));
-
- /* do the trivial buffer limit test */
- if (buffers_min != -1 && len < buffers_min) {
- *min_idx = len - 1;
- *max_idx = len - 1;
- return FALSE;
- }
-
- result = FALSE;
- /* else count bytes and time */
- first = -1;
- bytes = 0;
- /* unset limits */
- *min_idx = -1;
- *max_idx = -1;
- max_hit = FALSE;
-
- i = 0;
- /* loop through the buffers, when a limit is ok, mark it
- * as -1, we have at least one buffer in the queue. */
- do {
- GstBuffer *buf;
-
- /* if we checked all min limits, update result */
- if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
- /* don't go below 0 */
- *min_idx = MAX (i - 1, 0);
- }
- /* if we reached one max limit break out */
- if (max_hit) {
- /* i > 0 when we get here, we subtract one to get the position
- * of the previous buffer. */
- *max_idx = i - 1;
- /* we have valid complete result if we found a min_idx too */
- result = *min_idx != -1;
- break;
- }
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
-
- bytes += gst_buffer_get_size (buf);
-
- /* take timestamp and save for the base first timestamp */
- if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
- GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
- GST_TIME_ARGS (time));
- if (first == -1)
- first = time;
-
- /* increase max usage if we did not fill enough. Note that
- * buffers are sorted from new to old, so the first timestamp is
- * bigger than the next one. */
- if (time_min != -1 && first - time >= time_min)
- time_min = -1;
- if (time_max != -1 && first - time >= time_max)
- max_hit = TRUE;
- } else {
- GST_LOG_OBJECT (sink, "No timestamp on buffer");
- }
- /* time is OK or unknown, check and increase if not enough bytes */
- if (bytes_min != -1) {
- if (bytes >= bytes_min)
- bytes_min = -1;
- }
- if (bytes_max != -1) {
- if (bytes >= bytes_max) {
- max_hit = TRUE;
- }
- }
- i++;
- }
- while (i < len);
-
- /* if we did not hit the max or min limit, set to buffer size */
- if (*max_idx == -1)
- *max_idx = len - 1;
- /* make sure min does not exceed max */
- if (*min_idx == -1)
- *min_idx = *max_idx;
-
- return result;
-}
-
-/* parse the unit/value pair and assign it to the result value of the
- * right type, leave the other values untouched
- *
- * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
- */
-static gboolean
-assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
- GstClockTime * time)
-{
- gboolean res = TRUE;
-
- /* set only the limit of the given format to the given value */
- switch (format) {
- case GST_FORMAT_BUFFERS:
- *buffers = (gint) value;
- break;
- case GST_FORMAT_TIME:
- *time = value;
- break;
- case GST_FORMAT_BYTES:
- *bytes = (gint) value;
- break;
- case GST_FORMAT_UNDEFINED:
- default:
- res = FALSE;
- break;
- }
- return res;
-}
-
-/* count the index in the buffer queue to satisfy the given unit
- * and value pair starting from buffer at index 0.
- *
- * Returns: TRUE if there was enough data in the queue to satisfy the
- * burst values. @idx contains the index in the buffer that contains enough
- * data to satisfy the limits or the last buffer in the queue when the
- * function returns FALSE.
- */
-static gboolean
-count_burst_unit (GstMultiSocketSink * sink, gint * min_idx,
- GstFormat min_format, guint64 min_value, gint * max_idx,
- GstFormat max_format, guint64 max_value)
-{
- gint bytes_min = -1, buffers_min = -1;
- gint bytes_max = -1, buffers_max = -1;
- GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
-
- assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
- assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
-
- return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
- max_idx, bytes_max, buffers_max, time_max);
-}
-
-/* decide where in the current buffer queue this new client should start
- * receiving buffers from.
- * This function is called whenever a client is connected and has not yet
- * received a buffer.
- * If this returns -1, it means that we haven't found a good point to
- * start streaming from yet, and this function should be called again later
- * when more buffers have arrived.
- */
-static gint
-gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
- GstSocketClient * client)
-{
- gint result;
- GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
-
- GST_DEBUG_OBJECT (sink,
- "[socket %p] new client, deciding where to start in queue",
- client->socket);
- GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
- mhsink->bufqueue->len);
- switch (mhclient->sync_method) {
- case GST_SYNC_METHOD_LATEST:
- /* no syncing, we are happy with whatever the client is going to get */
- result = mhclient->bufpos;
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket,
- result);
- break;
- case GST_SYNC_METHOD_NEXT_KEYFRAME:
- {
- /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
- * is a sync point, we can proceed, otherwise we need to keep waiting */
- GST_LOG_OBJECT (sink,
- "[socket %p] new client, bufpos %d, waiting for keyframe",
- client->socket, mhclient->bufpos);
-
- result = find_prev_syncframe (mhsink, mhclient->bufpos);
- if (result != -1) {
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
- client->socket, result);
- break;
- }
-
- /* client is not on a syncbuffer, need to skip these buffers and
- * wait some more */
- GST_LOG_OBJECT (sink,
- "[socket %p] new client, skipping buffer(s), no syncpoint found",
- client->socket);
- mhclient->bufpos = -1;
- break;
- }
- case GST_SYNC_METHOD_LATEST_KEYFRAME:
- {
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket);
-
- /* for new clients we initially scan the complete buffer queue for
- * a sync point when a buffer is added. If we don't find a keyframe,
- * we need to wait for the next keyframe and so we change the client's
- * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
- */
- result = find_next_syncframe (mhsink, 0);
- if (result != -1) {
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
- client->socket, result);
- break;
- }
-
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
- "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket);
- /* throw client to the waiting state */
- mhclient->bufpos = -1;
- /* and make client sync to next keyframe */
- mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
- break;
- }
- case GST_SYNC_METHOD_BURST:
- {
- gboolean ok;
- gint max;
-
- /* move to the position where we satisfy the client's burst
- * parameters. If we could not satisfy the parameters because there
- * is not enough data, we just send what we have (which is in result).
- * We use the max value to limit the search
- */
- ok = count_burst_unit (sink, &result, client->burst_min_format,
- client->burst_min_value, &max, client->burst_max_format,
- client->burst_max_value);
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
- client->socket, ok, result);
-
- GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
-
- /* we hit the max and it is below the min, use that then */
- if (max != -1 && max <= result) {
- result = MAX (max - 1, 0);
- GST_DEBUG_OBJECT (sink,
- "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d",
- client->socket, result);
- }
- break;
- }
- case GST_SYNC_METHOD_BURST_KEYFRAME:
- {
- gint min_idx, max_idx;
- gint next_syncframe, prev_syncframe;
-
- /* BURST_KEYFRAME:
- *
- * _always_ start sending a keyframe to the client. We first search
- * a keyframe between min/max limits. If there is none, we send it the
- * last keyframe before min. If there is none, the behaviour is like
- * NEXT_KEYFRAME.
- */
- /* gather burst limits */
- count_burst_unit (sink, &min_idx, client->burst_min_format,
- client->burst_min_value, &max_idx, client->burst_max_format,
- client->burst_max_value);
-
- GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
- /* first find a keyframe after min_idx */
- next_syncframe = find_next_syncframe (mhsink, min_idx);
- if (next_syncframe != -1 && next_syncframe < max_idx) {
- /* we have a valid keyframe and it's below the max */
- GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
- result = next_syncframe;
- break;
- }
-
- /* no valid keyframe, try to find one below min */
- prev_syncframe = find_prev_syncframe (mhsink, min_idx);
- if (prev_syncframe != -1) {
- GST_WARNING_OBJECT (sink,
- "using keyframe below min in BURST_KEYFRAME sync mode");
- result = prev_syncframe;
- break;
- }
-
- /* no prev keyframe or not enough data */
- GST_WARNING_OBJECT (sink,
- "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
-
- /* throw client to the waiting state */
- mhclient->bufpos = -1;
- /* and make client sync to next keyframe */
- mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
- result = -1;
- break;
- }
- case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
- {
- gint min_idx, max_idx;
- gint next_syncframe;
-
- /* BURST_WITH_KEYFRAME:
- *
- * try to start sending a keyframe to the client. We first search
- * a keyframe between min/max limits. If there is none, we send it the
- * amount of data up 'till min.
- */
- /* gather enough data to burst */
- count_burst_unit (sink, &min_idx, client->burst_min_format,
- client->burst_min_value, &max_idx, client->burst_max_format,
- client->burst_max_value);
-
- GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
-
- /* first find a keyframe after min_idx */
- next_syncframe = find_next_syncframe (mhsink, min_idx);
- if (next_syncframe != -1 && next_syncframe < max_idx) {
- /* we have a valid keyframe and it's below the max */
- GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
- result = next_syncframe;
- break;
- }
-
- /* no keyframe, send data from min_idx */
- GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
-
- /* make sure we don't go over the max limit */
- if (max_idx != -1 && max_idx <= min_idx) {
- result = MAX (max_idx - 1, 0);
- } else {
- result = min_idx;
- }
-
- break;
- }
- default:
- g_warning ("unknown sync method %d", mhclient->sync_method);
- result = mhclient->bufpos;
- break;
- }
- return result;
-}
-
/* Handle a write on a client,
* which indicates a read request from a client.
*
if (mhclient->bufpos == -1) {
/* client is too fast, remove from write queue until new buffer is
* available */
+ // FIXME: specific
if (client->source) {
g_source_destroy (client->source);
g_source_unref (client->source);
client->source = NULL;
}
+ //
/* if we flushed out all of the client buffers, we can stop */
if (mhclient->flushcount == 0)
goto flushed;
/* for new connections, we need to find a good spot in the
* bufqueue to start streaming from */
if (mhclient->new_connection && !flushing) {
- gint position = gst_multi_socket_sink_new_client (sink, client);
+ gint position = gst_multi_handle_sink_new_client (mhsink, mhclient);
if (position >= 0) {
/* we got a valid spot in the queue */
mhclient->bufpos = position;
} else {
/* cannot send data to this client yet */
+ // FIXME: specific
if (client->source) {
g_source_destroy (client->source);
g_source_unref (client->source);
client->source = NULL;
}
+ //
return TRUE;
}
}
gst_buffer_map (head, &map, GST_MAP_READ);
maxsize = map.size - mhclient->bufoffset;
+ // FIXME: specific
/* try to write the complete buffer */
wrote =
}
}
-/* calculate the new position for a client after recovery. This function
- * does not update the client position but merely returns the required
- * position.
- */
-static gint
-gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
- GstSocketClient * client)
-{
- gint newbufpos;
- GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
- GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-
- GST_WARNING_OBJECT (sink,
- "[socket %p] client %p is lagging at %d, recover using policy %d",
- client->socket, client, mhclient->bufpos, mhsink->recover_policy);
-
- switch (mhsink->recover_policy) {
- case GST_RECOVER_POLICY_NONE:
- /* do nothing, client will catch up or get kicked out when it reaches
- * the hard max */
- newbufpos = mhclient->bufpos;
- break;
- case GST_RECOVER_POLICY_RESYNC_LATEST:
- /* move to beginning of queue */
- newbufpos = -1;
- break;
- case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
- /* move to beginning of soft max */
- newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
- break;
- case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
- /* find keyframe in buffers, we search backwards to find the
- * closest keyframe relative to what this client already received. */
- newbufpos = MIN (mhsink->bufqueue->len - 1,
- get_buffers_max (sink, mhsink->units_soft_max) - 1);
-
- while (newbufpos >= 0) {
- GstBuffer *buf;
-
- buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
- if (is_sync_frame (mhsink, buf)) {
- /* found a buffer that is not a delta unit */
- break;
- }
- newbufpos--;
- }
- break;
- default:
- /* unknown recovery procedure */
- newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
- break;
- }
- return newbufpos;
-}
-
/* Queue a buffer on the global queue.
*
* This function adds the buffer to the front of a GArray. It removes the
queuelen = mhsink->bufqueue->len;
if (mhsink->units_max > 0)
- max_buffers = get_buffers_max (sink, mhsink->units_max);
+ max_buffers = get_buffers_max (mhsink, mhsink->units_max);
else
max_buffers = -1;
if (mhsink->units_soft_max > 0)
- soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
+ soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
gint newpos;
- newpos = gst_multi_socket_sink_recover_client (sink, client);
+ newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient);
if (newpos != mhclient->bufpos) {
mhclient->dropped_buffers += mhclient->bufpos - newpos;
mhclient->bufpos = newpos;
/* get index where the limits are ok, we don't really care if all limits
* are ok, we just queue as much as we need. We also don't compare against
* the max limits. */
- find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
+ find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min,
mhsink->time_min, &max, -1, -1, -1);
max_buffer_usage = MAX (max_buffer_usage, usage + 1);
gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
- GstMultiSocketSink *multisocketsink;
-
- multisocketsink = GST_MULTI_SOCKET_SINK (object);
-
switch (prop_id) {
- case PROP_UNIT_FORMAT:
- multisocketsink->unit_format = g_value_get_enum (value);
- break;
- case PROP_BURST_FORMAT:
- multisocketsink->def_burst_format = g_value_get_enum (value);
- break;
- case PROP_BURST_VALUE:
- multisocketsink->def_burst_value = g_value_get_uint64 (value);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
multisocketsink = GST_MULTI_SOCKET_SINK (object);
switch (prop_id) {
- case PROP_UNIT_FORMAT:
- g_value_set_enum (value, multisocketsink->unit_format);
- break;
- case PROP_BURST_FORMAT:
- g_value_set_enum (value, multisocketsink->def_burst_format);
- break;
- case PROP_BURST_VALUE:
- g_value_set_uint64 (value, multisocketsink->def_burst_value);
- break;
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multisocketsink->socket_hash));
GSocket *socket;
GSource *source;
-
- /* method to sync client when connecting */
- GstSyncMethod sync_method;
- GstFormat burst_min_format;
- guint64 burst_min_value;
- GstFormat burst_max_format;
- guint64 burst_max_value;
} GstSocketClient;
/**
GMainContext *main_context;
GCancellable *cancellable;
- gboolean previous_buffer_in_caps;
-
- guint mtu;
-
- /* these values are used to check if a client is reading fast
- * enough and to control receovery */
- GstFormat unit_format;/* the format of the units */
-
- GstFormat def_burst_format;
- guint64 def_burst_value;
-
guint8 header_flags;
};
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */
- g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
+ g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
/* now add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
- g_signal_emit_by_name (sink, "add_full", pfd2[1], 3,
- GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
- (guint64) 200);
- g_signal_emit_by_name (sink, "add_full", pfd3[1], 3,
- GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
- (guint64) 50);
+ g_signal_emit_by_name (sink, "add_full", pfd2[1], GST_SYNC_METHOD_BURST,
+ GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200);
+ g_signal_emit_by_name (sink, "add_full", pfd3[1], GST_SYNC_METHOD_BURST,
+ GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */
- g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
+ g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
/* now add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
- g_signal_emit_by_name (sink, "add_full", pfd2[1], 4,
- GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
- (guint64) 90);
- g_signal_emit_by_name (sink, "add_full", pfd3[1], 4,
- GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
- (guint64) 50);
+ g_signal_emit_by_name (sink, "add_full", pfd2[1],
+ GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+ GST_FORMAT_BYTES, (guint64) 90);
+ g_signal_emit_by_name (sink, "add_full", pfd3[1],
+ GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+ GST_FORMAT_BYTES, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */
- g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
+ g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
/* now add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
- g_signal_emit_by_name (sink, "add_full", pfd2[1], 5,
- GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
- (guint64) 90);
- g_signal_emit_by_name (sink, "add_full", pfd3[1], 5,
- GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES,
- (guint64) 50);
+ g_signal_emit_by_name (sink, "add_full", pfd2[1],
+ GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES,
+ (guint64) 50, GST_FORMAT_BYTES, (guint64) 90);
+ g_signal_emit_by_name (sink, "add_full", pfd3[1],
+ GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES,
+ (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
#include <gio/gio.h>
#include <gst/check/gstcheck.h>
+#include "gst/tcp/gstmultisocketsink.h"
+
static GstPad *mysrcpad;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
/* now add the clients */
g_signal_emit_by_name (sink, "add", socket[0]);
- g_signal_emit_by_name (sink, "add_full", socket[2], 3,
+ g_signal_emit_by_name (sink, "add_full", socket[2], GST_SYNC_METHOD_BURST,
GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200);
- g_signal_emit_by_name (sink, "add_full", socket[4], 3,
+ g_signal_emit_by_name (sink, "add_full", socket[4], GST_SYNC_METHOD_BURST,
GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
/* push last buffer to make client fds ready for reading */
/* now add the clients */
g_signal_emit_by_name (sink, "add", socket[0]);
- g_signal_emit_by_name (sink, "add_full", socket[2], 4,
- GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 90);
- g_signal_emit_by_name (sink, "add_full", socket[4], 4,
- GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
+ g_signal_emit_by_name (sink, "add_full", socket[2],
+ GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+ GST_FORMAT_BYTES, (guint64) 90);
+ g_signal_emit_by_name (sink, "add_full", socket[4],
+ GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+ GST_FORMAT_BYTES, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
/* now add the clients */
g_signal_emit_by_name (sink, "add", socket[0]);
- g_signal_emit_by_name (sink, "add_full", socket[2], 5,
- GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 90);
- g_signal_emit_by_name (sink, "add_full", socket[4], 5,
- GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
+ g_signal_emit_by_name (sink, "add_full", socket[2],
+ GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+ GST_FORMAT_BYTES, (guint64) 90);
+ g_signal_emit_by_name (sink, "add_full", socket[4],
+ GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
+ GST_FORMAT_BYTES, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {