From 14ac8bb5854c86452eed978d75a9a5d3891efdc2 Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Fri, 27 Jan 2012 18:44:04 +0100 Subject: [PATCH] multihandlesink: rework to use GST_TYPE_FORMAT --- gst/tcp/gstmultifdsink.c | 579 +-------------------------------- gst/tcp/gstmultifdsink.h | 33 -- gst/tcp/gstmultihandlesink.c | 78 ++--- gst/tcp/gstmultihandlesink.h | 15 +- gst/tcp/gstmultisocketsink.c | 568 +------------------------------- gst/tcp/gstmultisocketsink.h | 18 - tests/check/elements/multifdsink.c | 40 ++- tests/check/elements/multisocketsink.c | 26 +- 8 files changed, 103 insertions(+), 1254 deletions(-) diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index c57ada9..136b22d 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -148,10 +148,6 @@ enum /* this is really arbitrarily chosen */ #define DEFAULT_MODE 1 -#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS - -#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED -#define DEFAULT_BURST_VALUE 0 #define DEFAULT_HANDLE_READ TRUE @@ -160,11 +156,6 @@ enum PROP_0, PROP_MODE, - PROP_UNIT_FORMAT, - - PROP_BURST_FORMAT, - PROP_BURST_VALUE, - PROP_HANDLE_READ, PROP_NUM_FDS, @@ -192,25 +183,6 @@ gst_fdset_mode_get_type (void) return fdset_mode_type; } -#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type()) -static GType -gst_unit_format_get_type (void) -{ - static GType unit_format_type = 0; - static const GEnumValue unit_format[] = { - {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"}, - {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"}, - {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"}, - {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"}, - {0, NULL, NULL}, - }; - - if (!unit_format_type) { - unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format); - } - return unit_format_type; -} - static void gst_multi_fd_sink_finalize (GObject * object); static void gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink); @@ -266,22 +238,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) GST_TYPE_FDSET_MODE, DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT, - g_param_spec_enum ("unit-format", "Units format", - "The unit to measure the max/soft-max/queued properties", - GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, - g_param_spec_enum ("burst-format", "Burst format", - "The format of the burst units (when sync-method is burst[[-with]-keyframe])", - GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_VALUE, - g_param_spec_uint64 ("burst-value", "Burst value", - "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, - DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - /** * GstMultiFdSink::handle-read * @@ -331,8 +287,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, add_full), NULL, NULL, gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, - G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT, - G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64); + G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, + G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64); /** * GstMultiFdSink::remove: * @gstmultifdsink: the multifdsink element to emit this signal on @@ -478,11 +434,6 @@ gst_multi_fd_sink_init (GstMultiFdSink * this) this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal); - this->unit_format = DEFAULT_UNIT_FORMAT; - - this->def_burst_format = DEFAULT_BURST_FORMAT; - this->def_burst_value = DEFAULT_BURST_VALUE; - this->handle_read = DEFAULT_HANDLE_READ; } @@ -535,10 +486,10 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, g_snprintf (mhclient->debug, 30, "[fd %5d]", fd); client->fd.fd = fd; - client->burst_min_format = min_format; - client->burst_min_value = min_value; - client->burst_max_format = max_format; - client->burst_max_value = max_value; + mhclient->burst_min_format = min_format; + mhclient->burst_min_value = min_value; + mhclient->burst_max_format = max_format; + mhclient->burst_max_value = max_value; CLIENTS_LOCK (sink); @@ -613,8 +564,8 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) mhsink = GST_MULTI_HANDLE_SINK (sink); gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method, - sink->def_burst_format, sink->def_burst_value, sink->def_burst_format, - -1); + mhsink->def_burst_format, mhsink->def_burst_value, + mhsink->def_burst_format, -1); } /* "remove" signal implementation */ @@ -1086,434 +1037,6 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, return TRUE; } -/* Get the number of buffers from the buffer queue needed to satisfy - * the maximum max in the configured units. - * If units are not BUFFERS, and there are insufficient buffers in the - * queue to satify the limit, return len(queue) + 1 */ -static gint -get_buffers_max (GstMultiFdSink * sink, gint64 max) -{ - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - switch (sink->unit_format) { - case GST_TCP_UNIT_FORMAT_BUFFERS: - return max; - case GST_TCP_UNIT_FORMAT_TIME: - { - GstBuffer *buf; - int i; - int len; - gint64 diff; - GstClockTime first = GST_CLOCK_TIME_NONE; - - len = mhsink->bufqueue->len; - - for (i = 0; i < len; i++) { - buf = g_array_index (mhsink->bufqueue, GstBuffer *, i); - if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { - if (first == -1) - first = GST_BUFFER_TIMESTAMP (buf); - - diff = first - GST_BUFFER_TIMESTAMP (buf); - - if (diff > max) - return i + 1; - } - } - return len + 1; - } - case GST_TCP_UNIT_FORMAT_BYTES: - { - GstBuffer *buf; - int i; - int len; - gint acc = 0; - - len = mhsink->bufqueue->len; - - for (i = 0; i < len; i++) { - buf = g_array_index (mhsink->bufqueue, GstBuffer *, i); - acc += gst_buffer_get_size (buf); - - if (acc > max) - return i + 1; - } - return len + 1; - } - default: - return max; - } -} - -/* find the positions in the buffer queue where *_min and *_max - * is satisfied - */ -/* count the amount of data in the buffers and return the index - * that satifies the given limits. - * - * Returns: index @idx in the buffer queue so that the given limits are - * satisfied. TRUE if all the limits could be satisfied, FALSE if not - * enough data was in the queue. - * - * FIXME, this code might now work if any of the units is in buffers... - */ -static gboolean -find_limits (GstMultiFdSink * sink, - gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, - gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) -{ - GstClockTime first, time; - gint i, len, bytes; - gboolean result, max_hit; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - /* take length of queue */ - len = mhsink->bufqueue->len; - - /* this must hold */ - g_assert (len > 0); - - GST_LOG_OBJECT (sink, - "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT - ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min, - buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max, - GST_TIME_ARGS (time_max)); - - /* do the trivial buffer limit test */ - if (buffers_min != -1 && len < buffers_min) { - *min_idx = len - 1; - *max_idx = len - 1; - return FALSE; - } - - result = FALSE; - /* else count bytes and time */ - first = -1; - bytes = 0; - /* unset limits */ - *min_idx = -1; - *max_idx = -1; - max_hit = FALSE; - - i = 0; - /* loop through the buffers, when a limit is ok, mark it - * as -1, we have at least one buffer in the queue. */ - do { - GstBuffer *buf; - - /* if we checked all min limits, update result */ - if (bytes_min == -1 && time_min == -1 && *min_idx == -1) { - /* don't go below 0 */ - *min_idx = MAX (i - 1, 0); - } - /* if we reached one max limit break out */ - if (max_hit) { - /* i > 0 when we get here, we subtract one to get the position - * of the previous buffer. */ - *max_idx = i - 1; - /* we have valid complete result if we found a min_idx too */ - result = *min_idx != -1; - break; - } - buf = g_array_index (mhsink->bufqueue, GstBuffer *, i); - - bytes += gst_buffer_get_size (buf); - - /* take timestamp and save for the base first timestamp */ - if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) { - GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer", - GST_TIME_ARGS (time)); - if (first == -1) - first = time; - - /* increase max usage if we did not fill enough. Note that - * buffers are sorted from new to old, so the first timestamp is - * bigger than the next one. */ - if (time_min != -1 && first - time >= time_min) - time_min = -1; - if (time_max != -1 && first - time >= time_max) - max_hit = TRUE; - } else { - GST_LOG_OBJECT (sink, "No timestamp on buffer"); - } - /* time is OK or unknown, check and increase if not enough bytes */ - if (bytes_min != -1) { - if (bytes >= bytes_min) - bytes_min = -1; - } - if (bytes_max != -1) { - if (bytes >= bytes_max) { - max_hit = TRUE; - } - } - i++; - } - while (i < len); - - /* if we did not hit the max or min limit, set to buffer size */ - if (*max_idx == -1) - *max_idx = len - 1; - /* make sure min does not exceed max */ - if (*min_idx == -1) - *min_idx = *max_idx; - - return result; -} - -/* parse the unit/value pair and assign it to the result value of the - * right type, leave the other values untouched - * - * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. - */ -static gboolean -assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, - GstClockTime * time) -{ - gboolean res = TRUE; - - /* set only the limit of the given format to the given value */ - switch (format) { - case GST_TCP_UNIT_FORMAT_BUFFERS: - *buffers = (gint) value; - break; - case GST_TCP_UNIT_FORMAT_TIME: - *time = value; - break; - case GST_TCP_UNIT_FORMAT_BYTES: - *bytes = (gint) value; - break; - case GST_TCP_UNIT_FORMAT_UNDEFINED: - default: - res = FALSE; - break; - } - return res; -} - -/* count the index in the buffer queue to satisfy the given unit - * and value pair starting from buffer at index 0. - * - * Returns: TRUE if there was enough data in the queue to satisfy the - * burst values. @idx contains the index in the buffer that contains enough - * data to satisfy the limits or the last buffer in the queue when the - * function returns FALSE. - */ -static gboolean -count_burst_format (GstMultiFdSink * sink, gint * min_idx, - GstFormat min_format, guint64 min_value, gint * max_idx, - GstFormat max_format, guint64 max_value) -{ - gint bytes_min = -1, buffers_min = -1; - gint bytes_max = -1, buffers_max = -1; - GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE; - - assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min); - assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max); - - return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, - max_idx, bytes_max, buffers_max, time_max); -} - -/* decide where in the current buffer queue this new client should start - * receiving buffers from. - * This function is called whenever a client is connected and has not yet - * received a buffer. - * If this returns -1, it means that we haven't found a good point to - * start streaming from yet, and this function should be called again later - * when more buffers have arrived. - */ -static gint -gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) -{ - gint result; - GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - GST_DEBUG_OBJECT (sink, - "[fd %5d] new client, deciding where to start in queue", client->fd.fd); - GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", - mhsink->bufqueue->len); - switch (mhclient->sync_method) { - case GST_SYNC_METHOD_LATEST: - /* no syncing, we are happy with whatever the client is going to get */ - result = mhclient->bufpos; - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result); - break; - case GST_SYNC_METHOD_NEXT_KEYFRAME: - { - /* if one of the new buffers (between mhclient->bufpos and 0) in the queue - * is a sync point, we can proceed, otherwise we need to keep waiting */ - GST_LOG_OBJECT (sink, - "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd, - mhclient->bufpos); - - result = find_prev_syncframe (mhsink, mhclient->bufpos); - if (result != -1) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d", - client->fd.fd, result); - break; - } - - /* client is not on a syncbuffer, need to skip these buffers and - * wait some more */ - GST_LOG_OBJECT (sink, - "[fd %5d] new client, skipping buffer(s), no syncpoint found", - client->fd.fd); - mhclient->bufpos = -1; - break; - } - case GST_SYNC_METHOD_LATEST_KEYFRAME: - { - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd); - - /* for new clients we initially scan the complete buffer queue for - * a sync point when a buffer is added. If we don't find a keyframe, - * we need to wait for the next keyframe and so we change the client's - * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME. - */ - result = find_next_syncframe (mhsink, 0); - if (result != -1) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd, - result); - break; - } - - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " - "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd); - /* throw client to the waiting state */ - mhclient->bufpos = -1; - /* and make client sync to next keyframe */ - mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; - break; - } - case GST_SYNC_METHOD_BURST: - { - gboolean ok; - gint max; - - /* move to the position where we satisfy the client's burst - * parameters. If we could not satisfy the parameters because there - * is not enough data, we just send what we have (which is in result). - * We use the max value to limit the search - */ - ok = count_burst_format (sink, &result, client->burst_min_format, - client->burst_min_value, &max, client->burst_max_format, - client->burst_max_value); - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d", - client->fd.fd, ok, result); - - GST_LOG_OBJECT (sink, "min %d, max %d", result, max); - - /* we hit the max and it is below the min, use that then */ - if (max != -1 && max <= result) { - result = MAX (max - 1, 0); - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d", - client->fd.fd, result); - } - break; - } - case GST_SYNC_METHOD_BURST_KEYFRAME: - { - gint min_idx, max_idx; - gint next_syncframe, prev_syncframe; - - /* BURST_KEYFRAME: - * - * _always_ start sending a keyframe to the client. We first search - * a keyframe between min/max limits. If there is none, we send it the - * last keyframe before min. If there is none, the behaviour is like - * NEXT_KEYFRAME. - */ - /* gather burst limits */ - count_burst_format (sink, &min_idx, client->burst_min_format, - client->burst_min_value, &max_idx, client->burst_max_format, - client->burst_max_value); - - GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); - - /* first find a keyframe after min_idx */ - next_syncframe = find_next_syncframe (mhsink, min_idx); - if (next_syncframe != -1 && next_syncframe < max_idx) { - /* we have a valid keyframe and it's below the max */ - GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); - result = next_syncframe; - break; - } - - /* no valid keyframe, try to find one below min */ - prev_syncframe = find_prev_syncframe (mhsink, min_idx); - if (prev_syncframe != -1) { - GST_WARNING_OBJECT (sink, - "using keyframe below min in BURST_KEYFRAME sync mode"); - result = prev_syncframe; - break; - } - - /* no prev keyframe or not enough data */ - GST_WARNING_OBJECT (sink, - "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next"); - - /* throw client to the waiting state */ - mhclient->bufpos = -1; - /* and make client sync to next keyframe */ - mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; - result = -1; - break; - } - case GST_SYNC_METHOD_BURST_WITH_KEYFRAME: - { - gint min_idx, max_idx; - gint next_syncframe; - - /* BURST_WITH_KEYFRAME: - * - * try to start sending a keyframe to the client. We first search - * a keyframe between min/max limits. If there is none, we send it the - * amount of data up 'till min. - */ - /* gather enough data to burst */ - count_burst_format (sink, &min_idx, client->burst_min_format, - client->burst_min_value, &max_idx, client->burst_max_format, - client->burst_max_value); - - GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); - - /* first find a keyframe after min_idx */ - next_syncframe = find_next_syncframe (mhsink, min_idx); - if (next_syncframe != -1 && next_syncframe < max_idx) { - /* we have a valid keyframe and it's below the max */ - GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); - result = next_syncframe; - break; - } - - /* no keyframe, send data from min_idx */ - GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode"); - - /* make sure we don't go over the max limit */ - if (max_idx != -1 && max_idx <= min_idx) { - result = MAX (max_idx - 1, 0); - } else { - result = min_idx; - } - - break; - } - default: - g_warning ("unknown sync method %d", mhclient->sync_method); - result = mhclient->bufpos; - break; - } - return result; -} - /* Handle a write on a client, * which indicates a read request from a client. * @@ -1565,7 +1088,9 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, if (mhclient->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ + // FIXME: specific gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); + // /* if we flushed out all of the client buffers, we can stop */ if (mhclient->flushcount == 0) goto flushed; @@ -1579,7 +1104,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* for new connections, we need to find a good spot in the * bufqueue to start streaming from */ if (mhclient->new_connection && !flushing) { - gint position = gst_multi_fd_sink_new_client (sink, client); + gint position = gst_multi_handle_sink_new_client (mhsink, mhclient); if (position >= 0) { /* we got a valid spot in the queue */ @@ -1587,6 +1112,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, mhclient->bufpos = position; } else { /* cannot send data to this client yet */ + // FIXME: specific gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); return TRUE; } @@ -1636,6 +1162,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, data = info.data; maxsize = info.size - mhclient->bufoffset; + // FIXME: specific /* try to write the complete buffer */ #ifdef MSG_NOSIGNAL #define FLAGS MSG_NOSIGNAL @@ -1707,60 +1234,6 @@ write_error: } } -/* calculate the new position for a client after recovery. This function - * does not update the client position but merely returns the required - * position. - */ -static gint -gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) -{ - gint newbufpos; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - - GST_WARNING_OBJECT (sink, - "[fd %5d] client %p is lagging at %d, recover using policy %d", - client->fd.fd, client, mhclient->bufpos, mhsink->recover_policy); - - switch (mhsink->recover_policy) { - case GST_RECOVER_POLICY_NONE: - /* do nothing, client will catch up or get kicked out when it reaches - * the hard max */ - newbufpos = mhclient->bufpos; - break; - case GST_RECOVER_POLICY_RESYNC_LATEST: - /* move to beginning of queue */ - newbufpos = -1; - break; - case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: - /* move to beginning of soft max */ - newbufpos = get_buffers_max (sink, mhsink->units_soft_max); - break; - case GST_RECOVER_POLICY_RESYNC_KEYFRAME: - /* find keyframe in buffers, we search backwards to find the - * closest keyframe relative to what this client already received. */ - newbufpos = MIN (mhsink->bufqueue->len - 1, - get_buffers_max (sink, mhsink->units_soft_max) - 1); - - while (newbufpos >= 0) { - GstBuffer *buf; - - buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos); - if (is_sync_frame (mhsink, buf)) { - /* found a buffer that is not a delta unit */ - break; - } - newbufpos--; - } - break; - default: - /* unknown recovery procedure */ - newbufpos = get_buffers_max (sink, mhsink->units_soft_max); - break; - } - return newbufpos; -} - /* Queue a buffer on the global queue. * * This function adds the buffer to the front of a GArray. It removes the @@ -1804,12 +1277,12 @@ gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer) queuelen = mhsink->bufqueue->len; if (mhsink->units_max > 0) - max_buffers = get_buffers_max (sink, mhsink->units_max); + max_buffers = get_buffers_max (mhsink, mhsink->units_max); else max_buffers = -1; if (mhsink->units_soft_max > 0) - soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max); + soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max); else soft_max_buffers = -1; GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers, @@ -1840,7 +1313,7 @@ restart: if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) { gint newpos; - newpos = gst_multi_fd_sink_recover_client (sink, client); + newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient); if (newpos != mhclient->bufpos) { mhclient->dropped_buffers += mhclient->bufpos - newpos; mhclient->bufpos = newpos; @@ -1893,7 +1366,7 @@ restart: /* get index where the limits are ok, we don't really care if all limits * are ok, we just queue as much as we need. We also don't compare against * the max limits. */ - find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min, + find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min, mhsink->time_min, &max, -1, -1, -1); max_buffer_usage = MAX (max_buffer_usage, usage + 1); @@ -2156,15 +1629,6 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, case PROP_MODE: multifdsink->mode = g_value_get_enum (value); break; - case PROP_UNIT_FORMAT: - multifdsink->unit_format = g_value_get_enum (value); - break; - case PROP_BURST_FORMAT: - multifdsink->def_burst_format = g_value_get_enum (value); - break; - case PROP_BURST_VALUE: - multifdsink->def_burst_value = g_value_get_uint64 (value); - break; case PROP_HANDLE_READ: multifdsink->handle_read = g_value_get_boolean (value); break; @@ -2187,15 +1651,6 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_MODE: g_value_set_enum (value, multifdsink->mode); break; - case PROP_UNIT_FORMAT: - g_value_set_enum (value, multifdsink->unit_format); - break; - case PROP_BURST_FORMAT: - g_value_set_enum (value, multifdsink->def_burst_format); - break; - case PROP_BURST_VALUE: - g_value_set_uint64 (value, multifdsink->def_burst_value); - break; case PROP_HANDLE_READ: g_value_set_boolean (value, multifdsink->handle_read); break; diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 89ec423..d589539 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -47,23 +47,6 @@ typedef struct _GstMultiFdSink GstMultiFdSink; typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; -/** - * GstTCPUnitType: - * @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined - * @GST_TCP_UNIT_FORMAT_BUFFERS : buffers - * @GST_TCP_UNIT_FORMAT_TIME : timeunits (in nanoseconds) - * @GST_TCP_UNIT_FORMAT_BYTES : bytes - * - * The units used to specify limits. - */ -typedef enum -{ - GST_TCP_UNIT_FORMAT_UNDEFINED, - GST_TCP_UNIT_FORMAT_BUFFERS, - GST_TCP_UNIT_FORMAT_TIME, - GST_TCP_UNIT_FORMAT_BYTES, -} GstTCPUnitType; - /* structure for a client */ typedef struct { @@ -72,15 +55,6 @@ typedef struct { GstPollFD fd; gboolean is_socket; - - gboolean caps_sent; - - /* method to sync client when connecting */ - GstSyncMethod sync_method; - GstFormat burst_min_format; - guint64 burst_min_value; - GstFormat burst_max_format; - guint64 burst_max_value; } GstTCPClient; /** @@ -99,13 +73,6 @@ struct _GstMultiFdSink { gboolean handle_read; - /* these values are used to check if a client is reading fast - * enough and to control receovery */ - GstFormat unit_format;/* the type of the units */ - - GstFormat def_burst_format; - guint64 def_burst_value; - guint8 header_flags; }; diff --git a/gst/tcp/gstmultihandlesink.c b/gst/tcp/gstmultihandlesink.c index bb0ffcb..700f8fa 100644 --- a/gst/tcp/gstmultihandlesink.c +++ b/gst/tcp/gstmultihandlesink.c @@ -170,9 +170,7 @@ enum PROP_BYTES_QUEUED, PROP_TIME_QUEUED, -#if 0 PROP_UNIT_FORMAT, -#endif PROP_UNITS_MAX, PROP_UNITS_SOFT_MAX, @@ -189,10 +187,8 @@ enum PROP_BYTES_TO_SERVE, PROP_BYTES_SERVED, -#if 0 PROP_BURST_FORMAT, PROP_BURST_VALUE, -#endif PROP_QOS_DSCP, @@ -420,7 +416,6 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); -#if 0 g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, g_param_spec_enum ("burst-format", "Burst format", "The format of the burst units (when sync-method is burst[[-with]-keyframe])", @@ -430,7 +425,6 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) g_param_spec_uint64 ("burst-value", "Burst value", "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); -#endif g_object_class_install_property (gobject_class, PROP_QOS_DSCP, g_param_spec_int ("qos-dscp", "QoS diff srv code point", @@ -678,10 +672,8 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this) this->timeout = DEFAULT_TIMEOUT; this->def_sync_method = DEFAULT_SYNC_METHOD; -#if 0 this->def_burst_format = DEFAULT_BURST_FORMAT; this->def_burst_value = DEFAULT_BURST_VALUE; -#endif this->qos_dscp = DEFAULT_QOS_DSCP; @@ -1407,13 +1399,11 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction) return result; } - -#if 0 /* Get the number of buffers from the buffer queue needed to satisfy * the maximum max in the configured units. * If units are not BUFFERS, and there are insufficient buffers in the * queue to satify the limit, return len(queue) + 1 */ -static gint +gint get_buffers_max (GstMultiHandleSink * sink, gint64 max) { switch (sink->unit_format) { @@ -1465,9 +1455,7 @@ get_buffers_max (GstMultiHandleSink * sink, gint64 max) return max; } } -#endif -#if 0 /* find the positions in the buffer queue where *_min and *_max * is satisfied */ @@ -1480,7 +1468,7 @@ get_buffers_max (GstMultiHandleSink * sink, gint64 max) * * FIXME, this code might now work if any of the units is in buffers... */ -static gboolean +gboolean find_limits (GstMultiHandleSink * sink, gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) @@ -1611,9 +1599,7 @@ assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, } return res; } -#endif -#if 0 /* count the index in the buffer queue to satisfy the given unit * and value pair starting from buffer at index 0. * @@ -1637,9 +1623,7 @@ count_burst_unit (GstMultiHandleSink * sink, gint * min_idx, return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, max_idx, bytes_max, buffers_max, time_max); } -#endif -#if 0 /* decide where in the current buffer queue this new client should start * receiving buffers from. * This function is called whenever a client is connected and has not yet @@ -1648,15 +1632,14 @@ count_burst_unit (GstMultiHandleSink * sink, gint * min_idx, * start streaming from yet, and this function should be called again later * when more buffers have arrived. */ -static gint +gint gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, - GstSocketClient * client) + GstMultiHandleClient * client) { gint result; GST_DEBUG_OBJECT (sink, - "[socket %p] new client, deciding where to start in queue", - client->socket); + "%s new client, deciding where to start in queue", client->debug); GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", sink->bufqueue->len); switch (client->sync_method) { @@ -1664,37 +1647,34 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, /* no syncing, we are happy with whatever the client is going to get */ result = client->bufpos; GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket, - result); + "%s SYNC_METHOD_LATEST, position %d", client->debug, result); break; case GST_SYNC_METHOD_NEXT_KEYFRAME: { /* if one of the new buffers (between client->bufpos and 0) in the queue * is a sync point, we can proceed, otherwise we need to keep waiting */ GST_LOG_OBJECT (sink, - "[socket %p] new client, bufpos %d, waiting for keyframe", - client->socket, client->bufpos); + "%s new client, bufpos %d, waiting for keyframe", + client->debug, client->bufpos); result = find_prev_syncframe (sink, client->bufpos); if (result != -1) { GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d", - client->socket, result); + "%s SYNC_METHOD_NEXT_KEYFRAME: result %d", client->debug, result); break; } /* client is not on a syncbuffer, need to skip these buffers and * wait some more */ GST_LOG_OBJECT (sink, - "[socket %p] new client, skipping buffer(s), no syncpoint found", - client->socket); + "%s new client, skipping buffer(s), no syncpoint found", + client->debug); client->bufpos = -1; break; } case GST_SYNC_METHOD_LATEST_KEYFRAME: { - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket); + GST_DEBUG_OBJECT (sink, "%s SYNC_METHOD_LATEST_KEYFRAME", client->debug); /* for new clients we initially scan the complete buffer queue for * a sync point when a buffer is added. If we don't find a keyframe, @@ -1704,14 +1684,13 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, result = find_next_syncframe (sink, 0); if (result != -1) { GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d", - client->socket, result); + "%s SYNC_METHOD_LATEST_KEYFRAME: result %d", client->debug, result); break; } GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " - "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket); + "%s SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " + "switching to SYNC_METHOD_NEXT_KEYFRAME", client->debug); /* throw client to the waiting state */ client->bufpos = -1; /* and make client sync to next keyframe */ @@ -1732,8 +1711,8 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, client->burst_min_value, &max, client->burst_max_format, client->burst_max_value); GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d", - client->socket, ok, result); + "%s SYNC_METHOD_BURST: burst_unit returned %d, result %d", + client->debug, ok, result); GST_LOG_OBJECT (sink, "min %d, max %d", result, max); @@ -1741,8 +1720,8 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, if (max != -1 && max <= result) { result = MAX (max - 1, 0); GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d", - client->socket, result); + "%s SYNC_METHOD_BURST: result above max, taken down to %d", + client->debug, result); } break; } @@ -1840,7 +1819,6 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, } return result; } -#endif #if 0 /* Handle a write on a client, @@ -2034,20 +2012,19 @@ write_error: } #endif -#if 0 /* calculate the new position for a client after recovery. This function * does not update the client position but merely returns the required * position. */ -static gint +gint gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink, - GstSocketClient * client) + GstMultiHandleClient * client) { gint newbufpos; GST_WARNING_OBJECT (sink, - "[socket %p] client %p is lagging at %d, recover using policy %d", - client->socket, client, client->bufpos, sink->recover_policy); + "%s client %p is lagging at %d, recover using policy %d", + client->debug, client, client->bufpos, sink->recover_policy); switch (sink->recover_policy) { case GST_RECOVER_POLICY_NONE: @@ -2087,7 +2064,6 @@ gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink, } return newbufpos; } -#endif #if 0 /* Queue a buffer on the global queue. @@ -2536,7 +2512,6 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id, case PROP_BUFFERS_MIN: multihandlesink->buffers_min = g_value_get_int (value); break; -#if 0 case PROP_UNIT_FORMAT: multihandlesink->unit_format = g_value_get_enum (value); break; @@ -2546,7 +2521,6 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id, case PROP_UNITS_SOFT_MAX: multihandlesink->units_soft_max = g_value_get_int64 (value); break; -#endif case PROP_RECOVER_POLICY: multihandlesink->recover_policy = g_value_get_enum (value); break; @@ -2556,14 +2530,12 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id, case PROP_SYNC_METHOD: multihandlesink->def_sync_method = g_value_get_enum (value); break; -#if 0 case PROP_BURST_FORMAT: multihandlesink->def_burst_format = g_value_get_enum (value); break; case PROP_BURST_VALUE: multihandlesink->def_burst_value = g_value_get_uint64 (value); break; -#endif case PROP_QOS_DSCP: multihandlesink->qos_dscp = g_value_get_int (value); gst_multi_handle_sink_setup_dscp (multihandlesink); @@ -2612,7 +2584,6 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, case PROP_TIME_QUEUED: g_value_set_uint64 (value, multihandlesink->time_queued); break; -#if 0 case PROP_UNIT_FORMAT: g_value_set_enum (value, multihandlesink->unit_format); break; @@ -2622,7 +2593,6 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, case PROP_UNITS_SOFT_MAX: g_value_set_int64 (value, multihandlesink->units_soft_max); break; -#endif case PROP_RECOVER_POLICY: g_value_set_enum (value, multihandlesink->recover_policy); break; @@ -2638,14 +2608,12 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, case PROP_BYTES_SERVED: g_value_set_uint64 (value, multihandlesink->bytes_served); break; -#if 0 case PROP_BURST_FORMAT: g_value_set_enum (value, multihandlesink->def_burst_format); break; case PROP_BURST_VALUE: g_value_set_uint64 (value, multihandlesink->def_burst_value); break; -#endif case PROP_QOS_DSCP: g_value_set_int (value, multihandlesink->qos_dscp); break; diff --git a/gst/tcp/gstmultihandlesink.h b/gst/tcp/gstmultihandlesink.h index cbd86c7..ba4e23b 100644 --- a/gst/tcp/gstmultihandlesink.h +++ b/gst/tcp/gstmultihandlesink.h @@ -142,13 +142,10 @@ typedef struct { /* method to sync client when connecting */ GstSyncMethod sync_method; -// FIXME: refactor format vs unit -#if 0 GstFormat burst_min_format; guint64 burst_min_value; GstFormat burst_max_format; guint64 burst_max_value; -#endif GstCaps *caps; /* caps of last queued buffer */ @@ -179,6 +176,18 @@ gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink); gboolean gst_multi_handle_sink_start (GstBaseSink * bsink); void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink); gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client); +gint get_buffers_max (GstMultiHandleSink * sink, gint64 max); +gint +gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink, + GstMultiHandleClient * client); +gint +gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, + GstMultiHandleClient * client); +gboolean +find_limits (GstMultiHandleSink * sink, + gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, + gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max); + /** * GstMultiHandleSink: diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index f7b1840..831ffdf 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -136,22 +136,10 @@ enum LAST_SIGNAL }; - -/* this is really arbitrarily chosen */ -#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS - -#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED -#define DEFAULT_BURST_VALUE 0 - enum { PROP_0, - PROP_UNIT_FORMAT, - - PROP_BURST_FORMAT, - PROP_BURST_VALUE, - PROP_NUM_SOCKETS, PROP_LAST @@ -206,22 +194,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) gobject_class->get_property = gst_multi_socket_sink_get_property; gobject_class->finalize = gst_multi_socket_sink_finalize; - g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT, - g_param_spec_enum ("unit-format", "Units format", - "The unit to measure the max/soft-max/queued properties", - GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, - g_param_spec_enum ("burst-format", "Burst format", - "The format of the burst units (when sync-method is burst[[-with]-keyframe])", - GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_VALUE, - g_param_spec_uint64 ("burst-value", "Burst value", - "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, - DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS, g_param_spec_uint ("num-sockets", "Number of sockets", "The current number of client sockets", @@ -405,11 +377,6 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this) { this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal); - this->unit_format = DEFAULT_UNIT_FORMAT; - - this->def_burst_format = DEFAULT_BURST_FORMAT; - this->def_burst_value = DEFAULT_BURST_VALUE; - this->cancellable = g_cancellable_new (); } @@ -464,10 +431,10 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, g_snprintf (mhclient->debug, 30, "[socket %p]", socket); client->socket = G_SOCKET (g_object_ref (socket)); - client->burst_min_format = min_format; - client->burst_min_value = min_value; - client->burst_max_format = max_format; - client->burst_max_value = max_value; + mhclient->burst_min_format = min_format; + mhclient->burst_min_value = min_value; + mhclient->burst_max_format = max_format; + mhclient->burst_max_value = max_value; CLIENTS_LOCK (sink); @@ -535,8 +502,8 @@ gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket) mhsink = GST_MULTI_HANDLE_SINK (sink); gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method, - sink->def_burst_format, sink->def_burst_value, sink->def_burst_format, - -1); + mhsink->def_burst_format, mhsink->def_burst_value, + mhsink->def_burst_format, -1); } /* "remove" signal implementation */ @@ -957,437 +924,6 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, return TRUE; } -/* Get the number of buffers from the buffer queue needed to satisfy - * the maximum max in the configured units. - * If units are not BUFFERS, and there are insufficient buffers in the - * queue to satify the limit, return len(queue) + 1 */ -static gint -get_buffers_max (GstMultiSocketSink * sink, gint64 max) -{ - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - switch (sink->unit_format) { - case GST_FORMAT_BUFFERS: - return max; - case GST_FORMAT_TIME: - { - GstBuffer *buf; - int i; - int len; - gint64 diff; - GstClockTime first = GST_CLOCK_TIME_NONE; - - len = mhsink->bufqueue->len; - - for (i = 0; i < len; i++) { - buf = g_array_index (mhsink->bufqueue, GstBuffer *, i); - if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { - if (first == -1) - first = GST_BUFFER_TIMESTAMP (buf); - - diff = first - GST_BUFFER_TIMESTAMP (buf); - - if (diff > max) - return i + 1; - } - } - return len + 1; - } - case GST_FORMAT_BYTES: - { - GstBuffer *buf; - int i; - int len; - gint acc = 0; - - len = mhsink->bufqueue->len; - - for (i = 0; i < len; i++) { - buf = g_array_index (mhsink->bufqueue, GstBuffer *, i); - acc += gst_buffer_get_size (buf); - - if (acc > max) - return i + 1; - } - return len + 1; - } - default: - return max; - } -} - -/* find the positions in the buffer queue where *_min and *_max - * is satisfied - */ -/* count the amount of data in the buffers and return the index - * that satifies the given limits. - * - * Returns: index @idx in the buffer queue so that the given limits are - * satisfied. TRUE if all the limits could be satisfied, FALSE if not - * enough data was in the queue. - * - * FIXME, this code might now work if any of the units is in buffers... - */ -static gboolean -find_limits (GstMultiSocketSink * sink, - gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, - gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) -{ - GstClockTime first, time; - gint i, len, bytes; - gboolean result, max_hit; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - /* take length of queue */ - len = mhsink->bufqueue->len; - - /* this must hold */ - g_assert (len > 0); - - GST_LOG_OBJECT (sink, - "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT - ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min, - buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max, - GST_TIME_ARGS (time_max)); - - /* do the trivial buffer limit test */ - if (buffers_min != -1 && len < buffers_min) { - *min_idx = len - 1; - *max_idx = len - 1; - return FALSE; - } - - result = FALSE; - /* else count bytes and time */ - first = -1; - bytes = 0; - /* unset limits */ - *min_idx = -1; - *max_idx = -1; - max_hit = FALSE; - - i = 0; - /* loop through the buffers, when a limit is ok, mark it - * as -1, we have at least one buffer in the queue. */ - do { - GstBuffer *buf; - - /* if we checked all min limits, update result */ - if (bytes_min == -1 && time_min == -1 && *min_idx == -1) { - /* don't go below 0 */ - *min_idx = MAX (i - 1, 0); - } - /* if we reached one max limit break out */ - if (max_hit) { - /* i > 0 when we get here, we subtract one to get the position - * of the previous buffer. */ - *max_idx = i - 1; - /* we have valid complete result if we found a min_idx too */ - result = *min_idx != -1; - break; - } - buf = g_array_index (mhsink->bufqueue, GstBuffer *, i); - - bytes += gst_buffer_get_size (buf); - - /* take timestamp and save for the base first timestamp */ - if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) { - GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer", - GST_TIME_ARGS (time)); - if (first == -1) - first = time; - - /* increase max usage if we did not fill enough. Note that - * buffers are sorted from new to old, so the first timestamp is - * bigger than the next one. */ - if (time_min != -1 && first - time >= time_min) - time_min = -1; - if (time_max != -1 && first - time >= time_max) - max_hit = TRUE; - } else { - GST_LOG_OBJECT (sink, "No timestamp on buffer"); - } - /* time is OK or unknown, check and increase if not enough bytes */ - if (bytes_min != -1) { - if (bytes >= bytes_min) - bytes_min = -1; - } - if (bytes_max != -1) { - if (bytes >= bytes_max) { - max_hit = TRUE; - } - } - i++; - } - while (i < len); - - /* if we did not hit the max or min limit, set to buffer size */ - if (*max_idx == -1) - *max_idx = len - 1; - /* make sure min does not exceed max */ - if (*min_idx == -1) - *min_idx = *max_idx; - - return result; -} - -/* parse the unit/value pair and assign it to the result value of the - * right type, leave the other values untouched - * - * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. - */ -static gboolean -assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, - GstClockTime * time) -{ - gboolean res = TRUE; - - /* set only the limit of the given format to the given value */ - switch (format) { - case GST_FORMAT_BUFFERS: - *buffers = (gint) value; - break; - case GST_FORMAT_TIME: - *time = value; - break; - case GST_FORMAT_BYTES: - *bytes = (gint) value; - break; - case GST_FORMAT_UNDEFINED: - default: - res = FALSE; - break; - } - return res; -} - -/* count the index in the buffer queue to satisfy the given unit - * and value pair starting from buffer at index 0. - * - * Returns: TRUE if there was enough data in the queue to satisfy the - * burst values. @idx contains the index in the buffer that contains enough - * data to satisfy the limits or the last buffer in the queue when the - * function returns FALSE. - */ -static gboolean -count_burst_unit (GstMultiSocketSink * sink, gint * min_idx, - GstFormat min_format, guint64 min_value, gint * max_idx, - GstFormat max_format, guint64 max_value) -{ - gint bytes_min = -1, buffers_min = -1; - gint bytes_max = -1, buffers_max = -1; - GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE; - - assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min); - assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max); - - return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, - max_idx, bytes_max, buffers_max, time_max); -} - -/* decide where in the current buffer queue this new client should start - * receiving buffers from. - * This function is called whenever a client is connected and has not yet - * received a buffer. - * If this returns -1, it means that we haven't found a good point to - * start streaming from yet, and this function should be called again later - * when more buffers have arrived. - */ -static gint -gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, - GstSocketClient * client) -{ - gint result; - GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - - GST_DEBUG_OBJECT (sink, - "[socket %p] new client, deciding where to start in queue", - client->socket); - GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", - mhsink->bufqueue->len); - switch (mhclient->sync_method) { - case GST_SYNC_METHOD_LATEST: - /* no syncing, we are happy with whatever the client is going to get */ - result = mhclient->bufpos; - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket, - result); - break; - case GST_SYNC_METHOD_NEXT_KEYFRAME: - { - /* if one of the new buffers (between mhclient->bufpos and 0) in the queue - * is a sync point, we can proceed, otherwise we need to keep waiting */ - GST_LOG_OBJECT (sink, - "[socket %p] new client, bufpos %d, waiting for keyframe", - client->socket, mhclient->bufpos); - - result = find_prev_syncframe (mhsink, mhclient->bufpos); - if (result != -1) { - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d", - client->socket, result); - break; - } - - /* client is not on a syncbuffer, need to skip these buffers and - * wait some more */ - GST_LOG_OBJECT (sink, - "[socket %p] new client, skipping buffer(s), no syncpoint found", - client->socket); - mhclient->bufpos = -1; - break; - } - case GST_SYNC_METHOD_LATEST_KEYFRAME: - { - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket); - - /* for new clients we initially scan the complete buffer queue for - * a sync point when a buffer is added. If we don't find a keyframe, - * we need to wait for the next keyframe and so we change the client's - * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME. - */ - result = find_next_syncframe (mhsink, 0); - if (result != -1) { - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d", - client->socket, result); - break; - } - - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " - "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket); - /* throw client to the waiting state */ - mhclient->bufpos = -1; - /* and make client sync to next keyframe */ - mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; - break; - } - case GST_SYNC_METHOD_BURST: - { - gboolean ok; - gint max; - - /* move to the position where we satisfy the client's burst - * parameters. If we could not satisfy the parameters because there - * is not enough data, we just send what we have (which is in result). - * We use the max value to limit the search - */ - ok = count_burst_unit (sink, &result, client->burst_min_format, - client->burst_min_value, &max, client->burst_max_format, - client->burst_max_value); - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d", - client->socket, ok, result); - - GST_LOG_OBJECT (sink, "min %d, max %d", result, max); - - /* we hit the max and it is below the min, use that then */ - if (max != -1 && max <= result) { - result = MAX (max - 1, 0); - GST_DEBUG_OBJECT (sink, - "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d", - client->socket, result); - } - break; - } - case GST_SYNC_METHOD_BURST_KEYFRAME: - { - gint min_idx, max_idx; - gint next_syncframe, prev_syncframe; - - /* BURST_KEYFRAME: - * - * _always_ start sending a keyframe to the client. We first search - * a keyframe between min/max limits. If there is none, we send it the - * last keyframe before min. If there is none, the behaviour is like - * NEXT_KEYFRAME. - */ - /* gather burst limits */ - count_burst_unit (sink, &min_idx, client->burst_min_format, - client->burst_min_value, &max_idx, client->burst_max_format, - client->burst_max_value); - - GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); - - /* first find a keyframe after min_idx */ - next_syncframe = find_next_syncframe (mhsink, min_idx); - if (next_syncframe != -1 && next_syncframe < max_idx) { - /* we have a valid keyframe and it's below the max */ - GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); - result = next_syncframe; - break; - } - - /* no valid keyframe, try to find one below min */ - prev_syncframe = find_prev_syncframe (mhsink, min_idx); - if (prev_syncframe != -1) { - GST_WARNING_OBJECT (sink, - "using keyframe below min in BURST_KEYFRAME sync mode"); - result = prev_syncframe; - break; - } - - /* no prev keyframe or not enough data */ - GST_WARNING_OBJECT (sink, - "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next"); - - /* throw client to the waiting state */ - mhclient->bufpos = -1; - /* and make client sync to next keyframe */ - mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; - result = -1; - break; - } - case GST_SYNC_METHOD_BURST_WITH_KEYFRAME: - { - gint min_idx, max_idx; - gint next_syncframe; - - /* BURST_WITH_KEYFRAME: - * - * try to start sending a keyframe to the client. We first search - * a keyframe between min/max limits. If there is none, we send it the - * amount of data up 'till min. - */ - /* gather enough data to burst */ - count_burst_unit (sink, &min_idx, client->burst_min_format, - client->burst_min_value, &max_idx, client->burst_max_format, - client->burst_max_value); - - GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); - - /* first find a keyframe after min_idx */ - next_syncframe = find_next_syncframe (mhsink, min_idx); - if (next_syncframe != -1 && next_syncframe < max_idx) { - /* we have a valid keyframe and it's below the max */ - GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); - result = next_syncframe; - break; - } - - /* no keyframe, send data from min_idx */ - GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode"); - - /* make sure we don't go over the max limit */ - if (max_idx != -1 && max_idx <= min_idx) { - result = MAX (max_idx - 1, 0); - } else { - result = min_idx; - } - - break; - } - default: - g_warning ("unknown sync method %d", mhclient->sync_method); - result = mhclient->bufpos; - break; - } - return result; -} - /* Handle a write on a client, * which indicates a read request from a client. * @@ -1440,11 +976,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, if (mhclient->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ + // FIXME: specific if (client->source) { g_source_destroy (client->source); g_source_unref (client->source); client->source = NULL; } + // /* if we flushed out all of the client buffers, we can stop */ if (mhclient->flushcount == 0) goto flushed; @@ -1458,7 +996,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, /* for new connections, we need to find a good spot in the * bufqueue to start streaming from */ if (mhclient->new_connection && !flushing) { - gint position = gst_multi_socket_sink_new_client (sink, client); + gint position = gst_multi_handle_sink_new_client (mhsink, mhclient); if (position >= 0) { /* we got a valid spot in the queue */ @@ -1466,11 +1004,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, mhclient->bufpos = position; } else { /* cannot send data to this client yet */ + // FIXME: specific if (client->source) { g_source_destroy (client->source); g_source_unref (client->source); client->source = NULL; } + // return TRUE; } } @@ -1517,6 +1057,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, gst_buffer_map (head, &map, GST_MAP_READ); maxsize = map.size - mhclient->bufoffset; + // FIXME: specific /* try to write the complete buffer */ wrote = @@ -1583,61 +1124,6 @@ write_error: } } -/* calculate the new position for a client after recovery. This function - * does not update the client position but merely returns the required - * position. - */ -static gint -gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, - GstSocketClient * client) -{ - gint newbufpos; - GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - - GST_WARNING_OBJECT (sink, - "[socket %p] client %p is lagging at %d, recover using policy %d", - client->socket, client, mhclient->bufpos, mhsink->recover_policy); - - switch (mhsink->recover_policy) { - case GST_RECOVER_POLICY_NONE: - /* do nothing, client will catch up or get kicked out when it reaches - * the hard max */ - newbufpos = mhclient->bufpos; - break; - case GST_RECOVER_POLICY_RESYNC_LATEST: - /* move to beginning of queue */ - newbufpos = -1; - break; - case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: - /* move to beginning of soft max */ - newbufpos = get_buffers_max (sink, mhsink->units_soft_max); - break; - case GST_RECOVER_POLICY_RESYNC_KEYFRAME: - /* find keyframe in buffers, we search backwards to find the - * closest keyframe relative to what this client already received. */ - newbufpos = MIN (mhsink->bufqueue->len - 1, - get_buffers_max (sink, mhsink->units_soft_max) - 1); - - while (newbufpos >= 0) { - GstBuffer *buf; - - buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos); - if (is_sync_frame (mhsink, buf)) { - /* found a buffer that is not a delta unit */ - break; - } - newbufpos--; - } - break; - default: - /* unknown recovery procedure */ - newbufpos = get_buffers_max (sink, mhsink->units_soft_max); - break; - } - return newbufpos; -} - /* Queue a buffer on the global queue. * * This function adds the buffer to the front of a GArray. It removes the @@ -1682,12 +1168,12 @@ gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink, queuelen = mhsink->bufqueue->len; if (mhsink->units_max > 0) - max_buffers = get_buffers_max (sink, mhsink->units_max); + max_buffers = get_buffers_max (mhsink, mhsink->units_max); else max_buffers = -1; if (mhsink->units_soft_max > 0) - soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max); + soft_max_buffers = get_buffers_max (mhsink, mhsink->units_soft_max); else soft_max_buffers = -1; GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers, @@ -1718,7 +1204,7 @@ restart: if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) { gint newpos; - newpos = gst_multi_socket_sink_recover_client (sink, client); + newpos = gst_multi_handle_sink_recover_client (mhsink, mhclient); if (newpos != mhclient->bufpos) { mhclient->dropped_buffers += mhclient->bufpos - newpos; mhclient->bufpos = newpos; @@ -1778,7 +1264,7 @@ restart: /* get index where the limits are ok, we don't really care if all limits * are ok, we just queue as much as we need. We also don't compare against * the max limits. */ - find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min, + find_limits (mhsink, &usage, mhsink->bytes_min, mhsink->buffers_min, mhsink->time_min, &max, -1, -1, -1); max_buffer_usage = MAX (max_buffer_usage, usage + 1); @@ -1968,20 +1454,7 @@ static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstMultiSocketSink *multisocketsink; - - multisocketsink = GST_MULTI_SOCKET_SINK (object); - switch (prop_id) { - case PROP_UNIT_FORMAT: - multisocketsink->unit_format = g_value_get_enum (value); - break; - case PROP_BURST_FORMAT: - multisocketsink->def_burst_format = g_value_get_enum (value); - break; - case PROP_BURST_VALUE: - multisocketsink->def_burst_value = g_value_get_uint64 (value); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1998,15 +1471,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, multisocketsink = GST_MULTI_SOCKET_SINK (object); switch (prop_id) { - case PROP_UNIT_FORMAT: - g_value_set_enum (value, multisocketsink->unit_format); - break; - case PROP_BURST_FORMAT: - g_value_set_enum (value, multisocketsink->def_burst_format); - break; - case PROP_BURST_VALUE: - g_value_set_uint64 (value, multisocketsink->def_burst_value); - break; case PROP_NUM_SOCKETS: g_value_set_uint (value, g_hash_table_size (multisocketsink->socket_hash)); diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index d3e40d8..1a0222b 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -57,13 +57,6 @@ typedef struct { GSocket *socket; GSource *source; - - /* method to sync client when connecting */ - GstSyncMethod sync_method; - GstFormat burst_min_format; - guint64 burst_min_value; - GstFormat burst_max_format; - guint64 burst_max_value; } GstSocketClient; /** @@ -80,17 +73,6 @@ struct _GstMultiSocketSink { GMainContext *main_context; GCancellable *cancellable; - gboolean previous_buffer_in_caps; - - guint mtu; - - /* these values are used to check if a client is reading fast - * enough and to control receovery */ - GstFormat unit_format;/* the format of the units */ - - GstFormat def_burst_format; - guint64 def_burst_value; - guint8 header_flags; }; diff --git a/tests/check/elements/multifdsink.c b/tests/check/elements/multifdsink.c index 3e8f5d6..66afed2 100644 --- a/tests/check/elements/multifdsink.c +++ b/tests/check/elements/multifdsink.c @@ -467,7 +467,7 @@ GST_START_TEST (test_burst_client_bytes) /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */ - g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL); + g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); fail_if (pipe (pfd1) == -1); @@ -493,12 +493,10 @@ GST_START_TEST (test_burst_client_bytes) /* now add the clients */ g_signal_emit_by_name (sink, "add", pfd1[1]); - g_signal_emit_by_name (sink, "add_full", pfd2[1], 3, - GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES, - (guint64) 200); - g_signal_emit_by_name (sink, "add_full", pfd3[1], 3, - GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES, - (guint64) 50); + g_signal_emit_by_name (sink, "add_full", pfd2[1], GST_SYNC_METHOD_BURST, + GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200); + g_signal_emit_by_name (sink, "add_full", pfd3[1], GST_SYNC_METHOD_BURST, + GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50); /* push last buffer to make client fds ready for reading */ for (i = 9; i < 10; i++) { @@ -555,7 +553,7 @@ GST_START_TEST (test_burst_client_bytes_keyframe) /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */ - g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL); + g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); fail_if (pipe (pfd1) == -1); @@ -585,12 +583,12 @@ GST_START_TEST (test_burst_client_bytes_keyframe) /* now add the clients */ g_signal_emit_by_name (sink, "add", pfd1[1]); - g_signal_emit_by_name (sink, "add_full", pfd2[1], 4, - GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES, - (guint64) 90); - g_signal_emit_by_name (sink, "add_full", pfd3[1], 4, - GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES, - (guint64) 50); + g_signal_emit_by_name (sink, "add_full", pfd2[1], + GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50, + GST_FORMAT_BYTES, (guint64) 90); + g_signal_emit_by_name (sink, "add_full", pfd3[1], + GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50, + GST_FORMAT_BYTES, (guint64) 50); /* push last buffer to make client fds ready for reading */ for (i = 9; i < 10; i++) { @@ -648,7 +646,7 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe) /* make sure we keep at least 100 bytes at all times */ g_object_set (sink, "bytes-min", 100, NULL); g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */ - g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL); + g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL); g_object_set (sink, "burst-value", (guint64) 80, NULL); fail_if (pipe (pfd1) == -1); @@ -678,12 +676,12 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe) /* now add the clients */ g_signal_emit_by_name (sink, "add", pfd1[1]); - g_signal_emit_by_name (sink, "add_full", pfd2[1], 5, - GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES, - (guint64) 90); - g_signal_emit_by_name (sink, "add_full", pfd3[1], 5, - GST_TCP_UNIT_FORMAT_BYTES, (guint64) 50, GST_TCP_UNIT_FORMAT_BYTES, - (guint64) 50); + g_signal_emit_by_name (sink, "add_full", pfd2[1], + GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, + (guint64) 50, GST_FORMAT_BYTES, (guint64) 90); + g_signal_emit_by_name (sink, "add_full", pfd3[1], + GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, + (guint64) 50, GST_FORMAT_BYTES, (guint64) 50); /* push last buffer to make client fds ready for reading */ for (i = 9; i < 10; i++) { diff --git a/tests/check/elements/multisocketsink.c b/tests/check/elements/multisocketsink.c index 591a98a..8e41600 100644 --- a/tests/check/elements/multisocketsink.c +++ b/tests/check/elements/multisocketsink.c @@ -28,6 +28,8 @@ #include #include +#include "gst/tcp/gstmultisocketsink.h" + static GstPad *mysrcpad; static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", @@ -528,9 +530,9 @@ GST_START_TEST (test_burst_client_bytes) /* now add the clients */ g_signal_emit_by_name (sink, "add", socket[0]); - g_signal_emit_by_name (sink, "add_full", socket[2], 3, + g_signal_emit_by_name (sink, "add_full", socket[2], GST_SYNC_METHOD_BURST, GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200); - g_signal_emit_by_name (sink, "add_full", socket[4], 3, + g_signal_emit_by_name (sink, "add_full", socket[4], GST_SYNC_METHOD_BURST, GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50); /* push last buffer to make client fds ready for reading */ @@ -616,10 +618,12 @@ GST_START_TEST (test_burst_client_bytes_keyframe) /* now add the clients */ g_signal_emit_by_name (sink, "add", socket[0]); - g_signal_emit_by_name (sink, "add_full", socket[2], 4, - GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 90); - g_signal_emit_by_name (sink, "add_full", socket[4], 4, - GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50); + g_signal_emit_by_name (sink, "add_full", socket[2], + GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50, + GST_FORMAT_BYTES, (guint64) 90); + g_signal_emit_by_name (sink, "add_full", socket[4], + GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50, + GST_FORMAT_BYTES, (guint64) 50); /* push last buffer to make client fds ready for reading */ for (i = 9; i < 10; i++) { @@ -708,10 +712,12 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe) /* now add the clients */ g_signal_emit_by_name (sink, "add", socket[0]); - g_signal_emit_by_name (sink, "add_full", socket[2], 5, - GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 90); - g_signal_emit_by_name (sink, "add_full", socket[4], 5, - GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50); + g_signal_emit_by_name (sink, "add_full", socket[2], + GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50, + GST_FORMAT_BYTES, (guint64) 90); + g_signal_emit_by_name (sink, "add_full", socket[4], + GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50, + GST_FORMAT_BYTES, (guint64) 50); /* push last buffer to make client fds ready for reading */ for (i = 9; i < 10; i++) { -- 2.7.4