subparse: fix off by one offset calculation
[platform/upstream/gstreamer.git] / gst / tcp / gstmultisocketsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  * Copyright (C) <2011> Collabora Ltd.
6  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-multisocketsink
26  * @see_also: tcpserversink
27  *
28  * This plugin writes incoming data to a set of file descriptors. The
29  * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal. 
30  * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
31  *
32  * As of version 0.10.8, a client can also be added with the #GstMultiSocketSink::add-full signal
33  * that allows for more control over what and how much data a client 
34  * initially receives.
35  *
36  * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
37  * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
38  * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
39  * client is not active anymore or, depending on the value of the
40  * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
41  * In all cases, multisocketsink will never close a file descriptor itself.
42  * The user of multisocketsink is responsible for closing all file descriptors.
43  * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal.
44  * Note that multisocketsink still has a reference to the file descriptor when the
45  * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
46  * the descriptor; it is therefore not safe to close the file descriptor in
47  * the #GstMultiSocketSink::client-removed signal handler, and you should use the 
48  * #GstMultiSocketSink::client-fd-removed signal to safely close the fd.
49  *
50  * Multisocketsink internally keeps a queue of the incoming buffers and uses a
51  * separate thread to send the buffers to the clients. This ensures that no
52  * client write can block the pipeline and that clients can read with different
53  * speeds.
54  *
55  * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
56  * which buffer in the queued buffers will be sent first to the client. Clients 
57  * can be sent the most recent buffer (which might not be decodable by the 
58  * client if it is not a keyframe), the next keyframe received in 
59  * multisocketsink (which can take some time depending on the keyframe rate), or the
60  * last received keyframe (which will cause a simple burst-on-connect). 
61  * Multisocketsink will always keep at least one keyframe in its internal buffers
62  * when the sync-mode is set to latest-keyframe.
63  *
64  * As of version 0.10.8, there are additional values for the #GstMultiSocketSink:sync-method 
65  * property to allow finer control over burst-on-connect behaviour. By selecting
66  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
67  * additionally requires that the burst begin with a keyframe, and 
68  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
69  * prefer a minimum burst size even if it requires not starting with a keyframe.
70  *
71  * Multisocketsink can be instructed to keep at least a minimum amount of data
72  * expressed in time or byte units in its internal queues with the 
73  * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
74  * These properties are useful if the application adds clients with the 
75  * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
76  * actually be honored. 
77  *
78  * When streaming data, clients are allowed to read at a different rate than
79  * the rate at which multisocketsink receives data. If the client is reading too
80  * fast, no data will be send to the client until multisocketsink receives more
81  * data. If the client, however, reads too slowly, data for that client will be 
82  * queued up in multisocketsink. Two properties control the amount of data 
83  * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and 
84  * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
85  * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
86  *
87  * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
88  * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
89  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
90  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
91  * positions the client to the soft limit in the buffer queue and
92  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
93  * buffer queue.
94  *
95  * multisocketsink will by default synchronize on the clock before serving the 
96  * buffers to the clients. This behaviour can be disabled by setting the sync 
97  * property to FALSE. Multisocketsink will by default not do QoS and will never
98  * drop late buffers.
99  *
100  * Last reviewed on 2006-09-12 (0.10.10)
101  */
102
103 #ifdef HAVE_CONFIG_H
104 #include "config.h"
105 #endif
106
107 #include <gst/gst-i18n-plugin.h>
108
109 #include <string.h>
110
111 #include "gstmultisocketsink.h"
112 #include "gsttcp-marshal.h"
113
114 #ifndef G_OS_WIN32
115 #include <netinet/in.h>
116 #endif
117
118 #define NOT_IMPLEMENTED 0
119
120 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
121 #define GST_CAT_DEFAULT (multisocketsink_debug)
122
123 /* MultiSocketSink signals and args */
124 enum
125 {
126   /* methods */
127   SIGNAL_ADD,
128   SIGNAL_ADD_BURST,
129   SIGNAL_REMOVE,
130   SIGNAL_REMOVE_FLUSH,
131   SIGNAL_GET_STATS,
132
133   /* signals */
134   SIGNAL_CLIENT_ADDED,
135   SIGNAL_CLIENT_REMOVED,
136   SIGNAL_CLIENT_SOCKET_REMOVED,
137
138   LAST_SIGNAL
139 };
140
141 enum
142 {
143   PROP_0,
144
145   PROP_LAST
146 };
147
148 static void gst_multi_socket_sink_finalize (GObject * object);
149
150 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
151     GSocket * socket);
152 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
153     GSocket * socket, GstSyncMethod sync, GstFormat min_format,
154     guint64 min_value, GstFormat max_format, guint64 max_value);
155 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
156     GSocket * socket);
157 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
158     GSocket * socket);
159 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
160     GSocket * socket);
161
162 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
163     GstMultiSinkHandle handle);
164 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
165     GstMultiSinkHandle handle, GstClientStatus status);
166
167 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
168 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
169 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
170 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
171 static GstMultiHandleClient
172     * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
173     GstMultiSinkHandle handle, GstSyncMethod sync_method);
174 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
175 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
176     GstMultiHandleClient * client);
177 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
178     gchar debug[30]);
179
180 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
181     handle);
182 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
183     GstMultiHandleClient * mhclient);
184 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
185     GstMultiHandleClient * mhclient);
186
187 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
188     handle, GIOCondition condition, GstMultiSocketSink * sink);
189
190 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
191 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
192
193 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
194     const GValue * value, GParamSpec * pspec);
195 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
196     GValue * value, GParamSpec * pspec);
197
198 #define gst_multi_socket_sink_parent_class parent_class
199 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
200     GST_TYPE_MULTI_HANDLE_SINK);
201
202 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
203
204 static void
205 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
206 {
207   GObjectClass *gobject_class;
208   GstElementClass *gstelement_class;
209   GstBaseSinkClass *gstbasesink_class;
210   GstMultiHandleSinkClass *gstmultihandlesink_class;
211
212   gobject_class = (GObjectClass *) klass;
213   gstelement_class = (GstElementClass *) klass;
214   gstbasesink_class = (GstBaseSinkClass *) klass;
215   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
216
217   gobject_class->set_property = gst_multi_socket_sink_set_property;
218   gobject_class->get_property = gst_multi_socket_sink_get_property;
219   gobject_class->finalize = gst_multi_socket_sink_finalize;
220
221   /**
222    * GstMultiSocketSink::add:
223    * @gstmultisocketsink: the multisocketsink element to emit this signal on
224    * @socket:             the socket to add to multisocketsink
225    *
226    * Hand the given open socket to multisocketsink to write to.
227    */
228   gst_multi_socket_sink_signals[SIGNAL_ADD] =
229       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
230       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
231       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
232       g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
233   /**
234    * GstMultiSocketSink::add-full:
235    * @gstmultisocketsink: the multisocketsink element to emit this signal on
236    * @socket:         the socket to add to multisocketsink
237    * @sync:           the sync method to use
238    * @format_min:     the format of @value_min
239    * @value_min:      the minimum amount of data to burst expressed in
240    *                  @format_min units.
241    * @format_max:     the format of @value_max
242    * @value_max:      the maximum amount of data to burst expressed in
243    *                  @format_max units.
244    *
245    * Hand the given open socket to multisocketsink to write to and
246    * specify the burst parameters for the new connection.
247    */
248   gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
249       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
250       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
251       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
252       gst_tcp_marshal_VOID__OBJECT_ENUM_ENUM_UINT64_ENUM_UINT64, G_TYPE_NONE, 6,
253       G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64,
254       GST_TYPE_FORMAT, G_TYPE_UINT64);
255   /**
256    * GstMultiSocketSink::remove:
257    * @gstmultisocketsink: the multisocketsink element to emit this signal on
258    * @socket:             the socket to remove from multisocketsink
259    *
260    * Remove the given open socket from multisocketsink.
261    */
262   gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
263       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
264       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
265       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL,
266       g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
267   /**
268    * GstMultiSocketSink::remove-flush:
269    * @gstmultisocketsink: the multisocketsink element to emit this signal on
270    * @socket:             the socket to remove from multisocketsink
271    *
272    * Remove the given open socket from multisocketsink after flushing all
273    * the pending data to the socket.
274    */
275   gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
276       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
277       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
278       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL,
279       g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
280
281   /**
282    * GstMultiSocketSink::get-stats:
283    * @gstmultisocketsink: the multisocketsink element to emit this signal on
284    * @socket:             the socket to get stats of from multisocketsink
285    *
286    * Get statistics about @socket. This function returns a GstStructure.
287    *
288    * Returns: a GstStructure with the statistics. The structure contains
289    *     values that represent: total number of bytes sent, time
290    *     when the client was added, time when the client was
291    *     disconnected/removed, time the client is/was active, last activity
292    *     time (in epoch seconds), number of buffers dropped.
293    *     All times are expressed in nanoseconds (GstClockTime).
294    */
295   gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
296       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
297       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
298       G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL,
299       gst_tcp_marshal_BOXED__OBJECT, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
300
301   /**
302    * GstMultiSocketSink::client-added:
303    * @gstmultisocketsink: the multisocketsink element that emitted this signal
304    * @socket:             the socket that was added to multisocketsink
305    *
306    * The given socket was added to multisocketsink. This signal will
307    * be emitted from the streaming thread so application should be prepared
308    * for that.
309    */
310   gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
311       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
312       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
313       G_TYPE_NONE, 1, G_TYPE_OBJECT);
314   /**
315    * GstMultiSocketSink::client-removed:
316    * @gstmultisocketsink: the multisocketsink element that emitted this signal
317    * @socket:             the socket that is to be removed from multisocketsink
318    * @status:             the reason why the client was removed
319    *
320    * The given socket is about to be removed from multisocketsink. This
321    * signal will be emitted from the streaming thread so applications should
322    * be prepared for that.
323    *
324    * @gstmultisocketsink still holds a handle to @socket so it is possible to call
325    * the get-stats signal from this callback. For the same reason it is
326    * not safe to close() and reuse @socket in this callback.
327    */
328   gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
329       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
330       G_SIGNAL_RUN_LAST, 0, NULL, NULL, gst_tcp_marshal_VOID__OBJECT_ENUM,
331       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
332   /**
333    * GstMultiSocketSink::client-socket-removed:
334    * @gstmultisocketsink: the multisocketsink element that emitted this signal
335    * @socket:             the socket that was removed from multisocketsink
336    *
337    * The given socket was removed from multisocketsink. This signal will
338    * be emitted from the streaming thread so applications should be prepared
339    * for that.
340    *
341    * In this callback, @gstmultisocketsink has removed all the information
342    * associated with @socket and it is therefore not possible to call get-stats
343    * with @socket. It is however safe to close() and reuse @fd in the callback.
344    *
345    * Since: 0.10.7
346    */
347   gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
348       g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
349       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
350       G_TYPE_NONE, 1, G_TYPE_SOCKET);
351
352   gst_element_class_set_static_metadata (gstelement_class,
353       "Multi socket sink", "Sink/Network",
354       "Send data to multiple sockets",
355       "Thomas Vander Stichele <thomas at apestaart dot org>, "
356       "Wim Taymans <wim@fluendo.com>, "
357       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
358
359   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
360   gstbasesink_class->unlock_stop =
361       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
362
363   klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
364   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
365   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
366   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
367   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
368
369   gstmultihandlesink_class->emit_client_added =
370       gst_multi_socket_sink_emit_client_added;
371   gstmultihandlesink_class->emit_client_removed =
372       gst_multi_socket_sink_emit_client_removed;
373
374   gstmultihandlesink_class->stop_pre =
375       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
376   gstmultihandlesink_class->stop_post =
377       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
378   gstmultihandlesink_class->start_pre =
379       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
380   gstmultihandlesink_class->thread =
381       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
382   gstmultihandlesink_class->new_client =
383       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
384   gstmultihandlesink_class->client_get_fd =
385       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
386   gstmultihandlesink_class->client_free =
387       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
388   gstmultihandlesink_class->handle_debug =
389       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
390   gstmultihandlesink_class->handle_hash_key =
391       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
392   gstmultihandlesink_class->hash_adding =
393       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
394   gstmultihandlesink_class->hash_removing =
395       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
396
397   GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
398       "Multi socket sink");
399 }
400
401 static void
402 gst_multi_socket_sink_init (GstMultiSocketSink * this)
403 {
404   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
405
406   mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
407
408   this->cancellable = g_cancellable_new ();
409 }
410
411 static void
412 gst_multi_socket_sink_finalize (GObject * object)
413 {
414   GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
415
416   if (this->cancellable) {
417     g_object_unref (this->cancellable);
418     this->cancellable = NULL;
419   }
420
421   G_OBJECT_CLASS (parent_class)->finalize (object);
422 }
423
424 /* methods to emit signals */
425
426 static void
427 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
428     GstMultiSinkHandle handle)
429 {
430   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
431       handle.socket);
432 }
433
434 static void
435 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
436     GstMultiSinkHandle handle, GstClientStatus status)
437 {
438   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
439       0, handle.socket, status);
440 }
441
442 /* action signals */
443
444 static void
445 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
446 {
447   GstMultiSinkHandle handle;
448
449   handle.socket = socket;
450   gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
451 }
452
453 static void
454 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
455     GstSyncMethod sync, GstFormat min_format, guint64 min_value,
456     GstFormat max_format, guint64 max_value)
457 {
458   GstMultiSinkHandle handle;
459
460   handle.socket = socket;
461   gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
462       sync, min_format, min_value, max_format, max_value);
463 }
464
465 static void
466 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
467 {
468   GstMultiSinkHandle handle;
469
470   handle.socket = socket;
471   gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
472 }
473
474 static void
475 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
476 {
477   GstMultiSinkHandle handle;
478
479   handle.socket = socket;
480   gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
481       handle);
482 }
483
484 static GstStructure *
485 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
486 {
487   GstMultiSinkHandle handle;
488
489   handle.socket = socket;
490   return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
491       handle);
492 }
493
494 static GstMultiHandleClient *
495 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
496     GstMultiSinkHandle handle, GstSyncMethod sync_method)
497 {
498   GstSocketClient *client;
499   GstMultiHandleClient *mhclient;
500   GstMultiHandleSinkClass *mhsinkclass =
501       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
502
503   /* create client datastructure */
504   g_assert (G_IS_SOCKET (handle.socket));
505   client = g_new0 (GstSocketClient, 1);
506   mhclient = (GstMultiHandleClient *) client;
507
508   mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
509
510   gst_multi_handle_sink_client_init (mhclient, sync_method);
511   mhsinkclass->handle_debug (handle, mhclient->debug);
512
513   /* set the socket to non blocking */
514   g_socket_set_blocking (handle.socket, FALSE);
515
516   /* we always read from a client */
517   mhsinkclass->hash_adding (mhsink, mhclient);
518
519   gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
520
521   return mhclient;
522 }
523
524 static int
525 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
526 {
527   return g_socket_get_fd (client->handle.socket);
528 }
529
530 static void
531 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
532     GstMultiHandleClient * client)
533 {
534   g_assert (G_IS_SOCKET (client->handle.socket));
535
536   g_signal_emit (mhsink,
537       gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
538       client->handle.socket);
539
540   g_object_unref (client->handle.socket);
541 }
542
543 static void
544 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
545 {
546   g_snprintf (debug, 30, "[socket %p]", handle.socket);
547 }
548
549 static gpointer
550 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
551 {
552   return handle.socket;
553 }
554
555 /* handle a read on a client socket,
556  * which either indicates a close or should be ignored
557  * returns FALSE if some error occured or the client closed. */
558 static gboolean
559 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
560     GstSocketClient * client)
561 {
562   gboolean ret;
563   gchar dummy[256];
564   gssize nread;
565   GError *err = NULL;
566   gboolean first = TRUE;
567   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
568
569   GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
570
571   ret = TRUE;
572
573   /* just Read 'n' Drop, could also just drop the client as it's not supposed
574    * to write to us except for closing the socket, I guess it's because we
575    * like to listen to our customers. */
576   do {
577     gssize navail;
578
579     GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
580
581     navail = g_socket_get_available_bytes (mhclient->handle.socket);
582     if (navail < 0)
583       break;
584
585     nread =
586         g_socket_receive (mhclient->handle.socket, dummy, MIN (navail,
587             sizeof (dummy)), sink->cancellable, &err);
588     if (first && nread == 0) {
589       /* client sent close, so remove it */
590       GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
591           mhclient->debug);
592       mhclient->status = GST_CLIENT_STATUS_CLOSED;
593       ret = FALSE;
594     } else if (nread < 0) {
595       GST_WARNING_OBJECT (sink, "%s could not read: %s",
596           mhclient->debug, err->message);
597       mhclient->status = GST_CLIENT_STATUS_ERROR;
598       ret = FALSE;
599       break;
600     }
601     first = FALSE;
602   } while (nread > 0);
603   g_clear_error (&err);
604
605   return ret;
606 }
607
608 /* Handle a write on a client,
609  * which indicates a read request from a client.
610  *
611  * For each client we maintain a queue of GstBuffers that contain the raw bytes
612  * we need to send to the client.
613  *
614  * We first check to see if we need to send streamheaders. If so, we queue them.
615  *
616  * Then we run into the main loop that tries to send as many buffers as
617  * possible. It will first exhaust the mhclient->sending queue and if the queue
618  * is empty, it will pick a buffer from the global queue.
619  *
620  * Sending the buffers from the mhclient->sending queue is basically writing
621  * the bytes to the socket and maintaining a count of the bytes that were
622  * sent. When the buffer is completely sent, it is removed from the
623  * mhclient->sending queue and we try to pick a new buffer for sending.
624  *
625  * When the sending returns a partial buffer we stop sending more data as
626  * the next send operation could block.
627  *
628  * This functions returns FALSE if some error occured.
629  */
630 static gboolean
631 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
632     GstSocketClient * client)
633 {
634   gboolean more;
635   gboolean flushing;
636   GstClockTime now;
637   GTimeVal nowtv;
638   GError *err = NULL;
639   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
640   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
641   GstMultiHandleSinkClass *mhsinkclass =
642       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
643
644
645   g_get_current_time (&nowtv);
646   now = GST_TIMEVAL_TO_TIME (nowtv);
647
648   flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
649
650   more = TRUE;
651   do {
652     gint maxsize;
653
654     if (!mhclient->sending) {
655       /* client is not working on a buffer */
656       if (mhclient->bufpos == -1) {
657         /* client is too fast, remove from write queue until new buffer is
658          * available */
659         /* FIXME: specific */
660         if (client->source) {
661           g_source_destroy (client->source);
662           g_source_unref (client->source);
663           client->source = NULL;
664         }
665
666         /* if we flushed out all of the client buffers, we can stop */
667         if (mhclient->flushcount == 0)
668           goto flushed;
669
670         return TRUE;
671       } else {
672         /* client can pick a buffer from the global queue */
673         GstBuffer *buf;
674         GstClockTime timestamp;
675
676         /* for new connections, we need to find a good spot in the
677          * bufqueue to start streaming from */
678         if (mhclient->new_connection && !flushing) {
679           gint position =
680               gst_multi_handle_sink_new_client_position (mhsink, mhclient);
681
682           if (position >= 0) {
683             /* we got a valid spot in the queue */
684             mhclient->new_connection = FALSE;
685             mhclient->bufpos = position;
686           } else {
687             /* cannot send data to this client yet */
688             /* FIXME: specific */
689             if (client->source) {
690               g_source_destroy (client->source);
691               g_source_unref (client->source);
692               client->source = NULL;
693             }
694
695             return TRUE;
696           }
697         }
698
699         /* we flushed all remaining buffers, no need to get a new one */
700         if (mhclient->flushcount == 0)
701           goto flushed;
702
703         /* grab buffer */
704         buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
705         mhclient->bufpos--;
706
707         /* update stats */
708         timestamp = GST_BUFFER_TIMESTAMP (buf);
709         if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
710           mhclient->first_buffer_ts = timestamp;
711         if (timestamp != -1)
712           mhclient->last_buffer_ts = timestamp;
713
714         /* decrease flushcount */
715         if (mhclient->flushcount != -1)
716           mhclient->flushcount--;
717
718         GST_LOG_OBJECT (sink, "%s client %p at position %d",
719             mhclient->debug, client, mhclient->bufpos);
720
721         /* queueing a buffer will ref it */
722         mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
723
724         /* need to start from the first byte for this new buffer */
725         mhclient->bufoffset = 0;
726       }
727     }
728
729     /* see if we need to send something */
730     if (mhclient->sending) {
731       gssize wrote;
732       GstBuffer *head;
733       GstMapInfo map;
734
735       /* pick first buffer from list */
736       head = GST_BUFFER (mhclient->sending->data);
737
738       gst_buffer_map (head, &map, GST_MAP_READ);
739       maxsize = map.size - mhclient->bufoffset;
740
741       /* FIXME: specific */
742       /* try to write the complete buffer */
743
744       wrote =
745           g_socket_send (mhclient->handle.socket,
746           (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
747           &err);
748       gst_buffer_unmap (head, &map);
749
750       if (wrote < 0) {
751         /* hmm error.. */
752         if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
753           goto connection_reset;
754         } else {
755           goto write_error;
756         }
757       } else {
758         if (wrote < maxsize) {
759           /* partial write means that the client cannot read more and we should
760            * stop sending more */
761           GST_LOG_OBJECT (sink,
762               "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
763               mhclient->handle.socket, wrote);
764           mhclient->bufoffset += wrote;
765           more = FALSE;
766         } else {
767           /* complete buffer was written, we can proceed to the next one */
768           mhclient->sending = g_slist_remove (mhclient->sending, head);
769           gst_buffer_unref (head);
770           /* make sure we start from byte 0 for the next buffer */
771           mhclient->bufoffset = 0;
772         }
773         /* update stats */
774         mhclient->bytes_sent += wrote;
775         mhclient->last_activity_time = now;
776         mhsink->bytes_served += wrote;
777       }
778     }
779   } while (more);
780
781   return TRUE;
782
783   /* ERRORS */
784 flushed:
785   {
786     GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
787     mhclient->status = GST_CLIENT_STATUS_REMOVED;
788     return FALSE;
789   }
790 connection_reset:
791   {
792     GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
793         mhclient->debug);
794     mhclient->status = GST_CLIENT_STATUS_CLOSED;
795     g_clear_error (&err);
796     return FALSE;
797   }
798 write_error:
799   {
800     GST_WARNING_OBJECT (sink,
801         "%s could not write, removing client: %s", mhclient->debug,
802         err->message);
803     g_clear_error (&err);
804     mhclient->status = GST_CLIENT_STATUS_ERROR;
805     return FALSE;
806   }
807 }
808
809 static void
810 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
811     GstMultiHandleClient * mhclient)
812 {
813   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
814   GstSocketClient *client = (GstSocketClient *) (mhclient);
815
816   if (!sink->main_context)
817     return;
818
819   if (!client->source) {
820     client->source =
821         g_socket_create_source (mhclient->handle.socket,
822         G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
823     g_source_set_callback (client->source,
824         (GSourceFunc) gst_multi_socket_sink_socket_condition,
825         gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
826     g_source_attach (client->source, sink->main_context);
827   }
828 }
829
830 static void
831 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
832     GstMultiHandleClient * mhclient)
833 {
834   GstSocketClient *client = (GstSocketClient *) (mhclient);
835
836   if (client->source) {
837     g_source_destroy (client->source);
838     g_source_unref (client->source);
839     client->source = NULL;
840   }
841 }
842
843 /* Handle the clients. This is called when a socket becomes ready
844  * to read or writable. Badly behaving clients are put on a
845  * garbage list and removed.
846  */
847 static gboolean
848 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
849     GIOCondition condition, GstMultiSocketSink * sink)
850 {
851   GList *clink;
852   GstSocketClient *client;
853   gboolean ret = TRUE;
854   GstMultiHandleClient *mhclient;
855   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
856   GstMultiHandleSinkClass *mhsinkclass =
857       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
858
859   CLIENTS_LOCK (mhsink);
860   clink = g_hash_table_lookup (mhsink->handle_hash,
861       mhsinkclass->handle_hash_key (handle));
862   if (clink == NULL) {
863     ret = FALSE;
864     goto done;
865   }
866
867   client = clink->data;
868   mhclient = (GstMultiHandleClient *) client;
869
870   if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
871       && mhclient->status != GST_CLIENT_STATUS_OK) {
872     gst_multi_handle_sink_remove_client_link (mhsink, clink);
873     ret = FALSE;
874     goto done;
875   }
876
877   if ((condition & G_IO_ERR)) {
878     GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
879     mhclient->status = GST_CLIENT_STATUS_ERROR;
880     gst_multi_handle_sink_remove_client_link (mhsink, clink);
881     ret = FALSE;
882     goto done;
883   } else if ((condition & G_IO_HUP)) {
884     mhclient->status = GST_CLIENT_STATUS_CLOSED;
885     gst_multi_handle_sink_remove_client_link (mhsink, clink);
886     ret = FALSE;
887     goto done;
888   } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
889     /* handle client read */
890     if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
891       gst_multi_handle_sink_remove_client_link (mhsink, clink);
892       ret = FALSE;
893       goto done;
894     }
895   } else if ((condition & G_IO_OUT)) {
896     /* handle client write */
897     if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
898       gst_multi_handle_sink_remove_client_link (mhsink, clink);
899       ret = FALSE;
900       goto done;
901     }
902   }
903
904 done:
905   CLIENTS_UNLOCK (mhsink);
906
907   return ret;
908 }
909
910 static gboolean
911 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
912 {
913   GstClockTime now;
914   GTimeVal nowtv;
915   GList *clients;
916   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
917
918   g_get_current_time (&nowtv);
919   now = GST_TIMEVAL_TO_TIME (nowtv);
920
921   CLIENTS_LOCK (mhsink);
922   for (clients = mhsink->clients; clients; clients = clients->next) {
923     GstSocketClient *client;
924     GstMultiHandleClient *mhclient;
925
926     client = clients->data;
927     mhclient = (GstMultiHandleClient *) client;
928     if (mhsink->timeout > 0
929         && now - mhclient->last_activity_time > mhsink->timeout) {
930       mhclient->status = GST_CLIENT_STATUS_SLOW;
931       gst_multi_handle_sink_remove_client_link (mhsink, clients);
932     }
933   }
934   CLIENTS_UNLOCK (mhsink);
935
936   return FALSE;
937 }
938
939 /* we handle the client communication in another thread so that we do not block
940  * the gstreamer thread while we select() on the client fds */
941 static gpointer
942 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
943 {
944   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
945   GSource *timeout = NULL;
946
947   while (mhsink->running) {
948     if (mhsink->timeout > 0) {
949       timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
950
951       g_source_set_callback (timeout,
952           (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
953           (GDestroyNotify) gst_object_unref);
954       g_source_attach (timeout, sink->main_context);
955     }
956
957     /* Returns after handling all pending events or when
958      * _wakeup() was called. In any case we have to add
959      * a new timeout because something happened.
960      */
961     g_main_context_iteration (sink->main_context, TRUE);
962
963     if (timeout) {
964       g_source_destroy (timeout);
965       g_source_unref (timeout);
966     }
967   }
968
969   return NULL;
970 }
971
972 static void
973 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
974     const GValue * value, GParamSpec * pspec)
975 {
976   switch (prop_id) {
977
978     default:
979       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
980       break;
981   }
982 }
983
984 static void
985 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
986     GValue * value, GParamSpec * pspec)
987 {
988   switch (prop_id) {
989     default:
990       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
991       break;
992   }
993 }
994
995 static gboolean
996 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
997 {
998   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
999   GstMultiHandleSinkClass *mhsinkclass =
1000       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1001   GList *clients;
1002
1003   GST_INFO_OBJECT (mssink, "starting");
1004
1005   mssink->main_context = g_main_context_new ();
1006
1007   CLIENTS_LOCK (mhsink);
1008   for (clients = mhsink->clients; clients; clients = clients->next) {
1009     GstSocketClient *client = clients->data;
1010     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1011
1012     if (client->source)
1013       continue;
1014     mhsinkclass->hash_adding (mhsink, mhclient);
1015   }
1016   CLIENTS_UNLOCK (mhsink);
1017
1018   return TRUE;
1019 }
1020
1021 static gboolean
1022 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1023 {
1024   return TRUE;
1025 }
1026
1027 static void
1028 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1029 {
1030   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1031
1032   if (mssink->main_context)
1033     g_main_context_wakeup (mssink->main_context);
1034 }
1035
1036 static void
1037 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1038 {
1039   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1040
1041   if (mssink->main_context) {
1042     g_main_context_unref (mssink->main_context);
1043     mssink->main_context = NULL;
1044   }
1045
1046   g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1047       mssink);
1048 }
1049
1050 static gboolean
1051 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1052 {
1053   GstMultiSocketSink *sink;
1054
1055   sink = GST_MULTI_SOCKET_SINK (bsink);
1056
1057   GST_DEBUG_OBJECT (sink, "set to flushing");
1058   g_cancellable_cancel (sink->cancellable);
1059   if (sink->main_context)
1060     g_main_context_wakeup (sink->main_context);
1061
1062   return TRUE;
1063 }
1064
1065 /* will be called only between calls to start() and stop() */
1066 static gboolean
1067 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1068 {
1069   GstMultiSocketSink *sink;
1070
1071   sink = GST_MULTI_SOCKET_SINK (bsink);
1072
1073   GST_DEBUG_OBJECT (sink, "unset flushing");
1074   g_cancellable_reset (sink->cancellable);
1075
1076   return TRUE;
1077 }