From 9f38ed64f5e62c0ac575f1475401f4074976ea4f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 29 Oct 2004 11:10:38 +0000 Subject: [PATCH] gst/tcp/: Added burst on connect sync_method, deprecated sync_clients, streamlined the sync code some more. Original commit message from CVS: * gst/tcp/.cvsignore: * gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type), (gst_multifdsink_class_init), (gst_multifdsink_init), (gst_multifdsink_add), (gst_multifdsink_remove), (gst_multifdsink_remove_client_link), (is_sync_frame), (gst_multifdsink_new_client), (gst_multifdsink_handle_client_write), (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_set_property), (gst_multifdsink_get_property): * gst/tcp/gstmultifdsink.h: Added burst on connect sync_method, deprecated sync_clients, streamlined the sync code some more. --- ChangeLog | 16 +++++ gst/tcp/.gitignore | 1 + gst/tcp/gstmultifdsink.c | 184 +++++++++++++++++++++++++++++++++++++++-------- gst/tcp/gstmultifdsink.h | 12 ++-- 4 files changed, 179 insertions(+), 34 deletions(-) diff --git a/ChangeLog b/ChangeLog index 359da6d..7a9ac16 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,19 @@ +2004-10-29 Wim Taymans + + * gst/tcp/.cvsignore: + * gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type), + (gst_multifdsink_class_init), (gst_multifdsink_init), + (gst_multifdsink_add), (gst_multifdsink_remove), + (gst_multifdsink_remove_client_link), (is_sync_frame), + (gst_multifdsink_new_client), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), + (gst_multifdsink_handle_clients), (gst_multifdsink_set_property), + (gst_multifdsink_get_property): + * gst/tcp/gstmultifdsink.h: + Added burst on connect sync_method, deprecated sync_clients, + streamlined the sync code some more. + 2004-10-29 Ronald S. Bultje * gst/playback/gstplaybasebin.c: (thread_error), (setup_source), diff --git a/gst/tcp/.gitignore b/gst/tcp/.gitignore index ffb7fc3..45ededd 100644 --- a/gst/tcp/.gitignore +++ b/gst/tcp/.gitignore @@ -2,3 +2,4 @@ gsttcp-enumtypes.c gsttcp-enumtypes.h gsttcp-marshal.c gsttcp-marshal.h +fdsetstress diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 87dc253..999e508 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -95,7 +95,7 @@ enum #define DEFAULT_UNITS_SOFT_MAX -1 #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE #define DEFAULT_TIMEOUT 0 -#define DEFAULT_SYNC_CLIENTS FALSE +#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_NONE enum { @@ -115,7 +115,8 @@ enum ARG_RECOVER_POLICY, ARG_TIMEOUT, - ARG_SYNC_CLIENTS, + ARG_SYNC_CLIENTS, /* deprecated */ + ARG_SYNC_METHOD, ARG_BYTES_TO_SERVE, ARG_BYTES_SERVED, }; @@ -144,6 +145,27 @@ gst_recover_policy_get_type (void) return recover_policy_type; } +#define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type()) +static GType +gst_sync_method_get_type (void) +{ + static GType sync_method_type = 0; + static GEnumValue sync_method[] = { + {GST_SYNC_METHOD_NONE, "GST_SYNC_METHOD_NONE", + "Serve new client the latest buffer"}, + {GST_SYNC_METHOD_WAIT, "GST_SYNC_METHOD_WAIT", + "Make the new client wait for the next keyframe"}, + {GST_SYNC_METHOD_BURST, "GST_SYNC_METHOD_BURST", + "Serve the new client the last keyframe, aka burst"}, + {0, NULL, NULL}, + }; + + if (!sync_method_type) { + sync_method_type = g_enum_register_static ("GstTCPSyncMethod", sync_method); + } + return sync_method_type; +} + #if NOT_IMPLEMENTED #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) static GType @@ -310,8 +332,12 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) 0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_CLIENTS, g_param_spec_boolean ("sync-clients", "Sync clients", - "Sync clients to a keyframe", - DEFAULT_SYNC_CLIENTS, G_PARAM_READWRITE)); + "(DEPRECATED) Sync clients to a keyframe", + DEFAULT_SYNC_METHOD == GST_SYNC_METHOD_WAIT, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD, + g_param_spec_enum ("sync-method", "Sync Method", + "How to sync new clients to the stream", + GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE, g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve", "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0, @@ -386,7 +412,7 @@ gst_multifdsink_init (GstMultiFdSink * this) this->recover_policy = DEFAULT_RECOVER_POLICY; this->timeout = DEFAULT_TIMEOUT; - this->sync_clients = DEFAULT_SYNC_CLIENTS; + this->sync_method = DEFAULT_SYNC_METHOD; } void @@ -788,27 +814,84 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink, static gint gst_multifdsink_new_client (GstMultiFdSink * sink, GstTCPClient * client) { - if (sink->sync_clients) { - GstBuffer *buf; - - GST_LOG_OBJECT (sink, "New client on fd %d, bufpos %d", - client->fd.fd, client->bufpos); - - if (client->bufpos < 0) - return -1; + gint result; + + switch (sink->sync_method) { + case GST_SYNC_METHOD_WAIT: + { + /* if the buffer at the head of the queue is a sync point we can proceed, + * else we need to skip the buffer and wait for a new one */ + GST_LOG_OBJECT (sink, + "New client on fd %d, bufpos %d, waiting for keyframe", client->fd.fd, + client->bufpos); + + /* the client is not yet alligned to a buffer */ + if (client->bufpos < 0) { + result = -1; + } else { + GstBuffer *buf; + gint i; + + for (i = client->bufpos; i >= 0; i--) { + /* get the buffer for the client */ + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + if (is_sync_frame (sink, buf)) { + GST_LOG_OBJECT (sink, "New client on fd %d found sync", + client->fd.fd); + result = i; + goto done; + } else { + /* client is not on a buffer, need to skip this buffer and + * wait some more */ + GST_LOG_OBJECT (sink, "New client on fd %d skipping buffer", + client->fd.fd); + client->bufpos--; + } + } + result = -1; + } + break; + } + case GST_SYNC_METHOD_BURST: + { + /* FIXME for new clients we constantly scan the complete + * buffer queue for sync point whenever a buffer is added. This is + * suboptimal because if we cannot find a sync point the first time, + * the algorithm should behave as GST_SYNC_METHOD_WAIT */ + gint i, len; + + GST_LOG_OBJECT (sink, "New client on fd %d, bufpos %d, bursting keyframe", + client->fd.fd, client->bufpos); + + /* take length of queued buffers */ + len = sink->bufqueue->len; + /* assume we don't find a keyframe */ + result = -1; + /* then loop over all buffers to find the first keyframe */ + for (i = 0; i < len; i++) { + GstBuffer *buf; - buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); - if (is_sync_frame (sink, buf)) { - GST_LOG_OBJECT (sink, "New client on fd %d found sync", client->fd.fd); - return client->bufpos; - } else { - GST_LOG_OBJECT (sink, "New client on fd %d skipping buffer", - client->fd.fd); - client->bufpos--; - return -1; + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + if (is_sync_frame (sink, buf)) { + /* found a keyframe, return its position */ + GST_LOG_OBJECT (sink, "found keyframe at %d", i); + result = i; + goto done; + } + } + GST_LOG_OBJECT (sink, "no keyframe found"); + /* throw client to the waiting state */ + client->bufpos = -1; + break; } + default: + /* no syncing, we are happy with whatever the client is going to get */ + GST_LOG_OBJECT (sink, "no client syn needed"); + result = client->bufpos; + break; } - return client->bufpos; +done: + return result; } /* handle a write on a client, @@ -904,7 +987,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, if (client->new_connection) { gint position = gst_multifdsink_new_client (sink, client); - if (position > 0) { + if (position >= 0) { /* we got a valid spot in the queue */ client->new_connection = FALSE; client->bufpos = position; @@ -1017,14 +1100,15 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) newbufpos = sink->units_soft_max; break; case GST_RECOVER_POLICY_RESYNC_KEYFRAME: - /* find keyframe in buffers */ + /* find keyframe in buffers, we search backwards to find the + * closest keyframe relative to what this client already received. */ newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1); - while (newbufpos > 0) { + while (newbufpos >= 0) { GstBuffer *buf; buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos); - if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_DELTA_UNIT)) { + if (is_sync_frame (sink, buf)) { /* found a buffer that is not a delta unit */ break; } @@ -1129,8 +1213,36 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) max_buffer_usage = client->bufpos; } } + + /* now look for sync points and make sure there is at least one + * sync point in the queue. We only do this if the burst mode + * is enabled. */ + if (sink->sync_method == GST_SYNC_METHOD_BURST) { + /* no point in searching beyond the queue length */ + gint limit = queuelen; + GstBuffer *buf; + + /* no point in searching beyond the soft-max if any. */ + if (sink->units_soft_max > 0) { + limit = MIN (limit, sink->units_soft_max); + } + GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d", + max_buffer_usage); + for (i = 0; i < limit; i++) { + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + if (is_sync_frame (sink, buf)) { + /* found a sync frame, now extend the buffer usage to + * include at least this frame. */ + max_buffer_usage = MAX (max_buffer_usage, i); + break; + } + } + GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage); + } + /* nobody is referencing units after max_buffer_usage so we can - * remove them from the queue */ + * remove them from the queue. We remove them in reverse order as + * this is the most optimal for GArray. */ for (i = queuelen - 1; i > max_buffer_usage; i--) { GstBuffer *old; @@ -1142,6 +1254,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) /* unref tail buffer */ gst_buffer_unref (old); } + /* save for stats */ sink->buffers_queued = max_buffer_usage; g_mutex_unlock (sink->clientslock); @@ -1396,7 +1509,14 @@ gst_multifdsink_set_property (GObject * object, guint prop_id, multifdsink->timeout = g_value_get_uint64 (value); break; case ARG_SYNC_CLIENTS: - multifdsink->sync_clients = g_value_get_boolean (value); + if (g_value_get_boolean (value) == TRUE) { + multifdsink->sync_method = GST_SYNC_METHOD_WAIT; + } else { + multifdsink->sync_method = GST_SYNC_METHOD_NONE; + } + break; + case ARG_SYNC_METHOD: + multifdsink->sync_method = g_value_get_enum (value); break; default: @@ -1452,7 +1572,11 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value, g_value_set_uint64 (value, multifdsink->timeout); break; case ARG_SYNC_CLIENTS: - g_value_set_boolean (value, multifdsink->sync_clients); + g_value_set_boolean (value, + multifdsink->sync_method == GST_SYNC_METHOD_WAIT); + break; + case ARG_SYNC_METHOD: + g_value_set_enum (value, multifdsink->sync_method); break; case ARG_BYTES_TO_SERVE: g_value_set_uint64 (value, multifdsink->bytes_to_serve); diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 0c4bf0a..aef3048 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -65,6 +65,13 @@ typedef enum typedef enum { + GST_SYNC_METHOD_NONE, + GST_SYNC_METHOD_WAIT, + GST_SYNC_METHOD_BURST, +} GstSyncMethod; + +typedef enum +{ GST_UNIT_TYPE_BUFFERS, GST_UNIT_TYPE_TIME, GST_UNIT_TYPE_BYTES, @@ -127,9 +134,6 @@ struct _GstMultiFdSink { GstFDSetMode mode; GstFDSet *fdset; - //fd_set readfds; /* all the client file descriptors that we can read from */ - //fd_set writefds; /* all the client file descriptors that we can write to */ - GstFD control_sock[2];/* sockets for controlling the select call */ GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ @@ -146,7 +150,7 @@ struct _GstMultiFdSink { gint units_soft_max; /* max units a client can lag before recovery starts */ GstRecoverPolicy recover_policy; GstClockTime timeout; /* max amount of nanoseconds to remain idle */ - gboolean sync_clients;/* sync clients to keyframe */ + GstSyncMethod sync_method; /* what method to use for connecting clients */ /* stats */ gint buffers_queued; /* number of queued buffers */ -- 2.7.4