base: Avoid usage of deprecated API
[platform/upstream/gstreamer.git] / gst / tcp / gstmultisocketsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  * Copyright (C) <2011> Collabora Ltd.
6  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-multisocketsink
26  * @title: multisocketsink
27  * @see_also: tcpserversink
28  *
29  * This plugin writes incoming data to a set of sockets. The
30  * sockets can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
31  * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32  *
33  * A client can also be added with the #GstMultiSocketSink::add-full signal
34  * that allows for more control over what and how much data a client
35  * initially receives.
36  *
37  * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
38  * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
39  * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
40  * client is not active anymore or, depending on the value of the
41  * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
42  * In all cases, multisocketsink will never close a socket itself.
43  * The user of multisocketsink is responsible for closing all sockets.
44  * This can for example be done in response to the #GstMultiSocketSink::client-socket-removed signal.
45  * Note that multisocketsink still has a reference to the socket when the
46  * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
47  * the descriptor; it is therefore not safe to close the socket in
48  * the #GstMultiSocketSink::client-removed signal handler, and you should use the
49  * #GstMultiSocketSink::client-socket-removed signal to safely close the socket.
50  *
51  * Multisocketsink internally keeps a queue of the incoming buffers and uses a
52  * separate thread to send the buffers to the clients. This ensures that no
53  * client write can block the pipeline and that clients can read with different
54  * speeds.
55  *
56  * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
57  * which buffer in the queued buffers will be sent first to the client. Clients
58  * can be sent the most recent buffer (which might not be decodable by the
59  * client if it is not a keyframe), the next keyframe received in
60  * multisocketsink (which can take some time depending on the keyframe rate), or the
61  * last received keyframe (which will cause a simple burst-on-connect).
62  * Multisocketsink will always keep at least one keyframe in its internal buffers
63  * when the sync-mode is set to latest-keyframe.
64  *
65  * There are additional values for the #GstMultiSocketSink:sync-method
66  * property to allow finer control over burst-on-connect behaviour. By selecting
67  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
68  * additionally requires that the burst begin with a keyframe, and
69  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
70  * prefer a minimum burst size even if it requires not starting with a keyframe.
71  *
72  * Multisocketsink can be instructed to keep at least a minimum amount of data
73  * expressed in time or byte units in its internal queues with the
74  * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
75  * These properties are useful if the application adds clients with the
76  * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
77  * actually be honored.
78  *
79  * When streaming data, clients are allowed to read at a different rate than
80  * the rate at which multisocketsink receives data. If the client is reading too
81  * fast, no data will be send to the client until multisocketsink receives more
82  * data. If the client, however, reads too slowly, data for that client will be
83  * queued up in multisocketsink. Two properties control the amount of data
84  * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and
85  * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
86  * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
87  *
88  * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
89  * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
90  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
91  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
92  * positions the client to the soft limit in the buffer queue and
93  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
94  * buffer queue.
95  *
96  * multisocketsink will by default synchronize on the clock before serving the
97  * buffers to the clients. This behaviour can be disabled by setting the sync
98  * property to FALSE. Multisocketsink will by default not do QoS and will never
99  * drop late buffers.
100  */
101
102 #ifdef HAVE_CONFIG_H
103 #include "config.h"
104 #endif
105
106 #include <gst/gst-i18n-plugin.h>
107 #include <gst/net/gstnetcontrolmessagemeta.h>
108
109 #include <string.h>
110
111 #include "gstmultisocketsink.h"
112
113 #ifndef G_OS_WIN32
114 #include <netinet/in.h>
115 #endif
116
117 #define NOT_IMPLEMENTED 0
118
119 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
120 #define GST_CAT_DEFAULT (multisocketsink_debug)
121
122 /* MultiSocketSink signals and args */
123 enum
124 {
125   /* methods */
126   SIGNAL_ADD,
127   SIGNAL_ADD_BURST,
128   SIGNAL_REMOVE,
129   SIGNAL_REMOVE_FLUSH,
130   SIGNAL_GET_STATS,
131
132   /* signals */
133   SIGNAL_CLIENT_ADDED,
134   SIGNAL_CLIENT_REMOVED,
135   SIGNAL_CLIENT_SOCKET_REMOVED,
136
137   LAST_SIGNAL
138 };
139
140 #define DEFAULT_SEND_DISPATCHED FALSE
141 #define DEFAULT_SEND_MESSAGES   FALSE
142
143 enum
144 {
145   PROP_0,
146   PROP_SEND_DISPATCHED,
147   PROP_SEND_MESSAGES,
148   PROP_LAST
149 };
150
151 static void gst_multi_socket_sink_finalize (GObject * object);
152
153 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
154     GSocket * socket);
155 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
156     GSocket * socket, GstSyncMethod sync, GstFormat min_format,
157     guint64 min_value, GstFormat max_format, guint64 max_value);
158 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
159     GSocket * socket);
160 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
161     GSocket * socket);
162 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
163     GSocket * socket);
164
165 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
166     GstMultiSinkHandle handle);
167 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
168     GstMultiSinkHandle handle, GstClientStatus status);
169
170 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
171 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
172 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
173 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
174 static GstMultiHandleClient
175     * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
176     GstMultiSinkHandle handle, GstSyncMethod sync_method);
177 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
178 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
179     GstMultiHandleClient * client);
180 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
181     gchar debug[30]);
182
183 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
184     handle);
185 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
186     GstMultiHandleClient * mhclient);
187 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
188     GstMultiHandleClient * mhclient);
189 static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
190     GstSocketClient * client);
191
192 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
193     handle, GIOCondition condition, GstMultiSocketSink * sink);
194
195 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
196 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
197
198 static gboolean gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink,
199     GstQuery * query);
200
201 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
202     const GValue * value, GParamSpec * pspec);
203 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
204     GValue * value, GParamSpec * pspec);
205
206 #define gst_multi_socket_sink_parent_class parent_class
207 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
208     GST_TYPE_MULTI_HANDLE_SINK);
209
210 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
211
212 static void
213 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
214 {
215   GObjectClass *gobject_class;
216   GstElementClass *gstelement_class;
217   GstBaseSinkClass *gstbasesink_class;
218   GstMultiHandleSinkClass *gstmultihandlesink_class;
219
220   gobject_class = (GObjectClass *) klass;
221   gstelement_class = (GstElementClass *) klass;
222   gstbasesink_class = (GstBaseSinkClass *) klass;
223   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
224
225   gobject_class->set_property = gst_multi_socket_sink_set_property;
226   gobject_class->get_property = gst_multi_socket_sink_get_property;
227   gobject_class->finalize = gst_multi_socket_sink_finalize;
228
229   /**
230    * GstMultiSocketSink:send-dispatched:
231    *
232    * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
233    * is sent to a client.
234    * The event is a CUSTOM event name GstNetworkMessageDispatched and
235    * contains:
236    *
237    *   "object"  G_TYPE_OBJECT     : the object identifying the client
238    *   "buffer"  GST_TYPE_BUFFER   : the buffer sent to the client
239    *
240    * Since: 1.8.0
241    */
242   g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
243       g_param_spec_boolean ("send-dispatched", "Send Dispatched",
244           "If GstNetworkMessageDispatched events should be pushed",
245           DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246   /**
247    * GstMultiSocketSink:send-messages:
248    *
249    * Sends a GstNetworkMessage event upstream whenever a buffer
250    * is received from a client.
251    * The event is a CUSTOM event name GstNetworkMessage and contains:
252    *
253    *   "object"  G_TYPE_OBJECT     : the object identifying the client
254    *   "buffer"  GST_TYPE_BUFFER   : the buffer with data received from the
255    *                                 client
256    *
257    * Since: 1.8.0
258    */
259   g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
260       g_param_spec_boolean ("send-messages", "Send Messages",
261           "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
262           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263
264   /**
265    * GstMultiSocketSink::add:
266    * @gstmultisocketsink: the multisocketsink element to emit this signal on
267    * @socket:             the socket to add to multisocketsink
268    *
269    * Hand the given open socket to multisocketsink to write to.
270    */
271   gst_multi_socket_sink_signals[SIGNAL_ADD] =
272       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
273       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
274       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
275       NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
276   /**
277    * GstMultiSocketSink::add-full:
278    * @gstmultisocketsink: the multisocketsink element to emit this signal on
279    * @socket:         the socket to add to multisocketsink
280    * @sync:           the sync method to use
281    * @format_min:     the format of @value_min
282    * @value_min:      the minimum amount of data to burst expressed in
283    *                  @format_min units.
284    * @format_max:     the format of @value_max
285    * @value_max:      the maximum amount of data to burst expressed in
286    *                  @format_max units.
287    *
288    * Hand the given open socket to multisocketsink to write to and
289    * specify the burst parameters for the new connection.
290    */
291   gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
292       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
293       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
294       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
295       NULL, G_TYPE_NONE, 6, G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD,
296       GST_TYPE_FORMAT, G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64);
297   /**
298    * GstMultiSocketSink::remove:
299    * @gstmultisocketsink: the multisocketsink element to emit this signal on
300    * @socket:             the socket to remove from multisocketsink
301    *
302    * Remove the given open socket from multisocketsink.
303    */
304   gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
305       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
306       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
307       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL, NULL,
308       G_TYPE_NONE, 1, G_TYPE_SOCKET);
309   /**
310    * GstMultiSocketSink::remove-flush:
311    * @gstmultisocketsink: the multisocketsink element to emit this signal on
312    * @socket:             the socket to remove from multisocketsink
313    *
314    * Remove the given open socket from multisocketsink after flushing all
315    * the pending data to the socket.
316    */
317   gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
318       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
319       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
320       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL, NULL,
321       G_TYPE_NONE, 1, G_TYPE_SOCKET);
322
323   /**
324    * GstMultiSocketSink::get-stats:
325    * @gstmultisocketsink: the multisocketsink element to emit this signal on
326    * @socket:             the socket to get stats of from multisocketsink
327    *
328    * Get statistics about @socket. This function returns a GstStructure.
329    *
330    * Returns: a GstStructure with the statistics. The structure contains
331    *     values that represent: total number of bytes sent, time
332    *     when the client was added, time when the client was
333    *     disconnected/removed, time the client is/was active, last activity
334    *     time (in epoch seconds), number of buffers dropped.
335    *     All times are expressed in nanoseconds (GstClockTime).
336    */
337   gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
338       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
339       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
340       G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL, NULL,
341       GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
342
343   /**
344    * GstMultiSocketSink::client-added:
345    * @gstmultisocketsink: the multisocketsink element that emitted this signal
346    * @socket:             the socket that was added to multisocketsink
347    *
348    * The given socket was added to multisocketsink. This signal will
349    * be emitted from the streaming thread so application should be prepared
350    * for that.
351    */
352   gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
353       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
354       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_OBJECT);
355   /**
356    * GstMultiSocketSink::client-removed:
357    * @gstmultisocketsink: the multisocketsink element that emitted this signal
358    * @socket:             the socket that is to be removed from multisocketsink
359    * @status:             the reason why the client was removed
360    *
361    * The given socket is about to be removed from multisocketsink. This
362    * signal will be emitted from the streaming thread so applications should
363    * be prepared for that.
364    *
365    * @gstmultisocketsink still holds a handle to @socket so it is possible to call
366    * the get-stats signal from this callback. For the same reason it is
367    * not safe to `close()` and reuse @socket in this callback.
368    */
369   gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
370       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
371       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_SOCKET,
372       GST_TYPE_CLIENT_STATUS);
373   /**
374    * GstMultiSocketSink::client-socket-removed:
375    * @gstmultisocketsink: the multisocketsink element that emitted this signal
376    * @socket:             the socket that was removed from multisocketsink
377    *
378    * The given socket was removed from multisocketsink. This signal will
379    * be emitted from the streaming thread so applications should be prepared
380    * for that.
381    *
382    * In this callback, @gstmultisocketsink has removed all the information
383    * associated with @socket and it is therefore not possible to call get-stats
384    * with @socket. It is however safe to `close()` and reuse @fd in the callback.
385    */
386   gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
387       g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
388       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
389
390   gst_element_class_set_static_metadata (gstelement_class,
391       "Multi socket sink", "Sink/Network",
392       "Send data to multiple sockets",
393       "Thomas Vander Stichele <thomas at apestaart dot org>, "
394       "Wim Taymans <wim@fluendo.com>, "
395       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
396
397   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
398   gstbasesink_class->unlock_stop =
399       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
400   gstbasesink_class->propose_allocation =
401       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_propose_allocation);
402
403   klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
404   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
405   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
406   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
407   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
408
409   gstmultihandlesink_class->emit_client_added =
410       gst_multi_socket_sink_emit_client_added;
411   gstmultihandlesink_class->emit_client_removed =
412       gst_multi_socket_sink_emit_client_removed;
413
414   gstmultihandlesink_class->stop_pre =
415       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
416   gstmultihandlesink_class->stop_post =
417       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
418   gstmultihandlesink_class->start_pre =
419       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
420   gstmultihandlesink_class->thread =
421       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
422   gstmultihandlesink_class->new_client =
423       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
424   gstmultihandlesink_class->client_get_fd =
425       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
426   gstmultihandlesink_class->client_free =
427       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
428   gstmultihandlesink_class->handle_debug =
429       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
430   gstmultihandlesink_class->handle_hash_key =
431       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
432   gstmultihandlesink_class->hash_adding =
433       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
434   gstmultihandlesink_class->hash_removing =
435       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
436
437   GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
438       "Multi socket sink");
439 }
440
441 static void
442 gst_multi_socket_sink_init (GstMultiSocketSink * this)
443 {
444   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
445
446   mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
447
448   this->cancellable = g_cancellable_new ();
449   this->send_dispatched = DEFAULT_SEND_DISPATCHED;
450   this->send_messages = DEFAULT_SEND_MESSAGES;
451 }
452
453 static void
454 gst_multi_socket_sink_finalize (GObject * object)
455 {
456   GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
457
458   if (this->cancellable) {
459     g_object_unref (this->cancellable);
460     this->cancellable = NULL;
461   }
462
463   G_OBJECT_CLASS (parent_class)->finalize (object);
464 }
465
466 /* methods to emit signals */
467
468 static void
469 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
470     GstMultiSinkHandle handle)
471 {
472   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
473       handle.socket);
474 }
475
476 static void
477 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
478     GstMultiSinkHandle handle, GstClientStatus status)
479 {
480   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
481       0, handle.socket, status);
482 }
483
484 /* action signals */
485
486 static void
487 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
488 {
489   GstMultiSinkHandle handle;
490
491   handle.socket = socket;
492   gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
493 }
494
495 static void
496 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
497     GstSyncMethod sync, GstFormat min_format, guint64 min_value,
498     GstFormat max_format, guint64 max_value)
499 {
500   GstMultiSinkHandle handle;
501
502   handle.socket = socket;
503   gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
504       sync, min_format, min_value, max_format, max_value);
505 }
506
507 static void
508 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
509 {
510   GstMultiSinkHandle handle;
511
512   handle.socket = socket;
513   gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
514 }
515
516 static void
517 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
518 {
519   GstMultiSinkHandle handle;
520
521   handle.socket = socket;
522   gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
523       handle);
524 }
525
526 static GstStructure *
527 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
528 {
529   GstMultiSinkHandle handle;
530
531   handle.socket = socket;
532   return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
533       handle);
534 }
535
536 static GstMultiHandleClient *
537 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
538     GstMultiSinkHandle handle, GstSyncMethod sync_method)
539 {
540   GstSocketClient *client;
541   GstMultiHandleClient *mhclient;
542   GstMultiHandleSinkClass *mhsinkclass =
543       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
544
545   /* create client datastructure */
546   g_assert (G_IS_SOCKET (handle.socket));
547   client = g_new0 (GstSocketClient, 1);
548   mhclient = (GstMultiHandleClient *) client;
549
550   mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
551
552   gst_multi_handle_sink_client_init (mhclient, sync_method);
553   mhsinkclass->handle_debug (handle, mhclient->debug);
554
555   /* set the socket to non blocking */
556   g_socket_set_blocking (handle.socket, FALSE);
557
558   /* we always read from a client */
559   mhsinkclass->hash_adding (mhsink, mhclient);
560
561   gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
562
563   return mhclient;
564 }
565
566 static int
567 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
568 {
569   return g_socket_get_fd (client->handle.socket);
570 }
571
572 static void
573 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
574     GstMultiHandleClient * client)
575 {
576   g_assert (G_IS_SOCKET (client->handle.socket));
577
578   g_signal_emit (mhsink,
579       gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
580       client->handle.socket);
581
582   g_object_unref (client->handle.socket);
583 }
584
585 static void
586 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
587 {
588   g_snprintf (debug, 30, "[socket %p]", handle.socket);
589 }
590
591 static gpointer
592 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
593 {
594   return handle.socket;
595 }
596
597 /* handle a read on a client socket,
598  * which either indicates a close or should be ignored
599  * returns FALSE if some error occurred or the client closed. */
600 static gboolean
601 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
602     GstSocketClient * client)
603 {
604   gboolean ret, do_event;
605   gchar dummy[256], *mem, *omem;
606   gssize nread;
607   GError *err = NULL;
608   gboolean first = TRUE;
609   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
610   gssize navail, maxmem;
611
612   GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
613
614   ret = TRUE;
615
616   navail = g_socket_get_available_bytes (mhclient->handle.socket);
617   if (navail < 0)
618     return TRUE;
619
620   /* only collect the data in a buffer when we need to send it with an event */
621   do_event = sink->send_messages && navail > 0;
622   if (do_event) {
623     omem = mem = g_malloc (navail);
624     maxmem = navail;
625   } else {
626     mem = dummy;
627     maxmem = sizeof (dummy);
628   }
629
630   /* just Read 'n' Drop, could also just drop the client as it's not supposed
631    * to write to us except for closing the socket, I guess it's because we
632    * like to listen to our customers. */
633   do {
634     GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
635
636     nread =
637         g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
638             maxmem), sink->cancellable, &err);
639
640     if (first && nread == 0) {
641       /* client sent close, so remove it */
642       GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
643           mhclient->debug);
644       mhclient->status = GST_CLIENT_STATUS_CLOSED;
645       ret = FALSE;
646       break;
647     } else if (nread < 0) {
648       if (err->code == G_IO_ERROR_WOULD_BLOCK)
649         break;
650
651       GST_WARNING_OBJECT (sink, "%s could not read: %s",
652           mhclient->debug, err->message);
653       mhclient->status = GST_CLIENT_STATUS_ERROR;
654       ret = FALSE;
655       break;
656     }
657     navail -= nread;
658     if (do_event)
659       mem += nread;
660     first = FALSE;
661   } while (navail > 0);
662   g_clear_error (&err);
663
664   if (do_event) {
665     if (ret) {
666       GstBuffer *buf;
667       GstEvent *ev;
668
669       buf = gst_buffer_new_wrapped (omem, maxmem);
670       ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
671           gst_structure_new ("GstNetworkMessage",
672               "object", G_TYPE_OBJECT, mhclient->handle.socket,
673               "buffer", GST_TYPE_BUFFER, buf, NULL));
674       gst_buffer_unref (buf);
675
676       gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
677     } else
678       g_free (omem);
679   }
680   return ret;
681 }
682
683 /**
684  * map_memory_output_vector_n:
685  * @buf: The #GstBuffer that should be mapped
686  * @offset: Offset into the buffer that should be mapped
687  * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
688  * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
689  * @num_vectors: the number of elements in @vectors to prevent buffer overruns
690  *
691  * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
692  * I/O to send the data over a socket.  The whole buffer won't be mapped into
693  * memory if it consists of more than @num_vectors #GstMemory s.
694  *
695  * Use #unmap_n_memorys after you are
696  * finished with the mappings.
697  *
698  * Returns: The number of GstMemorys mapped
699  */
700 static int
701 map_n_memory_output_vector (GstBuffer * buf, size_t offset,
702     GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
703 {
704   guint mem_idx, mem_len;
705   gsize mem_skip;
706   size_t maxsize;
707   int i;
708
709   g_return_val_if_fail (num_vectors > 0, 0);
710   memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
711
712   maxsize = gst_buffer_get_size (buf) - offset;
713   if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
714           &mem_skip))
715     g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
716         "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
717
718   for (i = 0; i < mem_len && i < num_vectors; i++) {
719     GstMapInfo map = { 0 };
720     GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
721     if (!gst_memory_map (mem, &map, GST_MAP_READ))
722       g_error ("Unable to map memory %p.  This should never happen.", mem);
723
724     if (i == 0) {
725       vectors[i].buffer = map.data + mem_skip;
726       vectors[i].size = map.size - mem_skip;
727     } else {
728       vectors[i].buffer = map.data;
729       vectors[i].size = map.size;
730     }
731     mapinfo[i] = map;
732   }
733   return i;
734 }
735
736 /**
737  * map_n_memory_output_vector:
738  * @buf: The #GstBuffer that should be mapped
739  * @offset: Offset into the buffer that should be mapped
740  * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
741  * @num_vectors: the number of elements in @vectors to prevent buffer overruns
742  *
743  * Returns: The number of GstMemorys mapped
744  */
745 static void
746 unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
747 {
748   int i;
749   g_return_if_fail (num_mappings > 0);
750
751   for (i = 0; i < num_mappings; i++)
752     gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
753 }
754
755 static gsize
756 gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
757     gsize msg_space)
758 {
759   gpointer iter_state = NULL;
760   GstMeta *meta;
761   gsize msg_count = 0;
762
763   while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
764       && msg_count < msg_space) {
765     if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
766       msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
767   }
768
769   return msg_count;
770 }
771
772 #define CMSG_MAX 255
773
774 static gssize
775 gst_multi_socket_sink_write (GstMultiSocketSink * sink,
776     GSocket * sock, GstBuffer * buffer, gsize bufoffset,
777     GCancellable * cancellable, GError ** err)
778 {
779   GstMapInfo maps[8];
780   GOutputVector vec[8];
781   guint mems_mapped;
782   gssize wrote;
783   GSocketControlMessage *cmsgs[CMSG_MAX];
784   gsize msg_count;
785
786   mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
787
788   msg_count = gst_buffer_get_cmsg_list (buffer, cmsgs, CMSG_MAX);
789
790   wrote =
791       g_socket_send_message (sock, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
792       cancellable, err);
793   unmap_n_memorys (maps, mems_mapped);
794   return wrote;
795 }
796
797 /* Handle a write on a client,
798  * which indicates a read request from a client.
799  *
800  * For each client we maintain a queue of GstBuffers that contain the raw bytes
801  * we need to send to the client.
802  *
803  * We first check to see if we need to send streamheaders. If so, we queue them.
804  *
805  * Then we run into the main loop that tries to send as many buffers as
806  * possible. It will first exhaust the mhclient->sending queue and if the queue
807  * is empty, it will pick a buffer from the global queue.
808  *
809  * Sending the buffers from the mhclient->sending queue is basically writing
810  * the bytes to the socket and maintaining a count of the bytes that were
811  * sent. When the buffer is completely sent, it is removed from the
812  * mhclient->sending queue and we try to pick a new buffer for sending.
813  *
814  * When the sending returns a partial buffer we stop sending more data as
815  * the next send operation could block.
816  *
817  * This functions returns FALSE if some error occurred.
818  */
819 static gboolean
820 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
821     GstSocketClient * client)
822 {
823   gboolean more;
824   gboolean flushing;
825   GstClockTime now;
826   GError *err = NULL;
827   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
828   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
829   GstMultiHandleSinkClass *mhsinkclass =
830       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
831
832
833   now = g_get_real_time () * GST_USECOND;
834
835   flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
836
837   more = TRUE;
838   do {
839     if (!mhclient->sending) {
840       /* client is not working on a buffer */
841       if (mhclient->bufpos == -1) {
842         /* client is too fast, remove from write queue until new buffer is
843          * available */
844         gst_multi_socket_sink_stop_sending (sink, client);
845
846         /* if we flushed out all of the client buffers, we can stop */
847         if (mhclient->flushcount == 0)
848           goto flushed;
849
850         return TRUE;
851       } else {
852         /* client can pick a buffer from the global queue */
853         GstBuffer *buf;
854         GstClockTime timestamp;
855
856         /* for new connections, we need to find a good spot in the
857          * bufqueue to start streaming from */
858         if (mhclient->new_connection && !flushing) {
859           gint position =
860               gst_multi_handle_sink_new_client_position (mhsink, mhclient);
861
862           if (position >= 0) {
863             /* we got a valid spot in the queue */
864             mhclient->new_connection = FALSE;
865             mhclient->bufpos = position;
866           } else {
867             /* cannot send data to this client yet */
868             gst_multi_socket_sink_stop_sending (sink, client);
869             return TRUE;
870           }
871         }
872
873         /* we flushed all remaining buffers, no need to get a new one */
874         if (mhclient->flushcount == 0)
875           goto flushed;
876
877         /* grab buffer */
878         buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
879         mhclient->bufpos--;
880
881         /* update stats */
882         timestamp = GST_BUFFER_TIMESTAMP (buf);
883         if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
884           mhclient->first_buffer_ts = timestamp;
885         if (timestamp != -1)
886           mhclient->last_buffer_ts = timestamp;
887
888         /* decrease flushcount */
889         if (mhclient->flushcount != -1)
890           mhclient->flushcount--;
891
892         GST_LOG_OBJECT (sink, "%s client %p at position %d",
893             mhclient->debug, client, mhclient->bufpos);
894
895         /* queueing a buffer will ref it */
896         mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
897
898         /* need to start from the first byte for this new buffer */
899         mhclient->bufoffset = 0;
900       }
901     }
902
903     /* see if we need to send something */
904     if (mhclient->sending) {
905       gssize wrote;
906       GstBuffer *head;
907
908       /* pick first buffer from list */
909       head = GST_BUFFER (mhclient->sending->data);
910
911       wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
912           mhclient->bufoffset, sink->cancellable, &err);
913
914       if (wrote < 0) {
915         /* hmm error.. */
916         if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
917           goto connection_reset;
918         } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
919           /* write would block, try again later */
920           GST_LOG_OBJECT (sink, "write would block %p",
921               mhclient->handle.socket);
922           more = FALSE;
923           g_clear_error (&err);
924         } else {
925           goto write_error;
926         }
927       } else {
928         if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
929           /* partial write, try again now */
930           GST_LOG_OBJECT (sink,
931               "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
932               mhclient->handle.socket, wrote);
933           mhclient->bufoffset += wrote;
934         } else {
935           if (sink->send_dispatched) {
936             gst_pad_push_event (GST_BASE_SINK_PAD (mhsink),
937                 gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
938                     gst_structure_new ("GstNetworkMessageDispatched",
939                         "object", G_TYPE_OBJECT, mhclient->handle.socket,
940                         "buffer", GST_TYPE_BUFFER, head, NULL)));
941           }
942           /* complete buffer was written, we can proceed to the next one */
943           mhclient->sending = g_slist_remove (mhclient->sending, head);
944           gst_buffer_unref (head);
945           /* make sure we start from byte 0 for the next buffer */
946           mhclient->bufoffset = 0;
947         }
948         /* update stats */
949         mhclient->bytes_sent += wrote;
950         mhclient->last_activity_time = now;
951         mhsink->bytes_served += wrote;
952       }
953     }
954   } while (more);
955
956   return TRUE;
957
958   /* ERRORS */
959 flushed:
960   {
961     GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
962     mhclient->status = GST_CLIENT_STATUS_REMOVED;
963     return FALSE;
964   }
965 connection_reset:
966   {
967     GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
968         mhclient->debug);
969     mhclient->status = GST_CLIENT_STATUS_CLOSED;
970     g_clear_error (&err);
971     return FALSE;
972   }
973 write_error:
974   {
975     GST_WARNING_OBJECT (sink,
976         "%s could not write, removing client: %s", mhclient->debug,
977         err->message);
978     g_clear_error (&err);
979     mhclient->status = GST_CLIENT_STATUS_ERROR;
980     return FALSE;
981   }
982 }
983
984 static void
985 ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
986     GIOCondition condition)
987 {
988   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
989
990   if (client->condition == condition)
991     return;
992
993   if (client->source) {
994     g_source_destroy (client->source);
995     g_source_unref (client->source);
996   }
997   if (condition && sink->main_context) {
998     client->source = g_socket_create_source (mhclient->handle.socket,
999         condition, sink->cancellable);
1000     g_source_set_callback (client->source,
1001         (GSourceFunc) gst_multi_socket_sink_socket_condition,
1002         gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1003     g_source_attach (client->source, sink->main_context);
1004   } else {
1005     client->source = NULL;
1006     condition = 0;
1007   }
1008   client->condition = condition;
1009 }
1010
1011 static void
1012 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
1013     GstMultiHandleClient * mhclient)
1014 {
1015   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1016   GstSocketClient *client = (GstSocketClient *) (mhclient);
1017
1018   ensure_condition (sink, client,
1019       G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1020 }
1021
1022 static void
1023 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
1024     GstMultiHandleClient * mhclient)
1025 {
1026   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1027   GstSocketClient *client = (GstSocketClient *) (mhclient);
1028
1029   ensure_condition (sink, client, 0);
1030 }
1031
1032 static void
1033 gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
1034     GstSocketClient * client)
1035 {
1036   ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1037 }
1038
1039 /* Handle the clients. This is called when a socket becomes ready
1040  * to read or writable. Badly behaving clients are put on a
1041  * garbage list and removed.
1042  */
1043 static gboolean
1044 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
1045     GIOCondition condition, GstMultiSocketSink * sink)
1046 {
1047   GList *clink;
1048   GstSocketClient *client;
1049   gboolean ret = TRUE;
1050   GstMultiHandleClient *mhclient;
1051   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1052   GstMultiHandleSinkClass *mhsinkclass =
1053       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1054
1055   CLIENTS_LOCK (mhsink);
1056   clink = g_hash_table_lookup (mhsink->handle_hash,
1057       mhsinkclass->handle_hash_key (handle));
1058   if (clink == NULL) {
1059     ret = FALSE;
1060     goto done;
1061   }
1062
1063   client = clink->data;
1064   mhclient = (GstMultiHandleClient *) client;
1065
1066   if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1067       && mhclient->status != GST_CLIENT_STATUS_OK) {
1068     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1069     ret = FALSE;
1070     goto done;
1071   }
1072
1073   if ((condition & G_IO_ERR)) {
1074     GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
1075     mhclient->status = GST_CLIENT_STATUS_ERROR;
1076     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1077     ret = FALSE;
1078     goto done;
1079   } else if ((condition & G_IO_HUP)) {
1080     mhclient->status = GST_CLIENT_STATUS_CLOSED;
1081     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1082     ret = FALSE;
1083     goto done;
1084   }
1085   if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
1086     /* handle client read */
1087     if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
1088       gst_multi_handle_sink_remove_client_link (mhsink, clink);
1089       ret = FALSE;
1090       goto done;
1091     }
1092   }
1093   if ((condition & G_IO_OUT)) {
1094     /* handle client write */
1095     if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
1096       gst_multi_handle_sink_remove_client_link (mhsink, clink);
1097       ret = FALSE;
1098       goto done;
1099     }
1100   }
1101
1102 done:
1103   CLIENTS_UNLOCK (mhsink);
1104
1105   return ret;
1106 }
1107
1108 static gboolean
1109 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
1110 {
1111   GstClockTime now;
1112   GList *clients;
1113   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1114
1115   now = g_get_real_time () * GST_USECOND;
1116
1117   CLIENTS_LOCK (mhsink);
1118   for (clients = mhsink->clients; clients; clients = clients->next) {
1119     GstSocketClient *client;
1120     GstMultiHandleClient *mhclient;
1121
1122     client = clients->data;
1123     mhclient = (GstMultiHandleClient *) client;
1124     if (mhsink->timeout > 0
1125         && now - mhclient->last_activity_time > mhsink->timeout) {
1126       mhclient->status = GST_CLIENT_STATUS_SLOW;
1127       gst_multi_handle_sink_remove_client_link (mhsink, clients);
1128     }
1129   }
1130   CLIENTS_UNLOCK (mhsink);
1131
1132   return FALSE;
1133 }
1134
1135 /* we handle the client communication in another thread so that we do not block
1136  * the gstreamer thread while we select() on the client fds */
1137 static gpointer
1138 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
1139 {
1140   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1141   GSource *timeout = NULL;
1142
1143   while (mhsink->running) {
1144     if (mhsink->timeout > 0) {
1145       timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
1146
1147       g_source_set_callback (timeout,
1148           (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
1149           (GDestroyNotify) gst_object_unref);
1150       g_source_attach (timeout, sink->main_context);
1151     }
1152
1153     /* Returns after handling all pending events or when
1154      * _wakeup() was called. In any case we have to add
1155      * a new timeout because something happened.
1156      */
1157     g_main_context_iteration (sink->main_context, TRUE);
1158
1159     if (timeout) {
1160       g_source_destroy (timeout);
1161       g_source_unref (timeout);
1162     }
1163   }
1164
1165   return NULL;
1166 }
1167
1168 static void
1169 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
1170     const GValue * value, GParamSpec * pspec)
1171 {
1172   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1173
1174   switch (prop_id) {
1175     case PROP_SEND_DISPATCHED:
1176       sink->send_dispatched = g_value_get_boolean (value);
1177       break;
1178     case PROP_SEND_MESSAGES:
1179       sink->send_messages = g_value_get_boolean (value);
1180       break;
1181     default:
1182       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1183       break;
1184   }
1185 }
1186
1187 static void
1188 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
1189     GValue * value, GParamSpec * pspec)
1190 {
1191   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1192
1193   switch (prop_id) {
1194     case PROP_SEND_DISPATCHED:
1195       g_value_set_boolean (value, sink->send_dispatched);
1196       break;
1197     case PROP_SEND_MESSAGES:
1198       g_value_set_boolean (value, sink->send_messages);
1199       break;
1200     default:
1201       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1202       break;
1203   }
1204 }
1205
1206 static gboolean
1207 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
1208 {
1209   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1210   GstMultiHandleSinkClass *mhsinkclass =
1211       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1212   GList *clients;
1213
1214   GST_INFO_OBJECT (mssink, "starting");
1215
1216   mssink->main_context = g_main_context_new ();
1217
1218   CLIENTS_LOCK (mhsink);
1219   for (clients = mhsink->clients; clients; clients = clients->next) {
1220     GstSocketClient *client = clients->data;
1221     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1222
1223     if (client->source)
1224       continue;
1225     mhsinkclass->hash_adding (mhsink, mhclient);
1226   }
1227   CLIENTS_UNLOCK (mhsink);
1228
1229   return TRUE;
1230 }
1231
1232 static gboolean
1233 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1234 {
1235   return TRUE;
1236 }
1237
1238 static void
1239 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1240 {
1241   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1242
1243   if (mssink->main_context)
1244     g_main_context_wakeup (mssink->main_context);
1245 }
1246
1247 static void
1248 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1249 {
1250   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1251
1252   if (mssink->main_context) {
1253     g_main_context_unref (mssink->main_context);
1254     mssink->main_context = NULL;
1255   }
1256
1257   g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1258       mssink);
1259 }
1260
1261 static gboolean
1262 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1263 {
1264   GstMultiSocketSink *sink;
1265
1266   sink = GST_MULTI_SOCKET_SINK (bsink);
1267
1268   GST_DEBUG_OBJECT (sink, "set to flushing");
1269   g_cancellable_cancel (sink->cancellable);
1270   if (sink->main_context)
1271     g_main_context_wakeup (sink->main_context);
1272
1273   return TRUE;
1274 }
1275
1276 /* will be called only between calls to start() and stop() */
1277 static gboolean
1278 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1279 {
1280   GstMultiSocketSink *sink;
1281
1282   sink = GST_MULTI_SOCKET_SINK (bsink);
1283
1284   GST_DEBUG_OBJECT (sink, "unset flushing");
1285   g_object_unref (sink->cancellable);
1286   sink->cancellable = g_cancellable_new ();
1287
1288   return TRUE;
1289 }
1290
1291 static gboolean
1292 gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
1293 {
1294   /* we support some meta */
1295   gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE,
1296       NULL);
1297
1298   return TRUE;
1299 }