test: use more literal enums
[platform/upstream/gstreamer.git] / gst / tcp / gstmultifdsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:element-multifdsink
24  * @see_also: tcpserversink
25  *
26  * This plugin writes incoming data to a set of file descriptors. The
27  * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. 
28  * For each descriptor added, the #GstMultiFdSink::client-added signal will be called.
29  *
30  * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal
31  * that allows for more control over what and how much data a client 
32  * initially receives.
33  *
34  * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For
35  * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The
36  * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a
37  * client is not active anymore or, depending on the value of the
38  * #GstMultiFdSink:recover-policy property, if the client is reading too slowly.
39  * In all cases, multifdsink will never close a file descriptor itself.
40  * The user of multifdsink is responsible for closing all file descriptors.
41  * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal.
42  * Note that multifdsink still has a reference to the file descriptor when the
43  * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on
44  * the descriptor; it is therefore not safe to close the file descriptor in
45  * the #GstMultiFdSink::client-removed signal handler, and you should use the 
46  * #GstMultiFdSink::client-fd-removed signal to safely close the fd.
47  *
48  * Multifdsink internally keeps a queue of the incoming buffers and uses a
49  * separate thread to send the buffers to the clients. This ensures that no
50  * client write can block the pipeline and that clients can read with different
51  * speeds.
52  *
53  * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define
54  * which buffer in the queued buffers will be sent first to the client. Clients 
55  * can be sent the most recent buffer (which might not be decodable by the 
56  * client if it is not a keyframe), the next keyframe received in 
57  * multifdsink (which can take some time depending on the keyframe rate), or the
58  * last received keyframe (which will cause a simple burst-on-connect). 
59  * Multifdsink will always keep at least one keyframe in its internal buffers
60  * when the sync-mode is set to latest-keyframe.
61  *
62  * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method 
63  * property to allow finer control over burst-on-connect behaviour. By selecting
64  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
65  * additionally requires that the burst begin with a keyframe, and 
66  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
67  * prefer a minimum burst size even if it requires not starting with a keyframe.
68  *
69  * Multifdsink can be instructed to keep at least a minimum amount of data
70  * expressed in time or byte units in its internal queues with the 
71  * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively.
72  * These properties are useful if the application adds clients with the 
73  * #GstMultiFdSink::add-full signal to make sure that a burst connect can
74  * actually be honored. 
75  *
76  * When streaming data, clients are allowed to read at a different rate than
77  * the rate at which multifdsink receives data. If the client is reading too
78  * fast, no data will be send to the client until multifdsink receives more
79  * data. If the client, however, reads too slowly, data for that client will be 
80  * queued up in multifdsink. Two properties control the amount of data 
81  * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and 
82  * #GstMultiFdSink:buffers-soft-max. A client that falls behind by
83  * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly.
84  *
85  * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery
86  * procedure which is controlled with the #GstMultiFdSink:recover-policy property.
87  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
88  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
89  * positions the client to the soft limit in the buffer queue and
90  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
91  * buffer queue.
92  *
93  * multifdsink will by default synchronize on the clock before serving the 
94  * buffers to the clients. This behaviour can be disabled by setting the sync 
95  * property to FALSE. Multifdsink will by default not do QoS and will never
96  * drop late buffers.
97  *
98  * Last reviewed on 2006-09-12 (0.10.10)
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <gst/gst-i18n-plugin.h>
106
107 #include <sys/ioctl.h>
108
109 #ifdef HAVE_UNISTD_H
110 #include <unistd.h>
111 #endif
112
113 #include <fcntl.h>
114 #include <sys/types.h>
115 #include <sys/socket.h>
116 #include <sys/stat.h>
117 #include <netinet/in.h>
118
119 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
120 #include <sys/filio.h>
121 #endif
122
123 #include "gstmultifdsink.h"
124 #include "gsttcp-marshal.h"
125
126 #define NOT_IMPLEMENTED 0
127
128 GST_DEBUG_CATEGORY_STATIC (multifdsink_debug);
129 #define GST_CAT_DEFAULT (multifdsink_debug)
130
131 /* MultiFdSink signals and args */
132 enum
133 {
134   /* methods */
135   SIGNAL_ADD,
136   SIGNAL_ADD_BURST,
137   SIGNAL_REMOVE,
138   SIGNAL_REMOVE_FLUSH,
139   SIGNAL_GET_STATS,
140
141   /* signals */
142   SIGNAL_CLIENT_ADDED,
143   SIGNAL_CLIENT_REMOVED,
144   SIGNAL_CLIENT_FD_REMOVED,
145
146   LAST_SIGNAL
147 };
148
149 /* this is really arbitrarily chosen */
150 #define DEFAULT_MODE                    1
151 #define DEFAULT_UNIT_FORMAT               GST_FORMAT_BUFFERS
152
153 #define DEFAULT_BURST_FORMAT              GST_FORMAT_UNDEFINED
154 #define DEFAULT_BURST_VALUE             0
155
156 #define DEFAULT_HANDLE_READ             TRUE
157
158 enum
159 {
160   PROP_0,
161   PROP_MODE,
162
163   PROP_UNIT_FORMAT,
164
165   PROP_BURST_FORMAT,
166   PROP_BURST_VALUE,
167
168   PROP_HANDLE_READ,
169
170   PROP_NUM_FDS,
171
172   PROP_LAST
173 };
174
175 /* For backward compat, we can't really select the poll mode anymore with
176  * GstPoll. */
177 #define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
178 static GType
179 gst_fdset_mode_get_type (void)
180 {
181   static GType fdset_mode_type = 0;
182   static const GEnumValue fdset_mode[] = {
183     {0, "Select", "select"},
184     {1, "Poll", "poll"},
185     {2, "EPoll", "epoll"},
186     {0, NULL, NULL},
187   };
188
189   if (!fdset_mode_type) {
190     fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
191   }
192   return fdset_mode_type;
193 }
194
195 #define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type())
196 static GType
197 gst_unit_format_get_type (void)
198 {
199   static GType unit_format_type = 0;
200   static const GEnumValue unit_format[] = {
201     {GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"},
202     {GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"},
203     {GST_TCP_UNIT_FORMAT_TIME, "Time", "time"},
204     {GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"},
205     {0, NULL, NULL},
206   };
207
208   if (!unit_format_type) {
209     unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format);
210   }
211   return unit_format_type;
212 }
213
214 static void gst_multi_fd_sink_finalize (GObject * object);
215
216 static void gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink);
217 static void gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink);
218 static void gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink);
219 static gboolean gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink);
220 static gpointer gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink);
221 static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink,
222     GstBuffer * buffer);
223 static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink *
224     mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
225 static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client);
226
227 static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink,
228     GList * link);
229
230 static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
231     const GValue * value, GParamSpec * pspec);
232 static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
233     GValue * value, GParamSpec * pspec);
234
235 #define gst_multi_fd_sink_parent_class parent_class
236 G_DEFINE_TYPE (GstMultiFdSink, gst_multi_fd_sink, GST_TYPE_MULTI_HANDLE_SINK);
237
238 static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 };
239
240 static void
241 gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
242 {
243   GObjectClass *gobject_class;
244   GstElementClass *gstelement_class;
245   GstMultiHandleSinkClass *gstmultihandlesink_class;
246
247   gobject_class = (GObjectClass *) klass;
248   gstelement_class = (GstElementClass *) klass;
249   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
250
251   gobject_class->set_property = gst_multi_fd_sink_set_property;
252   gobject_class->get_property = gst_multi_fd_sink_get_property;
253   gobject_class->finalize = gst_multi_fd_sink_finalize;
254
255   /**
256    * GstMultiFdSink::mode
257    *
258    * The mode for selecting activity on the fds. 
259    *
260    * This property is deprecated since 0.10.18, if will now automatically
261    * select and use the most optimal method.
262    */
263   g_object_class_install_property (gobject_class, PROP_MODE,
264       g_param_spec_enum ("mode", "Mode",
265           "The mode for selecting activity on the fds (deprecated)",
266           GST_TYPE_FDSET_MODE, DEFAULT_MODE,
267           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
268
269   g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
270       g_param_spec_enum ("unit-format", "Units format",
271           "The unit to measure the max/soft-max/queued properties",
272           GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
273           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
274
275   g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
276       g_param_spec_enum ("burst-format", "Burst format",
277           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
278           GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
279           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
280   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
281       g_param_spec_uint64 ("burst-value", "Burst value",
282           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
283           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
284
285   /**
286    * GstMultiFdSink::handle-read
287    *
288    * Handle read requests from clients and discard the data.
289    *
290    * Since: 0.10.23
291    */
292   g_object_class_install_property (gobject_class, PROP_HANDLE_READ,
293       g_param_spec_boolean ("handle-read", "Handle Read",
294           "Handle client reads and discard the data",
295           DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296
297   g_object_class_install_property (gobject_class, PROP_NUM_FDS,
298       g_param_spec_uint ("num-fds", "Number of fds",
299           "The current number of client file descriptors.",
300           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
301
302   /**
303    * GstMultiFdSink::add:
304    * @gstmultifdsink: the multifdsink element to emit this signal on
305    * @fd:             the file descriptor to add to multifdsink
306    *
307    * Hand the given open file descriptor to multifdsink to write to.
308    */
309   gst_multi_fd_sink_signals[SIGNAL_ADD] =
310       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
311       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
312           add), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1,
313       G_TYPE_INT);
314   /**
315    * GstMultiFdSink::add-full:
316    * @gstmultifdsink: the multifdsink element to emit this signal on
317    * @fd:             the file descriptor to add to multifdsink
318    * @sync:           the sync method to use
319    * @unit_format_min:  the unit-format of @value_min
320    * @value_min:      the minimum amount of data to burst expressed in
321    *                  @unit_format_min units.
322    * @unit_format_max:  the unit-format of @value_max
323    * @value_max:      the maximum amount of data to burst expressed in
324    *                  @unit_format_max units.
325    *
326    * Hand the given open file descriptor to multifdsink to write to and
327    * specify the burst parameters for the new connection.
328    */
329   gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
330       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
331       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
332           add_full), NULL, NULL,
333       gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
334       G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT,
335       G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64);
336   /**
337    * GstMultiFdSink::remove:
338    * @gstmultifdsink: the multifdsink element to emit this signal on
339    * @fd:             the file descriptor to remove from multifdsink
340    *
341    * Remove the given open file descriptor from multifdsink.
342    */
343   gst_multi_fd_sink_signals[SIGNAL_REMOVE] =
344       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
345       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
346           remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE,
347       1, G_TYPE_INT);
348   /**
349    * GstMultiFdSink::remove-flush:
350    * @gstmultifdsink: the multifdsink element to emit this signal on
351    * @fd:             the file descriptor to remove from multifdsink
352    *
353    * Remove the given open file descriptor from multifdsink after flushing all
354    * the pending data to the fd.
355    */
356   gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] =
357       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
358       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
359           remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT,
360       G_TYPE_NONE, 1, G_TYPE_INT);
361
362   /**
363    * GstMultiFdSink::get-stats:
364    * @gstmultifdsink: the multifdsink element to emit this signal on
365    * @fd:             the file descriptor to get stats of from multifdsink
366    *
367    * Get statistics about @fd. This function returns a GValueArray to ease
368    * automatic wrapping for bindings.
369    *
370    * Returns: a GValueArray with the statistics. The array contains guint64
371    *     values that represent respectively: total number of bytes sent, time
372    *     when the client was added, time when the client was
373    *     disconnected/removed, time the client is/was active, last activity
374    *     time (in epoch seconds), number of buffers dropped.
375    *     All times are expressed in nanoseconds (GstClockTime).
376    *     The array can be 0-length if the client was not found.
377    */
378   gst_multi_fd_sink_signals[SIGNAL_GET_STATS] =
379       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
380       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
381           get_stats), NULL, NULL, gst_tcp_marshal_BOXED__INT,
382       G_TYPE_VALUE_ARRAY, 1, G_TYPE_INT);
383
384   /**
385    * GstMultiFdSink::client-added:
386    * @gstmultifdsink: the multifdsink element that emitted this signal
387    * @fd:             the file descriptor that was added to multifdsink
388    *
389    * The given file descriptor was added to multifdsink. This signal will
390    * be emitted from the streaming thread so application should be prepared
391    * for that.
392    */
393   gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] =
394       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
395       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
396       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
397   /**
398    * GstMultiFdSink::client-removed:
399    * @gstmultifdsink: the multifdsink element that emitted this signal
400    * @fd:             the file descriptor that is to be removed from multifdsink
401    * @status:         the reason why the client was removed
402    *
403    * The given file descriptor is about to be removed from multifdsink. This
404    * signal will be emitted from the streaming thread so applications should
405    * be prepared for that.
406    *
407    * @gstmultifdsink still holds a handle to @fd so it is possible to call
408    * the get-stats signal from this callback. For the same reason it is
409    * not safe to close() and reuse @fd in this callback.
410    */
411   gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] =
412       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
413       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
414           client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_ENUM,
415       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
416   /**
417    * GstMultiFdSink::client-fd-removed:
418    * @gstmultifdsink: the multifdsink element that emitted this signal
419    * @fd:             the file descriptor that was removed from multifdsink
420    *
421    * The given file descriptor was removed from multifdsink. This signal will
422    * be emitted from the streaming thread so applications should be prepared
423    * for that.
424    *
425    * In this callback, @gstmultifdsink has removed all the information
426    * associated with @fd and it is therefore not possible to call get-stats
427    * with @fd. It is however safe to close() and reuse @fd in the callback.
428    *
429    * Since: 0.10.7
430    */
431   gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED] =
432       g_signal_new ("client-fd-removed", G_TYPE_FROM_CLASS (klass),
433       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
434           client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
435       G_TYPE_NONE, 1, G_TYPE_INT);
436
437   gst_element_class_set_details_simple (gstelement_class,
438       "Multi filedescriptor sink", "Sink/Network",
439       "Send data to multiple filedescriptors",
440       "Thomas Vander Stichele <thomas at apestaart dot org>, "
441       "Wim Taymans <wim@fluendo.com>");
442
443   gstmultihandlesink_class->clear_post =
444       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear_post);
445
446   gstmultihandlesink_class->stop_pre =
447       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_stop_pre);
448   gstmultihandlesink_class->stop_post =
449       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_stop_post);
450   gstmultihandlesink_class->start_pre =
451       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_start_pre);
452   gstmultihandlesink_class->thread =
453       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_thread);
454   gstmultihandlesink_class->queue_buffer =
455       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_queue_buffer);
456   gstmultihandlesink_class->client_queue_buffer =
457       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer);
458   gstmultihandlesink_class->client_get_fd =
459       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd);
460
461   gstmultihandlesink_class->remove_client_link =
462       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link);
463
464
465   klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
466   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
467   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
468   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush);
469   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
470
471   GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
472 }
473
474 static void
475 gst_multi_fd_sink_init (GstMultiFdSink * this)
476 {
477   this->mode = DEFAULT_MODE;
478
479   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
480
481   this->unit_format = DEFAULT_UNIT_FORMAT;
482
483   this->def_burst_format = DEFAULT_BURST_FORMAT;
484   this->def_burst_value = DEFAULT_BURST_VALUE;
485
486   this->handle_read = DEFAULT_HANDLE_READ;
487 }
488
489 static void
490 gst_multi_fd_sink_finalize (GObject * object)
491 {
492   GstMultiFdSink *this = GST_MULTI_FD_SINK (object);
493
494   g_hash_table_destroy (this->fd_hash);
495
496   G_OBJECT_CLASS (parent_class)->finalize (object);
497 }
498
499 static int
500 gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
501 {
502   GstTCPClient *tclient = (GstTCPClient *) client;
503
504   return tclient->fd.fd;
505 }
506
507 /* "add-full" signal implementation */
508 void
509 gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
510     GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
511     GstFormat max_format, guint64 max_value)
512 {
513   GstTCPClient *client;
514   GstMultiHandleClient *mhclient;
515   GList *clink;
516   gint flags;
517   struct stat statbuf;
518   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
519
520   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
521       "min_format %d, min_value %" G_GUINT64_FORMAT
522       ", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
523       min_format, min_value, max_format, max_value);
524
525   /* do limits check if we can */
526   if (min_format == max_format) {
527     if (max_value != -1 && min_value != -1 && max_value < min_value)
528       goto wrong_limits;
529   }
530
531   /* create client datastructure */
532   client = g_new0 (GstTCPClient, 1);
533   mhclient = (GstMultiHandleClient *) client;
534   gst_multi_handle_sink_client_init (mhclient, sync_method);
535   g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
536
537   client->fd.fd = fd;
538   client->burst_min_format = min_format;
539   client->burst_min_value = min_value;
540   client->burst_max_format = max_format;
541   client->burst_max_value = max_value;
542
543   CLIENTS_LOCK (sink);
544
545   /* check the hash to find a duplicate fd */
546   clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
547   if (clink != NULL)
548     goto duplicate;
549
550   /* we can add the fd now */
551   clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
552   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
553   mhsink->clients_cookie++;
554
555   /* set the socket to non blocking */
556   if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) {
557     GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", fd,
558         g_strerror (errno));
559   }
560
561   /* we always read from a client */
562   gst_poll_add_fd (sink->fdset, &client->fd);
563
564   /* we don't try to read from write only fds */
565   if (sink->handle_read) {
566     flags = fcntl (fd, F_GETFL, 0);
567     if ((flags & O_ACCMODE) != O_WRONLY) {
568       gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
569     }
570   }
571   /* figure out the mode, can't use send() for non sockets */
572   if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) {
573     client->is_socket = TRUE;
574     gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
575   }
576
577   gst_poll_restart (sink->fdset);
578
579   CLIENTS_UNLOCK (sink);
580
581   g_signal_emit (G_OBJECT (sink),
582       gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
583
584   return;
585
586   /* errors */
587 wrong_limits:
588   {
589     GST_WARNING_OBJECT (sink,
590         "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
591         G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
592         min_value, max_value, min_format);
593     return;
594   }
595 duplicate:
596   {
597     mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
598     CLIENTS_UNLOCK (sink);
599     GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
600     g_signal_emit (G_OBJECT (sink),
601         gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
602         mhclient->status);
603     g_free (client);
604     return;
605   }
606 }
607
608 /* "add" signal implementation */
609 void
610 gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
611 {
612   GstMultiHandleSink *mhsink;
613
614   mhsink = GST_MULTI_HANDLE_SINK (sink);
615   gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
616       sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
617       -1);
618 }
619
620 /* "remove" signal implementation */
621 void
622 gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
623 {
624   GList *clink;
625   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
626   GstMultiHandleSinkClass *mhsinkclass =
627       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
628
629   GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
630
631   CLIENTS_LOCK (sink);
632   clink = g_hash_table_lookup (sink->fd_hash, &fd);
633   if (clink != NULL) {
634     GstTCPClient *client = (GstTCPClient *) clink->data;
635     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
636
637     if (mhclient->status != GST_CLIENT_STATUS_OK) {
638       GST_INFO_OBJECT (sink,
639           "[fd %5d] Client already disconnecting with status %d",
640           fd, mhclient->status);
641       goto done;
642     }
643
644     mhclient->status = GST_CLIENT_STATUS_REMOVED;
645     mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
646     // FIXME: specific poll
647     gst_poll_restart (sink->fdset);
648   } else {
649     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
650   }
651
652 done:
653   CLIENTS_UNLOCK (sink);
654 }
655
656 /* "remove-flush" signal implementation */
657 void
658 gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
659 {
660   GList *clink;
661
662   GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);
663
664   CLIENTS_LOCK (sink);
665   clink = g_hash_table_lookup (sink->fd_hash, &fd);
666   if (clink != NULL) {
667     GstTCPClient *client = (GstTCPClient *) clink->data;
668     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
669
670     if (mhclient->status != GST_CLIENT_STATUS_OK) {
671       GST_INFO_OBJECT (sink,
672           "[fd %5d] Client already disconnecting with status %d",
673           fd, mhclient->status);
674       goto done;
675     }
676
677     /* take the position of the client as the number of buffers left to flush.
678      * If the client was at position -1, we flush 0 buffers, 0 == flush 1
679      * buffer, etc... */
680     mhclient->flushcount = mhclient->bufpos + 1;
681     /* mark client as flushing. We can not remove the client right away because
682      * it might have some buffers to flush in the ->sending queue. */
683     mhclient->status = GST_CLIENT_STATUS_FLUSHING;
684   } else {
685     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
686   }
687 done:
688   CLIENTS_UNLOCK (sink);
689 }
690
691 /* called with the CLIENTS_LOCK held */
692 static void
693 gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink)
694 {
695   GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
696
697   gst_poll_restart (sink->fdset);
698 }
699
700 /* "get-stats" signal implementation
701  * the array returned contains:
702  *
703  * guint64 : bytes_sent
704  * guint64 : connect time (in nanoseconds, since Epoch)
705  * guint64 : disconnect time (in nanoseconds, since Epoch)
706  * guint64 : time the client is/was connected (in nanoseconds)
707  * guint64 : last activity time (in nanoseconds, since Epoch)
708  * guint64 : buffers dropped due to recovery
709  * guint64 : timestamp of the first buffer sent (in nanoseconds)
710  * guint64 : timestamp of the last buffer sent (in nanoseconds)
711  */
712 GValueArray *
713 gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
714 {
715   GstTCPClient *client;
716   GValueArray *result = NULL;
717   GList *clink;
718
719   CLIENTS_LOCK (sink);
720   clink = g_hash_table_lookup (sink->fd_hash, &fd);
721   if (clink == NULL)
722     goto noclient;
723
724   client = (GstTCPClient *) clink->data;
725   if (client != NULL) {
726     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
727     GValue value = { 0 };
728     guint64 interval;
729
730     result = g_value_array_new (7);
731
732     g_value_init (&value, G_TYPE_UINT64);
733     g_value_set_uint64 (&value, mhclient->bytes_sent);
734     result = g_value_array_append (result, &value);
735     g_value_unset (&value);
736     g_value_init (&value, G_TYPE_UINT64);
737     g_value_set_uint64 (&value, mhclient->connect_time);
738     result = g_value_array_append (result, &value);
739     g_value_unset (&value);
740     if (mhclient->disconnect_time == 0) {
741       GTimeVal nowtv;
742
743       g_get_current_time (&nowtv);
744
745       interval = GST_TIMEVAL_TO_TIME (nowtv) - mhclient->connect_time;
746     } else {
747       interval = mhclient->disconnect_time - mhclient->connect_time;
748     }
749     g_value_init (&value, G_TYPE_UINT64);
750     g_value_set_uint64 (&value, mhclient->disconnect_time);
751     result = g_value_array_append (result, &value);
752     g_value_unset (&value);
753     g_value_init (&value, G_TYPE_UINT64);
754     g_value_set_uint64 (&value, interval);
755     result = g_value_array_append (result, &value);
756     g_value_unset (&value);
757     g_value_init (&value, G_TYPE_UINT64);
758     g_value_set_uint64 (&value, mhclient->last_activity_time);
759     result = g_value_array_append (result, &value);
760     g_value_unset (&value);
761     g_value_init (&value, G_TYPE_UINT64);
762     g_value_set_uint64 (&value, mhclient->dropped_buffers);
763     result = g_value_array_append (result, &value);
764     g_value_unset (&value);
765     g_value_init (&value, G_TYPE_UINT64);
766     g_value_set_uint64 (&value, mhclient->first_buffer_ts);
767     result = g_value_array_append (result, &value);
768     g_value_unset (&value);
769     g_value_init (&value, G_TYPE_UINT64);
770     g_value_set_uint64 (&value, mhclient->last_buffer_ts);
771     result = g_value_array_append (result, &value);
772   }
773
774 noclient:
775   CLIENTS_UNLOCK (sink);
776
777   /* python doesn't like a NULL pointer yet */
778   if (result == NULL) {
779     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
780     result = g_value_array_new (0);
781   }
782
783   return result;
784 }
785
786 /* should be called with the clientslock helt.
787  * Note that we don't close the fd as we didn't open it in the first
788  * place. An application should connect to the client-fd-removed signal and
789  * close the fd itself.
790  */
791 static void
792 gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
793 {
794   int fd;
795   GTimeVal now;
796   GstTCPClient *client = (GstTCPClient *) link->data;
797   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
798   GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink);
799   GstMultiFdSinkClass *fclass;
800
801   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
802
803   fd = client->fd.fd;
804
805   if (mhclient->currently_removing) {
806     GST_WARNING_OBJECT (sink, "%s client is already being removed",
807         mhclient->debug);
808     return;
809   } else {
810     mhclient->currently_removing = TRUE;
811   }
812
813   /* FIXME: if we keep track of ip we can log it here and signal */
814   switch (mhclient->status) {
815     case GST_CLIENT_STATUS_OK:
816       GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
817           mhclient->debug, client);
818       break;
819     case GST_CLIENT_STATUS_CLOSED:
820       GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
821           mhclient->debug, client);
822       break;
823     case GST_CLIENT_STATUS_REMOVED:
824       GST_DEBUG_OBJECT (sink,
825           "%s removing client %p because the app removed it", mhclient->debug,
826           client);
827       break;
828     case GST_CLIENT_STATUS_SLOW:
829       GST_INFO_OBJECT (sink,
830           "%s removing client %p because it was too slow", mhclient->debug,
831           client);
832       break;
833     case GST_CLIENT_STATUS_ERROR:
834       GST_WARNING_OBJECT (sink,
835           "%s removing client %p because of error", mhclient->debug, client);
836       break;
837     case GST_CLIENT_STATUS_FLUSHING:
838     default:
839       GST_WARNING_OBJECT (sink,
840           "%s removing client %p with invalid reason %d", mhclient->debug,
841           client, mhclient->status);
842       break;
843   }
844
845   gst_poll_remove_fd (mfsink->fdset, &client->fd);
846
847   g_get_current_time (&now);
848   mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now);
849
850   /* free client buffers */
851   g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL);
852   g_slist_free (mhclient->sending);
853   mhclient->sending = NULL;
854
855   if (mhclient->caps)
856     gst_caps_unref (mhclient->caps);
857   mhclient->caps = NULL;
858
859   /* unlock the mutex before signaling because the signal handler
860    * might query some properties */
861   CLIENTS_UNLOCK (sink);
862
863   g_signal_emit (G_OBJECT (sink),
864       gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
865       mhclient->status);
866
867   /* lock again before we remove the client completely */
868   CLIENTS_LOCK (sink);
869
870   /* fd cannot be reused in the above signal callback so we can safely
871    * remove it from the hashtable here */
872   if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) {
873     GST_WARNING_OBJECT (sink,
874         "[fd %5d] error removing client %p from hash", client->fd.fd, client);
875   }
876   /* after releasing the lock above, the link could be invalid, more
877    * precisely, the next and prev pointers could point to invalid list
878    * links. One optimisation could be to add a cookie to the linked list
879    * and take a shortcut when it did not change between unlocking and locking
880    * our mutex. For now we just walk the list again. */
881   sink->clients = g_list_remove (sink->clients, client);
882   sink->clients_cookie++;
883
884   if (fclass->removed)
885     fclass->removed (mfsink, client->fd.fd);
886
887   g_free (client);
888   CLIENTS_UNLOCK (sink);
889
890   /* and the fd is really gone now */
891   g_signal_emit (G_OBJECT (sink),
892       gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED], 0, fd);
893
894   CLIENTS_LOCK (sink);
895 }
896
897 /* handle a read on a client fd,
898  * which either indicates a close or should be ignored
899  * returns FALSE if some error occured or the client closed. */
900 static gboolean
901 gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
902     GstTCPClient * client)
903 {
904   int avail, fd;
905   gboolean ret;
906   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
907
908   fd = client->fd.fd;
909
910   if (ioctl (fd, FIONREAD, &avail) < 0)
911     goto ioctl_failed;
912
913   GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
914       fd, avail);
915
916   ret = TRUE;
917
918   if (avail == 0) {
919     /* client sent close, so remove it */
920     GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
921     mhclient->status = GST_CLIENT_STATUS_CLOSED;
922     ret = FALSE;
923   } else if (avail < 0) {
924     GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
925     mhclient->status = GST_CLIENT_STATUS_ERROR;
926     ret = FALSE;
927   } else {
928     guint8 dummy[512];
929     gint nread;
930
931     /* just Read 'n' Drop, could also just drop the client as it's not supposed
932      * to write to us except for closing the socket, I guess it's because we
933      * like to listen to our customers. */
934     do {
935       /* this is the maximum we can read */
936       gint to_read = MIN (avail, 512);
937
938       GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
939           fd, to_read);
940
941       nread = read (fd, dummy, to_read);
942       if (nread < -1) {
943         GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
944             fd, to_read, g_strerror (errno), errno);
945         mhclient->status = GST_CLIENT_STATUS_ERROR;
946         ret = FALSE;
947         break;
948       } else if (nread == 0) {
949         GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
950         mhclient->status = GST_CLIENT_STATUS_ERROR;
951         ret = FALSE;
952         break;
953       }
954       avail -= nread;
955     }
956     while (avail > 0);
957   }
958   return ret;
959
960   /* ERRORS */
961 ioctl_failed:
962   {
963     GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
964         fd, g_strerror (errno), errno);
965     mhclient->status = GST_CLIENT_STATUS_ERROR;
966     return FALSE;
967   }
968 }
969
970 /* queue the given buffer for the given client */
971 static gboolean
972 gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
973     GstMultiHandleClient * mhclient, GstBuffer * buffer)
974 {
975   GstCaps *caps;
976
977   /* TRUE: send them if the new caps have them */
978   gboolean send_streamheader = FALSE;
979   GstStructure *s;
980   GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
981   GstTCPClient *client = (GstTCPClient *) mhclient;
982
983   /* before we queue the buffer, we check if we need to queue streamheader
984    * buffers (because it's a new client, or because they changed) */
985   caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink));
986
987   if (!mhclient->caps) {
988     GST_DEBUG_OBJECT (sink,
989         "[fd %5d] no previous caps for this client, send streamheader",
990         client->fd.fd);
991     send_streamheader = TRUE;
992     mhclient->caps = gst_caps_ref (caps);
993   } else {
994     /* there were previous caps recorded, so compare */
995     if (!gst_caps_is_equal (caps, mhclient->caps)) {
996       const GValue *sh1, *sh2;
997
998       /* caps are not equal, but could still have the same streamheader */
999       s = gst_caps_get_structure (caps, 0);
1000       if (!gst_structure_has_field (s, "streamheader")) {
1001         /* no new streamheader, so nothing new to send */
1002         GST_DEBUG_OBJECT (sink,
1003             "[fd %5d] new caps do not have streamheader, not sending",
1004             client->fd.fd);
1005       } else {
1006         /* there is a new streamheader */
1007         s = gst_caps_get_structure (mhclient->caps, 0);
1008         if (!gst_structure_has_field (s, "streamheader")) {
1009           /* no previous streamheader, so send the new one */
1010           GST_DEBUG_OBJECT (sink,
1011               "[fd %5d] previous caps did not have streamheader, sending",
1012               client->fd.fd);
1013           send_streamheader = TRUE;
1014         } else {
1015           /* both old and new caps have streamheader set */
1016           if (!mhsink->resend_streamheader) {
1017             GST_DEBUG_OBJECT (sink,
1018                 "[fd %5d] asked to not resend the streamheader, not sending",
1019                 client->fd.fd);
1020             send_streamheader = FALSE;
1021           } else {
1022             sh1 = gst_structure_get_value (s, "streamheader");
1023             s = gst_caps_get_structure (caps, 0);
1024             sh2 = gst_structure_get_value (s, "streamheader");
1025             if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
1026               GST_DEBUG_OBJECT (sink,
1027                   "[fd %5d] new streamheader different from old, sending",
1028                   client->fd.fd);
1029               send_streamheader = TRUE;
1030             }
1031           }
1032         }
1033       }
1034     }
1035     /* Replace the old caps */
1036     gst_caps_unref (mhclient->caps);
1037     mhclient->caps = gst_caps_ref (caps);
1038   }
1039
1040   if (G_UNLIKELY (send_streamheader)) {
1041     const GValue *sh;
1042     GArray *buffers;
1043     int i;
1044
1045     GST_LOG_OBJECT (sink,
1046         "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
1047         client->fd.fd, caps);
1048     s = gst_caps_get_structure (caps, 0);
1049     if (!gst_structure_has_field (s, "streamheader")) {
1050       GST_DEBUG_OBJECT (sink,
1051           "[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
1052     } else {
1053       GST_LOG_OBJECT (sink,
1054           "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
1055           client->fd.fd, caps);
1056       sh = gst_structure_get_value (s, "streamheader");
1057       g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
1058       buffers = g_value_peek_pointer (sh);
1059       GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len);
1060       for (i = 0; i < buffers->len; ++i) {
1061         GValue *bufval;
1062         GstBuffer *buffer;
1063
1064         bufval = &g_array_index (buffers, GValue, i);
1065         g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
1066         buffer = g_value_peek_pointer (bufval);
1067         GST_DEBUG_OBJECT (sink,
1068             "[fd %5d] queueing streamheader buffer of length %" G_GSIZE_FORMAT,
1069             client->fd.fd, gst_buffer_get_size (buffer));
1070         gst_buffer_ref (buffer);
1071
1072         mhclient->sending = g_slist_append (mhclient->sending, buffer);
1073       }
1074     }
1075   }
1076
1077   gst_caps_unref (caps);
1078   caps = NULL;
1079
1080   GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %" G_GSIZE_FORMAT,
1081       client->fd.fd, gst_buffer_get_size (buffer));
1082
1083   gst_buffer_ref (buffer);
1084   mhclient->sending = g_slist_append (mhclient->sending, buffer);
1085
1086   return TRUE;
1087 }
1088
1089 /* Get the number of buffers from the buffer queue needed to satisfy
1090  * the maximum max in the configured units.
1091  * If units are not BUFFERS, and there are insufficient buffers in the
1092  * queue to satify the limit, return len(queue) + 1 */
1093 static gint
1094 get_buffers_max (GstMultiFdSink * sink, gint64 max)
1095 {
1096   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1097
1098   switch (sink->unit_format) {
1099     case GST_TCP_UNIT_FORMAT_BUFFERS:
1100       return max;
1101     case GST_TCP_UNIT_FORMAT_TIME:
1102     {
1103       GstBuffer *buf;
1104       int i;
1105       int len;
1106       gint64 diff;
1107       GstClockTime first = GST_CLOCK_TIME_NONE;
1108
1109       len = mhsink->bufqueue->len;
1110
1111       for (i = 0; i < len; i++) {
1112         buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1113         if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
1114           if (first == -1)
1115             first = GST_BUFFER_TIMESTAMP (buf);
1116
1117           diff = first - GST_BUFFER_TIMESTAMP (buf);
1118
1119           if (diff > max)
1120             return i + 1;
1121         }
1122       }
1123       return len + 1;
1124     }
1125     case GST_TCP_UNIT_FORMAT_BYTES:
1126     {
1127       GstBuffer *buf;
1128       int i;
1129       int len;
1130       gint acc = 0;
1131
1132       len = mhsink->bufqueue->len;
1133
1134       for (i = 0; i < len; i++) {
1135         buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1136         acc += gst_buffer_get_size (buf);
1137
1138         if (acc > max)
1139           return i + 1;
1140       }
1141       return len + 1;
1142     }
1143     default:
1144       return max;
1145   }
1146 }
1147
1148 /* find the positions in the buffer queue where *_min and *_max
1149  * is satisfied
1150  */
1151 /* count the amount of data in the buffers and return the index
1152  * that satifies the given limits.
1153  *
1154  * Returns: index @idx in the buffer queue so that the given limits are
1155  * satisfied. TRUE if all the limits could be satisfied, FALSE if not
1156  * enough data was in the queue.
1157  *
1158  * FIXME, this code might now work if any of the units is in buffers...
1159  */
1160 static gboolean
1161 find_limits (GstMultiFdSink * sink,
1162     gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
1163     gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
1164 {
1165   GstClockTime first, time;
1166   gint i, len, bytes;
1167   gboolean result, max_hit;
1168   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1169
1170   /* take length of queue */
1171   len = mhsink->bufqueue->len;
1172
1173   /* this must hold */
1174   g_assert (len > 0);
1175
1176   GST_LOG_OBJECT (sink,
1177       "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
1178       ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
1179       buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
1180       GST_TIME_ARGS (time_max));
1181
1182   /* do the trivial buffer limit test */
1183   if (buffers_min != -1 && len < buffers_min) {
1184     *min_idx = len - 1;
1185     *max_idx = len - 1;
1186     return FALSE;
1187   }
1188
1189   result = FALSE;
1190   /* else count bytes and time */
1191   first = -1;
1192   bytes = 0;
1193   /* unset limits */
1194   *min_idx = -1;
1195   *max_idx = -1;
1196   max_hit = FALSE;
1197
1198   i = 0;
1199   /* loop through the buffers, when a limit is ok, mark it 
1200    * as -1, we have at least one buffer in the queue. */
1201   do {
1202     GstBuffer *buf;
1203
1204     /* if we checked all min limits, update result */
1205     if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
1206       /* don't go below 0 */
1207       *min_idx = MAX (i - 1, 0);
1208     }
1209     /* if we reached one max limit break out */
1210     if (max_hit) {
1211       /* i > 0 when we get here, we subtract one to get the position
1212        * of the previous buffer. */
1213       *max_idx = i - 1;
1214       /* we have valid complete result if we found a min_idx too */
1215       result = *min_idx != -1;
1216       break;
1217     }
1218     buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1219
1220     bytes += gst_buffer_get_size (buf);
1221
1222     /* take timestamp and save for the base first timestamp */
1223     if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
1224       GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
1225           GST_TIME_ARGS (time));
1226       if (first == -1)
1227         first = time;
1228
1229       /* increase max usage if we did not fill enough. Note that
1230        * buffers are sorted from new to old, so the first timestamp is
1231        * bigger than the next one. */
1232       if (time_min != -1 && first - time >= time_min)
1233         time_min = -1;
1234       if (time_max != -1 && first - time >= time_max)
1235         max_hit = TRUE;
1236     } else {
1237       GST_LOG_OBJECT (sink, "No timestamp on buffer");
1238     }
1239     /* time is OK or unknown, check and increase if not enough bytes */
1240     if (bytes_min != -1) {
1241       if (bytes >= bytes_min)
1242         bytes_min = -1;
1243     }
1244     if (bytes_max != -1) {
1245       if (bytes >= bytes_max) {
1246         max_hit = TRUE;
1247       }
1248     }
1249     i++;
1250   }
1251   while (i < len);
1252
1253   /* if we did not hit the max or min limit, set to buffer size */
1254   if (*max_idx == -1)
1255     *max_idx = len - 1;
1256   /* make sure min does not exceed max */
1257   if (*min_idx == -1)
1258     *min_idx = *max_idx;
1259
1260   return result;
1261 }
1262
1263 /* parse the unit/value pair and assign it to the result value of the
1264  * right type, leave the other values untouched 
1265  *
1266  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
1267  */
1268 static gboolean
1269 assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
1270     GstClockTime * time)
1271 {
1272   gboolean res = TRUE;
1273
1274   /* set only the limit of the given format to the given value */
1275   switch (format) {
1276     case GST_TCP_UNIT_FORMAT_BUFFERS:
1277       *buffers = (gint) value;
1278       break;
1279     case GST_TCP_UNIT_FORMAT_TIME:
1280       *time = value;
1281       break;
1282     case GST_TCP_UNIT_FORMAT_BYTES:
1283       *bytes = (gint) value;
1284       break;
1285     case GST_TCP_UNIT_FORMAT_UNDEFINED:
1286     default:
1287       res = FALSE;
1288       break;
1289   }
1290   return res;
1291 }
1292
1293 /* count the index in the buffer queue to satisfy the given unit
1294  * and value pair starting from buffer at index 0.
1295  *
1296  * Returns: TRUE if there was enough data in the queue to satisfy the
1297  * burst values. @idx contains the index in the buffer that contains enough
1298  * data to satisfy the limits or the last buffer in the queue when the
1299  * function returns FALSE.
1300  */
1301 static gboolean
1302 count_burst_format (GstMultiFdSink * sink, gint * min_idx,
1303     GstFormat min_format, guint64 min_value, gint * max_idx,
1304     GstFormat max_format, guint64 max_value)
1305 {
1306   gint bytes_min = -1, buffers_min = -1;
1307   gint bytes_max = -1, buffers_max = -1;
1308   GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
1309
1310   assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
1311   assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
1312
1313   return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
1314       max_idx, bytes_max, buffers_max, time_max);
1315 }
1316
1317 /* decide where in the current buffer queue this new client should start
1318  * receiving buffers from.
1319  * This function is called whenever a client is connected and has not yet
1320  * received a buffer.
1321  * If this returns -1, it means that we haven't found a good point to
1322  * start streaming from yet, and this function should be called again later
1323  * when more buffers have arrived.
1324  */
1325 static gint
1326 gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
1327 {
1328   gint result;
1329   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1330   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1331
1332   GST_DEBUG_OBJECT (sink,
1333       "[fd %5d] new client, deciding where to start in queue", client->fd.fd);
1334   GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
1335       mhsink->bufqueue->len);
1336   switch (mhclient->sync_method) {
1337     case GST_SYNC_METHOD_LATEST:
1338       /* no syncing, we are happy with whatever the client is going to get */
1339       result = mhclient->bufpos;
1340       GST_DEBUG_OBJECT (sink,
1341           "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result);
1342       break;
1343     case GST_SYNC_METHOD_NEXT_KEYFRAME:
1344     {
1345       /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
1346        * is a sync point, we can proceed, otherwise we need to keep waiting */
1347       GST_LOG_OBJECT (sink,
1348           "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
1349           mhclient->bufpos);
1350
1351       result = find_prev_syncframe (mhsink, mhclient->bufpos);
1352       if (result != -1) {
1353         GST_DEBUG_OBJECT (sink,
1354             "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d",
1355             client->fd.fd, result);
1356         break;
1357       }
1358
1359       /* client is not on a syncbuffer, need to skip these buffers and
1360        * wait some more */
1361       GST_LOG_OBJECT (sink,
1362           "[fd %5d] new client, skipping buffer(s), no syncpoint found",
1363           client->fd.fd);
1364       mhclient->bufpos = -1;
1365       break;
1366     }
1367     case GST_SYNC_METHOD_LATEST_KEYFRAME:
1368     {
1369       GST_DEBUG_OBJECT (sink,
1370           "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd);
1371
1372       /* for new clients we initially scan the complete buffer queue for
1373        * a sync point when a buffer is added. If we don't find a keyframe,
1374        * we need to wait for the next keyframe and so we change the client's
1375        * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
1376        */
1377       result = find_next_syncframe (mhsink, 0);
1378       if (result != -1) {
1379         GST_DEBUG_OBJECT (sink,
1380             "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd,
1381             result);
1382         break;
1383       }
1384
1385       GST_DEBUG_OBJECT (sink,
1386           "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
1387           "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd);
1388       /* throw client to the waiting state */
1389       mhclient->bufpos = -1;
1390       /* and make client sync to next keyframe */
1391       mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1392       break;
1393     }
1394     case GST_SYNC_METHOD_BURST:
1395     {
1396       gboolean ok;
1397       gint max;
1398
1399       /* move to the position where we satisfy the client's burst
1400        * parameters. If we could not satisfy the parameters because there
1401        * is not enough data, we just send what we have (which is in result).
1402        * We use the max value to limit the search
1403        */
1404       ok = count_burst_format (sink, &result, client->burst_min_format,
1405           client->burst_min_value, &max, client->burst_max_format,
1406           client->burst_max_value);
1407       GST_DEBUG_OBJECT (sink,
1408           "[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d",
1409           client->fd.fd, ok, result);
1410
1411       GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
1412
1413       /* we hit the max and it is below the min, use that then */
1414       if (max != -1 && max <= result) {
1415         result = MAX (max - 1, 0);
1416         GST_DEBUG_OBJECT (sink,
1417             "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d",
1418             client->fd.fd, result);
1419       }
1420       break;
1421     }
1422     case GST_SYNC_METHOD_BURST_KEYFRAME:
1423     {
1424       gint min_idx, max_idx;
1425       gint next_syncframe, prev_syncframe;
1426
1427       /* BURST_KEYFRAME:
1428        *
1429        * _always_ start sending a keyframe to the client. We first search
1430        * a keyframe between min/max limits. If there is none, we send it the
1431        * last keyframe before min. If there is none, the behaviour is like
1432        * NEXT_KEYFRAME.
1433        */
1434       /* gather burst limits */
1435       count_burst_format (sink, &min_idx, client->burst_min_format,
1436           client->burst_min_value, &max_idx, client->burst_max_format,
1437           client->burst_max_value);
1438
1439       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1440
1441       /* first find a keyframe after min_idx */
1442       next_syncframe = find_next_syncframe (mhsink, min_idx);
1443       if (next_syncframe != -1 && next_syncframe < max_idx) {
1444         /* we have a valid keyframe and it's below the max */
1445         GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1446         result = next_syncframe;
1447         break;
1448       }
1449
1450       /* no valid keyframe, try to find one below min */
1451       prev_syncframe = find_prev_syncframe (mhsink, min_idx);
1452       if (prev_syncframe != -1) {
1453         GST_WARNING_OBJECT (sink,
1454             "using keyframe below min in BURST_KEYFRAME sync mode");
1455         result = prev_syncframe;
1456         break;
1457       }
1458
1459       /* no prev keyframe or not enough data  */
1460       GST_WARNING_OBJECT (sink,
1461           "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
1462
1463       /* throw client to the waiting state */
1464       mhclient->bufpos = -1;
1465       /* and make client sync to next keyframe */
1466       mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1467       result = -1;
1468       break;
1469     }
1470     case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
1471     {
1472       gint min_idx, max_idx;
1473       gint next_syncframe;
1474
1475       /* BURST_WITH_KEYFRAME:
1476        *
1477        * try to start sending a keyframe to the client. We first search
1478        * a keyframe between min/max limits. If there is none, we send it the
1479        * amount of data up 'till min.
1480        */
1481       /* gather enough data to burst */
1482       count_burst_format (sink, &min_idx, client->burst_min_format,
1483           client->burst_min_value, &max_idx, client->burst_max_format,
1484           client->burst_max_value);
1485
1486       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1487
1488       /* first find a keyframe after min_idx */
1489       next_syncframe = find_next_syncframe (mhsink, min_idx);
1490       if (next_syncframe != -1 && next_syncframe < max_idx) {
1491         /* we have a valid keyframe and it's below the max */
1492         GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1493         result = next_syncframe;
1494         break;
1495       }
1496
1497       /* no keyframe, send data from min_idx */
1498       GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
1499
1500       /* make sure we don't go over the max limit */
1501       if (max_idx != -1 && max_idx <= min_idx) {
1502         result = MAX (max_idx - 1, 0);
1503       } else {
1504         result = min_idx;
1505       }
1506
1507       break;
1508     }
1509     default:
1510       g_warning ("unknown sync method %d", mhclient->sync_method);
1511       result = mhclient->bufpos;
1512       break;
1513   }
1514   return result;
1515 }
1516
1517 /* Handle a write on a client,
1518  * which indicates a read request from a client.
1519  *
1520  * For each client we maintain a queue of GstBuffers that contain the raw bytes
1521  * we need to send to the client.
1522  *
1523  * We first check to see if we need to send streamheaders. If so, we queue them.
1524  *
1525  * Then we run into the main loop that tries to send as many buffers as
1526  * possible. It will first exhaust the mhclient->sending queue and if the queue
1527  * is empty, it will pick a buffer from the global queue.
1528  *
1529  * Sending the buffers from the mhclient->sending queue is basically writing
1530  * the bytes to the socket and maintaining a count of the bytes that were
1531  * sent. When the buffer is completely sent, it is removed from the
1532  * mhclient->sending queue and we try to pick a new buffer for sending.
1533  *
1534  * When the sending returns a partial buffer we stop sending more data as
1535  * the next send operation could block.
1536  *
1537  * This functions returns FALSE if some error occured.
1538  */
1539 static gboolean
1540 gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
1541     GstTCPClient * client)
1542 {
1543   int fd = client->fd.fd;
1544   gboolean more;
1545   gboolean flushing;
1546   GstClockTime now;
1547   GTimeVal nowtv;
1548   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1549   GstMultiHandleSinkClass *mhsinkclass =
1550       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1551   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1552
1553
1554   g_get_current_time (&nowtv);
1555   now = GST_TIMEVAL_TO_TIME (nowtv);
1556
1557   flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
1558
1559   more = TRUE;
1560   do {
1561     gint maxsize;
1562
1563     if (!mhclient->sending) {
1564       /* client is not working on a buffer */
1565       if (mhclient->bufpos == -1) {
1566         /* client is too fast, remove from write queue until new buffer is
1567          * available */
1568         gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1569         /* if we flushed out all of the client buffers, we can stop */
1570         if (mhclient->flushcount == 0)
1571           goto flushed;
1572
1573         return TRUE;
1574       } else {
1575         /* client can pick a buffer from the global queue */
1576         GstBuffer *buf;
1577         GstClockTime timestamp;
1578
1579         /* for new connections, we need to find a good spot in the
1580          * bufqueue to start streaming from */
1581         if (mhclient->new_connection && !flushing) {
1582           gint position = gst_multi_fd_sink_new_client (sink, client);
1583
1584           if (position >= 0) {
1585             /* we got a valid spot in the queue */
1586             mhclient->new_connection = FALSE;
1587             mhclient->bufpos = position;
1588           } else {
1589             /* cannot send data to this client yet */
1590             gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1591             return TRUE;
1592           }
1593         }
1594
1595         /* we flushed all remaining buffers, no need to get a new one */
1596         if (mhclient->flushcount == 0)
1597           goto flushed;
1598
1599         /* grab buffer */
1600         buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
1601         mhclient->bufpos--;
1602
1603         /* update stats */
1604         timestamp = GST_BUFFER_TIMESTAMP (buf);
1605         if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
1606           mhclient->first_buffer_ts = timestamp;
1607         if (timestamp != -1)
1608           mhclient->last_buffer_ts = timestamp;
1609
1610         /* decrease flushcount */
1611         if (mhclient->flushcount != -1)
1612           mhclient->flushcount--;
1613
1614         GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1615             fd, client, mhclient->bufpos);
1616
1617         /* queueing a buffer will ref it */
1618         mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
1619
1620         /* need to start from the first byte for this new buffer */
1621         mhclient->bufoffset = 0;
1622       }
1623     }
1624
1625     /* see if we need to send something */
1626     if (mhclient->sending) {
1627       ssize_t wrote;
1628       GstBuffer *head;
1629       GstMapInfo info;
1630       guint8 *data;
1631
1632       /* pick first buffer from list */
1633       head = GST_BUFFER (mhclient->sending->data);
1634
1635       g_assert (gst_buffer_map (head, &info, GST_MAP_READ));
1636       data = info.data;
1637       maxsize = info.size - mhclient->bufoffset;
1638
1639       /* try to write the complete buffer */
1640 #ifdef MSG_NOSIGNAL
1641 #define FLAGS MSG_NOSIGNAL
1642 #else
1643 #define FLAGS 0
1644 #endif
1645       if (client->is_socket) {
1646         wrote = send (fd, data + mhclient->bufoffset, maxsize, FLAGS);
1647       } else {
1648         wrote = write (fd, data + mhclient->bufoffset, maxsize);
1649       }
1650       gst_buffer_unmap (head, &info);
1651
1652       if (wrote < 0) {
1653         /* hmm error.. */
1654         if (errno == EAGAIN) {
1655           /* nothing serious, resource was unavailable, try again later */
1656           more = FALSE;
1657         } else if (errno == ECONNRESET) {
1658           goto connection_reset;
1659         } else {
1660           goto write_error;
1661         }
1662       } else {
1663         if (wrote < maxsize) {
1664           /* partial write means that the client cannot read more and we should
1665            * stop sending more */
1666           GST_LOG_OBJECT (sink,
1667               "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote);
1668           mhclient->bufoffset += wrote;
1669           more = FALSE;
1670         } else {
1671           /* complete buffer was written, we can proceed to the next one */
1672           mhclient->sending = g_slist_remove (mhclient->sending, head);
1673           gst_buffer_unref (head);
1674           /* make sure we start from byte 0 for the next buffer */
1675           mhclient->bufoffset = 0;
1676         }
1677         /* update stats */
1678         mhclient->bytes_sent += wrote;
1679         mhclient->last_activity_time = now;
1680         mhsink->bytes_served += wrote;
1681       }
1682     }
1683   } while (more);
1684
1685   return TRUE;
1686
1687   /* ERRORS */
1688 flushed:
1689   {
1690     GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd);
1691     mhclient->status = GST_CLIENT_STATUS_REMOVED;
1692     return FALSE;
1693   }
1694 connection_reset:
1695   {
1696     GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
1697     mhclient->status = GST_CLIENT_STATUS_CLOSED;
1698     return FALSE;
1699   }
1700 write_error:
1701   {
1702     GST_WARNING_OBJECT (sink,
1703         "[fd %5d] could not write, removing client: %s (%d)", fd,
1704         g_strerror (errno), errno);
1705     mhclient->status = GST_CLIENT_STATUS_ERROR;
1706     return FALSE;
1707   }
1708 }
1709
1710 /* calculate the new position for a client after recovery. This function
1711  * does not update the client position but merely returns the required
1712  * position.
1713  */
1714 static gint
1715 gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
1716 {
1717   gint newbufpos;
1718   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1719   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1720
1721   GST_WARNING_OBJECT (sink,
1722       "[fd %5d] client %p is lagging at %d, recover using policy %d",
1723       client->fd.fd, client, mhclient->bufpos, mhsink->recover_policy);
1724
1725   switch (mhsink->recover_policy) {
1726     case GST_RECOVER_POLICY_NONE:
1727       /* do nothing, client will catch up or get kicked out when it reaches
1728        * the hard max */
1729       newbufpos = mhclient->bufpos;
1730       break;
1731     case GST_RECOVER_POLICY_RESYNC_LATEST:
1732       /* move to beginning of queue */
1733       newbufpos = -1;
1734       break;
1735     case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
1736       /* move to beginning of soft max */
1737       newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
1738       break;
1739     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1740       /* find keyframe in buffers, we search backwards to find the
1741        * closest keyframe relative to what this client already received. */
1742       newbufpos = MIN (mhsink->bufqueue->len - 1,
1743           get_buffers_max (sink, mhsink->units_soft_max) - 1);
1744
1745       while (newbufpos >= 0) {
1746         GstBuffer *buf;
1747
1748         buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
1749         if (is_sync_frame (mhsink, buf)) {
1750           /* found a buffer that is not a delta unit */
1751           break;
1752         }
1753         newbufpos--;
1754       }
1755       break;
1756     default:
1757       /* unknown recovery procedure */
1758       newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
1759       break;
1760   }
1761   return newbufpos;
1762 }
1763
1764 /* Queue a buffer on the global queue.
1765  *
1766  * This function adds the buffer to the front of a GArray. It removes the
1767  * tail buffer if the max queue size is exceeded, unreffing the queued buffer.
1768  * Note that unreffing the buffer is not a problem as clients who
1769  * started writing out this buffer will still have a reference to it in the
1770  * mhclient->sending queue.
1771  *
1772  * After adding the buffer, we update all client positions in the queue. If
1773  * a client moves over the soft max, we start the recovery procedure for this
1774  * slow client. If it goes over the hard max, it is put into the slow list
1775  * and removed.
1776  *
1777  * Special care is taken of clients that were waiting for a new buffer (they
1778  * had a position of -1) because they can proceed after adding this new buffer.
1779  * This is done by adding the client back into the write fd_set and signaling
1780  * the select thread that the fd_set changed.
1781  */
1782 static void
1783 gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer)
1784 {
1785   GList *clients, *next;
1786   gint queuelen;
1787   gboolean need_signal = FALSE;
1788   gint max_buffer_usage;
1789   gint i;
1790   GTimeVal nowtv;
1791   GstClockTime now;
1792   gint max_buffers, soft_max_buffers;
1793   guint cookie;
1794   GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
1795   GstMultiHandleSinkClass *mhsinkclass =
1796       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1797
1798   g_get_current_time (&nowtv);
1799   now = GST_TIMEVAL_TO_TIME (nowtv);
1800
1801   CLIENTS_LOCK (mhsink);
1802   /* add buffer to queue */
1803   g_array_prepend_val (mhsink->bufqueue, buffer);
1804   queuelen = mhsink->bufqueue->len;
1805
1806   if (mhsink->units_max > 0)
1807     max_buffers = get_buffers_max (sink, mhsink->units_max);
1808   else
1809     max_buffers = -1;
1810
1811   if (mhsink->units_soft_max > 0)
1812     soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
1813   else
1814     soft_max_buffers = -1;
1815   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
1816       soft_max_buffers);
1817
1818   /* then loop over the clients and update the positions */
1819   max_buffer_usage = 0;
1820
1821 restart:
1822   cookie = mhsink->clients_cookie;
1823   for (clients = mhsink->clients; clients; clients = next) {
1824     GstTCPClient *client;
1825     GstMultiHandleClient *mhclient;
1826
1827     if (cookie != mhsink->clients_cookie) {
1828       GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
1829       goto restart;
1830     }
1831
1832     client = (GstTCPClient *) clients->data;
1833     mhclient = (GstMultiHandleClient *) client;
1834     next = g_list_next (clients);
1835
1836     mhclient->bufpos++;
1837     GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1838         client->fd.fd, client, mhclient->bufpos);
1839     /* check soft max if needed, recover client */
1840     if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
1841       gint newpos;
1842
1843       newpos = gst_multi_fd_sink_recover_client (sink, client);
1844       if (newpos != mhclient->bufpos) {
1845         mhclient->dropped_buffers += mhclient->bufpos - newpos;
1846         mhclient->bufpos = newpos;
1847         mhclient->discont = TRUE;
1848         GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
1849             client->fd.fd, client, mhclient->bufpos);
1850       } else {
1851         GST_INFO_OBJECT (sink,
1852             "[fd %5d] client %p not recovering position",
1853             client->fd.fd, client);
1854       }
1855     }
1856     /* check hard max and timeout, remove client */
1857     if ((max_buffers > 0 && mhclient->bufpos >= max_buffers) ||
1858         (mhsink->timeout > 0
1859             && now - mhclient->last_activity_time > mhsink->timeout)) {
1860       /* remove client */
1861       GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
1862           client->fd.fd, client);
1863       /* remove the client, the fd set will be cleared and the select thread
1864        * will be signaled */
1865       mhclient->status = GST_CLIENT_STATUS_SLOW;
1866       /* set client to invalid position while being removed */
1867       mhclient->bufpos = -1;
1868       mhsinkclass->remove_client_link (mhsink, clients);
1869       need_signal = TRUE;
1870       continue;
1871     } else if (mhclient->bufpos == 0 || mhclient->new_connection) {
1872       /* can send data to this client now. need to signal the select thread that
1873        * the fd_set changed */
1874       gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE);
1875       need_signal = TRUE;
1876     }
1877     /* keep track of maximum buffer usage */
1878     if (mhclient->bufpos > max_buffer_usage) {
1879       max_buffer_usage = mhclient->bufpos;
1880     }
1881   }
1882
1883   /* make sure we respect bytes-min, buffers-min and time-min when they are set */
1884   {
1885     gint usage, max;
1886
1887     GST_LOG_OBJECT (sink,
1888         "extending queue %d to respect time_min %" GST_TIME_FORMAT
1889         ", bytes_min %d, buffers_min %d", max_buffer_usage,
1890         GST_TIME_ARGS (mhsink->time_min), mhsink->bytes_min,
1891         mhsink->buffers_min);
1892
1893     /* get index where the limits are ok, we don't really care if all limits
1894      * are ok, we just queue as much as we need. We also don't compare against
1895      * the max limits. */
1896     find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
1897         mhsink->time_min, &max, -1, -1, -1);
1898
1899     max_buffer_usage = MAX (max_buffer_usage, usage + 1);
1900     GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
1901   }
1902
1903   /* now look for sync points and make sure there is at least one
1904    * sync point in the queue. We only do this if the LATEST_KEYFRAME or 
1905    * BURST_KEYFRAME mode is selected */
1906   if (mhsink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME ||
1907       mhsink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) {
1908     /* no point in searching beyond the queue length */
1909     gint limit = queuelen;
1910     GstBuffer *buf;
1911
1912     /* no point in searching beyond the soft-max if any. */
1913     if (soft_max_buffers > 0) {
1914       limit = MIN (limit, soft_max_buffers);
1915     }
1916     GST_LOG_OBJECT (sink,
1917         "extending queue to include sync point, now at %d, limit is %d",
1918         max_buffer_usage, limit);
1919     for (i = 0; i < limit; i++) {
1920       buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1921       if (is_sync_frame (mhsink, buf)) {
1922         /* found a sync frame, now extend the buffer usage to
1923          * include at least this frame. */
1924         max_buffer_usage = MAX (max_buffer_usage, i);
1925         break;
1926       }
1927     }
1928     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1929   }
1930
1931   GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
1932
1933   /* nobody is referencing units after max_buffer_usage so we can
1934    * remove them from the queue. We remove them in reverse order as
1935    * this is the most optimal for GArray. */
1936   for (i = queuelen - 1; i > max_buffer_usage; i--) {
1937     GstBuffer *old;
1938
1939     /* queue exceeded max size */
1940     queuelen--;
1941     old = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1942     mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
1943
1944     /* unref tail buffer */
1945     gst_buffer_unref (old);
1946   }
1947   /* save for stats */
1948   mhsink->buffers_queued = max_buffer_usage;
1949   CLIENTS_UNLOCK (sink);
1950
1951   /* and send a signal to thread if fd_set changed */
1952   if (need_signal) {
1953     gst_poll_restart (sink->fdset);
1954   }
1955 }
1956
1957 /* Handle the clients. Basically does a blocking select for one
1958  * of the client fds to become read or writable. We also have a
1959  * filedescriptor to receive commands on that we need to check.
1960  *
1961  * After going out of the select call, we read and write to all
1962  * clients that can do so. Badly behaving clients are put on a
1963  * garbage list and removed.
1964  */
1965 static void
1966 gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
1967 {
1968   int result;
1969   GList *clients, *next;
1970   gboolean try_again;
1971   GstMultiFdSinkClass *fclass;
1972   guint cookie;
1973   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1974   GstMultiHandleSinkClass *mhsinkclass =
1975       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1976
1977
1978   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
1979
1980   do {
1981     try_again = FALSE;
1982
1983     /* check for:
1984      * - server socket input (ie, new client connections)
1985      * - client socket input (ie, clients saying goodbye)
1986      * - client socket output (ie, client reads)          */
1987     GST_LOG_OBJECT (sink, "waiting on action on fdset");
1988
1989     result =
1990         gst_poll_wait (sink->fdset,
1991         mhsink->timeout != 0 ? mhsink->timeout : GST_CLOCK_TIME_NONE);
1992
1993     /* Handle the special case in which the sink is not receiving more buffers
1994      * and will not disconnect inactive client in the streaming thread. */
1995     if (G_UNLIKELY (result == 0)) {
1996       GstClockTime now;
1997       GTimeVal nowtv;
1998
1999       g_get_current_time (&nowtv);
2000       now = GST_TIMEVAL_TO_TIME (nowtv);
2001
2002       CLIENTS_LOCK (sink);
2003       for (clients = mhsink->clients; clients; clients = next) {
2004         GstTCPClient *client;
2005         GstMultiHandleClient *mhclient;
2006
2007         client = (GstTCPClient *) clients->data;
2008         mhclient = (GstMultiHandleClient *) client;
2009         next = g_list_next (clients);
2010         if (mhsink->timeout > 0
2011             && now - mhclient->last_activity_time > mhsink->timeout) {
2012           mhclient->status = GST_CLIENT_STATUS_SLOW;
2013           mhsinkclass->remove_client_link (mhsink, clients);
2014         }
2015       }
2016       CLIENTS_UNLOCK (sink);
2017       return;
2018     } else if (result < 0) {
2019       GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno),
2020           errno);
2021       if (errno == EBADF) {
2022         /* ok, so one or more of the fds is invalid. We loop over them to find
2023          * the ones that give an error to the F_GETFL fcntl. */
2024         CLIENTS_LOCK (sink);
2025       restart:
2026         cookie = mhsink->clients_cookie;
2027         for (clients = mhsink->clients; clients; clients = next) {
2028           GstTCPClient *client;
2029           GstMultiHandleClient *mhclient;
2030           int fd;
2031           long flags;
2032           int res;
2033
2034           if (cookie != mhsink->clients_cookie) {
2035             GST_DEBUG_OBJECT (sink, "Cookie changed finding bad fd");
2036             goto restart;
2037           }
2038
2039           client = (GstTCPClient *) clients->data;
2040           mhclient = (GstMultiHandleClient *) client;
2041           next = g_list_next (clients);
2042
2043           fd = client->fd.fd;
2044
2045           res = fcntl (fd, F_GETFL, &flags);
2046           if (res == -1) {
2047             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
2048                 fd, g_strerror (errno), errno);
2049             if (errno == EBADF) {
2050               mhclient->status = GST_CLIENT_STATUS_ERROR;
2051               /* releases the CLIENTS lock */
2052               mhsinkclass->remove_client_link (mhsink, clients);
2053             }
2054           }
2055         }
2056         CLIENTS_UNLOCK (sink);
2057         /* after this, go back in the select loop as the read/writefds
2058          * are not valid */
2059         try_again = TRUE;
2060       } else if (errno == EINTR) {
2061         /* interrupted system call, just redo the wait */
2062         try_again = TRUE;
2063       } else if (errno == EBUSY) {
2064         /* the call to gst_poll_wait() was flushed */
2065         return;
2066       } else {
2067         /* this is quite bad... */
2068         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2069             ("select failed: %s (%d)", g_strerror (errno), errno));
2070         return;
2071       }
2072     } else {
2073       GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
2074     }
2075   } while (try_again);
2076
2077   /* subclasses can check fdset with this virtual function */
2078   if (fclass->wait)
2079     fclass->wait (sink, sink->fdset);
2080
2081   /* Check the clients */
2082   CLIENTS_LOCK (sink);
2083
2084 restart2:
2085   cookie = mhsink->clients_cookie;
2086   for (clients = mhsink->clients; clients; clients = next) {
2087     GstTCPClient *client;
2088     GstMultiHandleClient *mhclient;
2089
2090     if (mhsink->clients_cookie != cookie) {
2091       GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date");
2092       goto restart2;
2093     }
2094
2095     client = (GstTCPClient *) clients->data;
2096     mhclient = (GstMultiHandleClient *) client;
2097     next = g_list_next (clients);
2098
2099     if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
2100         && mhclient->status != GST_CLIENT_STATUS_OK) {
2101       mhsinkclass->remove_client_link (mhsink, clients);
2102       continue;
2103     }
2104
2105     if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
2106       mhclient->status = GST_CLIENT_STATUS_CLOSED;
2107       mhsinkclass->remove_client_link (mhsink, clients);
2108       continue;
2109     }
2110     if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
2111       GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
2112       mhclient->status = GST_CLIENT_STATUS_ERROR;
2113       mhsinkclass->remove_client_link (mhsink, clients);
2114       continue;
2115     }
2116     if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
2117       /* handle client read */
2118       if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
2119         mhsinkclass->remove_client_link (mhsink, clients);
2120         continue;
2121       }
2122     }
2123     if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
2124       /* handle client write */
2125       if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
2126         mhsinkclass->remove_client_link (mhsink, clients);
2127         continue;
2128       }
2129     }
2130   }
2131   CLIENTS_UNLOCK (sink);
2132 }
2133
2134 /* we handle the client communication in another thread so that we do not block
2135  * the gstreamer thread while we select() on the client fds */
2136 static gpointer
2137 gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink)
2138 {
2139   GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
2140
2141   while (mhsink->running) {
2142     gst_multi_fd_sink_handle_clients (sink);
2143   }
2144   return NULL;
2145 }
2146
2147 static void
2148 gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
2149     const GValue * value, GParamSpec * pspec)
2150 {
2151   GstMultiFdSink *multifdsink;
2152
2153   multifdsink = GST_MULTI_FD_SINK (object);
2154
2155   switch (prop_id) {
2156     case PROP_MODE:
2157       multifdsink->mode = g_value_get_enum (value);
2158       break;
2159     case PROP_UNIT_FORMAT:
2160       multifdsink->unit_format = g_value_get_enum (value);
2161       break;
2162     case PROP_BURST_FORMAT:
2163       multifdsink->def_burst_format = g_value_get_enum (value);
2164       break;
2165     case PROP_BURST_VALUE:
2166       multifdsink->def_burst_value = g_value_get_uint64 (value);
2167       break;
2168     case PROP_HANDLE_READ:
2169       multifdsink->handle_read = g_value_get_boolean (value);
2170       break;
2171
2172     default:
2173       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2174       break;
2175   }
2176 }
2177
2178 static void
2179 gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
2180     GParamSpec * pspec)
2181 {
2182   GstMultiFdSink *multifdsink;
2183
2184   multifdsink = GST_MULTI_FD_SINK (object);
2185
2186   switch (prop_id) {
2187     case PROP_MODE:
2188       g_value_set_enum (value, multifdsink->mode);
2189       break;
2190     case PROP_UNIT_FORMAT:
2191       g_value_set_enum (value, multifdsink->unit_format);
2192       break;
2193     case PROP_BURST_FORMAT:
2194       g_value_set_enum (value, multifdsink->def_burst_format);
2195       break;
2196     case PROP_BURST_VALUE:
2197       g_value_set_uint64 (value, multifdsink->def_burst_value);
2198       break;
2199     case PROP_HANDLE_READ:
2200       g_value_set_boolean (value, multifdsink->handle_read);
2201       break;
2202     case PROP_NUM_FDS:
2203       g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash));
2204       break;
2205
2206     default:
2207       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2208       break;
2209   }
2210 }
2211
2212 static gboolean
2213 gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink)
2214 {
2215   GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (mhsink);
2216
2217   GST_INFO_OBJECT (mfsink, "starting in mode %d", mfsink->mode);
2218   if ((mfsink->fdset = gst_poll_new (TRUE)) == NULL)
2219     goto socket_pair;
2220
2221   return TRUE;
2222
2223   /* ERRORS */
2224 socket_pair:
2225   {
2226     GST_ELEMENT_ERROR (mfsink, RESOURCE, OPEN_READ_WRITE, (NULL),
2227         GST_ERROR_SYSTEM);
2228     return FALSE;
2229   }
2230 }
2231
2232 static gboolean
2233 multifdsink_hash_remove (gpointer key, gpointer value, gpointer data)
2234 {
2235   return TRUE;
2236 }
2237
2238 static void
2239 gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink)
2240 {
2241   GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (mhsink);
2242
2243   gst_poll_set_flushing (mfsink->fdset, TRUE);
2244
2245 }
2246
2247 static void
2248 gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink)
2249 {
2250   GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (mhsink);
2251
2252   if (mfsink->fdset) {
2253     gst_poll_free (mfsink->fdset);
2254     mfsink->fdset = NULL;
2255   }
2256   g_hash_table_foreach_remove (mfsink->fd_hash, multifdsink_hash_remove,
2257       mfsink);
2258 }