503abe284475ceb48667e114270b0f6d29f25e76
[platform/upstream/gstreamer.git] / gst / tcp / gstmultisocketsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  * Copyright (C) <2011> Collabora Ltd.
6  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 /**
25  * SECTION:element-multisocketsink
26  * @see_also: tcpserversink
27  *
28  * This plugin writes incoming data to a set of file descriptors. The
29  * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal. 
30  * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
31  *
32  * As of version 0.10.8, a client can also be added with the #GstMultiSocketSink::add-full signal
33  * that allows for more control over what and how much data a client 
34  * initially receives.
35  *
36  * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
37  * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
38  * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
39  * client is not active anymore or, depending on the value of the
40  * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
41  * In all cases, multisocketsink will never close a file descriptor itself.
42  * The user of multisocketsink is responsible for closing all file descriptors.
43  * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal.
44  * Note that multisocketsink still has a reference to the file descriptor when the
45  * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
46  * the descriptor; it is therefore not safe to close the file descriptor in
47  * the #GstMultiSocketSink::client-removed signal handler, and you should use the 
48  * #GstMultiSocketSink::client-fd-removed signal to safely close the fd.
49  *
50  * Multisocketsink internally keeps a queue of the incoming buffers and uses a
51  * separate thread to send the buffers to the clients. This ensures that no
52  * client write can block the pipeline and that clients can read with different
53  * speeds.
54  *
55  * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
56  * which buffer in the queued buffers will be sent first to the client. Clients 
57  * can be sent the most recent buffer (which might not be decodable by the 
58  * client if it is not a keyframe), the next keyframe received in 
59  * multisocketsink (which can take some time depending on the keyframe rate), or the
60  * last received keyframe (which will cause a simple burst-on-connect). 
61  * Multisocketsink will always keep at least one keyframe in its internal buffers
62  * when the sync-mode is set to latest-keyframe.
63  *
64  * As of version 0.10.8, there are additional values for the #GstMultiSocketSink:sync-method 
65  * property to allow finer control over burst-on-connect behaviour. By selecting
66  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
67  * additionally requires that the burst begin with a keyframe, and 
68  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
69  * prefer a minimum burst size even if it requires not starting with a keyframe.
70  *
71  * Multisocketsink can be instructed to keep at least a minimum amount of data
72  * expressed in time or byte units in its internal queues with the 
73  * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
74  * These properties are useful if the application adds clients with the 
75  * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
76  * actually be honored. 
77  *
78  * When streaming data, clients are allowed to read at a different rate than
79  * the rate at which multisocketsink receives data. If the client is reading too
80  * fast, no data will be send to the client until multisocketsink receives more
81  * data. If the client, however, reads too slowly, data for that client will be 
82  * queued up in multisocketsink. Two properties control the amount of data 
83  * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and 
84  * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
85  * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
86  *
87  * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
88  * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
89  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
90  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
91  * positions the client to the soft limit in the buffer queue and
92  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
93  * buffer queue.
94  *
95  * multisocketsink will by default synchronize on the clock before serving the 
96  * buffers to the clients. This behaviour can be disabled by setting the sync 
97  * property to FALSE. Multisocketsink will by default not do QoS and will never
98  * drop late buffers.
99  *
100  * Last reviewed on 2006-09-12 (0.10.10)
101  */
102
103 #ifdef HAVE_CONFIG_H
104 #include "config.h"
105 #endif
106
107 #include <gst/gst-i18n-plugin.h>
108
109 #include "gstmultisocketsink.h"
110 #include "gsttcp-marshal.h"
111
112 #ifndef G_OS_WIN32
113 #include <netinet/in.h>
114 #endif
115
116 #define NOT_IMPLEMENTED 0
117
118 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
119     GST_PAD_SINK,
120     GST_PAD_ALWAYS,
121     GST_STATIC_CAPS_ANY);
122
123 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
124 #define GST_CAT_DEFAULT (multisocketsink_debug)
125
126 /* MultiSocketSink signals and args */
127 enum
128 {
129   /* methods */
130   SIGNAL_ADD,
131   SIGNAL_ADD_BURST,
132   SIGNAL_REMOVE,
133   SIGNAL_REMOVE_FLUSH,
134   SIGNAL_GET_STATS,
135
136   /* signals */
137   SIGNAL_CLIENT_ADDED,
138   SIGNAL_CLIENT_REMOVED,
139   SIGNAL_CLIENT_SOCKET_REMOVED,
140
141   LAST_SIGNAL
142 };
143
144
145 /* this is really arbitrarily chosen */
146 #define DEFAULT_MODE                    1
147 #define DEFAULT_BUFFERS_MAX             -1
148 #define DEFAULT_BUFFERS_SOFT_MAX        -1
149 #define DEFAULT_UNIT_TYPE               GST_FORMAT_BUFFERS
150 #define DEFAULT_UNITS_MAX               -1
151 #define DEFAULT_UNITS_SOFT_MAX          -1
152
153 #define DEFAULT_BURST_FORMAT            GST_FORMAT_UNDEFINED
154 #define DEFAULT_BURST_VALUE             0
155
156 #define DEFAULT_QOS_DSCP                -1
157
158 enum
159 {
160   PROP_0,
161   PROP_MODE,
162
163   PROP_UNIT_TYPE,
164   PROP_UNITS_MAX,
165   PROP_UNITS_SOFT_MAX,
166
167   PROP_BUFFERS_MAX,
168   PROP_BUFFERS_SOFT_MAX,
169
170   PROP_BURST_FORMAT,
171   PROP_BURST_VALUE,
172
173   PROP_QOS_DSCP,
174
175   PROP_NUM_SOCKETS,
176
177   PROP_LAST
178 };
179
180 static void gst_multi_socket_sink_finalize (GObject * object);
181
182 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
183 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
184 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
185 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
186
187 static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
188     GList * link);
189 static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
190     GIOCondition condition, GstMultiSocketSink * sink);
191
192 static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink,
193     GstBuffer * buf);
194 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
195 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
196
197 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
198     const GValue * value, GParamSpec * pspec);
199 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
200     GValue * value, GParamSpec * pspec);
201
202 #define gst_multi_socket_sink_parent_class parent_class
203 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
204     GST_TYPE_MULTI_HANDLE_SINK);
205
206 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
207
208 static void
209 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
210 {
211   GObjectClass *gobject_class;
212   GstElementClass *gstelement_class;
213   GstBaseSinkClass *gstbasesink_class;
214   GstMultiHandleSinkClass *gstmultihandlesink_class;
215
216   gobject_class = (GObjectClass *) klass;
217   gstelement_class = (GstElementClass *) klass;
218   gstbasesink_class = (GstBaseSinkClass *) klass;
219   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
220
221   gobject_class->set_property = gst_multi_socket_sink_set_property;
222   gobject_class->get_property = gst_multi_socket_sink_get_property;
223   gobject_class->finalize = gst_multi_socket_sink_finalize;
224
225   g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
226       g_param_spec_int ("buffers-max", "Buffers max",
227           "max number of buffers to queue for a client (-1 = no limit)", -1,
228           G_MAXINT, DEFAULT_BUFFERS_MAX,
229           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230   g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
231       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
232           "Recover client when going over this limit (-1 = no limit)", -1,
233           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
234           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
235
236   g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
237       g_param_spec_enum ("unit-type", "Units type",
238           "The unit to measure the max/soft-max/queued properties",
239           GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
240           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241   g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
242       g_param_spec_int64 ("units-max", "Units max",
243           "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
244           DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
245   g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
246       g_param_spec_int64 ("units-soft-max", "Units soft max",
247           "Recover client when going over this limit (-1 = no limit)", -1,
248           G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
249           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250
251   g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
252       g_param_spec_enum ("burst-format", "Burst format",
253           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
254           GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
255           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
257       g_param_spec_uint64 ("burst-value", "Burst value",
258           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
259           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
260
261   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
262       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
263           "Quality of Service, differentiated services code point (-1 default)",
264           -1, 63, DEFAULT_QOS_DSCP,
265           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266
267   g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
268       g_param_spec_uint ("num-sockets", "Number of sockets",
269           "The current number of client sockets",
270           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
271
272   /**
273    * GstMultiSocketSink::add:
274    * @gstmultisocketsink: the multisocketsink element to emit this signal on
275    * @socket:             the socket to add to multisocketsink
276    *
277    * Hand the given open socket to multisocketsink to write to.
278    */
279   gst_multi_socket_sink_signals[SIGNAL_ADD] =
280       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
281       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
282       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
283       g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
284   /**
285    * GstMultiSocketSink::add-full:
286    * @gstmultisocketsink: the multisocketsink element to emit this signal on
287    * @socket:         the socket to add to multisocketsink
288    * @sync:           the sync method to use
289    * @format_min:     the format of @value_min
290    * @value_min:      the minimum amount of data to burst expressed in
291    *                  @format_min units.
292    * @format_max:     the format of @value_max
293    * @value_max:      the maximum amount of data to burst expressed in
294    *                  @format_max units.
295    *
296    * Hand the given open socket to multisocketsink to write to and
297    * specify the burst parameters for the new connection.
298    */
299   gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
300       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
301       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
302       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
303       gst_tcp_marshal_VOID__OBJECT_ENUM_ENUM_UINT64_ENUM_UINT64, G_TYPE_NONE, 6,
304       G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64,
305       GST_TYPE_FORMAT, G_TYPE_UINT64);
306   /**
307    * GstMultiSocketSink::remove:
308    * @gstmultisocketsink: the multisocketsink element to emit this signal on
309    * @socket:             the socket to remove from multisocketsink
310    *
311    * Remove the given open socket from multisocketsink.
312    */
313   gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
314       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
315       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
316       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL,
317       g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
318   /**
319    * GstMultiSocketSink::remove-flush:
320    * @gstmultisocketsink: the multisocketsink element to emit this signal on
321    * @socket:             the socket to remove from multisocketsink
322    *
323    * Remove the given open socket from multisocketsink after flushing all
324    * the pending data to the socket.
325    */
326   gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
327       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
328       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
329       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL,
330       g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
331
332   /**
333    * GstMultiSocketSink::get-stats:
334    * @gstmultisocketsink: the multisocketsink element to emit this signal on
335    * @socket:             the socket to get stats of from multisocketsink
336    *
337    * Get statistics about @socket. This function returns a GstStructure.
338    *
339    * Returns: a GstStructure with the statistics. The structure contains
340    *     values that represent: total number of bytes sent, time
341    *     when the client was added, time when the client was
342    *     disconnected/removed, time the client is/was active, last activity
343    *     time (in epoch seconds), number of buffers dropped.
344    *     All times are expressed in nanoseconds (GstClockTime).
345    */
346   gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
347       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
348       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
349       G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL,
350       gst_tcp_marshal_BOXED__OBJECT, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
351
352   /**
353    * GstMultiSocketSink::client-added:
354    * @gstmultisocketsink: the multisocketsink element that emitted this signal
355    * @socket:             the socket that was added to multisocketsink
356    *
357    * The given socket was added to multisocketsink. This signal will
358    * be emitted from the streaming thread so application should be prepared
359    * for that.
360    */
361   gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
362       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
363       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass,
364           client_added), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
365       G_TYPE_NONE, 1, G_TYPE_OBJECT);
366   /**
367    * GstMultiSocketSink::client-removed:
368    * @gstmultisocketsink: the multisocketsink element that emitted this signal
369    * @socket:             the socket that is to be removed from multisocketsink
370    * @status:             the reason why the client was removed
371    *
372    * The given socket is about to be removed from multisocketsink. This
373    * signal will be emitted from the streaming thread so applications should
374    * be prepared for that.
375    *
376    * @gstmultisocketsink still holds a handle to @socket so it is possible to call
377    * the get-stats signal from this callback. For the same reason it is
378    * not safe to close() and reuse @socket in this callback.
379    */
380   gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
381       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
382       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass,
383           client_removed), NULL, NULL, gst_tcp_marshal_VOID__OBJECT_ENUM,
384       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
385   /**
386    * GstMultiSocketSink::client-socket-removed:
387    * @gstmultisocketsink: the multisocketsink element that emitted this signal
388    * @socket:             the socket that was removed from multisocketsink
389    *
390    * The given socket was removed from multisocketsink. This signal will
391    * be emitted from the streaming thread so applications should be prepared
392    * for that.
393    *
394    * In this callback, @gstmultisocketsink has removed all the information
395    * associated with @socket and it is therefore not possible to call get-stats
396    * with @socket. It is however safe to close() and reuse @fd in the callback.
397    *
398    * Since: 0.10.7
399    */
400   gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
401       g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
402       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass,
403           client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
404       G_TYPE_NONE, 1, G_TYPE_SOCKET);
405
406   gst_element_class_add_pad_template (gstelement_class,
407       gst_static_pad_template_get (&sinktemplate));
408
409   gst_element_class_set_details_simple (gstelement_class,
410       "Multi socket sink", "Sink/Network",
411       "Send data to multiple sockets",
412       "Thomas Vander Stichele <thomas at apestaart dot org>, "
413       "Wim Taymans <wim@fluendo.com>, "
414       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
415
416   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render);
417
418   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
419   gstbasesink_class->unlock_stop =
420       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
421
422   gstmultihandlesink_class->stop_pre =
423       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
424   gstmultihandlesink_class->stop_post =
425       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
426   gstmultihandlesink_class->start_pre =
427       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
428   gstmultihandlesink_class->thread =
429       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
430
431   gstmultihandlesink_class->remove_client_link =
432       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
433
434   klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
435   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
436   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
437   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
438   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
439
440   GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
441       "Multi socket sink");
442 }
443
444 static void
445 gst_multi_socket_sink_init (GstMultiSocketSink * this)
446 {
447   this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
448
449   this->unit_type = DEFAULT_UNIT_TYPE;
450   this->units_max = DEFAULT_UNITS_MAX;
451   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
452
453   this->def_burst_format = DEFAULT_BURST_FORMAT;
454   this->def_burst_value = DEFAULT_BURST_VALUE;
455
456   this->qos_dscp = DEFAULT_QOS_DSCP;
457
458   this->header_flags = 0;
459   this->cancellable = g_cancellable_new ();
460 }
461
462 static void
463 gst_multi_socket_sink_finalize (GObject * object)
464 {
465   GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
466
467   g_hash_table_destroy (this->socket_hash);
468   if (this->cancellable) {
469     g_object_unref (this->cancellable);
470     this->cancellable = NULL;
471   }
472
473   G_OBJECT_CLASS (parent_class)->finalize (object);
474 }
475
476 static gint
477 setup_dscp_client (GstMultiSocketSink * sink, GstSocketClient * client)
478 {
479 #ifndef IP_TOS
480   return 0;
481 #else
482   gint tos;
483   gint ret;
484   int fd;
485   union gst_sockaddr
486   {
487     struct sockaddr sa;
488     struct sockaddr_in6 sa_in6;
489     struct sockaddr_storage sa_stor;
490   } sa;
491   socklen_t slen = sizeof (sa);
492   gint af;
493
494   /* don't touch */
495   if (sink->qos_dscp < 0)
496     return 0;
497
498   fd = g_socket_get_fd (client->socket);
499
500   if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
501     GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
502     return ret;
503   }
504
505   af = sa.sa.sa_family;
506
507   /* if this is an IPv4-mapped address then do IPv4 QoS */
508   if (af == AF_INET6) {
509
510     GST_DEBUG_OBJECT (sink, "check IP6 socket");
511     if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
512       GST_DEBUG_OBJECT (sink, "mapped to IPV4");
513       af = AF_INET;
514     }
515   }
516
517   /* extract and shift 6 bits of the DSCP */
518   tos = (sink->qos_dscp & 0x3f) << 2;
519
520   switch (af) {
521     case AF_INET:
522       ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
523       break;
524     case AF_INET6:
525 #ifdef IPV6_TCLASS
526       ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
527       break;
528 #endif
529     default:
530       ret = 0;
531       GST_ERROR_OBJECT (sink, "unsupported AF");
532       break;
533   }
534   if (ret)
535     GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
536
537   return ret;
538 #endif
539 }
540
541 static void
542 setup_dscp (GstMultiSocketSink * sink)
543 {
544   GList *clients;
545   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
546
547   CLIENTS_LOCK (sink);
548   for (clients = mhsink->clients; clients; clients = clients->next) {
549     GstSocketClient *client;
550
551     client = clients->data;
552
553     setup_dscp_client (sink, client);
554   }
555   CLIENTS_UNLOCK (sink);
556 }
557
558 /* "add-full" signal implementation */
559 void
560 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
561     GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
562     GstFormat max_format, guint64 max_value)
563 {
564   GstSocketClient *client;
565   GstMultiHandleClient *mhclient;
566   GList *clink;
567   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
568
569   GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, "
570       "min_format %d, min_value %" G_GUINT64_FORMAT
571       ", max_format %d, max_value %" G_GUINT64_FORMAT, socket,
572       sync_method, min_format, min_value, max_format, max_value);
573
574   /* do limits check if we can */
575   if (min_format == max_format) {
576     if (max_value != -1 && min_value != -1 && max_value < min_value)
577       goto wrong_limits;
578   }
579
580   /* create client datastructure */
581   client = g_new0 (GstSocketClient, 1);
582   mhclient = (GstMultiHandleClient *) client;
583   gst_multi_handle_sink_client_init (mhclient, sync_method);
584   g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
585   client->socket = G_SOCKET (g_object_ref (socket));
586
587   client->burst_min_format = min_format;
588   client->burst_min_value = min_value;
589   client->burst_max_format = max_format;
590   client->burst_max_value = max_value;
591
592   CLIENTS_LOCK (sink);
593
594   /* check the hash to find a duplicate fd */
595   clink = g_hash_table_lookup (sink->socket_hash, socket);
596   if (clink != NULL)
597     goto duplicate;
598
599   /* we can add the fd now */
600   clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
601   g_hash_table_insert (sink->socket_hash, socket, clink);
602   mhsink->clients_cookie++;
603
604   /* set the socket to non blocking */
605   g_socket_set_blocking (socket, FALSE);
606
607   /* we always read from a client */
608   if (sink->main_context) {
609     client->source =
610         g_socket_create_source (client->socket,
611         G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
612     g_source_set_callback (client->source,
613         (GSourceFunc) gst_multi_socket_sink_socket_condition,
614         gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
615     g_source_attach (client->source, sink->main_context);
616   }
617
618   setup_dscp_client (sink, client);
619
620   CLIENTS_UNLOCK (sink);
621
622   g_signal_emit (G_OBJECT (sink),
623       gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, socket);
624
625   return;
626
627   /* errors */
628 wrong_limits:
629   {
630     GST_WARNING_OBJECT (sink,
631         "[socket %p] wrong values min =%" G_GUINT64_FORMAT ", max=%"
632         G_GUINT64_FORMAT ", format %d specified when adding client", socket,
633         min_value, max_value, min_format);
634     return;
635   }
636 duplicate:
637   {
638     mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
639     CLIENTS_UNLOCK (sink);
640     GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing",
641         socket);
642     g_signal_emit (G_OBJECT (sink),
643         gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
644         mhclient->status);
645     g_free (client);
646     return;
647   }
648 }
649
650 /* "add" signal implementation */
651 void
652 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
653 {
654   GstMultiHandleSink *mhsink;
655
656   mhsink = GST_MULTI_HANDLE_SINK (sink);
657   gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method,
658       sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
659       -1);
660 }
661
662 /* "remove" signal implementation */
663 void
664 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
665 {
666   GList *clink;
667   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
668   GstMultiHandleSinkClass *mhsinkclass =
669       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
670
671   GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket);
672
673   CLIENTS_LOCK (sink);
674   clink = g_hash_table_lookup (sink->socket_hash, socket);
675   if (clink != NULL) {
676     GstSocketClient *client = clink->data;
677     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
678
679     if (mhclient->status != GST_CLIENT_STATUS_OK) {
680       GST_INFO_OBJECT (sink,
681           "[socket %p] Client already disconnecting with status %d",
682           socket, mhclient->status);
683       goto done;
684     }
685
686     mhclient->status = GST_CLIENT_STATUS_REMOVED;
687     mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
688   } else {
689     GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!",
690         socket);
691   }
692
693 done:
694   CLIENTS_UNLOCK (sink);
695 }
696
697 /* "remove-flush" signal implementation */
698 void
699 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
700 {
701   GList *clink;
702
703   GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", socket);
704
705   CLIENTS_LOCK (sink);
706   clink = g_hash_table_lookup (sink->socket_hash, socket);
707   if (clink != NULL) {
708     GstSocketClient *client = clink->data;
709     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
710
711     if (mhclient->status != GST_CLIENT_STATUS_OK) {
712       GST_INFO_OBJECT (sink,
713           "[socket %p] Client already disconnecting with status %d",
714           socket, mhclient->status);
715       goto done;
716     }
717
718     /* take the position of the client as the number of buffers left to flush.
719      * If the client was at position -1, we flush 0 buffers, 0 == flush 1
720      * buffer, etc... */
721     mhclient->flushcount = mhclient->bufpos + 1;
722     /* mark client as flushing. We can not remove the client right away because
723      * it might have some buffers to flush in the ->sending queue. */
724     mhclient->status = GST_CLIENT_STATUS_FLUSHING;
725   } else {
726     GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!",
727         socket);
728   }
729 done:
730   CLIENTS_UNLOCK (sink);
731 }
732
733 /* "get-stats" signal implementation
734  */
735 GstStructure *
736 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
737 {
738   GstSocketClient *client;
739   GstStructure *result = NULL;
740   GList *clink;
741
742   CLIENTS_LOCK (sink);
743   clink = g_hash_table_lookup (sink->socket_hash, socket);
744   if (clink == NULL)
745     goto noclient;
746
747   client = clink->data;
748   if (client != NULL) {
749     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
750     guint64 interval;
751
752     result = gst_structure_new_empty ("multisocketsink-stats");
753
754     if (mhclient->disconnect_time == 0) {
755       GTimeVal nowtv;
756
757       g_get_current_time (&nowtv);
758
759       interval = GST_TIMEVAL_TO_TIME (nowtv) - mhclient->connect_time;
760     } else {
761       interval = mhclient->disconnect_time - mhclient->connect_time;
762     }
763
764     gst_structure_set (result,
765         "bytes-sent", G_TYPE_UINT64, mhclient->bytes_sent,
766         "connect-time", G_TYPE_UINT64, mhclient->connect_time,
767         "disconnect-time", G_TYPE_UINT64, mhclient->disconnect_time,
768         "connected-duration", G_TYPE_UINT64, interval,
769         "last-activatity-time", G_TYPE_UINT64, mhclient->last_activity_time,
770         "dropped-buffers", G_TYPE_UINT64, mhclient->dropped_buffers,
771         "first-buffer-ts", G_TYPE_UINT64, mhclient->first_buffer_ts,
772         "last-buffer-ts", G_TYPE_UINT64, mhclient->last_buffer_ts, NULL);
773   }
774
775 noclient:
776   CLIENTS_UNLOCK (sink);
777
778   /* python doesn't like a NULL pointer yet */
779   if (result == NULL) {
780     GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket);
781     result = gst_structure_new_empty ("multisocketsink-stats");
782   }
783
784   return result;
785 }
786
787 /* should be called with the clientslock helt.
788  * Note that we don't close the fd as we didn't open it in the first
789  * place. An application should connect to the client-fd-removed signal and
790  * close the fd itself.
791  */
792 static void
793 gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
794     GList * link)
795 {
796   GSocket *socket;
797   GTimeVal now;
798   GstSocketClient *client = link->data;
799   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
800   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (sink);
801   GstMultiSocketSinkClass *fclass;
802
803   fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink);
804
805   socket = client->socket;
806
807   if (mhclient->currently_removing) {
808     GST_WARNING_OBJECT (sink, "%s client is already being removed",
809         mhclient->debug);
810     return;
811   } else {
812     mhclient->currently_removing = TRUE;
813   }
814
815   /* FIXME: if we keep track of ip we can log it here and signal */
816   switch (mhclient->status) {
817     case GST_CLIENT_STATUS_OK:
818       GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
819           mhclient->debug, client);
820       break;
821     case GST_CLIENT_STATUS_CLOSED:
822       GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
823           mhclient->debug, client);
824       break;
825     case GST_CLIENT_STATUS_REMOVED:
826       GST_DEBUG_OBJECT (sink,
827           "%s removing client %p because the app removed it", mhclient->debug,
828           client);
829       break;
830     case GST_CLIENT_STATUS_SLOW:
831       GST_INFO_OBJECT (sink,
832           "%s removing client %p because it was too slow", mhclient->debug,
833           client);
834       break;
835     case GST_CLIENT_STATUS_ERROR:
836       GST_WARNING_OBJECT (sink,
837           "%s removing client %p because of error", mhclient->debug, client);
838       break;
839     case GST_CLIENT_STATUS_FLUSHING:
840     default:
841       GST_WARNING_OBJECT (sink,
842           "%s removing client %p with invalid reason %d", mhclient->debug,
843           client, mhclient->status);
844       break;
845   }
846
847   if (client->source) {
848     g_source_destroy (client->source);
849     g_source_unref (client->source);
850     client->source = NULL;
851   }
852
853   g_get_current_time (&now);
854   mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now);
855
856   /* free client buffers */
857   g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL);
858   g_slist_free (mhclient->sending);
859   mhclient->sending = NULL;
860
861   if (mhclient->caps)
862     gst_caps_unref (mhclient->caps);
863   mhclient->caps = NULL;
864
865   /* unlock the mutex before signaling because the signal handler
866    * might query some properties */
867   CLIENTS_UNLOCK (sink);
868
869   g_signal_emit (G_OBJECT (sink),
870       gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
871       mhclient->status);
872
873   /* lock again before we remove the client completely */
874   CLIENTS_LOCK (sink);
875
876   /* fd cannot be reused in the above signal callback so we can safely
877    * remove it from the hashtable here */
878   if (!g_hash_table_remove (mssink->socket_hash, socket)) {
879     GST_WARNING_OBJECT (sink,
880         "[socket %p] error removing client %p from hash", socket, client);
881   }
882   /* after releasing the lock above, the link could be invalid, more
883    * precisely, the next and prev pointers could point to invalid list
884    * links. One optimisation could be to add a cookie to the linked list
885    * and take a shortcut when it did not change between unlocking and locking
886    * our mutex. For now we just walk the list again. */
887   sink->clients = g_list_remove (sink->clients, client);
888   sink->clients_cookie++;
889
890   if (fclass->removed)
891     fclass->removed (mssink, socket);
892
893   g_free (client);
894   CLIENTS_UNLOCK (sink);
895
896   /* and the fd is really gone now */
897   g_signal_emit (G_OBJECT (sink),
898       gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, socket);
899   g_object_unref (socket);
900
901   CLIENTS_LOCK (sink);
902 }
903
904 /* handle a read on a client socket,
905  * which either indicates a close or should be ignored
906  * returns FALSE if some error occured or the client closed. */
907 static gboolean
908 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
909     GstSocketClient * client)
910 {
911   gboolean ret;
912   gchar dummy[256];
913   gssize nread;
914   GError *err = NULL;
915   gboolean first = TRUE;
916   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
917
918   GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read",
919       client->socket);
920
921   ret = TRUE;
922
923   /* just Read 'n' Drop, could also just drop the client as it's not supposed
924    * to write to us except for closing the socket, I guess it's because we
925    * like to listen to our customers. */
926   do {
927     gssize navail;
928
929     GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
930         client->socket);
931
932     navail = g_socket_get_available_bytes (client->socket);
933     if (navail < 0)
934       break;
935
936     nread =
937         g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
938         sink->cancellable, &err);
939     if (first && nread == 0) {
940       /* client sent close, so remove it */
941       GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing",
942           client->socket);
943       mhclient->status = GST_CLIENT_STATUS_CLOSED;
944       ret = FALSE;
945     } else if (nread < 0) {
946       GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s",
947           client->socket, err->message);
948       mhclient->status = GST_CLIENT_STATUS_ERROR;
949       ret = FALSE;
950       break;
951     }
952     first = FALSE;
953   } while (nread > 0);
954   g_clear_error (&err);
955
956   return ret;
957 }
958
959 /* queue the given buffer for the given client */
960 static gboolean
961 gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
962     GstSocketClient * client, GstBuffer * buffer)
963 {
964   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
965   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
966   GstCaps *caps;
967
968   /* TRUE: send them if the new caps have them */
969   gboolean send_streamheader = FALSE;
970   GstStructure *s;
971
972   /* before we queue the buffer, we check if we need to queue streamheader
973    * buffers (because it's a new client, or because they changed) */
974   caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink));
975
976   if (!mhclient->caps) {
977     GST_DEBUG_OBJECT (sink,
978         "[socket %p] no previous caps for this client, send streamheader",
979         client->socket);
980     send_streamheader = TRUE;
981     mhclient->caps = gst_caps_ref (caps);
982   } else {
983     /* there were previous caps recorded, so compare */
984     if (!gst_caps_is_equal (caps, mhclient->caps)) {
985       const GValue *sh1, *sh2;
986
987       /* caps are not equal, but could still have the same streamheader */
988       s = gst_caps_get_structure (caps, 0);
989       if (!gst_structure_has_field (s, "streamheader")) {
990         /* no new streamheader, so nothing new to send */
991         GST_DEBUG_OBJECT (sink,
992             "[socket %p] new caps do not have streamheader, not sending",
993             client->socket);
994       } else {
995         /* there is a new streamheader */
996         s = gst_caps_get_structure (mhclient->caps, 0);
997         if (!gst_structure_has_field (s, "streamheader")) {
998           /* no previous streamheader, so send the new one */
999           GST_DEBUG_OBJECT (sink,
1000               "[socket %p] previous caps did not have streamheader, sending",
1001               client->socket);
1002           send_streamheader = TRUE;
1003         } else {
1004           /* both old and new caps have streamheader set */
1005           if (!mhsink->resend_streamheader) {
1006             GST_DEBUG_OBJECT (sink,
1007                 "[socket %p] asked to not resend the streamheader, not sending",
1008                 client->socket);
1009             send_streamheader = FALSE;
1010           } else {
1011             sh1 = gst_structure_get_value (s, "streamheader");
1012             s = gst_caps_get_structure (caps, 0);
1013             sh2 = gst_structure_get_value (s, "streamheader");
1014             if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
1015               GST_DEBUG_OBJECT (sink,
1016                   "[socket %p] new streamheader different from old, sending",
1017                   client->socket);
1018               send_streamheader = TRUE;
1019             }
1020           }
1021         }
1022       }
1023     }
1024     /* Replace the old caps */
1025     gst_caps_unref (mhclient->caps);
1026     mhclient->caps = gst_caps_ref (caps);
1027   }
1028
1029   if (G_UNLIKELY (send_streamheader)) {
1030     const GValue *sh;
1031     GArray *buffers;
1032     int i;
1033
1034     GST_LOG_OBJECT (sink,
1035         "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
1036         client->socket, caps);
1037     s = gst_caps_get_structure (caps, 0);
1038     if (!gst_structure_has_field (s, "streamheader")) {
1039       GST_DEBUG_OBJECT (sink,
1040           "[socket %p] no new streamheader, so nothing to send",
1041           client->socket);
1042     } else {
1043       GST_LOG_OBJECT (sink,
1044           "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
1045           client->socket, caps);
1046       sh = gst_structure_get_value (s, "streamheader");
1047       g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
1048       buffers = g_value_peek_pointer (sh);
1049       GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len);
1050       for (i = 0; i < buffers->len; ++i) {
1051         GValue *bufval;
1052         GstBuffer *buffer;
1053
1054         bufval = &g_array_index (buffers, GValue, i);
1055         g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
1056         buffer = g_value_peek_pointer (bufval);
1057         GST_DEBUG_OBJECT (sink,
1058             "[socket %p] queueing streamheader buffer of length %"
1059             G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer));
1060         gst_buffer_ref (buffer);
1061
1062         mhclient->sending = g_slist_append (mhclient->sending, buffer);
1063       }
1064     }
1065   }
1066
1067   gst_caps_unref (caps);
1068   caps = NULL;
1069
1070   GST_LOG_OBJECT (sink,
1071       "[socket %p] queueing buffer of length %" G_GSIZE_FORMAT, client->socket,
1072       gst_buffer_get_size (buffer));
1073
1074   gst_buffer_ref (buffer);
1075   mhclient->sending = g_slist_append (mhclient->sending, buffer);
1076
1077   return TRUE;
1078 }
1079
1080 /* Get the number of buffers from the buffer queue needed to satisfy
1081  * the maximum max in the configured units.
1082  * If units are not BUFFERS, and there are insufficient buffers in the
1083  * queue to satify the limit, return len(queue) + 1 */
1084 static gint
1085 get_buffers_max (GstMultiSocketSink * sink, gint64 max)
1086 {
1087   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1088
1089   switch (sink->unit_type) {
1090     case GST_FORMAT_BUFFERS:
1091       return max;
1092     case GST_FORMAT_TIME:
1093     {
1094       GstBuffer *buf;
1095       int i;
1096       int len;
1097       gint64 diff;
1098       GstClockTime first = GST_CLOCK_TIME_NONE;
1099
1100       len = mhsink->bufqueue->len;
1101
1102       for (i = 0; i < len; i++) {
1103         buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1104         if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
1105           if (first == -1)
1106             first = GST_BUFFER_TIMESTAMP (buf);
1107
1108           diff = first - GST_BUFFER_TIMESTAMP (buf);
1109
1110           if (diff > max)
1111             return i + 1;
1112         }
1113       }
1114       return len + 1;
1115     }
1116     case GST_FORMAT_BYTES:
1117     {
1118       GstBuffer *buf;
1119       int i;
1120       int len;
1121       gint acc = 0;
1122
1123       len = mhsink->bufqueue->len;
1124
1125       for (i = 0; i < len; i++) {
1126         buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1127         acc += gst_buffer_get_size (buf);
1128
1129         if (acc > max)
1130           return i + 1;
1131       }
1132       return len + 1;
1133     }
1134     default:
1135       return max;
1136   }
1137 }
1138
1139 /* find the positions in the buffer queue where *_min and *_max
1140  * is satisfied
1141  */
1142 /* count the amount of data in the buffers and return the index
1143  * that satifies the given limits.
1144  *
1145  * Returns: index @idx in the buffer queue so that the given limits are
1146  * satisfied. TRUE if all the limits could be satisfied, FALSE if not
1147  * enough data was in the queue.
1148  *
1149  * FIXME, this code might now work if any of the units is in buffers...
1150  */
1151 static gboolean
1152 find_limits (GstMultiSocketSink * sink,
1153     gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
1154     gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
1155 {
1156   GstClockTime first, time;
1157   gint i, len, bytes;
1158   gboolean result, max_hit;
1159   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1160
1161   /* take length of queue */
1162   len = mhsink->bufqueue->len;
1163
1164   /* this must hold */
1165   g_assert (len > 0);
1166
1167   GST_LOG_OBJECT (sink,
1168       "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
1169       ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
1170       buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
1171       GST_TIME_ARGS (time_max));
1172
1173   /* do the trivial buffer limit test */
1174   if (buffers_min != -1 && len < buffers_min) {
1175     *min_idx = len - 1;
1176     *max_idx = len - 1;
1177     return FALSE;
1178   }
1179
1180   result = FALSE;
1181   /* else count bytes and time */
1182   first = -1;
1183   bytes = 0;
1184   /* unset limits */
1185   *min_idx = -1;
1186   *max_idx = -1;
1187   max_hit = FALSE;
1188
1189   i = 0;
1190   /* loop through the buffers, when a limit is ok, mark it 
1191    * as -1, we have at least one buffer in the queue. */
1192   do {
1193     GstBuffer *buf;
1194
1195     /* if we checked all min limits, update result */
1196     if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
1197       /* don't go below 0 */
1198       *min_idx = MAX (i - 1, 0);
1199     }
1200     /* if we reached one max limit break out */
1201     if (max_hit) {
1202       /* i > 0 when we get here, we subtract one to get the position
1203        * of the previous buffer. */
1204       *max_idx = i - 1;
1205       /* we have valid complete result if we found a min_idx too */
1206       result = *min_idx != -1;
1207       break;
1208     }
1209     buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1210
1211     bytes += gst_buffer_get_size (buf);
1212
1213     /* take timestamp and save for the base first timestamp */
1214     if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
1215       GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
1216           GST_TIME_ARGS (time));
1217       if (first == -1)
1218         first = time;
1219
1220       /* increase max usage if we did not fill enough. Note that
1221        * buffers are sorted from new to old, so the first timestamp is
1222        * bigger than the next one. */
1223       if (time_min != -1 && first - time >= time_min)
1224         time_min = -1;
1225       if (time_max != -1 && first - time >= time_max)
1226         max_hit = TRUE;
1227     } else {
1228       GST_LOG_OBJECT (sink, "No timestamp on buffer");
1229     }
1230     /* time is OK or unknown, check and increase if not enough bytes */
1231     if (bytes_min != -1) {
1232       if (bytes >= bytes_min)
1233         bytes_min = -1;
1234     }
1235     if (bytes_max != -1) {
1236       if (bytes >= bytes_max) {
1237         max_hit = TRUE;
1238       }
1239     }
1240     i++;
1241   }
1242   while (i < len);
1243
1244   /* if we did not hit the max or min limit, set to buffer size */
1245   if (*max_idx == -1)
1246     *max_idx = len - 1;
1247   /* make sure min does not exceed max */
1248   if (*min_idx == -1)
1249     *min_idx = *max_idx;
1250
1251   return result;
1252 }
1253
1254 /* parse the unit/value pair and assign it to the result value of the
1255  * right type, leave the other values untouched 
1256  *
1257  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
1258  */
1259 static gboolean
1260 assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
1261     GstClockTime * time)
1262 {
1263   gboolean res = TRUE;
1264
1265   /* set only the limit of the given format to the given value */
1266   switch (format) {
1267     case GST_FORMAT_BUFFERS:
1268       *buffers = (gint) value;
1269       break;
1270     case GST_FORMAT_TIME:
1271       *time = value;
1272       break;
1273     case GST_FORMAT_BYTES:
1274       *bytes = (gint) value;
1275       break;
1276     case GST_FORMAT_UNDEFINED:
1277     default:
1278       res = FALSE;
1279       break;
1280   }
1281   return res;
1282 }
1283
1284 /* count the index in the buffer queue to satisfy the given unit
1285  * and value pair starting from buffer at index 0.
1286  *
1287  * Returns: TRUE if there was enough data in the queue to satisfy the
1288  * burst values. @idx contains the index in the buffer that contains enough
1289  * data to satisfy the limits or the last buffer in the queue when the
1290  * function returns FALSE.
1291  */
1292 static gboolean
1293 count_burst_unit (GstMultiSocketSink * sink, gint * min_idx,
1294     GstFormat min_format, guint64 min_value, gint * max_idx,
1295     GstFormat max_format, guint64 max_value)
1296 {
1297   gint bytes_min = -1, buffers_min = -1;
1298   gint bytes_max = -1, buffers_max = -1;
1299   GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
1300
1301   assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
1302   assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
1303
1304   return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
1305       max_idx, bytes_max, buffers_max, time_max);
1306 }
1307
1308 /* decide where in the current buffer queue this new client should start
1309  * receiving buffers from.
1310  * This function is called whenever a client is connected and has not yet
1311  * received a buffer.
1312  * If this returns -1, it means that we haven't found a good point to
1313  * start streaming from yet, and this function should be called again later
1314  * when more buffers have arrived.
1315  */
1316 static gint
1317 gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
1318     GstSocketClient * client)
1319 {
1320   gint result;
1321   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1322   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1323
1324   GST_DEBUG_OBJECT (sink,
1325       "[socket %p] new client, deciding where to start in queue",
1326       client->socket);
1327   GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
1328       mhsink->bufqueue->len);
1329   switch (mhclient->sync_method) {
1330     case GST_SYNC_METHOD_LATEST:
1331       /* no syncing, we are happy with whatever the client is going to get */
1332       result = mhclient->bufpos;
1333       GST_DEBUG_OBJECT (sink,
1334           "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket,
1335           result);
1336       break;
1337     case GST_SYNC_METHOD_NEXT_KEYFRAME:
1338     {
1339       /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
1340        * is a sync point, we can proceed, otherwise we need to keep waiting */
1341       GST_LOG_OBJECT (sink,
1342           "[socket %p] new client, bufpos %d, waiting for keyframe",
1343           client->socket, mhclient->bufpos);
1344
1345       result = find_prev_syncframe (mhsink, mhclient->bufpos);
1346       if (result != -1) {
1347         GST_DEBUG_OBJECT (sink,
1348             "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
1349             client->socket, result);
1350         break;
1351       }
1352
1353       /* client is not on a syncbuffer, need to skip these buffers and
1354        * wait some more */
1355       GST_LOG_OBJECT (sink,
1356           "[socket %p] new client, skipping buffer(s), no syncpoint found",
1357           client->socket);
1358       mhclient->bufpos = -1;
1359       break;
1360     }
1361     case GST_SYNC_METHOD_LATEST_KEYFRAME:
1362     {
1363       GST_DEBUG_OBJECT (sink,
1364           "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket);
1365
1366       /* for new clients we initially scan the complete buffer queue for
1367        * a sync point when a buffer is added. If we don't find a keyframe,
1368        * we need to wait for the next keyframe and so we change the client's
1369        * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
1370        */
1371       result = find_next_syncframe (mhsink, 0);
1372       if (result != -1) {
1373         GST_DEBUG_OBJECT (sink,
1374             "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
1375             client->socket, result);
1376         break;
1377       }
1378
1379       GST_DEBUG_OBJECT (sink,
1380           "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
1381           "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket);
1382       /* throw client to the waiting state */
1383       mhclient->bufpos = -1;
1384       /* and make client sync to next keyframe */
1385       mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1386       break;
1387     }
1388     case GST_SYNC_METHOD_BURST:
1389     {
1390       gboolean ok;
1391       gint max;
1392
1393       /* move to the position where we satisfy the client's burst
1394        * parameters. If we could not satisfy the parameters because there
1395        * is not enough data, we just send what we have (which is in result).
1396        * We use the max value to limit the search
1397        */
1398       ok = count_burst_unit (sink, &result, client->burst_min_format,
1399           client->burst_min_value, &max, client->burst_max_format,
1400           client->burst_max_value);
1401       GST_DEBUG_OBJECT (sink,
1402           "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
1403           client->socket, ok, result);
1404
1405       GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
1406
1407       /* we hit the max and it is below the min, use that then */
1408       if (max != -1 && max <= result) {
1409         result = MAX (max - 1, 0);
1410         GST_DEBUG_OBJECT (sink,
1411             "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d",
1412             client->socket, result);
1413       }
1414       break;
1415     }
1416     case GST_SYNC_METHOD_BURST_KEYFRAME:
1417     {
1418       gint min_idx, max_idx;
1419       gint next_syncframe, prev_syncframe;
1420
1421       /* BURST_KEYFRAME:
1422        *
1423        * _always_ start sending a keyframe to the client. We first search
1424        * a keyframe between min/max limits. If there is none, we send it the
1425        * last keyframe before min. If there is none, the behaviour is like
1426        * NEXT_KEYFRAME.
1427        */
1428       /* gather burst limits */
1429       count_burst_unit (sink, &min_idx, client->burst_min_format,
1430           client->burst_min_value, &max_idx, client->burst_max_format,
1431           client->burst_max_value);
1432
1433       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1434
1435       /* first find a keyframe after min_idx */
1436       next_syncframe = find_next_syncframe (mhsink, min_idx);
1437       if (next_syncframe != -1 && next_syncframe < max_idx) {
1438         /* we have a valid keyframe and it's below the max */
1439         GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1440         result = next_syncframe;
1441         break;
1442       }
1443
1444       /* no valid keyframe, try to find one below min */
1445       prev_syncframe = find_prev_syncframe (mhsink, min_idx);
1446       if (prev_syncframe != -1) {
1447         GST_WARNING_OBJECT (sink,
1448             "using keyframe below min in BURST_KEYFRAME sync mode");
1449         result = prev_syncframe;
1450         break;
1451       }
1452
1453       /* no prev keyframe or not enough data  */
1454       GST_WARNING_OBJECT (sink,
1455           "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
1456
1457       /* throw client to the waiting state */
1458       mhclient->bufpos = -1;
1459       /* and make client sync to next keyframe */
1460       mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1461       result = -1;
1462       break;
1463     }
1464     case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
1465     {
1466       gint min_idx, max_idx;
1467       gint next_syncframe;
1468
1469       /* BURST_WITH_KEYFRAME:
1470        *
1471        * try to start sending a keyframe to the client. We first search
1472        * a keyframe between min/max limits. If there is none, we send it the
1473        * amount of data up 'till min.
1474        */
1475       /* gather enough data to burst */
1476       count_burst_unit (sink, &min_idx, client->burst_min_format,
1477           client->burst_min_value, &max_idx, client->burst_max_format,
1478           client->burst_max_value);
1479
1480       GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1481
1482       /* first find a keyframe after min_idx */
1483       next_syncframe = find_next_syncframe (mhsink, min_idx);
1484       if (next_syncframe != -1 && next_syncframe < max_idx) {
1485         /* we have a valid keyframe and it's below the max */
1486         GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1487         result = next_syncframe;
1488         break;
1489       }
1490
1491       /* no keyframe, send data from min_idx */
1492       GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
1493
1494       /* make sure we don't go over the max limit */
1495       if (max_idx != -1 && max_idx <= min_idx) {
1496         result = MAX (max_idx - 1, 0);
1497       } else {
1498         result = min_idx;
1499       }
1500
1501       break;
1502     }
1503     default:
1504       g_warning ("unknown sync method %d", mhclient->sync_method);
1505       result = mhclient->bufpos;
1506       break;
1507   }
1508   return result;
1509 }
1510
1511 /* Handle a write on a client,
1512  * which indicates a read request from a client.
1513  *
1514  * For each client we maintain a queue of GstBuffers that contain the raw bytes
1515  * we need to send to the client.
1516  *
1517  * We first check to see if we need to send streamheaders. If so, we queue them.
1518  *
1519  * Then we run into the main loop that tries to send as many buffers as
1520  * possible. It will first exhaust the mhclient->sending queue and if the queue
1521  * is empty, it will pick a buffer from the global queue.
1522  *
1523  * Sending the buffers from the mhclient->sending queue is basically writing
1524  * the bytes to the socket and maintaining a count of the bytes that were
1525  * sent. When the buffer is completely sent, it is removed from the
1526  * mhclient->sending queue and we try to pick a new buffer for sending.
1527  *
1528  * When the sending returns a partial buffer we stop sending more data as
1529  * the next send operation could block.
1530  *
1531  * This functions returns FALSE if some error occured.
1532  */
1533 static gboolean
1534 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
1535     GstSocketClient * client)
1536 {
1537   GSocket *socket = client->socket;
1538   gboolean more;
1539   gboolean flushing;
1540   GstClockTime now;
1541   GTimeVal nowtv;
1542   GError *err = NULL;
1543   GstMultiHandleSink *mhsink;
1544   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1545
1546   mhsink = GST_MULTI_HANDLE_SINK (sink);
1547
1548   g_get_current_time (&nowtv);
1549   now = GST_TIMEVAL_TO_TIME (nowtv);
1550
1551   flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
1552
1553   more = TRUE;
1554   do {
1555     gint maxsize;
1556
1557     if (!mhclient->sending) {
1558       /* client is not working on a buffer */
1559       if (mhclient->bufpos == -1) {
1560         /* client is too fast, remove from write queue until new buffer is
1561          * available */
1562         if (client->source) {
1563           g_source_destroy (client->source);
1564           g_source_unref (client->source);
1565           client->source = NULL;
1566         }
1567         /* if we flushed out all of the client buffers, we can stop */
1568         if (mhclient->flushcount == 0)
1569           goto flushed;
1570
1571         return TRUE;
1572       } else {
1573         /* client can pick a buffer from the global queue */
1574         GstBuffer *buf;
1575         GstClockTime timestamp;
1576
1577         /* for new connections, we need to find a good spot in the
1578          * bufqueue to start streaming from */
1579         if (mhclient->new_connection && !flushing) {
1580           gint position = gst_multi_socket_sink_new_client (sink, client);
1581
1582           if (position >= 0) {
1583             /* we got a valid spot in the queue */
1584             mhclient->new_connection = FALSE;
1585             mhclient->bufpos = position;
1586           } else {
1587             /* cannot send data to this client yet */
1588             if (client->source) {
1589               g_source_destroy (client->source);
1590               g_source_unref (client->source);
1591               client->source = NULL;
1592             }
1593             return TRUE;
1594           }
1595         }
1596
1597         /* we flushed all remaining buffers, no need to get a new one */
1598         if (mhclient->flushcount == 0)
1599           goto flushed;
1600
1601         /* grab buffer */
1602         buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
1603         mhclient->bufpos--;
1604
1605         /* update stats */
1606         timestamp = GST_BUFFER_TIMESTAMP (buf);
1607         if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
1608           mhclient->first_buffer_ts = timestamp;
1609         if (timestamp != -1)
1610           mhclient->last_buffer_ts = timestamp;
1611
1612         /* decrease flushcount */
1613         if (mhclient->flushcount != -1)
1614           mhclient->flushcount--;
1615
1616         GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
1617             socket, client, mhclient->bufpos);
1618
1619         /* queueing a buffer will ref it */
1620         gst_multi_socket_sink_client_queue_buffer (sink, client, buf);
1621
1622         /* need to start from the first byte for this new buffer */
1623         mhclient->bufoffset = 0;
1624       }
1625     }
1626
1627     /* see if we need to send something */
1628     if (mhclient->sending) {
1629       gssize wrote;
1630       GstBuffer *head;
1631       GstMapInfo map;
1632
1633       /* pick first buffer from list */
1634       head = GST_BUFFER (mhclient->sending->data);
1635
1636       gst_buffer_map (head, &map, GST_MAP_READ);
1637       maxsize = map.size - mhclient->bufoffset;
1638
1639       /* try to write the complete buffer */
1640
1641       wrote =
1642           g_socket_send (socket, (gchar *) map.data + mhclient->bufoffset,
1643           maxsize, sink->cancellable, &err);
1644       gst_buffer_unmap (head, &map);
1645
1646       if (wrote < 0) {
1647         /* hmm error.. */
1648         if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
1649           goto connection_reset;
1650         } else {
1651           goto write_error;
1652         }
1653       } else {
1654         if (wrote < maxsize) {
1655           /* partial write means that the client cannot read more and we should
1656            * stop sending more */
1657           GST_LOG_OBJECT (sink,
1658               "partial write on %p of %" G_GSSIZE_FORMAT " bytes", socket,
1659               wrote);
1660           mhclient->bufoffset += wrote;
1661           more = FALSE;
1662         } else {
1663           /* complete buffer was written, we can proceed to the next one */
1664           mhclient->sending = g_slist_remove (mhclient->sending, head);
1665           gst_buffer_unref (head);
1666           /* make sure we start from byte 0 for the next buffer */
1667           mhclient->bufoffset = 0;
1668         }
1669         /* update stats */
1670         mhclient->bytes_sent += wrote;
1671         mhclient->last_activity_time = now;
1672         mhsink->bytes_served += wrote;
1673       }
1674     }
1675   } while (more);
1676
1677   return TRUE;
1678
1679   /* ERRORS */
1680 flushed:
1681   {
1682     GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket);
1683     mhclient->status = GST_CLIENT_STATUS_REMOVED;
1684     return FALSE;
1685   }
1686 connection_reset:
1687   {
1688     GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing",
1689         socket);
1690     mhclient->status = GST_CLIENT_STATUS_CLOSED;
1691     g_clear_error (&err);
1692     return FALSE;
1693   }
1694 write_error:
1695   {
1696     GST_WARNING_OBJECT (sink,
1697         "[socket %p] could not write, removing client: %s", socket,
1698         err->message);
1699     g_clear_error (&err);
1700     mhclient->status = GST_CLIENT_STATUS_ERROR;
1701     return FALSE;
1702   }
1703 }
1704
1705 /* calculate the new position for a client after recovery. This function
1706  * does not update the client position but merely returns the required
1707  * position.
1708  */
1709 static gint
1710 gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
1711     GstSocketClient * client)
1712 {
1713   gint newbufpos;
1714   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1715   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1716
1717   GST_WARNING_OBJECT (sink,
1718       "[socket %p] client %p is lagging at %d, recover using policy %d",
1719       client->socket, client, mhclient->bufpos, mhsink->recover_policy);
1720
1721   switch (mhsink->recover_policy) {
1722     case GST_RECOVER_POLICY_NONE:
1723       /* do nothing, client will catch up or get kicked out when it reaches
1724        * the hard max */
1725       newbufpos = mhclient->bufpos;
1726       break;
1727     case GST_RECOVER_POLICY_RESYNC_LATEST:
1728       /* move to beginning of queue */
1729       newbufpos = -1;
1730       break;
1731     case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
1732       /* move to beginning of soft max */
1733       newbufpos = get_buffers_max (sink, sink->units_soft_max);
1734       break;
1735     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1736       /* find keyframe in buffers, we search backwards to find the
1737        * closest keyframe relative to what this client already received. */
1738       newbufpos = MIN (mhsink->bufqueue->len - 1,
1739           get_buffers_max (sink, sink->units_soft_max) - 1);
1740
1741       while (newbufpos >= 0) {
1742         GstBuffer *buf;
1743
1744         buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
1745         if (is_sync_frame (mhsink, buf)) {
1746           /* found a buffer that is not a delta unit */
1747           break;
1748         }
1749         newbufpos--;
1750       }
1751       break;
1752     default:
1753       /* unknown recovery procedure */
1754       newbufpos = get_buffers_max (sink, sink->units_soft_max);
1755       break;
1756   }
1757   return newbufpos;
1758 }
1759
1760 /* Queue a buffer on the global queue.
1761  *
1762  * This function adds the buffer to the front of a GArray. It removes the
1763  * tail buffer if the max queue size is exceeded, unreffing the queued buffer.
1764  * Note that unreffing the buffer is not a problem as clients who
1765  * started writing out this buffer will still have a reference to it in the
1766  * mhclient->sending queue.
1767  *
1768  * After adding the buffer, we update all client positions in the queue. If
1769  * a client moves over the soft max, we start the recovery procedure for this
1770  * slow client. If it goes over the hard max, it is put into the slow list
1771  * and removed.
1772  *
1773  * Special care is taken of clients that were waiting for a new buffer (they
1774  * had a position of -1) because they can proceed after adding this new buffer.
1775  * This is done by adding the client back into the write fd_set and signaling
1776  * the select thread that the fd_set changed.
1777  */
1778 static void
1779 gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
1780 {
1781   GList *clients, *next;
1782   gint queuelen;
1783   gint max_buffer_usage;
1784   gint i;
1785   GTimeVal nowtv;
1786   GstClockTime now;
1787   gint max_buffers, soft_max_buffers;
1788   guint cookie;
1789   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1790   GstMultiHandleSinkClass *mhsinkclass =
1791       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1792
1793   g_get_current_time (&nowtv);
1794   now = GST_TIMEVAL_TO_TIME (nowtv);
1795
1796   CLIENTS_LOCK (sink);
1797   /* add buffer to queue */
1798   gst_buffer_ref (buf);
1799   g_array_prepend_val (mhsink->bufqueue, buf);
1800   queuelen = mhsink->bufqueue->len;
1801
1802   if (sink->units_max > 0)
1803     max_buffers = get_buffers_max (sink, sink->units_max);
1804   else
1805     max_buffers = -1;
1806
1807   if (sink->units_soft_max > 0)
1808     soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
1809   else
1810     soft_max_buffers = -1;
1811   GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
1812       soft_max_buffers);
1813
1814   /* then loop over the clients and update the positions */
1815   max_buffer_usage = 0;
1816
1817 restart:
1818   cookie = mhsink->clients_cookie;
1819   for (clients = mhsink->clients; clients; clients = next) {
1820     GstSocketClient *client;
1821     GstMultiHandleClient *mhclient;
1822
1823     if (cookie != mhsink->clients_cookie) {
1824       GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
1825       goto restart;
1826     }
1827
1828     client = clients->data;
1829     mhclient = (GstMultiHandleClient *) client;
1830     next = g_list_next (clients);
1831
1832     mhclient->bufpos++;
1833     GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
1834         client->socket, client, mhclient->bufpos);
1835     /* check soft max if needed, recover client */
1836     if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
1837       gint newpos;
1838
1839       newpos = gst_multi_socket_sink_recover_client (sink, client);
1840       if (newpos != mhclient->bufpos) {
1841         mhclient->dropped_buffers += mhclient->bufpos - newpos;
1842         mhclient->bufpos = newpos;
1843         mhclient->discont = TRUE;
1844         GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d",
1845             client->socket, client, mhclient->bufpos);
1846       } else {
1847         GST_INFO_OBJECT (sink,
1848             "[socket %p] client %p not recovering position",
1849             client->socket, client);
1850       }
1851     }
1852     /* check hard max and timeout, remove client */
1853     if ((max_buffers > 0 && mhclient->bufpos >= max_buffers) ||
1854         (mhsink->timeout > 0
1855             && now - mhclient->last_activity_time > mhsink->timeout)) {
1856       /* remove client */
1857       GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing",
1858           client->socket, client);
1859       /* remove the client, the fd set will be cleared and the select thread
1860        * will be signaled */
1861       mhclient->status = GST_CLIENT_STATUS_SLOW;
1862       /* set client to invalid position while being removed */
1863       mhclient->bufpos = -1;
1864       mhsinkclass->remove_client_link (mhsink, clients);
1865       continue;
1866     } else if (mhclient->bufpos == 0 || mhclient->new_connection) {
1867       /* can send data to this client now. need to signal the select thread that
1868        * the fd_set changed */
1869       if (!client->source) {
1870         client->source =
1871             g_socket_create_source (client->socket,
1872             G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
1873             sink->cancellable);
1874         g_source_set_callback (client->source,
1875             (GSourceFunc) gst_multi_socket_sink_socket_condition,
1876             gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1877         g_source_attach (client->source, sink->main_context);
1878       }
1879     }
1880     /* keep track of maximum buffer usage */
1881     if (mhclient->bufpos > max_buffer_usage) {
1882       max_buffer_usage = mhclient->bufpos;
1883     }
1884   }
1885
1886   /* make sure we respect bytes-min, buffers-min and time-min when they are set */
1887   {
1888     gint usage, max;
1889
1890     GST_LOG_OBJECT (sink,
1891         "extending queue %d to respect time_min %" GST_TIME_FORMAT
1892         ", bytes_min %d, buffers_min %d", max_buffer_usage,
1893         GST_TIME_ARGS (mhsink->time_min), mhsink->bytes_min,
1894         mhsink->buffers_min);
1895
1896     /* get index where the limits are ok, we don't really care if all limits
1897      * are ok, we just queue as much as we need. We also don't compare against
1898      * the max limits. */
1899     find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
1900         mhsink->time_min, &max, -1, -1, -1);
1901
1902     max_buffer_usage = MAX (max_buffer_usage, usage + 1);
1903     GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
1904   }
1905
1906   /* now look for sync points and make sure there is at least one
1907    * sync point in the queue. We only do this if the LATEST_KEYFRAME or 
1908    * BURST_KEYFRAME mode is selected */
1909   if (mhsink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME ||
1910       mhsink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) {
1911     /* no point in searching beyond the queue length */
1912     gint limit = queuelen;
1913     GstBuffer *buf;
1914
1915     /* no point in searching beyond the soft-max if any. */
1916     if (soft_max_buffers > 0) {
1917       limit = MIN (limit, soft_max_buffers);
1918     }
1919     GST_LOG_OBJECT (sink,
1920         "extending queue to include sync point, now at %d, limit is %d",
1921         max_buffer_usage, limit);
1922     for (i = 0; i < limit; i++) {
1923       buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1924       if (is_sync_frame (mhsink, buf)) {
1925         /* found a sync frame, now extend the buffer usage to
1926          * include at least this frame. */
1927         max_buffer_usage = MAX (max_buffer_usage, i);
1928         break;
1929       }
1930     }
1931     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1932   }
1933
1934   GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
1935
1936   /* nobody is referencing units after max_buffer_usage so we can
1937    * remove them from the queue. We remove them in reverse order as
1938    * this is the most optimal for GArray. */
1939   for (i = queuelen - 1; i > max_buffer_usage; i--) {
1940     GstBuffer *old;
1941
1942     /* queue exceeded max size */
1943     queuelen--;
1944     old = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1945     mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
1946
1947     /* unref tail buffer */
1948     gst_buffer_unref (old);
1949   }
1950   /* save for stats */
1951   mhsink->buffers_queued = max_buffer_usage;
1952   CLIENTS_UNLOCK (sink);
1953 }
1954
1955 /* Handle the clients. This is called when a socket becomes ready
1956  * to read or writable. Badly behaving clients are put on a
1957  * garbage list and removed.
1958  */
1959 static gboolean
1960 gst_multi_socket_sink_socket_condition (GSocket * socket,
1961     GIOCondition condition, GstMultiSocketSink * sink)
1962 {
1963   GList *clink;
1964   GstSocketClient *client;
1965   gboolean ret = TRUE;
1966   GstMultiHandleClient *mhclient;
1967   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1968   GstMultiHandleSinkClass *mhsinkclass =
1969       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1970
1971   CLIENTS_LOCK (sink);
1972   clink = g_hash_table_lookup (sink->socket_hash, socket);
1973   if (clink == NULL) {
1974     ret = FALSE;
1975     goto done;
1976   }
1977
1978   client = clink->data;
1979   mhclient = (GstMultiHandleClient *) client;
1980
1981   if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1982       && mhclient->status != GST_CLIENT_STATUS_OK) {
1983     mhsinkclass->remove_client_link (mhsink, clink);
1984     ret = FALSE;
1985     goto done;
1986   }
1987
1988   if ((condition & G_IO_ERR)) {
1989     GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket);
1990     mhclient->status = GST_CLIENT_STATUS_ERROR;
1991     mhsinkclass->remove_client_link (mhsink, clink);
1992     ret = FALSE;
1993     goto done;
1994   } else if ((condition & G_IO_HUP)) {
1995     mhclient->status = GST_CLIENT_STATUS_CLOSED;
1996     mhsinkclass->remove_client_link (mhsink, clink);
1997     ret = FALSE;
1998     goto done;
1999   } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
2000     /* handle client read */
2001     if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
2002       mhsinkclass->remove_client_link (mhsink, clink);
2003       ret = FALSE;
2004       goto done;
2005     }
2006   } else if ((condition & G_IO_OUT)) {
2007     /* handle client write */
2008     if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
2009       mhsinkclass->remove_client_link (mhsink, clink);
2010       ret = FALSE;
2011       goto done;
2012     }
2013   }
2014
2015 done:
2016   CLIENTS_UNLOCK (sink);
2017
2018   return ret;
2019 }
2020
2021 static gboolean
2022 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
2023 {
2024   GstClockTime now;
2025   GTimeVal nowtv;
2026   GList *clients;
2027   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
2028   GstMultiHandleSinkClass *mhsinkclass =
2029       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
2030
2031   g_get_current_time (&nowtv);
2032   now = GST_TIMEVAL_TO_TIME (nowtv);
2033
2034   CLIENTS_LOCK (sink);
2035   for (clients = mhsink->clients; clients; clients = clients->next) {
2036     GstSocketClient *client;
2037     GstMultiHandleClient *mhclient;
2038
2039     client = clients->data;
2040     mhclient = (GstMultiHandleClient *) client;
2041     if (mhsink->timeout > 0
2042         && now - mhclient->last_activity_time > mhsink->timeout) {
2043       mhclient->status = GST_CLIENT_STATUS_SLOW;
2044       mhsinkclass->remove_client_link (mhsink, clients);
2045     }
2046   }
2047   CLIENTS_UNLOCK (sink);
2048
2049   return FALSE;
2050 }
2051
2052 /* we handle the client communication in another thread so that we do not block
2053  * the gstreamer thread while we select() on the client fds */
2054 static gpointer
2055 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
2056 {
2057   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
2058   GSource *timeout = NULL;
2059
2060   while (mhsink->running) {
2061     if (mhsink->timeout > 0) {
2062       timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
2063
2064       g_source_set_callback (timeout,
2065           (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
2066           (GDestroyNotify) gst_object_unref);
2067       g_source_attach (timeout, sink->main_context);
2068     }
2069
2070     /* Returns after handling all pending events or when
2071      * _wakeup() was called. In any case we have to add
2072      * a new timeout because something happened.
2073      */
2074     g_main_context_iteration (sink->main_context, TRUE);
2075
2076     if (timeout) {
2077       g_source_destroy (timeout);
2078       g_source_unref (timeout);
2079     }
2080   }
2081
2082   return NULL;
2083 }
2084
2085 static GstFlowReturn
2086 gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
2087 {
2088   GstMultiSocketSink *sink;
2089   gboolean in_caps;
2090 #if 0
2091   GstCaps *bufcaps, *padcaps;
2092 #endif
2093   GstMultiHandleSink *mhsink;
2094
2095   sink = GST_MULTI_SOCKET_SINK (bsink);
2096   mhsink = GST_MULTI_HANDLE_SINK (sink);
2097
2098   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
2099           GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
2100
2101 #if 0
2102   /* since we check every buffer for streamheader caps, we need to make
2103    * sure every buffer has caps set */
2104   bufcaps = gst_buffer_get_caps (buf);
2105   padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
2106
2107   /* make sure we have caps on the pad */
2108   if (!padcaps && !bufcaps)
2109     goto no_caps;
2110 #endif
2111
2112   /* get HEADER first, code below might mess with the flags */
2113   in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
2114
2115 #if 0
2116   /* stamp the buffer with previous caps if no caps set */
2117   if (!bufcaps) {
2118     if (!gst_buffer_is_writable (buf)) {
2119       /* metadata is not writable, copy will be made and original buffer
2120        * will be unreffed so we need to ref so that we don't lose the
2121        * buffer in the render method. */
2122       gst_buffer_ref (buf);
2123       /* the new buffer is ours only, we keep it out of the scope of this
2124        * function */
2125       buf = gst_buffer_make_writable (buf);
2126     } else {
2127       /* else the metadata is writable, we ref because we keep the buffer
2128        * out of the scope of this method */
2129       gst_buffer_ref (buf);
2130     }
2131     /* buffer metadata is writable now, set the caps */
2132     gst_buffer_set_caps (buf, padcaps);
2133   } else {
2134     gst_caps_unref (bufcaps);
2135
2136     /* since we keep this buffer out of the scope of this method */
2137     gst_buffer_ref (buf);
2138   }
2139 #endif
2140
2141   GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
2142       G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
2143       ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
2144       buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
2145       GST_BUFFER_OFFSET_END (buf),
2146       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
2147       GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
2148
2149   /* if we get HEADER buffers, but the previous buffer was not HEADER,
2150    * it means we're getting new streamheader buffers, and we should clear
2151    * the old ones */
2152   if (in_caps && sink->previous_buffer_in_caps == FALSE) {
2153     GST_DEBUG_OBJECT (sink,
2154         "receiving new HEADER buffers, clearing old streamheader");
2155     g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
2156     g_slist_free (mhsink->streamheader);
2157     mhsink->streamheader = NULL;
2158   }
2159
2160   /* save the current in_caps */
2161   sink->previous_buffer_in_caps = in_caps;
2162
2163   /* if the incoming buffer is marked as IN CAPS, then we assume for now
2164    * it's a streamheader that needs to be sent to each new client, so we
2165    * put it on our internal list of streamheader buffers.
2166    * FIXME: we could check if the buffer's contents are in fact part of the
2167    * current streamheader.
2168    *
2169    * We don't send the buffer to the client, since streamheaders are sent
2170    * separately when necessary. */
2171   if (in_caps) {
2172     GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
2173         G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
2174     mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
2175   } else {
2176     /* queue the buffer, this is a regular data buffer. */
2177     gst_multi_socket_sink_queue_buffer (sink, buf);
2178
2179     mhsink->bytes_to_serve += gst_buffer_get_size (buf);
2180   }
2181   return GST_FLOW_OK;
2182
2183   /* ERRORS */
2184 #if 0
2185 no_caps:
2186   {
2187     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
2188         ("Received first buffer without caps set"));
2189     return GST_FLOW_NOT_NEGOTIATED;
2190   }
2191 #endif
2192 }
2193
2194 static void
2195 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
2196     const GValue * value, GParamSpec * pspec)
2197 {
2198   GstMultiSocketSink *multisocketsink;
2199
2200   multisocketsink = GST_MULTI_SOCKET_SINK (object);
2201
2202   switch (prop_id) {
2203     case PROP_BUFFERS_MAX:
2204       multisocketsink->units_max = g_value_get_int (value);
2205       break;
2206     case PROP_BUFFERS_SOFT_MAX:
2207       multisocketsink->units_soft_max = g_value_get_int (value);
2208       break;
2209     case PROP_UNIT_TYPE:
2210       multisocketsink->unit_type = g_value_get_enum (value);
2211       break;
2212     case PROP_UNITS_MAX:
2213       multisocketsink->units_max = g_value_get_int64 (value);
2214       break;
2215     case PROP_UNITS_SOFT_MAX:
2216       multisocketsink->units_soft_max = g_value_get_int64 (value);
2217       break;
2218     case PROP_BURST_FORMAT:
2219       multisocketsink->def_burst_format = g_value_get_enum (value);
2220       break;
2221     case PROP_BURST_VALUE:
2222       multisocketsink->def_burst_value = g_value_get_uint64 (value);
2223       break;
2224     case PROP_QOS_DSCP:
2225       multisocketsink->qos_dscp = g_value_get_int (value);
2226       setup_dscp (multisocketsink);
2227       break;
2228
2229     default:
2230       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2231       break;
2232   }
2233 }
2234
2235 static void
2236 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
2237     GValue * value, GParamSpec * pspec)
2238 {
2239   GstMultiSocketSink *multisocketsink;
2240
2241   multisocketsink = GST_MULTI_SOCKET_SINK (object);
2242
2243   switch (prop_id) {
2244     case PROP_BUFFERS_MAX:
2245       g_value_set_int (value, multisocketsink->units_max);
2246       break;
2247     case PROP_BUFFERS_SOFT_MAX:
2248       g_value_set_int (value, multisocketsink->units_soft_max);
2249       break;
2250     case PROP_UNIT_TYPE:
2251       g_value_set_enum (value, multisocketsink->unit_type);
2252       break;
2253     case PROP_UNITS_MAX:
2254       g_value_set_int64 (value, multisocketsink->units_max);
2255       break;
2256     case PROP_UNITS_SOFT_MAX:
2257       g_value_set_int64 (value, multisocketsink->units_soft_max);
2258       break;
2259     case PROP_BURST_FORMAT:
2260       g_value_set_enum (value, multisocketsink->def_burst_format);
2261       break;
2262     case PROP_BURST_VALUE:
2263       g_value_set_uint64 (value, multisocketsink->def_burst_value);
2264       break;
2265     case PROP_QOS_DSCP:
2266       g_value_set_int (value, multisocketsink->qos_dscp);
2267       break;
2268     case PROP_NUM_SOCKETS:
2269       g_value_set_uint (value,
2270           g_hash_table_size (multisocketsink->socket_hash));
2271       break;
2272
2273     default:
2274       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2275       break;
2276   }
2277 }
2278
2279 static gboolean
2280 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
2281 {
2282   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
2283   GList *clients;
2284
2285   GST_INFO_OBJECT (mssink, "starting");
2286
2287   mssink->main_context = g_main_context_new ();
2288
2289   CLIENTS_LOCK (mssink);
2290   for (clients = mhsink->clients; clients; clients = clients->next) {
2291     GstSocketClient *client;
2292
2293     client = clients->data;
2294     if (client->source)
2295       continue;
2296     client->source =
2297         g_socket_create_source (client->socket,
2298         G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
2299         mssink->cancellable);
2300     g_source_set_callback (client->source,
2301         (GSourceFunc) gst_multi_socket_sink_socket_condition,
2302         gst_object_ref (mssink), (GDestroyNotify) gst_object_unref);
2303     g_source_attach (client->source, mssink->main_context);
2304   }
2305   CLIENTS_UNLOCK (mssink);
2306
2307   return TRUE;
2308 }
2309
2310
2311 static gboolean
2312 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
2313 {
2314   return TRUE;
2315 }
2316
2317 static void
2318 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
2319 {
2320   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
2321
2322   if (mssink->main_context)
2323     g_main_context_wakeup (mssink->main_context);
2324 }
2325
2326 static void
2327 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
2328 {
2329   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
2330
2331   if (mssink->main_context) {
2332     g_main_context_unref (mssink->main_context);
2333     mssink->main_context = NULL;
2334   }
2335
2336   g_hash_table_foreach_remove (mssink->socket_hash, multisocketsink_hash_remove,
2337       mssink);
2338 }
2339
2340 static gboolean
2341 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
2342 {
2343   GstMultiSocketSink *sink;
2344
2345   sink = GST_MULTI_SOCKET_SINK (bsink);
2346
2347   GST_DEBUG_OBJECT (sink, "set to flushing");
2348   g_cancellable_cancel (sink->cancellable);
2349   if (sink->main_context)
2350     g_main_context_wakeup (sink->main_context);
2351
2352   return TRUE;
2353 }
2354
2355 /* will be called only between calls to start() and stop() */
2356 static gboolean
2357 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
2358 {
2359   GstMultiSocketSink *sink;
2360
2361   sink = GST_MULTI_SOCKET_SINK (bsink);
2362
2363   GST_DEBUG_OBJECT (sink, "unset flushing");
2364   g_cancellable_reset (sink->cancellable);
2365
2366   return TRUE;
2367 }