tizen 2.0 init
[framework/multimedia/gst-plugins-base0.10.git] / gst / tcp / gstmultifdsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:element-multifdsink
24  * @see_also: tcpserversink
25  *
26  * This plugin writes incoming data to a set of file descriptors. The
27  * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. 
28  * For each descriptor added, the #GstMultiFdSink::client-added signal will be called.
29  *
30  * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal
31  * that allows for more control over what and how much data a client 
32  * initially receives.
33  *
34  * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For
35  * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The
36  * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a
37  * client is not active anymore or, depending on the value of the
38  * #GstMultiFdSink:recover-policy property, if the client is reading too slowly.
39  * In all cases, multifdsink will never close a file descriptor itself.
40  * The user of multifdsink is responsible for closing all file descriptors.
41  * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal.
42  * Note that multifdsink still has a reference to the file descriptor when the
43  * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on
44  * the descriptor; it is therefore not safe to close the file descriptor in
45  * the #GstMultiFdSink::client-removed signal handler, and you should use the 
46  * #GstMultiFdSink::client-fd-removed signal to safely close the fd.
47  *
48  * Multifdsink internally keeps a queue of the incoming buffers and uses a
49  * separate thread to send the buffers to the clients. This ensures that no
50  * client write can block the pipeline and that clients can read with different
51  * speeds.
52  *
53  * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define
54  * which buffer in the queued buffers will be sent first to the client. Clients 
55  * can be sent the most recent buffer (which might not be decodable by the 
56  * client if it is not a keyframe), the next keyframe received in 
57  * multifdsink (which can take some time depending on the keyframe rate), or the
58  * last received keyframe (which will cause a simple burst-on-connect). 
59  * Multifdsink will always keep at least one keyframe in its internal buffers
60  * when the sync-mode is set to latest-keyframe.
61  *
62  * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method 
63  * property to allow finer control over burst-on-connect behaviour. By selecting
64  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
65  * additionally requires that the burst begin with a keyframe, and 
66  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
67  * prefer a minimum burst size even if it requires not starting with a keyframe.
68  *
69  * Multifdsink can be instructed to keep at least a minimum amount of data
70  * expressed in time or byte units in its internal queues with the 
71  * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively.
72  * These properties are useful if the application adds clients with the 
73  * #GstMultiFdSink::add-full signal to make sure that a burst connect can
74  * actually be honored. 
75  *
76  * When streaming data, clients are allowed to read at a different rate than
77  * the rate at which multifdsink receives data. If the client is reading too
78  * fast, no data will be send to the client until multifdsink receives more
79  * data. If the client, however, reads too slowly, data for that client will be 
80  * queued up in multifdsink. Two properties control the amount of data 
81  * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and 
82  * #GstMultiFdSink:buffers-soft-max. A client that falls behind by
83  * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly.
84  *
85  * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery
86  * procedure which is controlled with the #GstMultiFdSink:recover-policy property.
87  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
88  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
89  * positions the client to the soft limit in the buffer queue and
90  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
91  * buffer queue.
92  *
93  * multifdsink will by default synchronize on the clock before serving the 
94  * buffers to the clients. This behaviour can be disabled by setting the sync 
95  * property to FALSE. Multifdsink will by default not do QoS and will never
96  * drop late buffers.
97  *
98  * Last reviewed on 2006-09-12 (0.10.10)
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
106  * with newer GLib versions (>= 2.31.0) */
107 #define GLIB_DISABLE_DEPRECATION_WARNINGS
108
109 #include <gst/gst-i18n-plugin.h>
110
111 #include <sys/ioctl.h>
112
113 #ifdef HAVE_UNISTD_H
114 #include <unistd.h>
115 #endif
116
117 #include <fcntl.h>
118 #include <sys/types.h>
119 #include <sys/socket.h>
120 #include <sys/stat.h>
121 #include <netinet/in.h>
122
123 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
124 #include <sys/filio.h>
125 #endif
126
127 #include "gstmultifdsink.h"
128 #include "gsttcp-marshal.h"
129
130 #define NOT_IMPLEMENTED 0
131
132 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
133     GST_PAD_SINK,
134     GST_PAD_ALWAYS,
135     GST_STATIC_CAPS_ANY);
136
137 GST_DEBUG_CATEGORY_STATIC (multifdsink_debug);
138 #define GST_CAT_DEFAULT (multifdsink_debug)
139
140 /* MultiFdSink signals and args */
141 enum
142 {
143   /* methods */
144   SIGNAL_ADD,
145   SIGNAL_ADD_BURST,
146   SIGNAL_REMOVE,
147   SIGNAL_REMOVE_FLUSH,
148   SIGNAL_CLEAR,
149   SIGNAL_GET_STATS,
150
151   /* signals */
152   SIGNAL_CLIENT_ADDED,
153   SIGNAL_CLIENT_REMOVED,
154   SIGNAL_CLIENT_FD_REMOVED,
155
156   LAST_SIGNAL
157 };
158
159
160 /* this is really arbitrarily chosen */
161 #define DEFAULT_PROTOCOL                GST_TCP_PROTOCOL_NONE
162 #define DEFAULT_MODE                    1
163 #define DEFAULT_BUFFERS_MAX             -1
164 #define DEFAULT_BUFFERS_SOFT_MAX        -1
165 #define DEFAULT_TIME_MIN                -1
166 #define DEFAULT_BYTES_MIN               -1
167 #define DEFAULT_BUFFERS_MIN             -1
168 #define DEFAULT_UNIT_TYPE               GST_TCP_UNIT_TYPE_BUFFERS
169 #define DEFAULT_UNITS_MAX               -1
170 #define DEFAULT_UNITS_SOFT_MAX          -1
171 #define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
172 #define DEFAULT_TIMEOUT                 0
173 #define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST
174
175 #define DEFAULT_BURST_UNIT              GST_TCP_UNIT_TYPE_UNDEFINED
176 #define DEFAULT_BURST_VALUE             0
177
178 #define DEFAULT_QOS_DSCP                -1
179 #define DEFAULT_HANDLE_READ             TRUE
180
181 #define DEFAULT_RESEND_STREAMHEADER      TRUE
182
183 enum
184 {
185   PROP_0,
186   PROP_PROTOCOL,
187   PROP_MODE,
188   PROP_BUFFERS_QUEUED,
189   PROP_BYTES_QUEUED,
190   PROP_TIME_QUEUED,
191
192   PROP_UNIT_TYPE,
193   PROP_UNITS_MAX,
194   PROP_UNITS_SOFT_MAX,
195
196   PROP_BUFFERS_MAX,
197   PROP_BUFFERS_SOFT_MAX,
198
199   PROP_TIME_MIN,
200   PROP_BYTES_MIN,
201   PROP_BUFFERS_MIN,
202
203   PROP_RECOVER_POLICY,
204   PROP_TIMEOUT,
205   PROP_SYNC_METHOD,
206   PROP_BYTES_TO_SERVE,
207   PROP_BYTES_SERVED,
208
209   PROP_BURST_UNIT,
210   PROP_BURST_VALUE,
211
212   PROP_QOS_DSCP,
213
214   PROP_HANDLE_READ,
215
216   PROP_RESEND_STREAMHEADER,
217
218   PROP_NUM_FDS,
219
220   PROP_LAST
221 };
222
223 /* For backward compat, we can't really select the poll mode anymore with
224  * GstPoll. */
225 #define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
226 static GType
227 gst_fdset_mode_get_type (void)
228 {
229   static GType fdset_mode_type = 0;
230   static const GEnumValue fdset_mode[] = {
231     {0, "Select", "select"},
232     {1, "Poll", "poll"},
233     {2, "EPoll", "epoll"},
234     {0, NULL, NULL},
235   };
236
237   if (!fdset_mode_type) {
238     fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
239   }
240   return fdset_mode_type;
241 }
242
243 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
244 static GType
245 gst_recover_policy_get_type (void)
246 {
247   static GType recover_policy_type = 0;
248   static const GEnumValue recover_policy[] = {
249     {GST_RECOVER_POLICY_NONE,
250         "Do not try to recover", "none"},
251     {GST_RECOVER_POLICY_RESYNC_LATEST,
252         "Resync client to latest buffer", "latest"},
253     {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
254         "Resync client to soft limit", "soft-limit"},
255     {GST_RECOVER_POLICY_RESYNC_KEYFRAME,
256         "Resync client to most recent keyframe", "keyframe"},
257     {0, NULL, NULL},
258   };
259
260   if (!recover_policy_type) {
261     recover_policy_type =
262         g_enum_register_static ("GstRecoverPolicy", recover_policy);
263   }
264   return recover_policy_type;
265 }
266
267 #define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
268 static GType
269 gst_sync_method_get_type (void)
270 {
271   static GType sync_method_type = 0;
272   static const GEnumValue sync_method[] = {
273     {GST_SYNC_METHOD_LATEST,
274         "Serve starting from the latest buffer", "latest"},
275     {GST_SYNC_METHOD_NEXT_KEYFRAME,
276         "Serve starting from the next keyframe", "next-keyframe"},
277     {GST_SYNC_METHOD_LATEST_KEYFRAME,
278           "Serve everything since the latest keyframe (burst)",
279         "latest-keyframe"},
280     {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"},
281     {GST_SYNC_METHOD_BURST_KEYFRAME,
282           "Serve burst-value data starting on a keyframe",
283         "burst-keyframe"},
284     {GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
285           "Serve burst-value data preferably starting on a keyframe",
286         "burst-with-keyframe"},
287     {0, NULL, NULL},
288   };
289
290   if (!sync_method_type) {
291     sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method);
292   }
293   return sync_method_type;
294 }
295
296 #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
297 static GType
298 gst_unit_type_get_type (void)
299 {
300   static GType unit_type_type = 0;
301   static const GEnumValue unit_type[] = {
302     {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
303     {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
304     {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
305     {GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
306     {0, NULL, NULL},
307   };
308
309   if (!unit_type_type) {
310     unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
311   }
312   return unit_type_type;
313 }
314
315 #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
316 static GType
317 gst_client_status_get_type (void)
318 {
319   static GType client_status_type = 0;
320   static const GEnumValue client_status[] = {
321     {GST_CLIENT_STATUS_OK, "ok", "ok"},
322     {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"},
323     {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"},
324     {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
325     {GST_CLIENT_STATUS_ERROR, "Error", "error"},
326     {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
327     {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"},
328     {0, NULL, NULL},
329   };
330
331   if (!client_status_type) {
332     client_status_type =
333         g_enum_register_static ("GstClientStatus", client_status);
334   }
335   return client_status_type;
336 }
337
338 static void gst_multi_fd_sink_finalize (GObject * object);
339
340 static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink,
341     GList * link);
342
343 static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
344     GstBuffer * buf);
345 static GstStateChangeReturn gst_multi_fd_sink_change_state (GstElement *
346     element, GstStateChange transition);
347
348 static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
349     const GValue * value, GParamSpec * pspec);
350 static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
351     GValue * value, GParamSpec * pspec);
352
353 GST_BOILERPLATE (GstMultiFdSink, gst_multi_fd_sink, GstBaseSink,
354     GST_TYPE_BASE_SINK);
355
356 static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 };
357
358 static void
359 gst_multi_fd_sink_base_init (gpointer g_class)
360 {
361   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
362
363   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
364
365   gst_element_class_set_details_simple (element_class,
366       "Multi filedescriptor sink", "Sink/Network",
367       "Send data to multiple filedescriptors",
368       "Thomas Vander Stichele <thomas at apestaart dot org>, "
369       "Wim Taymans <wim@fluendo.com>");
370 }
371
372 static void
373 gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
374 {
375   GObjectClass *gobject_class;
376   GstElementClass *gstelement_class;
377   GstBaseSinkClass *gstbasesink_class;
378
379   gobject_class = (GObjectClass *) klass;
380   gstelement_class = (GstElementClass *) klass;
381   gstbasesink_class = (GstBaseSinkClass *) klass;
382
383   gobject_class->set_property = gst_multi_fd_sink_set_property;
384   gobject_class->get_property = gst_multi_fd_sink_get_property;
385   gobject_class->finalize = gst_multi_fd_sink_finalize;
386
387   g_object_class_install_property (gobject_class, PROP_PROTOCOL,
388       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in"
389           ". GDP protocol here is deprecated. Please use gdppay element.",
390           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   /**
394    * GstMultiFdSink::mode
395    *
396    * The mode for selecting activity on the fds. 
397    *
398    * This property is deprecated since 0.10.18, if will now automatically
399    * select and use the most optimal method.
400    */
401   g_object_class_install_property (gobject_class, PROP_MODE,
402       g_param_spec_enum ("mode", "Mode",
403           "The mode for selecting activity on the fds (deprecated)",
404           GST_TYPE_FDSET_MODE, DEFAULT_MODE,
405           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
406
407   g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
408       g_param_spec_int ("buffers-max", "Buffers max",
409           "max number of buffers to queue for a client (-1 = no limit)", -1,
410           G_MAXINT, DEFAULT_BUFFERS_MAX,
411           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412   g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
413       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
414           "Recover client when going over this limit (-1 = no limit)", -1,
415           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
416           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
417
418   g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
419       g_param_spec_int ("bytes-min", "Bytes min",
420           "min number of bytes to queue (-1 = as little as possible)", -1,
421           G_MAXINT, DEFAULT_BYTES_MIN,
422           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423   g_object_class_install_property (gobject_class, PROP_TIME_MIN,
424       g_param_spec_int64 ("time-min", "Time min",
425           "min number of time to queue (-1 = as little as possible)", -1,
426           G_MAXINT64, DEFAULT_TIME_MIN,
427           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
429       g_param_spec_int ("buffers-min", "Buffers min",
430           "min number of buffers to queue (-1 = as few as possible)", -1,
431           G_MAXINT, DEFAULT_BUFFERS_MIN,
432           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433
434   g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
435       g_param_spec_enum ("unit-type", "Units type",
436           "The unit to measure the max/soft-max/queued properties",
437           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439   g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
440       g_param_spec_int64 ("units-max", "Units max",
441           "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
442           DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443   g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
444       g_param_spec_int64 ("units-soft-max", "Units soft max",
445           "Recover client when going over this limit (-1 = no limit)", -1,
446           G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
447           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
448
449   g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
450       g_param_spec_uint ("buffers-queued", "Buffers queued",
451           "Number of buffers currently queued", 0, G_MAXUINT, 0,
452           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
453 #if NOT_IMPLEMENTED
454   g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
455       g_param_spec_uint ("bytes-queued", "Bytes queued",
456           "Number of bytes currently queued", 0, G_MAXUINT, 0,
457           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
458   g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
459       g_param_spec_uint64 ("time-queued", "Time queued",
460           "Number of time currently queued", 0, G_MAXUINT64, 0,
461           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462 #endif
463
464   g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
465       g_param_spec_enum ("recover-policy", "Recover Policy",
466           "How to recover when client reaches the soft max",
467           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
470       g_param_spec_uint64 ("timeout", "Timeout",
471           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
472           0, G_MAXUINT64, DEFAULT_TIMEOUT,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474   g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
475       g_param_spec_enum ("sync-method", "Sync Method",
476           "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD,
477           DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478   g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
479       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
480           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
481           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
482   g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
483       g_param_spec_uint64 ("bytes-served", "Bytes served",
484           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
485           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
486
487   g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
488       g_param_spec_enum ("burst-unit", "Burst unit",
489           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
490           GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
493       g_param_spec_uint64 ("burst-value", "Burst value",
494           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
495           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
498       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
499           "Quality of Service, differentiated services code point (-1 default)",
500           -1, 63, DEFAULT_QOS_DSCP,
501           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
502   /**
503    * GstMultiFdSink::handle-read
504    *
505    * Handle read requests from clients and discard the data.
506    *
507    * Since: 0.10.23
508    */
509   g_object_class_install_property (gobject_class, PROP_HANDLE_READ,
510       g_param_spec_boolean ("handle-read", "Handle Read",
511           "Handle client reads and discard the data",
512           DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
513   /**
514    * GstMultiFdSink::resend-streamheader
515    *
516    * Resend the streamheaders to existing clients when they change.
517    *
518    * Since: 0.10.23
519    */
520   g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
521       g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
522           "Resend the streamheader if it changes in the caps",
523           DEFAULT_RESEND_STREAMHEADER,
524           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
525
526   g_object_class_install_property (gobject_class, PROP_NUM_FDS,
527       g_param_spec_uint ("num-fds", "Number of fds",
528           "The current number of client file descriptors.",
529           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
530
531   /**
532    * GstMultiFdSink::add:
533    * @gstmultifdsink: the multifdsink element to emit this signal on
534    * @fd:             the file descriptor to add to multifdsink
535    *
536    * Hand the given open file descriptor to multifdsink to write to.
537    */
538   gst_multi_fd_sink_signals[SIGNAL_ADD] =
539       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
540       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
541           add), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1,
542       G_TYPE_INT);
543   /**
544    * GstMultiFdSink::add-full:
545    * @gstmultifdsink: the multifdsink element to emit this signal on
546    * @fd:             the file descriptor to add to multifdsink
547    * @sync:           the sync method to use
548    * @unit_type_min:  the unit-type of @value_min
549    * @value_min:      the minimum amount of data to burst expressed in
550    *                  @unit_type_min units.
551    * @unit_type_max:  the unit-type of @value_max
552    * @value_max:      the maximum amount of data to burst expressed in
553    *                  @unit_type_max units.
554    *
555    * Hand the given open file descriptor to multifdsink to write to and
556    * specify the burst parameters for the new connection.
557    */
558   gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
559       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
560       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
561           add_full), NULL, NULL,
562       gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6,
563       G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64,
564       GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
565   /**
566    * GstMultiFdSink::remove:
567    * @gstmultifdsink: the multifdsink element to emit this signal on
568    * @fd:             the file descriptor to remove from multifdsink
569    *
570    * Remove the given open file descriptor from multifdsink.
571    */
572   gst_multi_fd_sink_signals[SIGNAL_REMOVE] =
573       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
574       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
575           remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
576       G_TYPE_INT);
577   /**
578    * GstMultiFdSink::remove-flush:
579    * @gstmultifdsink: the multifdsink element to emit this signal on
580    * @fd:             the file descriptor to remove from multifdsink
581    *
582    * Remove the given open file descriptor from multifdsink after flushing all
583    * the pending data to the fd.
584    */
585   gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] =
586       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
588           remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
589       G_TYPE_INT);
590   /**
591    * GstMultiFdSink::clear:
592    * @gstmultifdsink: the multifdsink element to emit this signal on
593    *
594    * Remove all file descriptors from multifdsink.  Since multifdsink did not
595    * open fd's itself, it does not explicitly close the fd.  The application
596    * should do so by connecting to the client-fd-removed callback.
597    */
598   gst_multi_fd_sink_signals[SIGNAL_CLEAR] =
599       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
600       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
601           clear), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
602
603   /**
604    * GstMultiFdSink::get-stats:
605    * @gstmultifdsink: the multifdsink element to emit this signal on
606    * @fd:             the file descriptor to get stats of from multifdsink
607    *
608    * Get statistics about @fd. This function returns a GValueArray to ease
609    * automatic wrapping for bindings.
610    *
611    * Returns: a GValueArray with the statistics. The array contains guint64
612    *     values that represent respectively: total number of bytes sent, time
613    *     when the client was added, time when the client was
614    *     disconnected/removed, time the client is/was active, last activity
615    *     time (in epoch seconds), number of buffers dropped.
616    *     All times are expressed in nanoseconds (GstClockTime).
617    *     The array can be 0-length if the client was not found.
618    */
619   gst_multi_fd_sink_signals[SIGNAL_GET_STATS] =
620       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
621       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
622           get_stats), NULL, NULL, gst_tcp_marshal_BOXED__INT,
623       G_TYPE_VALUE_ARRAY, 1, G_TYPE_INT);
624
625   /**
626    * GstMultiFdSink::client-added:
627    * @gstmultifdsink: the multifdsink element that emitted this signal
628    * @fd:             the file descriptor that was added to multifdsink
629    *
630    * The given file descriptor was added to multifdsink. This signal will
631    * be emitted from the streaming thread so application should be prepared
632    * for that.
633    */
634   gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] =
635       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
636       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
637       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
638   /**
639    * GstMultiFdSink::client-removed:
640    * @gstmultifdsink: the multifdsink element that emitted this signal
641    * @fd:             the file descriptor that is to be removed from multifdsink
642    * @status:         the reason why the client was removed
643    *
644    * The given file descriptor is about to be removed from multifdsink. This
645    * signal will be emitted from the streaming thread so applications should
646    * be prepared for that.
647    *
648    * @gstmultifdsink still holds a handle to @fd so it is possible to call
649    * the get-stats signal from this callback. For the same reason it is
650    * not safe to close() and reuse @fd in this callback.
651    */
652   gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] =
653       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
654       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
655           client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
656       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
657   /**
658    * GstMultiFdSink::client-fd-removed:
659    * @gstmultifdsink: the multifdsink element that emitted this signal
660    * @fd:             the file descriptor that was removed from multifdsink
661    *
662    * The given file descriptor was removed from multifdsink. This signal will
663    * be emitted from the streaming thread so applications should be prepared
664    * for that.
665    *
666    * In this callback, @gstmultifdsink has removed all the information
667    * associated with @fd and it is therefore not possible to call get-stats
668    * with @fd. It is however safe to close() and reuse @fd in the callback.
669    *
670    * Since: 0.10.7
671    */
672   gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED] =
673       g_signal_new ("client-fd-removed", G_TYPE_FROM_CLASS (klass),
674       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
675           client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
676       G_TYPE_NONE, 1, G_TYPE_INT);
677
678   gstelement_class->change_state =
679       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_change_state);
680
681   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
682
683   klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
684   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
685   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
686   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush);
687   klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
688   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
689
690   GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
691 }
692
693 static void
694 gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass)
695 {
696   GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN);
697
698   this->protocol = DEFAULT_PROTOCOL;
699   this->mode = DEFAULT_MODE;
700
701   CLIENTS_LOCK_INIT (this);
702   this->clients = NULL;
703   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
704
705   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
706   this->unit_type = DEFAULT_UNIT_TYPE;
707   this->units_max = DEFAULT_UNITS_MAX;
708   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
709   this->time_min = DEFAULT_TIME_MIN;
710   this->bytes_min = DEFAULT_BYTES_MIN;
711   this->buffers_min = DEFAULT_BUFFERS_MIN;
712   this->recover_policy = DEFAULT_RECOVER_POLICY;
713
714   this->timeout = DEFAULT_TIMEOUT;
715   this->def_sync_method = DEFAULT_SYNC_METHOD;
716   this->def_burst_unit = DEFAULT_BURST_UNIT;
717   this->def_burst_value = DEFAULT_BURST_VALUE;
718
719   this->qos_dscp = DEFAULT_QOS_DSCP;
720   this->handle_read = DEFAULT_HANDLE_READ;
721
722   this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
723
724   this->header_flags = 0;
725 }
726
727 static void
728 gst_multi_fd_sink_finalize (GObject * object)
729 {
730   GstMultiFdSink *this;
731
732   this = GST_MULTI_FD_SINK (object);
733
734   CLIENTS_LOCK_FREE (this);
735   g_hash_table_destroy (this->fd_hash);
736   g_array_free (this->bufqueue, TRUE);
737
738   G_OBJECT_CLASS (parent_class)->finalize (object);
739 }
740
741 static gint
742 setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
743 {
744   gint tos;
745   gint ret;
746   union gst_sockaddr
747   {
748     struct sockaddr sa;
749     struct sockaddr_in6 sa_in6;
750     struct sockaddr_storage sa_stor;
751   } sa;
752   socklen_t slen = sizeof (sa);
753   gint af;
754
755   /* don't touch */
756   if (sink->qos_dscp < 0)
757     return 0;
758
759   if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
760     GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
761     return ret;
762   }
763
764   af = sa.sa.sa_family;
765
766   /* if this is an IPv4-mapped address then do IPv4 QoS */
767   if (af == AF_INET6) {
768
769     GST_DEBUG_OBJECT (sink, "check IP6 socket");
770     if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
771       GST_DEBUG_OBJECT (sink, "mapped to IPV4");
772       af = AF_INET;
773     }
774   }
775
776   /* extract and shift 6 bits of the DSCP */
777   tos = (sink->qos_dscp & 0x3f) << 2;
778
779   switch (af) {
780     case AF_INET:
781       ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
782       break;
783     case AF_INET6:
784 #ifdef IPV6_TCLASS
785       ret =
786           setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
787           sizeof (tos));
788       break;
789 #endif
790     default:
791       ret = 0;
792       GST_ERROR_OBJECT (sink, "unsupported AF");
793       break;
794   }
795   if (ret)
796     GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
797
798   return ret;
799 }
800
801
802 static void
803 setup_dscp (GstMultiFdSink * sink)
804 {
805   GList *clients, *next;
806
807   CLIENTS_LOCK (sink);
808   for (clients = sink->clients; clients; clients = next) {
809     GstTCPClient *client;
810
811     client = (GstTCPClient *) clients->data;
812     next = g_list_next (clients);
813
814     setup_dscp_client (sink, client);
815   }
816   CLIENTS_UNLOCK (sink);
817 }
818
819 /* "add-full" signal implementation */
820 void
821 gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
822     GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
823     GstTCPUnitType max_unit, guint64 max_value)
824 {
825   GstTCPClient *client;
826   GList *clink;
827   GTimeVal now;
828   gint flags;
829   struct stat statbuf;
830
831   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
832       "min_unit %d, min_value %" G_GUINT64_FORMAT
833       ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
834       min_unit, min_value, max_unit, max_value);
835
836   /* do limits check if we can */
837   if (min_unit == max_unit) {
838     if (max_value != -1 && min_value != -1 && max_value < min_value)
839       goto wrong_limits;
840   }
841
842   /* create client datastructure */
843   client = g_new0 (GstTCPClient, 1);
844   client->fd.fd = fd;
845   client->status = GST_CLIENT_STATUS_OK;
846   client->bufpos = -1;
847   client->flushcount = -1;
848   client->bufoffset = 0;
849   client->sending = NULL;
850   client->bytes_sent = 0;
851   client->dropped_buffers = 0;
852   client->avg_queue_size = 0;
853   client->first_buffer_ts = GST_CLOCK_TIME_NONE;
854   client->last_buffer_ts = GST_CLOCK_TIME_NONE;
855   client->new_connection = TRUE;
856   client->burst_min_unit = min_unit;
857   client->burst_min_value = min_value;
858   client->burst_max_unit = max_unit;
859   client->burst_max_value = max_value;
860   client->sync_method = sync_method;
861   client->currently_removing = FALSE;
862
863   /* update start time */
864   g_get_current_time (&now);
865   client->connect_time = GST_TIMEVAL_TO_TIME (now);
866   client->disconnect_time = 0;
867   /* set last activity time to connect time */
868   client->last_activity_time = client->connect_time;
869
870   CLIENTS_LOCK (sink);
871
872   /* check the hash to find a duplicate fd */
873   clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
874   if (clink != NULL)
875     goto duplicate;
876
877   /* we can add the fd now */
878   clink = sink->clients = g_list_prepend (sink->clients, client);
879   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
880   sink->clients_cookie++;
881
882   /* set the socket to non blocking */
883   if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) {
884     GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", fd,
885         g_strerror (errno));
886   }
887
888   /* we always read from a client */
889   gst_poll_add_fd (sink->fdset, &client->fd);
890
891   /* we don't try to read from write only fds */
892   if (sink->handle_read) {
893     flags = fcntl (fd, F_GETFL, 0);
894     if ((flags & O_ACCMODE) != O_WRONLY) {
895       gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
896     }
897   }
898   /* figure out the mode, can't use send() for non sockets */
899   if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) {
900     client->is_socket = TRUE;
901     setup_dscp_client (sink, client);
902   }
903
904   gst_poll_restart (sink->fdset);
905
906   CLIENTS_UNLOCK (sink);
907
908   g_signal_emit (G_OBJECT (sink),
909       gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
910
911   return;
912
913   /* errors */
914 wrong_limits:
915   {
916     GST_WARNING_OBJECT (sink,
917         "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
918         G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
919         min_value, max_value, min_unit);
920     return;
921   }
922 duplicate:
923   {
924     client->status = GST_CLIENT_STATUS_DUPLICATE;
925     CLIENTS_UNLOCK (sink);
926     GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
927     g_signal_emit (G_OBJECT (sink),
928         gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
929         client->status);
930     g_free (client);
931     return;
932   }
933 }
934
935 /* "add" signal implementation */
936 void
937 gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
938 {
939   gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method,
940       sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
941 }
942
943 /* "remove" signal implementation */
944 void
945 gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
946 {
947   GList *clink;
948
949   GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
950
951   CLIENTS_LOCK (sink);
952   clink = g_hash_table_lookup (sink->fd_hash, &fd);
953   if (clink != NULL) {
954     GstTCPClient *client = (GstTCPClient *) clink->data;
955
956     if (client->status != GST_CLIENT_STATUS_OK) {
957       GST_INFO_OBJECT (sink,
958           "[fd %5d] Client already disconnecting with status %d",
959           fd, client->status);
960       goto done;
961     }
962
963     client->status = GST_CLIENT_STATUS_REMOVED;
964     gst_multi_fd_sink_remove_client_link (sink, clink);
965     gst_poll_restart (sink->fdset);
966   } else {
967     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
968   }
969
970 done:
971   CLIENTS_UNLOCK (sink);
972 }
973
974 /* "remove-flush" signal implementation */
975 void
976 gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
977 {
978   GList *clink;
979
980   GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);
981
982   CLIENTS_LOCK (sink);
983   clink = g_hash_table_lookup (sink->fd_hash, &fd);
984   if (clink != NULL) {
985     GstTCPClient *client = (GstTCPClient *) clink->data;
986
987     if (client->status != GST_CLIENT_STATUS_OK) {
988       GST_INFO_OBJECT (sink,
989           "[fd %5d] Client already disconnecting with status %d",
990           fd, client->status);
991       goto done;
992     }
993
994     /* take the position of the client as the number of buffers left to flush.
995      * If the client was at position -1, we flush 0 buffers, 0 == flush 1
996      * buffer, etc... */
997     client->flushcount = client->bufpos + 1;
998     /* mark client as flushing. We can not remove the client right away because
999      * it might have some buffers to flush in the ->sending queue. */
1000     client->status = GST_CLIENT_STATUS_FLUSHING;
1001   } else {
1002     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
1003   }
1004 done:
1005   CLIENTS_UNLOCK (sink);
1006 }
1007
1008 /* can be called both through the signal (i.e. from any thread) or when 
1009  * stopping, after the writing thread has shut down */
1010 void
1011 gst_multi_fd_sink_clear (GstMultiFdSink * sink)
1012 {
1013   GList *clients, *next;
1014   guint32 cookie;
1015
1016   GST_DEBUG_OBJECT (sink, "clearing all clients");
1017
1018   CLIENTS_LOCK (sink);
1019 restart:
1020   cookie = sink->clients_cookie;
1021   for (clients = sink->clients; clients; clients = next) {
1022     GstTCPClient *client;
1023
1024     if (cookie != sink->clients_cookie) {
1025       GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients");
1026       goto restart;
1027     }
1028
1029     client = (GstTCPClient *) clients->data;
1030     next = g_list_next (clients);
1031
1032     client->status = GST_CLIENT_STATUS_REMOVED;
1033     gst_multi_fd_sink_remove_client_link (sink, clients);
1034   }
1035   gst_poll_restart (sink->fdset);
1036   CLIENTS_UNLOCK (sink);
1037 }
1038
1039 /* "get-stats" signal implementation
1040  * the array returned contains:
1041  *
1042  * guint64 : bytes_sent
1043  * guint64 : connect time (in nanoseconds, since Epoch)
1044  * guint64 : disconnect time (in nanoseconds, since Epoch)
1045  * guint64 : time the client is/was connected (in nanoseconds)
1046  * guint64 : last activity time (in nanoseconds, since Epoch)
1047  * guint64 : buffers dropped due to recovery
1048  * guint64 : timestamp of the first buffer sent (in nanoseconds)
1049  * guint64 : timestamp of the last buffer sent (in nanoseconds)
1050  */
1051 GValueArray *
1052 gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
1053 {
1054   GstTCPClient *client;
1055   GValueArray *result = NULL;
1056   GList *clink;
1057
1058   CLIENTS_LOCK (sink);
1059   clink = g_hash_table_lookup (sink->fd_hash, &fd);
1060   if (clink == NULL)
1061     goto noclient;
1062
1063   client = (GstTCPClient *) clink->data;
1064   if (client != NULL) {
1065     GValue value = { 0 };
1066     guint64 interval;
1067
1068     result = g_value_array_new (7);
1069
1070     g_value_init (&value, G_TYPE_UINT64);
1071     g_value_set_uint64 (&value, client->bytes_sent);
1072     result = g_value_array_append (result, &value);
1073     g_value_unset (&value);
1074     g_value_init (&value, G_TYPE_UINT64);
1075     g_value_set_uint64 (&value, client->connect_time);
1076     result = g_value_array_append (result, &value);
1077     g_value_unset (&value);
1078     if (client->disconnect_time == 0) {
1079       GTimeVal nowtv;
1080
1081       g_get_current_time (&nowtv);
1082
1083       interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
1084     } else {
1085       interval = client->disconnect_time - client->connect_time;
1086     }
1087     g_value_init (&value, G_TYPE_UINT64);
1088     g_value_set_uint64 (&value, client->disconnect_time);
1089     result = g_value_array_append (result, &value);
1090     g_value_unset (&value);
1091     g_value_init (&value, G_TYPE_UINT64);
1092     g_value_set_uint64 (&value, interval);
1093     result = g_value_array_append (result, &value);
1094     g_value_unset (&value);
1095     g_value_init (&value, G_TYPE_UINT64);
1096     g_value_set_uint64 (&value, client->last_activity_time);
1097     result = g_value_array_append (result, &value);
1098     g_value_unset (&value);
1099     g_value_init (&value, G_TYPE_UINT64);
1100     g_value_set_uint64 (&value, client->dropped_buffers);
1101     result = g_value_array_append (result, &value);
1102     g_value_unset (&value);
1103     g_value_init (&value, G_TYPE_UINT64);
1104     g_value_set_uint64 (&value, client->first_buffer_ts);
1105     result = g_value_array_append (result, &value);
1106     g_value_unset (&value);
1107     g_value_init (&value, G_TYPE_UINT64);
1108     g_value_set_uint64 (&value, client->last_buffer_ts);
1109     result = g_value_array_append (result, &value);
1110   }
1111
1112 noclient:
1113   CLIENTS_UNLOCK (sink);
1114
1115   /* python doesn't like a NULL pointer yet */
1116   if (result == NULL) {
1117     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
1118     result = g_value_array_new (0);
1119   }
1120
1121   return result;
1122 }
1123
1124 /* should be called with the clientslock helt.
1125  * Note that we don't close the fd as we didn't open it in the first
1126  * place. An application should connect to the client-fd-removed signal and
1127  * close the fd itself.
1128  */
1129 static void
1130 gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
1131 {
1132   int fd;
1133   GTimeVal now;
1134   GstTCPClient *client = (GstTCPClient *) link->data;
1135   GstMultiFdSinkClass *fclass;
1136
1137   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
1138
1139   fd = client->fd.fd;
1140
1141   if (client->currently_removing) {
1142     GST_WARNING_OBJECT (sink, "[fd %5d] client is already being removed", fd);
1143     return;
1144   } else {
1145     client->currently_removing = TRUE;
1146   }
1147
1148   /* FIXME: if we keep track of ip we can log it here and signal */
1149   switch (client->status) {
1150     case GST_CLIENT_STATUS_OK:
1151       GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason",
1152           fd, client);
1153       break;
1154     case GST_CLIENT_STATUS_CLOSED:
1155       GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close",
1156           fd, client);
1157       break;
1158     case GST_CLIENT_STATUS_REMOVED:
1159       GST_DEBUG_OBJECT (sink,
1160           "[fd %5d] removing client %p because the app removed it", fd, client);
1161       break;
1162     case GST_CLIENT_STATUS_SLOW:
1163       GST_INFO_OBJECT (sink,
1164           "[fd %5d] removing client %p because it was too slow", fd, client);
1165       break;
1166     case GST_CLIENT_STATUS_ERROR:
1167       GST_WARNING_OBJECT (sink,
1168           "[fd %5d] removing client %p because of error", fd, client);
1169       break;
1170     case GST_CLIENT_STATUS_FLUSHING:
1171     default:
1172       GST_WARNING_OBJECT (sink,
1173           "[fd %5d] removing client %p with invalid reason %d", fd, client,
1174           client->status);
1175       break;
1176   }
1177
1178   gst_poll_remove_fd (sink->fdset, &client->fd);
1179
1180   g_get_current_time (&now);
1181   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
1182
1183   /* free client buffers */
1184   g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
1185   g_slist_free (client->sending);
1186   client->sending = NULL;
1187
1188   if (client->caps)
1189     gst_caps_unref (client->caps);
1190   client->caps = NULL;
1191
1192   /* unlock the mutex before signaling because the signal handler
1193    * might query some properties */
1194   CLIENTS_UNLOCK (sink);
1195
1196   g_signal_emit (G_OBJECT (sink),
1197       gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
1198
1199   /* lock again before we remove the client completely */
1200   CLIENTS_LOCK (sink);
1201
1202   /* fd cannot be reused in the above signal callback so we can safely
1203    * remove it from the hashtable here */
1204   if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
1205     GST_WARNING_OBJECT (sink,
1206         "[fd %5d] error removing client %p from hash", client->fd.fd, client);
1207   }
1208   /* after releasing the lock above, the link could be invalid, more
1209    * precisely, the next and prev pointers could point to invalid list
1210    * links. One optimisation could be to add a cookie to the linked list
1211    * and take a shortcut when it did not change between unlocking and locking
1212    * our mutex. For now we just walk the list again. */
1213   sink->clients = g_list_remove (sink->clients, client);
1214   sink->clients_cookie++;
1215
1216   if (fclass->removed)
1217     fclass->removed (sink, client->fd.fd);
1218
1219   g_free (client);
1220   CLIENTS_UNLOCK (sink);
1221
1222   /* and the fd is really gone now */
1223   g_signal_emit (G_OBJECT (sink),
1224       gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED], 0, fd);
1225
1226   CLIENTS_LOCK (sink);
1227 }
1228
1229 /* handle a read on a client fd,
1230  * which either indicates a close or should be ignored
1231  * returns FALSE if some error occured or the client closed. */
1232 static gboolean
1233 gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
1234     GstTCPClient * client)
1235 {
1236   int avail, fd;
1237   gboolean ret;
1238
1239   fd = client->fd.fd;
1240
1241   if (ioctl (fd, FIONREAD, &avail) < 0)
1242     goto ioctl_failed;
1243
1244   GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
1245       fd, avail);
1246
1247   ret = TRUE;
1248
1249   if (avail == 0) {
1250     /* client sent close, so remove it */
1251     GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
1252     client->status = GST_CLIENT_STATUS_CLOSED;
1253     ret = FALSE;
1254   } else if (avail < 0) {
1255     GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
1256     client->status = GST_CLIENT_STATUS_ERROR;
1257     ret = FALSE;
1258   } else {
1259     guint8 dummy[512];
1260     gint nread;
1261
1262     /* just Read 'n' Drop, could also just drop the client as it's not supposed
1263      * to write to us except for closing the socket, I guess it's because we
1264      * like to listen to our customers. */
1265     do {
1266       /* this is the maximum we can read */
1267       gint to_read = MIN (avail, 512);
1268
1269       GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
1270           fd, to_read);
1271
1272       nread = read (fd, dummy, to_read);
1273       if (nread < -1) {
1274         GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
1275             fd, to_read, g_strerror (errno), errno);
1276         client->status = GST_CLIENT_STATUS_ERROR;
1277         ret = FALSE;
1278         break;
1279       } else if (nread == 0) {
1280         GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
1281         client->status = GST_CLIENT_STATUS_ERROR;
1282         ret = FALSE;
1283         break;
1284       }
1285       avail -= nread;
1286     }
1287     while (avail > 0);
1288   }
1289   return ret;
1290
1291   /* ERRORS */
1292 ioctl_failed:
1293   {
1294     GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
1295         fd, g_strerror (errno), errno);
1296     client->status = GST_CLIENT_STATUS_ERROR;
1297     return FALSE;
1298   }
1299 }
1300
1301 /* Queue raw data for this client, creating a new buffer.
1302  * This takes ownership of the data by
1303  * setting it as GST_BUFFER_MALLOCDATA() on the created buffer so
1304  * be sure to pass g_free()-able @data.
1305  */
1306 static gboolean
1307 gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink,
1308     GstTCPClient * client, gchar * data, gint len)
1309 {
1310   GstBuffer *buf;
1311
1312   buf = gst_buffer_new ();
1313   GST_BUFFER_DATA (buf) = (guint8 *) data;
1314   GST_BUFFER_MALLOCDATA (buf) = (guint8 *) data;
1315   GST_BUFFER_SIZE (buf) = len;
1316
1317   GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d",
1318       client->fd.fd, len);
1319
1320   client->sending = g_slist_append (client->sending, buf);
1321
1322   return TRUE;
1323 }
1324
1325 /* GDP-encode given caps and queue them for sending */
1326 static gboolean
1327 gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink,
1328     GstTCPClient * client, const GstCaps * caps)
1329 {
1330   guint8 *header;
1331   guint8 *payload;
1332   guint length;
1333   gchar *string;
1334
1335   g_return_val_if_fail (caps != NULL, FALSE);
1336
1337   string = gst_caps_to_string (caps);
1338   GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP",
1339       client->fd.fd, string);
1340   g_free (string);
1341
1342   if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header,
1343           &payload)) {
1344     GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
1345     return FALSE;
1346   }
1347   gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, length);
1348
1349   length = gst_dp_header_payload_length (header);
1350   gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) payload, length);
1351
1352   return TRUE;
1353 }
1354
1355 static gboolean
1356 is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
1357 {
1358   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
1359     return FALSE;
1360   } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
1361     return TRUE;
1362   }
1363
1364   return FALSE;
1365 }
1366
1367 /* queue the given buffer for the given client, possibly adding the GDP
1368  * header if GDP is being used */
1369 static gboolean
1370 gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
1371     GstTCPClient * client, GstBuffer * buffer)
1372 {
1373   GstCaps *caps;
1374
1375   /* TRUE: send them if the new caps have them */
1376   gboolean send_streamheader = FALSE;
1377   GstStructure *s;
1378
1379   /* before we queue the buffer, we check if we need to queue streamheader
1380    * buffers (because it's a new client, or because they changed) */
1381   caps = gst_buffer_get_caps (buffer);  /* cleaned up after streamheader */
1382   if (!client->caps) {
1383     GST_DEBUG_OBJECT (sink,
1384         "[fd %5d] no previous caps for this client, send streamheader",
1385         client->fd.fd);
1386     send_streamheader = TRUE;
1387     client->caps = gst_caps_ref (caps);
1388   } else {
1389     /* there were previous caps recorded, so compare */
1390     if (!gst_caps_is_equal (caps, client->caps)) {
1391       const GValue *sh1, *sh2;
1392
1393       /* caps are not equal, but could still have the same streamheader */
1394       s = gst_caps_get_structure (caps, 0);
1395       if (!gst_structure_has_field (s, "streamheader")) {
1396         /* no new streamheader, so nothing new to send */
1397         GST_DEBUG_OBJECT (sink,
1398             "[fd %5d] new caps do not have streamheader, not sending",
1399             client->fd.fd);
1400       } else {
1401         /* there is a new streamheader */
1402         s = gst_caps_get_structure (client->caps, 0);
1403         if (!gst_structure_has_field (s, "streamheader")) {
1404           /* no previous streamheader, so send the new one */
1405           GST_DEBUG_OBJECT (sink,
1406               "[fd %5d] previous caps did not have streamheader, sending",
1407               client->fd.fd);
1408           send_streamheader = TRUE;
1409         } else {
1410           /* both old and new caps have streamheader set */
1411           if (!sink->resend_streamheader) {
1412             GST_DEBUG_OBJECT (sink,
1413                 "[fd %5d] asked to not resend the streamheader, not sending",
1414                 client->fd.fd);
1415             send_streamheader = FALSE;
1416           } else {
1417             sh1 = gst_structure_get_value (s, "streamheader");
1418             s = gst_caps_get_structure (caps, 0);
1419             sh2 = gst_structure_get_value (s, "streamheader");
1420             if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
1421               GST_DEBUG_OBJECT (sink,
1422                   "[fd %5d] new streamheader different from old, sending",
1423                   client->fd.fd);
1424               send_streamheader = TRUE;
1425             }
1426           }
1427         }
1428       }
1429     }
1430     /* Replace the old caps */
1431     gst_caps_unref (client->caps);
1432     client->caps = gst_caps_ref (caps);
1433   }
1434
1435   if (G_UNLIKELY (send_streamheader)) {
1436     const GValue *sh;
1437     GArray *buffers;
1438     int i;
1439
1440     GST_LOG_OBJECT (sink,
1441         "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
1442         client->fd.fd, caps);
1443     s = gst_caps_get_structure (caps, 0);
1444     if (!gst_structure_has_field (s, "streamheader")) {
1445       GST_DEBUG_OBJECT (sink,
1446           "[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
1447     } else {
1448       GST_LOG_OBJECT (sink,
1449           "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
1450           client->fd.fd, caps);
1451       sh = gst_structure_get_value (s, "streamheader");
1452       g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
1453       buffers = g_value_peek_pointer (sh);
1454       GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len);
1455       for (i = 0; i < buffers->len; ++i) {
1456         GValue *bufval;
1457         GstBuffer *buffer;
1458
1459         bufval = &g_array_index (buffers, GValue, i);
1460         g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
1461         buffer = g_value_peek_pointer (bufval);
1462         GST_DEBUG_OBJECT (sink,
1463             "[fd %5d] queueing streamheader buffer of length %d",
1464             client->fd.fd, GST_BUFFER_SIZE (buffer));
1465         gst_buffer_ref (buffer);
1466
1467         if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
1468           guint8 *header;
1469           guint len;
1470
1471           if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len,
1472                   &header)) {
1473             GST_DEBUG_OBJECT (sink,
1474                 "[fd %5d] could not create header, removing client",
1475                 client->fd.fd);
1476             return FALSE;
1477           }
1478           gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header,
1479               len);
1480         }
1481
1482         client->sending = g_slist_append (client->sending, buffer);
1483       }
1484     }
1485   }
1486
1487   gst_caps_unref (caps);
1488   caps = NULL;
1489   /* now we can send the buffer, possibly sending a GDP header first */
1490   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
1491     guint8 *header;
1492     guint len;
1493
1494     if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) {
1495       GST_DEBUG_OBJECT (sink,
1496           "[fd %5d] could not create header, removing client", client->fd.fd);
1497       return FALSE;
1498     }
1499     gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, len);
1500   }
1501
1502   GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d",
1503       client->fd.fd, GST_BUFFER_SIZE (buffer));
1504
1505   gst_buffer_ref (buffer);
1506   client->sending = g_slist_append (client->sending, buffer);
1507
1508   return TRUE;
1509 }
1510
1511 /* find the keyframe in the list of buffers starting the
1512  * search from @idx. @direction as -1 will search backwards, 
1513  * 1 will search forwards.
1514  * Returns: the index or -1 if there is no keyframe after idx.
1515  */
1516 static gint
1517 find_syncframe (GstMultiFdSink * sink, gint idx, gint direction)
1518 {
1519   gint i, len, result;
1520
1521   /* take length of queued buffers */
1522   len = sink->bufqueue->len;
1523
1524   /* assume we don't find a keyframe */
1525   result = -1;
1526
1527   /* then loop over all buffers to find the first keyframe */
1528   for (i = idx; i >= 0 && i < len; i += direction) {
1529     GstBuffer *buf;
1530
1531     buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1532     if (is_sync_frame (sink, buf)) {
1533       GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
1534           i, idx, direction);
1535       result = i;
1536       break;
1537     }
1538   }
1539   return result;
1540 }
1541
1542 #define find_next_syncframe(s,i)        find_syncframe(s,i,1)
1543 #define find_prev_syncframe(s,i)        find_syncframe(s,i,-1)
1544
1545 /* Get the number of buffers from the buffer queue needed to satisfy
1546  * the maximum max in the configured units.
1547  * If units are not BUFFERS, and there are insufficient buffers in the
1548  * queue to satify the limit, return len(queue) + 1 */
1549 static gint
1550 get_buffers_max (GstMultiFdSink * sink, gint64 max)
1551 {
1552   switch (sink->unit_type) {
1553     case GST_TCP_UNIT_TYPE_BUFFERS:
1554       return max;
1555     case GST_TCP_UNIT_TYPE_TIME:
1556     {
1557       GstBuffer *buf;
1558       int i;
1559       int len;
1560       gint64 diff;
1561       GstClockTime first = GST_CLOCK_TIME_NONE;
1562
1563       len = sink->bufqueue->len;
1564
1565       for (i = 0; i < len; i++) {
1566         buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1567         if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
1568           if (first == -1)
1569             first = GST_BUFFER_TIMESTAMP (buf);
1570
1571           diff = first - GST_BUFFER_TIMESTAMP (buf);
1572
1573           if (diff > max)
1574             return i + 1;
1575         }
1576       }
1577       return len + 1;
1578     }
1579     case GST_TCP_UNIT_TYPE_BYTES:
1580     {
1581       GstBuffer *buf;
1582       int i;
1583       int len;
1584       gint acc = 0;
1585
1586       len = sink->bufqueue->len;
1587
1588       for (i = 0; i < len; i++) {
1589         buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1590         acc += GST_BUFFER_SIZE (buf);
1591
1592         if (acc > max)
1593           return i + 1;
1594       }
1595       return len + 1;
1596     }
1597     default:
1598       return max;
1599   }
1600 }
1601
1602 /* find the positions in the buffer queue where *_min and *_max
1603  * is satisfied
1604  */
1605 /* count the amount of data in the buffers and return the index
1606  * that satifies the given limits.
1607  *
1608  * Returns: index @idx in the buffer queue so that the given limits are
1609  * satisfied. TRUE if all the limits could be satisfied, FALSE if not
1610  * enough data was in the queue.
1611  *
1612  * FIXME, this code might now work if any of the units is in buffers...
1613  */
1614 static gboolean
1615 find_limits (GstMultiFdSink * sink,
1616     gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
1617     gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
1618 {
1619   GstClockTime first, time;
1620   gint i, len, bytes;
1621   gboolean result, max_hit;
1622
1623   /* take length of queue */
1624   len = sink->bufqueue->len;
1625
1626   /* this must hold */
1627   g_assert (len > 0);
1628
1629   GST_LOG_OBJECT (sink,
1630       "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
1631       ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
1632       buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
1633       GST_TIME_ARGS (time_max));
1634
1635   /* do the trivial buffer limit test */
1636   if (buffers_min != -1 && len < buffers_min) {
1637     *min_idx = len - 1;
1638     *max_idx = len - 1;
1639     return FALSE;
1640   }
1641
1642   result = FALSE;
1643   /* else count bytes and time */
1644   first = -1;
1645   bytes = 0;
1646   /* unset limits */
1647   *min_idx = -1;
1648   *max_idx = -1;
1649   max_hit = FALSE;
1650
1651   i = 0;
1652   /* loop through the buffers, when a limit is ok, mark it 
1653    * as -1, we have at least one buffer in the queue. */
1654   do {
1655     GstBuffer *buf;
1656
1657     /* if we checked all min limits, update result */
1658     if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
1659       /* don't go below 0 */
1660       *min_idx = MAX (i - 1, 0);
1661     }
1662     /* if we reached one max limit break out */
1663     if (max_hit) {
1664       /* i > 0 when we get here, we subtract one to get the position
1665        * of the previous buffer. */
1666       *max_idx = i - 1;
1667       /* we have valid complete result if we found a min_idx too */
1668       result = *min_idx != -1;
1669       break;
1670     }
1671     buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1672
1673     bytes += GST_BUFFER_SIZE (buf);
1674
1675     /* take timestamp and save for the base first timestamp */
1676     if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
1677       GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
1678           GST_TIME_ARGS (time));
1679       if (first == -1)
1680         first = time;
1681
1682       /* increase max usage if we did not fill enough. Note that
1683        * buffers are sorted from new to old, so the first timestamp is
1684        * bigger than the next one. */
1685       if (time_min != -1 && first - time >= time_min)
1686         time_min = -1;
1687       if (time_max != -1 && first - time >= time_max)
1688         max_hit = TRUE;
1689     } else {
1690       GST_LOG_OBJECT (sink, "No timestamp on buffer");
1691     }
1692     /* time is OK or unknown, check and increase if not enough bytes */
1693     if (bytes_min != -1) {
1694       if (bytes >= bytes_min)
1695         bytes_min = -1;
1696     }
1697     if (bytes_max != -1) {
1698       if (bytes >= bytes_max) {
1699         max_hit = TRUE;
1700       }
1701     }
1702     i++;
1703   }
1704   while (i < len);
1705
1706   /* if we did not hit the max or min limit, set to buffer size */
1707   if (*max_idx == -1)
1708     *max_idx = len - 1;
1709   /* make sure min does not exceed max */
1710   if (*min_idx == -1)
1711     *min_idx = *max_idx;
1712
1713   return result;
1714 }
1715
1716 /* parse the unit/value pair and assign it to the result value of the
1717  * right type, leave the other values untouched 
1718  *
1719  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
1720  */
1721 static gboolean
1722 assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
1723     GstClockTime * time)
1724 {
1725   gboolean res = TRUE;
1726
1727   /* set only the limit of the given format to the given value */
1728   switch (unit) {
1729     case GST_TCP_UNIT_TYPE_BUFFERS:
1730       *buffers = (gint) value;
1731       break;
1732     case GST_TCP_UNIT_TYPE_TIME:
1733       *time = value;
1734       break;
1735     case GST_TCP_UNIT_TYPE_BYTES:
1736       *bytes = (gint) value;
1737       break;
1738     case GST_TCP_UNIT_TYPE_UNDEFINED:
1739     default:
1740       res = FALSE;
1741       break;
1742   }
1743   return res;
1744 }
1745
1746 /* count the index in the buffer queue to satisfy the given unit
1747  * and value pair starting from buffer at index 0.
1748  *
1749  * Returns: TRUE if there was enough data in the queue to satisfy the
1750  * burst values. @idx contains the index in the buffer that contains enough
1751  * data to satisfy the limits or the last buffer in the queue when the
1752  * function returns FALSE.
1753  */
1754 static gboolean
1755 count_burst_unit (GstMultiFdSink * sink, gint * min_idx,
1756     GstTCPUnitType min_unit, guint64 min_value, gint * max_idx,
1757     GstTCPUnitType max_unit, guint64 max_value)
1758 {
1759   gint bytes_min = -1, buffers_min = -1;
1760   gint bytes_max = -1, buffers_max = -1;
1761   GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
1762
1763   assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min);
1764   assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max);
1765
1766   return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
1767       max_idx, bytes_max, buffers_max, time_max);
1768 }
1769
1770 /* decide where in the current buffer queue this new client should start
1771  * receiving buffers from.
1772  * This function is called whenever a client is connected and has not yet
1773  * received a buffer.
1774  * If this returns -1, it means that we haven't found a good point to
1775  * start streaming from yet, and this function should be called again later
1776  * when more buffers have arrived.
1777  */
1778 static gint
1779 gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
1780 {
1781   gint result;
1782
1783   GST_DEBUG_OBJECT (sink,
1784       "[fd %5d] new client, deciding where to start in queue", client->fd.fd);
1785   GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
1786       sink->bufqueue->len);
1787   switch (client->sync_method) {
1788     case GST_SYNC_METHOD_LATEST:
1789       /* no syncing, we are happy with whatever the client is going to get */
1790       result = client->bufpos;
1791       GST_DEBUG_OBJECT (sink,
1792           "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result);
1793       break;
1794     case GST_SYNC_METHOD_NEXT_KEYFRAME:
1795     {
1796       /* if one of the new buffers (between client->bufpos and 0) in the queue
1797        * is a sync point, we can proceed, otherwise we need to keep waiting */
1798       GST_LOG_OBJECT (sink,
1799           "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
1800           client->bufpos);
1801
1802       result = find_prev_syncframe (sink, client->bufpos);
1803       if (result != -1) {
1804         GST_DEBUG_OBJECT (sink,
1805             "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d",
1806             client->fd.fd, result);
1807         break;
1808       }
1809
1810       /* client is not on a syncbuffer, need to skip these buffers and
1811        * wait some more */
1812       GST_LOG_OBJECT (sink,
1813           "[fd %5d] new client, skipping buffer(s), no syncpoint found",
1814           client->fd.fd);
1815       client->bufpos = -1;
1816       break;
1817     }
1818     case GST_SYNC_METHOD_LATEST_KEYFRAME:
1819     {
1820       GST_DEBUG_OBJECT (sink,
1821           "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd);
1822
1823       /* for new clients we initially scan the complete buffer queue for
1824        * a sync point when a buffer is added. If we don't find a keyframe,
1825        * we need to wait for the next keyframe and so we change the client's
1826        * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
1827        */
1828       result = find_next_syncframe (sink, 0);
1829       if (result != -1) {
1830         GST_DEBUG_OBJECT (sink,
1831             "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd,
1832             result);
1833         break;
1834       }
1835
1836       GST_DEBUG_OBJECT (sink,
1837           "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
1838           "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd);
1839       /* throw client to the waiting state */
1840       client->bufpos = -1;
1841       /* and make client sync to next keyframe */
1842       client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1843       break;
1844     }
1845     case GST_SYNC_METHOD_BURST:
1846     {
1847       gboolean ok;
1848       gint max;
1849
1850       /* move to the position where we satisfy the client's burst
1851        * parameters. If we could not satisfy the parameters because there
1852        * is not enough data, we just send what we have (which is in result).
1853        * We use the max value to limit the search
1854        */
1855       ok = count_burst_unit (sink, &result, client->burst_min_unit,
1856           client->burst_min_value, &max, client->burst_max_unit,
1857           client->burst_max_value);
1858       GST_DEBUG_OBJECT (sink,
1859           "[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
1860           client->fd.fd, ok, result);
1861
1862       GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
1863
1864       /* we hit the max and it is below the min, use that then */
1865       if (max != -1 && max <= result) {
1866         result = MAX (max - 1, 0);
1867         GST_DEBUG_OBJECT (sink,
1868             "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d",
1869             client->fd.fd, result);
1870       }
1871       break;
1872     }
1873     case GST_SYNC_METHOD_BURST_KEYFRAME:
1874     {
1875       gint min_idx, max_idx;
1876       gint next_syncframe, prev_syncframe;
1877
1878       /* BURST_KEYFRAME:
1879        *
1880        * _always_ start sending a keyframe to the client. We first search
1881        * a keyframe between min/max limits. If there is none, we send it the
1882        * last keyframe before min. If there is none, the behaviour is like
1883        * NEXT_KEYFRAME.
1884        */
1885       /* gather burst limits */
1886       count_burst_unit (sink, &min_idx, client->burst_min_unit,
1887           client->burst_min_value, &max_idx, client->burst_max_unit,
1888           client->burst_max_value);
1889
1890       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1891
1892       /* first find a keyframe after min_idx */
1893       next_syncframe = find_next_syncframe (sink, min_idx);
1894       if (next_syncframe != -1 && next_syncframe < max_idx) {
1895         /* we have a valid keyframe and it's below the max */
1896         GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1897         result = next_syncframe;
1898         break;
1899       }
1900
1901       /* no valid keyframe, try to find one below min */
1902       prev_syncframe = find_prev_syncframe (sink, min_idx);
1903       if (prev_syncframe != -1) {
1904         GST_WARNING_OBJECT (sink,
1905             "using keyframe below min in BURST_KEYFRAME sync mode");
1906         result = prev_syncframe;
1907         break;
1908       }
1909
1910       /* no prev keyframe or not enough data  */
1911       GST_WARNING_OBJECT (sink,
1912           "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
1913
1914       /* throw client to the waiting state */
1915       client->bufpos = -1;
1916       /* and make client sync to next keyframe */
1917       client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1918       result = -1;
1919       break;
1920     }
1921     case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
1922     {
1923       gint min_idx, max_idx;
1924       gint next_syncframe;
1925
1926       /* BURST_WITH_KEYFRAME:
1927        *
1928        * try to start sending a keyframe to the client. We first search
1929        * a keyframe between min/max limits. If there is none, we send it the
1930        * amount of data up 'till min.
1931        */
1932       /* gather enough data to burst */
1933       count_burst_unit (sink, &min_idx, client->burst_min_unit,
1934           client->burst_min_value, &max_idx, client->burst_max_unit,
1935           client->burst_max_value);
1936
1937       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1938
1939       /* first find a keyframe after min_idx */
1940       next_syncframe = find_next_syncframe (sink, min_idx);
1941       if (next_syncframe != -1 && next_syncframe < max_idx) {
1942         /* we have a valid keyframe and it's below the max */
1943         GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1944         result = next_syncframe;
1945         break;
1946       }
1947
1948       /* no keyframe, send data from min_idx */
1949       GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
1950
1951       /* make sure we don't go over the max limit */
1952       if (max_idx != -1 && max_idx <= min_idx) {
1953         result = MAX (max_idx - 1, 0);
1954       } else {
1955         result = min_idx;
1956       }
1957
1958       break;
1959     }
1960     default:
1961       g_warning ("unknown sync method %d", client->sync_method);
1962       result = client->bufpos;
1963       break;
1964   }
1965   return result;
1966 }
1967
1968 /* Handle a write on a client,
1969  * which indicates a read request from a client.
1970  *
1971  * For each client we maintain a queue of GstBuffers that contain the raw bytes
1972  * we need to send to the client. In the case of the GDP protocol, we create
1973  * buffers out of the header bytes so that we can focus only on sending
1974  * buffers.
1975  *
1976  * We first check to see if we need to send caps (in GDP) and streamheaders.
1977  * If so, we queue them.
1978  *
1979  * Then we run into the main loop that tries to send as many buffers as
1980  * possible. It will first exhaust the client->sending queue and if the queue
1981  * is empty, it will pick a buffer from the global queue.
1982  *
1983  * Sending the buffers from the client->sending queue is basically writing
1984  * the bytes to the socket and maintaining a count of the bytes that were
1985  * sent. When the buffer is completely sent, it is removed from the
1986  * client->sending queue and we try to pick a new buffer for sending.
1987  *
1988  * When the sending returns a partial buffer we stop sending more data as
1989  * the next send operation could block.
1990  *
1991  * This functions returns FALSE if some error occured.
1992  */
1993 static gboolean
1994 gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
1995     GstTCPClient * client)
1996 {
1997   int fd = client->fd.fd;
1998   gboolean more;
1999   gboolean res;
2000   gboolean flushing;
2001   GstClockTime now;
2002   GTimeVal nowtv;
2003
2004   g_get_current_time (&nowtv);
2005   now = GST_TIMEVAL_TO_TIME (nowtv);
2006
2007   flushing = client->status == GST_CLIENT_STATUS_FLUSHING;
2008
2009   /* when using GDP, first check if we have queued caps yet */
2010   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
2011     /* don't need to do anything when the client is flushing */
2012     if (!client->caps_sent && !flushing) {
2013       GstPad *peer;
2014       GstCaps *caps;
2015
2016       peer = gst_pad_get_peer (GST_BASE_SINK_PAD (sink));
2017       if (!peer) {
2018         GST_WARNING_OBJECT (sink, "pad has no peer");
2019         return FALSE;
2020       }
2021       gst_object_unref (peer);
2022
2023       caps = gst_pad_get_negotiated_caps (GST_BASE_SINK_PAD (sink));
2024       if (!caps) {
2025         GST_WARNING_OBJECT (sink, "pad caps not yet negotiated");
2026         return FALSE;
2027       }
2028
2029       /* queue caps for sending */
2030       res = gst_multi_fd_sink_client_queue_caps (sink, client, caps);
2031
2032       gst_caps_unref (caps);
2033
2034       if (!res) {
2035         GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
2036         return FALSE;
2037       }
2038       client->caps_sent = TRUE;
2039     }
2040   }
2041
2042   more = TRUE;
2043   do {
2044     gint maxsize;
2045
2046     if (!client->sending) {
2047       /* client is not working on a buffer */
2048       if (client->bufpos == -1) {
2049         /* client is too fast, remove from write queue until new buffer is
2050          * available */
2051         gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
2052         /* if we flushed out all of the client buffers, we can stop */
2053         if (client->flushcount == 0)
2054           goto flushed;
2055
2056         return TRUE;
2057       } else {
2058         /* client can pick a buffer from the global queue */
2059         GstBuffer *buf;
2060         GstClockTime timestamp;
2061
2062         /* for new connections, we need to find a good spot in the
2063          * bufqueue to start streaming from */
2064         if (client->new_connection && !flushing) {
2065           gint position = gst_multi_fd_sink_new_client (sink, client);
2066
2067           if (position >= 0) {
2068             /* we got a valid spot in the queue */
2069             client->new_connection = FALSE;
2070             client->bufpos = position;
2071           } else {
2072             /* cannot send data to this client yet */
2073             gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
2074             return TRUE;
2075           }
2076         }
2077
2078         /* we flushed all remaining buffers, no need to get a new one */
2079         if (client->flushcount == 0)
2080           goto flushed;
2081
2082         /* grab buffer */
2083         buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
2084         client->bufpos--;
2085
2086         /* update stats */
2087         timestamp = GST_BUFFER_TIMESTAMP (buf);
2088         if (client->first_buffer_ts == GST_CLOCK_TIME_NONE)
2089           client->first_buffer_ts = timestamp;
2090         if (timestamp != -1)
2091           client->last_buffer_ts = timestamp;
2092
2093         /* decrease flushcount */
2094         if (client->flushcount != -1)
2095           client->flushcount--;
2096
2097         GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
2098             fd, client, client->bufpos);
2099
2100         /* queueing a buffer will ref it */
2101         gst_multi_fd_sink_client_queue_buffer (sink, client, buf);
2102
2103         /* need to start from the first byte for this new buffer */
2104         client->bufoffset = 0;
2105       }
2106     }
2107
2108     /* see if we need to send something */
2109     if (client->sending) {
2110       ssize_t wrote;
2111       GstBuffer *head;
2112
2113       /* pick first buffer from list */
2114       head = GST_BUFFER (client->sending->data);
2115       maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
2116
2117       /* try to write the complete buffer */
2118 #ifdef MSG_NOSIGNAL
2119 #define FLAGS MSG_NOSIGNAL
2120 #else
2121 #define FLAGS 0
2122 #endif
2123       if (client->is_socket) {
2124         wrote =
2125             send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
2126             FLAGS);
2127       } else {
2128         wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
2129       }
2130
2131       if (wrote < 0) {
2132         /* hmm error.. */
2133         if (errno == EAGAIN) {
2134           /* nothing serious, resource was unavailable, try again later */
2135           more = FALSE;
2136         } else if (errno == ECONNRESET) {
2137           goto connection_reset;
2138         } else {
2139           goto write_error;
2140         }
2141       } else {
2142         if (wrote < maxsize) {
2143           /* partial write means that the client cannot read more and we should
2144            * stop sending more */
2145           GST_LOG_OBJECT (sink,
2146               "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote);
2147           client->bufoffset += wrote;
2148           more = FALSE;
2149         } else {
2150           /* complete buffer was written, we can proceed to the next one */
2151           client->sending = g_slist_remove (client->sending, head);
2152           gst_buffer_unref (head);
2153           /* make sure we start from byte 0 for the next buffer */
2154           client->bufoffset = 0;
2155         }
2156         /* update stats */
2157         client->bytes_sent += wrote;
2158         client->last_activity_time = now;
2159         sink->bytes_served += wrote;
2160       }
2161     }
2162   } while (more);
2163
2164   return TRUE;
2165
2166   /* ERRORS */
2167 flushed:
2168   {
2169     GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd);
2170     client->status = GST_CLIENT_STATUS_REMOVED;
2171     return FALSE;
2172   }
2173 connection_reset:
2174   {
2175     GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
2176     client->status = GST_CLIENT_STATUS_CLOSED;
2177     return FALSE;
2178   }
2179 write_error:
2180   {
2181     GST_WARNING_OBJECT (sink,
2182         "[fd %5d] could not write, removing client: %s (%d)", fd,
2183         g_strerror (errno), errno);
2184     client->status = GST_CLIENT_STATUS_ERROR;
2185     return FALSE;
2186   }
2187 }
2188
2189 /* calculate the new position for a client after recovery. This function
2190  * does not update the client position but merely returns the required
2191  * position.
2192  */
2193 static gint
2194 gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
2195 {
2196   gint newbufpos;
2197
2198   GST_WARNING_OBJECT (sink,
2199       "[fd %5d] client %p is lagging at %d, recover using policy %d",
2200       client->fd.fd, client, client->bufpos, sink->recover_policy);
2201
2202   switch (sink->recover_policy) {
2203     case GST_RECOVER_POLICY_NONE:
2204       /* do nothing, client will catch up or get kicked out when it reaches
2205        * the hard max */
2206       newbufpos = client->bufpos;
2207       break;
2208     case GST_RECOVER_POLICY_RESYNC_LATEST:
2209       /* move to beginning of queue */
2210       newbufpos = -1;
2211       break;
2212     case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
2213       /* move to beginning of soft max */
2214       newbufpos = get_buffers_max (sink, sink->units_soft_max);
2215       break;
2216     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
2217       /* find keyframe in buffers, we search backwards to find the
2218        * closest keyframe relative to what this client already received. */
2219       newbufpos = MIN (sink->bufqueue->len - 1,
2220           get_buffers_max (sink, sink->units_soft_max) - 1);
2221
2222       while (newbufpos >= 0) {
2223         GstBuffer *buf;
2224
2225         buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
2226         if (is_sync_frame (sink, buf)) {
2227           /* found a buffer that is not a delta unit */
2228           break;
2229         }
2230         newbufpos--;
2231       }
2232       break;
2233     default:
2234       /* unknown recovery procedure */
2235       newbufpos = get_buffers_max (sink, sink->units_soft_max);
2236       break;
2237   }
2238   return newbufpos;
2239 }
2240
2241 /* Queue a buffer on the global queue.
2242  *
2243  * This function adds the buffer to the front of a GArray. It removes the
2244  * tail buffer if the max queue size is exceeded, unreffing the queued buffer.
2245  * Note that unreffing the buffer is not a problem as clients who
2246  * started writing out this buffer will still have a reference to it in the
2247  * client->sending queue.
2248  *
2249  * After adding the buffer, we update all client positions in the queue. If
2250  * a client moves over the soft max, we start the recovery procedure for this
2251  * slow client. If it goes over the hard max, it is put into the slow list
2252  * and removed.
2253  *
2254  * Special care is taken of clients that were waiting for a new buffer (they
2255  * had a position of -1) because they can proceed after adding this new buffer.
2256  * This is done by adding the client back into the write fd_set and signaling
2257  * the select thread that the fd_set changed.
2258  */
2259 static void
2260 gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
2261 {
2262   GList *clients, *next;
2263   gint queuelen;
2264   gboolean need_signal = FALSE;
2265   gint max_buffer_usage;
2266   gint i;
2267   GTimeVal nowtv;
2268   GstClockTime now;
2269   gint max_buffers, soft_max_buffers;
2270   guint cookie;
2271
2272   g_get_current_time (&nowtv);
2273   now = GST_TIMEVAL_TO_TIME (nowtv);
2274
2275   CLIENTS_LOCK (sink);
2276   /* add buffer to queue */
2277   g_array_prepend_val (sink->bufqueue, buf);
2278   queuelen = sink->bufqueue->len;
2279
2280   if (sink->units_max > 0)
2281     max_buffers = get_buffers_max (sink, sink->units_max);
2282   else
2283     max_buffers = -1;
2284
2285   if (sink->units_soft_max > 0)
2286     soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
2287   else
2288     soft_max_buffers = -1;
2289   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
2290       soft_max_buffers);
2291
2292   /* then loop over the clients and update the positions */
2293   max_buffer_usage = 0;
2294
2295 restart:
2296   cookie = sink->clients_cookie;
2297   for (clients = sink->clients; clients; clients = next) {
2298     GstTCPClient *client;
2299
2300     if (cookie != sink->clients_cookie) {
2301       GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
2302       goto restart;
2303     }
2304
2305     client = (GstTCPClient *) clients->data;
2306     next = g_list_next (clients);
2307
2308     client->bufpos++;
2309     GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
2310         client->fd.fd, client, client->bufpos);
2311     /* check soft max if needed, recover client */
2312     if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) {
2313       gint newpos;
2314
2315       newpos = gst_multi_fd_sink_recover_client (sink, client);
2316       if (newpos != client->bufpos) {
2317         client->dropped_buffers += client->bufpos - newpos;
2318         client->bufpos = newpos;
2319         client->discont = TRUE;
2320         GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
2321             client->fd.fd, client, client->bufpos);
2322       } else {
2323         GST_INFO_OBJECT (sink,
2324             "[fd %5d] client %p not recovering position",
2325             client->fd.fd, client);
2326       }
2327     }
2328     /* check hard max and timeout, remove client */
2329     if ((max_buffers > 0 && client->bufpos >= max_buffers) ||
2330         (sink->timeout > 0
2331             && now - client->last_activity_time > sink->timeout)) {
2332       /* remove client */
2333       GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
2334           client->fd.fd, client);
2335       /* remove the client, the fd set will be cleared and the select thread
2336        * will be signaled */
2337       client->status = GST_CLIENT_STATUS_SLOW;
2338       /* set client to invalid position while being removed */
2339       client->bufpos = -1;
2340       gst_multi_fd_sink_remove_client_link (sink, clients);
2341       need_signal = TRUE;
2342       continue;
2343     } else if (client->bufpos == 0 || client->new_connection) {
2344       /* can send data to this client now. need to signal the select thread that
2345        * the fd_set changed */
2346       gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE);
2347       need_signal = TRUE;
2348     }
2349     /* keep track of maximum buffer usage */
2350     if (client->bufpos > max_buffer_usage) {
2351       max_buffer_usage = client->bufpos;
2352     }
2353   }
2354
2355   /* make sure we respect bytes-min, buffers-min and time-min when they are set */
2356   {
2357     gint usage, max;
2358
2359     GST_LOG_OBJECT (sink,
2360         "extending queue %d to respect time_min %" GST_TIME_FORMAT
2361         ", bytes_min %d, buffers_min %d", max_buffer_usage,
2362         GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min);
2363
2364     /* get index where the limits are ok, we don't really care if all limits
2365      * are ok, we just queue as much as we need. We also don't compare against
2366      * the max limits. */
2367     find_limits (sink, &usage, sink->bytes_min, sink->buffers_min,
2368         sink->time_min, &max, -1, -1, -1);
2369
2370     max_buffer_usage = MAX (max_buffer_usage, usage + 1);
2371     GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
2372   }
2373
2374   /* now look for sync points and make sure there is at least one
2375    * sync point in the queue. We only do this if the LATEST_KEYFRAME or 
2376    * BURST_KEYFRAME mode is selected */
2377   if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME ||
2378       sink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) {
2379     /* no point in searching beyond the queue length */
2380     gint limit = queuelen;
2381     GstBuffer *buf;
2382
2383     /* no point in searching beyond the soft-max if any. */
2384     if (soft_max_buffers > 0) {
2385       limit = MIN (limit, soft_max_buffers);
2386     }
2387     GST_LOG_OBJECT (sink,
2388         "extending queue to include sync point, now at %d, limit is %d",
2389         max_buffer_usage, limit);
2390     for (i = 0; i < limit; i++) {
2391       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
2392       if (is_sync_frame (sink, buf)) {
2393         /* found a sync frame, now extend the buffer usage to
2394          * include at least this frame. */
2395         max_buffer_usage = MAX (max_buffer_usage, i);
2396         break;
2397       }
2398     }
2399     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
2400   }
2401
2402   GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
2403
2404   /* nobody is referencing units after max_buffer_usage so we can
2405    * remove them from the queue. We remove them in reverse order as
2406    * this is the most optimal for GArray. */
2407   for (i = queuelen - 1; i > max_buffer_usage; i--) {
2408     GstBuffer *old;
2409
2410     /* queue exceeded max size */
2411     queuelen--;
2412     old = g_array_index (sink->bufqueue, GstBuffer *, i);
2413     sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
2414
2415     /* unref tail buffer */
2416     gst_buffer_unref (old);
2417   }
2418   /* save for stats */
2419   sink->buffers_queued = max_buffer_usage;
2420   CLIENTS_UNLOCK (sink);
2421
2422   /* and send a signal to thread if fd_set changed */
2423   if (need_signal) {
2424     gst_poll_restart (sink->fdset);
2425   }
2426 }
2427
2428 /* Handle the clients. Basically does a blocking select for one
2429  * of the client fds to become read or writable. We also have a
2430  * filedescriptor to receive commands on that we need to check.
2431  *
2432  * After going out of the select call, we read and write to all
2433  * clients that can do so. Badly behaving clients are put on a
2434  * garbage list and removed.
2435  */
2436 static void
2437 gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
2438 {
2439   int result;
2440   GList *clients, *next;
2441   gboolean try_again;
2442   GstMultiFdSinkClass *fclass;
2443   guint cookie;
2444
2445   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
2446
2447   do {
2448     try_again = FALSE;
2449
2450     /* check for:
2451      * - server socket input (ie, new client connections)
2452      * - client socket input (ie, clients saying goodbye)
2453      * - client socket output (ie, client reads)          */
2454     GST_LOG_OBJECT (sink, "waiting on action on fdset");
2455
2456     result = gst_poll_wait (sink->fdset, sink->timeout != 0 ? sink->timeout :
2457         GST_CLOCK_TIME_NONE);
2458
2459     /* Handle the special case in which the sink is not receiving more buffers
2460      * and will not disconnect inactive client in the streaming thread. */
2461     if (G_UNLIKELY (result == 0)) {
2462       GstClockTime now;
2463       GTimeVal nowtv;
2464
2465       g_get_current_time (&nowtv);
2466       now = GST_TIMEVAL_TO_TIME (nowtv);
2467
2468       CLIENTS_LOCK (sink);
2469       for (clients = sink->clients; clients; clients = next) {
2470         GstTCPClient *client;
2471
2472         client = (GstTCPClient *) clients->data;
2473         next = g_list_next (clients);
2474         if (sink->timeout > 0
2475             && now - client->last_activity_time > sink->timeout) {
2476           client->status = GST_CLIENT_STATUS_SLOW;
2477           gst_multi_fd_sink_remove_client_link (sink, clients);
2478         }
2479       }
2480       CLIENTS_UNLOCK (sink);
2481       return;
2482     } else if (result < 0) {
2483       GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno),
2484           errno);
2485       if (errno == EBADF) {
2486         /* ok, so one or more of the fds is invalid. We loop over them to find
2487          * the ones that give an error to the F_GETFL fcntl. */
2488         CLIENTS_LOCK (sink);
2489       restart:
2490         cookie = sink->clients_cookie;
2491         for (clients = sink->clients; clients; clients = next) {
2492           GstTCPClient *client;
2493           int fd;
2494           long flags;
2495           int res;
2496
2497           if (cookie != sink->clients_cookie) {
2498             GST_DEBUG_OBJECT (sink, "Cookie changed finding bad fd");
2499             goto restart;
2500           }
2501
2502           client = (GstTCPClient *) clients->data;
2503           next = g_list_next (clients);
2504
2505           fd = client->fd.fd;
2506
2507           res = fcntl (fd, F_GETFL, &flags);
2508           if (res == -1) {
2509             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
2510                 fd, g_strerror (errno), errno);
2511             if (errno == EBADF) {
2512               client->status = GST_CLIENT_STATUS_ERROR;
2513               /* releases the CLIENTS lock */
2514               gst_multi_fd_sink_remove_client_link (sink, clients);
2515             }
2516           }
2517         }
2518         CLIENTS_UNLOCK (sink);
2519         /* after this, go back in the select loop as the read/writefds
2520          * are not valid */
2521         try_again = TRUE;
2522       } else if (errno == EINTR) {
2523         /* interrupted system call, just redo the wait */
2524         try_again = TRUE;
2525       } else if (errno == EBUSY) {
2526         /* the call to gst_poll_wait() was flushed */
2527         return;
2528       } else {
2529         /* this is quite bad... */
2530         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2531             ("select failed: %s (%d)", g_strerror (errno), errno));
2532         return;
2533       }
2534     } else {
2535       GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
2536     }
2537   } while (try_again);
2538
2539   /* subclasses can check fdset with this virtual function */
2540   if (fclass->wait)
2541     fclass->wait (sink, sink->fdset);
2542
2543   /* Check the clients */
2544   CLIENTS_LOCK (sink);
2545
2546 restart2:
2547   cookie = sink->clients_cookie;
2548   for (clients = sink->clients; clients; clients = next) {
2549     GstTCPClient *client;
2550
2551     if (sink->clients_cookie != cookie) {
2552       GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date");
2553       goto restart2;
2554     }
2555
2556     client = (GstTCPClient *) clients->data;
2557     next = g_list_next (clients);
2558
2559     if (client->status != GST_CLIENT_STATUS_FLUSHING
2560         && client->status != GST_CLIENT_STATUS_OK) {
2561       gst_multi_fd_sink_remove_client_link (sink, clients);
2562       continue;
2563     }
2564
2565     if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
2566       client->status = GST_CLIENT_STATUS_CLOSED;
2567       gst_multi_fd_sink_remove_client_link (sink, clients);
2568       continue;
2569     }
2570     if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
2571       GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
2572       client->status = GST_CLIENT_STATUS_ERROR;
2573       gst_multi_fd_sink_remove_client_link (sink, clients);
2574       continue;
2575     }
2576     if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
2577       /* handle client read */
2578       if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
2579         gst_multi_fd_sink_remove_client_link (sink, clients);
2580         continue;
2581       }
2582     }
2583     if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
2584       /* handle client write */
2585       if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
2586         gst_multi_fd_sink_remove_client_link (sink, clients);
2587         continue;
2588       }
2589     }
2590   }
2591   CLIENTS_UNLOCK (sink);
2592 }
2593
2594 /* we handle the client communication in another thread so that we do not block
2595  * the gstreamer thread while we select() on the client fds */
2596 static gpointer
2597 gst_multi_fd_sink_thread (GstMultiFdSink * sink)
2598 {
2599   while (sink->running) {
2600     gst_multi_fd_sink_handle_clients (sink);
2601   }
2602   return NULL;
2603 }
2604
2605 static GstFlowReturn
2606 gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
2607 {
2608   GstMultiFdSink *sink;
2609   gboolean in_caps;
2610   GstCaps *bufcaps, *padcaps;
2611
2612   sink = GST_MULTI_FD_SINK (bsink);
2613
2614   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN),
2615       GST_FLOW_WRONG_STATE);
2616
2617   /* since we check every buffer for streamheader caps, we need to make
2618    * sure every buffer has caps set */
2619   bufcaps = gst_buffer_get_caps (buf);
2620   padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
2621
2622   /* make sure we have caps on the pad */
2623   if (!padcaps && !bufcaps)
2624     goto no_caps;
2625
2626   /* get IN_CAPS first, code below might mess with the flags */
2627   in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
2628
2629   /* stamp the buffer with previous caps if no caps set */
2630   if (!bufcaps) {
2631     if (!gst_buffer_is_metadata_writable (buf)) {
2632       /* metadata is not writable, copy will be made and original buffer
2633        * will be unreffed so we need to ref so that we don't lose the
2634        * buffer in the render method. */
2635       gst_buffer_ref (buf);
2636       /* the new buffer is ours only, we keep it out of the scope of this
2637        * function */
2638       buf = gst_buffer_make_metadata_writable (buf);
2639     } else {
2640       /* else the metadata is writable, we ref because we keep the buffer
2641        * out of the scope of this method */
2642       gst_buffer_ref (buf);
2643     }
2644     /* buffer metadata is writable now, set the caps */
2645     gst_buffer_set_caps (buf, padcaps);
2646   } else {
2647     gst_caps_unref (bufcaps);
2648
2649     /* since we keep this buffer out of the scope of this method */
2650     gst_buffer_ref (buf);
2651   }
2652
2653   GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
2654       G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
2655       ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
2656       buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
2657       GST_BUFFER_OFFSET_END (buf),
2658       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
2659       GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
2660
2661   /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
2662    * it means we're getting new streamheader buffers, and we should clear
2663    * the old ones */
2664   if (in_caps && sink->previous_buffer_in_caps == FALSE) {
2665     GST_DEBUG_OBJECT (sink,
2666         "receiving new IN_CAPS buffers, clearing old streamheader");
2667     g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
2668     g_slist_free (sink->streamheader);
2669     sink->streamheader = NULL;
2670   }
2671
2672   /* save the current in_caps */
2673   sink->previous_buffer_in_caps = in_caps;
2674
2675   /* if the incoming buffer is marked as IN CAPS, then we assume for now
2676    * it's a streamheader that needs to be sent to each new client, so we
2677    * put it on our internal list of streamheader buffers.
2678    * FIXME: we could check if the buffer's contents are in fact part of the
2679    * current streamheader.
2680    *
2681    * We don't send the buffer to the client, since streamheaders are sent
2682    * separately when necessary. */
2683   if (in_caps) {
2684     GST_DEBUG_OBJECT (sink,
2685         "appending IN_CAPS buffer with length %d to streamheader",
2686         GST_BUFFER_SIZE (buf));
2687     sink->streamheader = g_slist_append (sink->streamheader, buf);
2688   } else {
2689     /* queue the buffer, this is a regular data buffer. */
2690     gst_multi_fd_sink_queue_buffer (sink, buf);
2691
2692     sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
2693   }
2694   return GST_FLOW_OK;
2695
2696   /* ERRORS */
2697 no_caps:
2698   {
2699     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
2700         ("Received first buffer without caps set"));
2701     return GST_FLOW_NOT_NEGOTIATED;
2702   }
2703 }
2704
2705 static void
2706 gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
2707     const GValue * value, GParamSpec * pspec)
2708 {
2709   GstMultiFdSink *multifdsink;
2710
2711   multifdsink = GST_MULTI_FD_SINK (object);
2712
2713   switch (prop_id) {
2714     case PROP_PROTOCOL:
2715       multifdsink->protocol = g_value_get_enum (value);
2716       break;
2717     case PROP_MODE:
2718       multifdsink->mode = g_value_get_enum (value);
2719       break;
2720     case PROP_BUFFERS_MAX:
2721       multifdsink->units_max = g_value_get_int (value);
2722       break;
2723     case PROP_BUFFERS_SOFT_MAX:
2724       multifdsink->units_soft_max = g_value_get_int (value);
2725       break;
2726     case PROP_TIME_MIN:
2727       multifdsink->time_min = g_value_get_int64 (value);
2728       break;
2729     case PROP_BYTES_MIN:
2730       multifdsink->bytes_min = g_value_get_int (value);
2731       break;
2732     case PROP_BUFFERS_MIN:
2733       multifdsink->buffers_min = g_value_get_int (value);
2734       break;
2735     case PROP_UNIT_TYPE:
2736       multifdsink->unit_type = g_value_get_enum (value);
2737       break;
2738     case PROP_UNITS_MAX:
2739       multifdsink->units_max = g_value_get_int64 (value);
2740       break;
2741     case PROP_UNITS_SOFT_MAX:
2742       multifdsink->units_soft_max = g_value_get_int64 (value);
2743       break;
2744     case PROP_RECOVER_POLICY:
2745       multifdsink->recover_policy = g_value_get_enum (value);
2746       break;
2747     case PROP_TIMEOUT:
2748       multifdsink->timeout = g_value_get_uint64 (value);
2749       break;
2750     case PROP_SYNC_METHOD:
2751       multifdsink->def_sync_method = g_value_get_enum (value);
2752       break;
2753     case PROP_BURST_UNIT:
2754       multifdsink->def_burst_unit = g_value_get_enum (value);
2755       break;
2756     case PROP_BURST_VALUE:
2757       multifdsink->def_burst_value = g_value_get_uint64 (value);
2758       break;
2759     case PROP_QOS_DSCP:
2760       multifdsink->qos_dscp = g_value_get_int (value);
2761       setup_dscp (multifdsink);
2762       break;
2763     case PROP_HANDLE_READ:
2764       multifdsink->handle_read = g_value_get_boolean (value);
2765       break;
2766     case PROP_RESEND_STREAMHEADER:
2767       multifdsink->resend_streamheader = g_value_get_boolean (value);
2768       break;
2769
2770     default:
2771       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2772       break;
2773   }
2774 }
2775
2776 static void
2777 gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
2778     GParamSpec * pspec)
2779 {
2780   GstMultiFdSink *multifdsink;
2781
2782   multifdsink = GST_MULTI_FD_SINK (object);
2783
2784   switch (prop_id) {
2785     case PROP_PROTOCOL:
2786       g_value_set_enum (value, multifdsink->protocol);
2787       break;
2788     case PROP_MODE:
2789       g_value_set_enum (value, multifdsink->mode);
2790       break;
2791     case PROP_BUFFERS_MAX:
2792       g_value_set_int (value, multifdsink->units_max);
2793       break;
2794     case PROP_BUFFERS_SOFT_MAX:
2795       g_value_set_int (value, multifdsink->units_soft_max);
2796       break;
2797     case PROP_TIME_MIN:
2798       g_value_set_int64 (value, multifdsink->time_min);
2799       break;
2800     case PROP_BYTES_MIN:
2801       g_value_set_int (value, multifdsink->bytes_min);
2802       break;
2803     case PROP_BUFFERS_MIN:
2804       g_value_set_int (value, multifdsink->buffers_min);
2805       break;
2806     case PROP_BUFFERS_QUEUED:
2807       g_value_set_uint (value, multifdsink->buffers_queued);
2808       break;
2809     case PROP_BYTES_QUEUED:
2810       g_value_set_uint (value, multifdsink->bytes_queued);
2811       break;
2812     case PROP_TIME_QUEUED:
2813       g_value_set_uint64 (value, multifdsink->time_queued);
2814       break;
2815     case PROP_UNIT_TYPE:
2816       g_value_set_enum (value, multifdsink->unit_type);
2817       break;
2818     case PROP_UNITS_MAX:
2819       g_value_set_int64 (value, multifdsink->units_max);
2820       break;
2821     case PROP_UNITS_SOFT_MAX:
2822       g_value_set_int64 (value, multifdsink->units_soft_max);
2823       break;
2824     case PROP_RECOVER_POLICY:
2825       g_value_set_enum (value, multifdsink->recover_policy);
2826       break;
2827     case PROP_TIMEOUT:
2828       g_value_set_uint64 (value, multifdsink->timeout);
2829       break;
2830     case PROP_SYNC_METHOD:
2831       g_value_set_enum (value, multifdsink->def_sync_method);
2832       break;
2833     case PROP_BYTES_TO_SERVE:
2834       g_value_set_uint64 (value, multifdsink->bytes_to_serve);
2835       break;
2836     case PROP_BYTES_SERVED:
2837       g_value_set_uint64 (value, multifdsink->bytes_served);
2838       break;
2839     case PROP_BURST_UNIT:
2840       g_value_set_enum (value, multifdsink->def_burst_unit);
2841       break;
2842     case PROP_BURST_VALUE:
2843       g_value_set_uint64 (value, multifdsink->def_burst_value);
2844       break;
2845     case PROP_QOS_DSCP:
2846       g_value_set_int (value, multifdsink->qos_dscp);
2847       break;
2848     case PROP_HANDLE_READ:
2849       g_value_set_boolean (value, multifdsink->handle_read);
2850       break;
2851     case PROP_RESEND_STREAMHEADER:
2852       g_value_set_boolean (value, multifdsink->resend_streamheader);
2853       break;
2854     case PROP_NUM_FDS:
2855       g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash));
2856       break;
2857
2858     default:
2859       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2860       break;
2861   }
2862 }
2863
2864
2865 /* create a socket for sending to remote machine */
2866 static gboolean
2867 gst_multi_fd_sink_start (GstBaseSink * bsink)
2868 {
2869   GstMultiFdSinkClass *fclass;
2870   GstMultiFdSink *this;
2871
2872   if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
2873     return TRUE;
2874
2875   this = GST_MULTI_FD_SINK (bsink);
2876   fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
2877
2878   GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
2879   if ((this->fdset = gst_poll_new (TRUE)) == NULL)
2880     goto socket_pair;
2881
2882   this->streamheader = NULL;
2883   this->bytes_to_serve = 0;
2884   this->bytes_served = 0;
2885
2886   if (fclass->init) {
2887     fclass->init (this);
2888   }
2889
2890   this->running = TRUE;
2891
2892 #if !GLIB_CHECK_VERSION (2, 31, 0)
2893   this->thread = g_thread_create ((GThreadFunc) gst_multi_fd_sink_thread,
2894       this, TRUE, NULL);
2895 #else
2896   this->thread = g_thread_new ("multifdsink",
2897       (GThreadFunc) gst_multi_fd_sink_thread, this);
2898 #endif
2899
2900   GST_OBJECT_FLAG_SET (this, GST_MULTI_FD_SINK_OPEN);
2901
2902   return TRUE;
2903
2904   /* ERRORS */
2905 socket_pair:
2906   {
2907     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
2908         GST_ERROR_SYSTEM);
2909     return FALSE;
2910   }
2911 }
2912
2913 static gboolean
2914 multifdsink_hash_remove (gpointer key, gpointer value, gpointer data)
2915 {
2916   return TRUE;
2917 }
2918
2919 static gboolean
2920 gst_multi_fd_sink_stop (GstBaseSink * bsink)
2921 {
2922   GstMultiFdSinkClass *fclass;
2923   GstMultiFdSink *this;
2924   GstBuffer *buf;
2925   int i;
2926
2927   this = GST_MULTI_FD_SINK (bsink);
2928   fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
2929
2930   if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
2931     return TRUE;
2932
2933   this->running = FALSE;
2934
2935   gst_poll_set_flushing (this->fdset, TRUE);
2936   if (this->thread) {
2937     GST_DEBUG_OBJECT (this, "joining thread");
2938     g_thread_join (this->thread);
2939     GST_DEBUG_OBJECT (this, "joined thread");
2940     this->thread = NULL;
2941   }
2942
2943   /* free the clients */
2944   gst_multi_fd_sink_clear (this);
2945
2946   if (this->streamheader) {
2947     g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
2948     g_slist_free (this->streamheader);
2949     this->streamheader = NULL;
2950   }
2951
2952   if (fclass->close)
2953     fclass->close (this);
2954
2955   if (this->fdset) {
2956     gst_poll_free (this->fdset);
2957     this->fdset = NULL;
2958   }
2959   g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this);
2960
2961   /* remove all queued buffers */
2962   if (this->bufqueue) {
2963     GST_DEBUG_OBJECT (this, "Emptying bufqueue with %d buffers",
2964         this->bufqueue->len);
2965     for (i = this->bufqueue->len - 1; i >= 0; --i) {
2966       buf = g_array_index (this->bufqueue, GstBuffer *, i);
2967       GST_LOG_OBJECT (this, "Removing buffer %p (%d) with refcount %d", buf, i,
2968           GST_MINI_OBJECT_REFCOUNT (buf));
2969       gst_buffer_unref (buf);
2970       this->bufqueue = g_array_remove_index (this->bufqueue, i);
2971     }
2972     /* freeing the array is done in _finalize */
2973   }
2974   GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN);
2975
2976   return TRUE;
2977 }
2978
2979 static GstStateChangeReturn
2980 gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition)
2981 {
2982   GstMultiFdSink *sink;
2983   GstStateChangeReturn ret;
2984
2985   sink = GST_MULTI_FD_SINK (element);
2986
2987   /* we disallow changing the state from the streaming thread */
2988   if (g_thread_self () == sink->thread)
2989     return GST_STATE_CHANGE_FAILURE;
2990
2991
2992   switch (transition) {
2993     case GST_STATE_CHANGE_NULL_TO_READY:
2994       if (!gst_multi_fd_sink_start (GST_BASE_SINK (sink)))
2995         goto start_failed;
2996       break;
2997     case GST_STATE_CHANGE_READY_TO_PAUSED:
2998       break;
2999     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3000       break;
3001     default:
3002       break;
3003   }
3004
3005   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3006
3007   switch (transition) {
3008     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3009       break;
3010     case GST_STATE_CHANGE_PAUSED_TO_READY:
3011       break;
3012     case GST_STATE_CHANGE_READY_TO_NULL:
3013       gst_multi_fd_sink_stop (GST_BASE_SINK (sink));
3014       break;
3015     default:
3016       break;
3017   }
3018   return ret;
3019
3020   /* ERRORS */
3021 start_failed:
3022   {
3023     /* error message was posted */
3024     return GST_STATE_CHANGE_FAILURE;
3025   }
3026 }