2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5 * Copyright (C) <2011> Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
25 * SECTION:element-multisocketsink
26 * @title: multisocketsink
27 * @see_also: tcpserversink
29 * This plugin writes incoming data to a set of sockets. The
30 * sockets can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
31 * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
33 * A client can also be added with the #GstMultiSocketSink::add-full signal
34 * that allows for more control over what and how much data a client
37 * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
38 * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
39 * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
40 * client is not active anymore or, depending on the value of the
41 * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
42 * In all cases, multisocketsink will never close a socket itself.
43 * The user of multisocketsink is responsible for closing all sockets.
44 * This can for example be done in response to the #GstMultiSocketSink::client-socket-removed signal.
45 * Note that multisocketsink still has a reference to the socket when the
46 * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
47 * the descriptor; it is therefore not safe to close the socket in
48 * the #GstMultiSocketSink::client-removed signal handler, and you should use the
49 * #GstMultiSocketSink::client-socket-removed signal to safely close the socket.
51 * Multisocketsink internally keeps a queue of the incoming buffers and uses a
52 * separate thread to send the buffers to the clients. This ensures that no
53 * client write can block the pipeline and that clients can read with different
56 * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
57 * which buffer in the queued buffers will be sent first to the client. Clients
58 * can be sent the most recent buffer (which might not be decodable by the
59 * client if it is not a keyframe), the next keyframe received in
60 * multisocketsink (which can take some time depending on the keyframe rate), or the
61 * last received keyframe (which will cause a simple burst-on-connect).
62 * Multisocketsink will always keep at least one keyframe in its internal buffers
63 * when the sync-mode is set to latest-keyframe.
65 * There are additional values for the #GstMultiSocketSink:sync-method
66 * property to allow finer control over burst-on-connect behaviour. By selecting
67 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
68 * additionally requires that the burst begin with a keyframe, and
69 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
70 * prefer a minimum burst size even if it requires not starting with a keyframe.
72 * Multisocketsink can be instructed to keep at least a minimum amount of data
73 * expressed in time or byte units in its internal queues with the
74 * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
75 * These properties are useful if the application adds clients with the
76 * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
77 * actually be honored.
79 * When streaming data, clients are allowed to read at a different rate than
80 * the rate at which multisocketsink receives data. If the client is reading too
81 * fast, no data will be send to the client until multisocketsink receives more
82 * data. If the client, however, reads too slowly, data for that client will be
83 * queued up in multisocketsink. Two properties control the amount of data
84 * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and
85 * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
86 * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
88 * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
89 * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
90 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
91 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
92 * positions the client to the soft limit in the buffer queue and
93 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
96 * multisocketsink will by default synchronize on the clock before serving the
97 * buffers to the clients. This behaviour can be disabled by setting the sync
98 * property to FALSE. Multisocketsink will by default not do QoS and will never
106 #include <gst/gst-i18n-plugin.h>
107 #include <gst/net/gstnetcontrolmessagemeta.h>
111 #include "gstmultisocketsink.h"
114 #include <netinet/in.h>
117 #define NOT_IMPLEMENTED 0
119 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
120 #define GST_CAT_DEFAULT (multisocketsink_debug)
122 /* MultiSocketSink signals and args */
134 SIGNAL_CLIENT_REMOVED,
135 SIGNAL_CLIENT_SOCKET_REMOVED,
140 #define DEFAULT_SEND_DISPATCHED FALSE
141 #define DEFAULT_SEND_MESSAGES FALSE
146 PROP_SEND_DISPATCHED,
151 static void gst_multi_socket_sink_finalize (GObject * object);
153 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
155 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
156 GSocket * socket, GstSyncMethod sync, GstFormat min_format,
157 guint64 min_value, GstFormat max_format, guint64 max_value);
158 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
160 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
162 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
165 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
166 GstMultiSinkHandle handle);
167 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
168 GstMultiSinkHandle handle, GstClientStatus status);
170 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
171 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
172 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
173 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
174 static GstMultiHandleClient
175 * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
176 GstMultiSinkHandle handle, GstSyncMethod sync_method);
177 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
178 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
179 GstMultiHandleClient * client);
180 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
183 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
185 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
186 GstMultiHandleClient * mhclient);
187 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
188 GstMultiHandleClient * mhclient);
189 static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
190 GstSocketClient * client);
192 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
193 handle, GIOCondition condition, GstMultiSocketSink * sink);
195 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
196 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
198 static gboolean gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink,
201 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
202 const GValue * value, GParamSpec * pspec);
203 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
204 GValue * value, GParamSpec * pspec);
206 #define gst_multi_socket_sink_parent_class parent_class
207 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
208 GST_TYPE_MULTI_HANDLE_SINK);
210 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
213 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
215 GObjectClass *gobject_class;
216 GstElementClass *gstelement_class;
217 GstBaseSinkClass *gstbasesink_class;
218 GstMultiHandleSinkClass *gstmultihandlesink_class;
220 gobject_class = (GObjectClass *) klass;
221 gstelement_class = (GstElementClass *) klass;
222 gstbasesink_class = (GstBaseSinkClass *) klass;
223 gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
225 gobject_class->set_property = gst_multi_socket_sink_set_property;
226 gobject_class->get_property = gst_multi_socket_sink_get_property;
227 gobject_class->finalize = gst_multi_socket_sink_finalize;
230 * GstMultiSocketSink:send-dispatched:
232 * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
233 * is sent to a client.
234 * The event is a CUSTOM event name GstNetworkMessageDispatched and
237 * "object" G_TYPE_OBJECT : the object identifying the client
238 * "buffer" GST_TYPE_BUFFER : the buffer sent to the client
242 g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
243 g_param_spec_boolean ("send-dispatched", "Send Dispatched",
244 "If GstNetworkMessageDispatched events should be pushed",
245 DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247 * GstMultiSocketSink:send-messages:
249 * Sends a GstNetworkMessage event upstream whenever a buffer
250 * is received from a client.
251 * The event is a CUSTOM event name GstNetworkMessage and contains:
253 * "object" G_TYPE_OBJECT : the object identifying the client
254 * "buffer" GST_TYPE_BUFFER : the buffer with data received from the
259 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
260 g_param_spec_boolean ("send-messages", "Send Messages",
261 "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
262 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 * GstMultiSocketSink::add:
266 * @gstmultisocketsink: the multisocketsink element to emit this signal on
267 * @socket: the socket to add to multisocketsink
269 * Hand the given open socket to multisocketsink to write to.
271 gst_multi_socket_sink_signals[SIGNAL_ADD] =
272 g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
273 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
274 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
275 NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
277 * GstMultiSocketSink::add-full:
278 * @gstmultisocketsink: the multisocketsink element to emit this signal on
279 * @socket: the socket to add to multisocketsink
280 * @sync: the sync method to use
281 * @format_min: the format of @value_min
282 * @value_min: the minimum amount of data to burst expressed in
284 * @format_max: the format of @value_max
285 * @value_max: the maximum amount of data to burst expressed in
288 * Hand the given open socket to multisocketsink to write to and
289 * specify the burst parameters for the new connection.
291 gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
292 g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
293 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
294 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
295 NULL, G_TYPE_NONE, 6, G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD,
296 GST_TYPE_FORMAT, G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64);
298 * GstMultiSocketSink::remove:
299 * @gstmultisocketsink: the multisocketsink element to emit this signal on
300 * @socket: the socket to remove from multisocketsink
302 * Remove the given open socket from multisocketsink.
304 gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
305 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
306 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
307 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL, NULL,
308 G_TYPE_NONE, 1, G_TYPE_SOCKET);
310 * GstMultiSocketSink::remove-flush:
311 * @gstmultisocketsink: the multisocketsink element to emit this signal on
312 * @socket: the socket to remove from multisocketsink
314 * Remove the given open socket from multisocketsink after flushing all
315 * the pending data to the socket.
317 gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
318 g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
319 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
320 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL, NULL,
321 G_TYPE_NONE, 1, G_TYPE_SOCKET);
324 * GstMultiSocketSink::get-stats:
325 * @gstmultisocketsink: the multisocketsink element to emit this signal on
326 * @socket: the socket to get stats of from multisocketsink
328 * Get statistics about @socket. This function returns a GstStructure.
330 * Returns: a GstStructure with the statistics. The structure contains
331 * values that represent: total number of bytes sent, time
332 * when the client was added, time when the client was
333 * disconnected/removed, time the client is/was active, last activity
334 * time (in epoch seconds), number of buffers dropped.
335 * All times are expressed in nanoseconds (GstClockTime).
337 gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
338 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
339 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
340 G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL, NULL,
341 GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
344 * GstMultiSocketSink::client-added:
345 * @gstmultisocketsink: the multisocketsink element that emitted this signal
346 * @socket: the socket that was added to multisocketsink
348 * The given socket was added to multisocketsink. This signal will
349 * be emitted from the streaming thread so application should be prepared
352 gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
353 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
354 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_OBJECT);
356 * GstMultiSocketSink::client-removed:
357 * @gstmultisocketsink: the multisocketsink element that emitted this signal
358 * @socket: the socket that is to be removed from multisocketsink
359 * @status: the reason why the client was removed
361 * The given socket is about to be removed from multisocketsink. This
362 * signal will be emitted from the streaming thread so applications should
363 * be prepared for that.
365 * @gstmultisocketsink still holds a handle to @socket so it is possible to call
366 * the get-stats signal from this callback. For the same reason it is
367 * not safe to `close()` and reuse @socket in this callback.
369 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
370 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
371 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_SOCKET,
372 GST_TYPE_CLIENT_STATUS);
374 * GstMultiSocketSink::client-socket-removed:
375 * @gstmultisocketsink: the multisocketsink element that emitted this signal
376 * @socket: the socket that was removed from multisocketsink
378 * The given socket was removed from multisocketsink. This signal will
379 * be emitted from the streaming thread so applications should be prepared
382 * In this callback, @gstmultisocketsink has removed all the information
383 * associated with @socket and it is therefore not possible to call get-stats
384 * with @socket. It is however safe to `close()` and reuse @fd in the callback.
386 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
387 g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
388 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
390 gst_element_class_set_static_metadata (gstelement_class,
391 "Multi socket sink", "Sink/Network",
392 "Send data to multiple sockets",
393 "Thomas Vander Stichele <thomas at apestaart dot org>, "
394 "Wim Taymans <wim@fluendo.com>, "
395 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
397 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
398 gstbasesink_class->unlock_stop =
399 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
400 gstbasesink_class->propose_allocation =
401 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_propose_allocation);
403 klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
404 klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
405 klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
406 klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
407 klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
409 gstmultihandlesink_class->emit_client_added =
410 gst_multi_socket_sink_emit_client_added;
411 gstmultihandlesink_class->emit_client_removed =
412 gst_multi_socket_sink_emit_client_removed;
414 gstmultihandlesink_class->stop_pre =
415 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
416 gstmultihandlesink_class->stop_post =
417 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
418 gstmultihandlesink_class->start_pre =
419 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
420 gstmultihandlesink_class->thread =
421 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
422 gstmultihandlesink_class->new_client =
423 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
424 gstmultihandlesink_class->client_get_fd =
425 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
426 gstmultihandlesink_class->client_free =
427 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
428 gstmultihandlesink_class->handle_debug =
429 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
430 gstmultihandlesink_class->handle_hash_key =
431 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
432 gstmultihandlesink_class->hash_adding =
433 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
434 gstmultihandlesink_class->hash_removing =
435 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
437 GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
438 "Multi socket sink");
442 gst_multi_socket_sink_init (GstMultiSocketSink * this)
444 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
446 mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
448 this->cancellable = g_cancellable_new ();
449 this->send_dispatched = DEFAULT_SEND_DISPATCHED;
450 this->send_messages = DEFAULT_SEND_MESSAGES;
454 gst_multi_socket_sink_finalize (GObject * object)
456 GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
458 if (this->cancellable) {
459 g_object_unref (this->cancellable);
460 this->cancellable = NULL;
463 G_OBJECT_CLASS (parent_class)->finalize (object);
466 /* methods to emit signals */
469 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
470 GstMultiSinkHandle handle)
472 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
477 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
478 GstMultiSinkHandle handle, GstClientStatus status)
480 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
481 0, handle.socket, status);
487 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
489 GstMultiSinkHandle handle;
491 handle.socket = socket;
492 gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
496 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
497 GstSyncMethod sync, GstFormat min_format, guint64 min_value,
498 GstFormat max_format, guint64 max_value)
500 GstMultiSinkHandle handle;
502 handle.socket = socket;
503 gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
504 sync, min_format, min_value, max_format, max_value);
508 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
510 GstMultiSinkHandle handle;
512 handle.socket = socket;
513 gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
517 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
519 GstMultiSinkHandle handle;
521 handle.socket = socket;
522 gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
526 static GstStructure *
527 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
529 GstMultiSinkHandle handle;
531 handle.socket = socket;
532 return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
536 static GstMultiHandleClient *
537 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
538 GstMultiSinkHandle handle, GstSyncMethod sync_method)
540 GstSocketClient *client;
541 GstMultiHandleClient *mhclient;
542 GstMultiHandleSinkClass *mhsinkclass =
543 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
545 /* create client datastructure */
546 g_assert (G_IS_SOCKET (handle.socket));
547 client = g_new0 (GstSocketClient, 1);
548 mhclient = (GstMultiHandleClient *) client;
550 mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
552 gst_multi_handle_sink_client_init (mhclient, sync_method);
553 mhsinkclass->handle_debug (handle, mhclient->debug);
555 /* set the socket to non blocking */
556 g_socket_set_blocking (handle.socket, FALSE);
558 /* we always read from a client */
559 mhsinkclass->hash_adding (mhsink, mhclient);
561 gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
567 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
569 return g_socket_get_fd (client->handle.socket);
573 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
574 GstMultiHandleClient * client)
576 g_assert (G_IS_SOCKET (client->handle.socket));
578 g_signal_emit (mhsink,
579 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
580 client->handle.socket);
582 g_object_unref (client->handle.socket);
586 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
588 g_snprintf (debug, 30, "[socket %p]", handle.socket);
592 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
594 return handle.socket;
597 /* handle a read on a client socket,
598 * which either indicates a close or should be ignored
599 * returns FALSE if some error occurred or the client closed. */
601 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
602 GstSocketClient * client)
604 gboolean ret, do_event;
605 gchar dummy[256], *mem, *omem;
608 gboolean first = TRUE;
609 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
610 gssize navail, maxmem;
612 GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
616 navail = g_socket_get_available_bytes (mhclient->handle.socket);
620 /* only collect the data in a buffer when we need to send it with an event */
621 do_event = sink->send_messages && navail > 0;
623 omem = mem = g_malloc (navail);
627 maxmem = sizeof (dummy);
630 /* just Read 'n' Drop, could also just drop the client as it's not supposed
631 * to write to us except for closing the socket, I guess it's because we
632 * like to listen to our customers. */
634 GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
637 g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
638 maxmem), sink->cancellable, &err);
640 if (first && nread == 0) {
641 /* client sent close, so remove it */
642 GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
644 mhclient->status = GST_CLIENT_STATUS_CLOSED;
647 } else if (nread < 0) {
648 if (err->code == G_IO_ERROR_WOULD_BLOCK)
651 GST_WARNING_OBJECT (sink, "%s could not read: %s",
652 mhclient->debug, err->message);
653 mhclient->status = GST_CLIENT_STATUS_ERROR;
661 } while (navail > 0);
662 g_clear_error (&err);
669 buf = gst_buffer_new_wrapped (omem, maxmem);
670 ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
671 gst_structure_new ("GstNetworkMessage",
672 "object", G_TYPE_OBJECT, mhclient->handle.socket,
673 "buffer", GST_TYPE_BUFFER, buf, NULL));
674 gst_buffer_unref (buf);
676 gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
684 * map_memory_output_vector_n:
685 * @buf: The #GstBuffer that should be mapped
686 * @offset: Offset into the buffer that should be mapped
687 * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
688 * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
689 * @num_vectors: the number of elements in @vectors to prevent buffer overruns
691 * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
692 * I/O to send the data over a socket. The whole buffer won't be mapped into
693 * memory if it consists of more than @num_vectors #GstMemory s.
695 * Use #unmap_n_memorys after you are
696 * finished with the mappings.
698 * Returns: The number of GstMemorys mapped
701 map_n_memory_output_vector (GstBuffer * buf, size_t offset,
702 GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
704 guint mem_idx, mem_len;
709 g_return_val_if_fail (num_vectors > 0, 0);
710 memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
712 maxsize = gst_buffer_get_size (buf) - offset;
713 if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
715 g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
716 "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
718 for (i = 0; i < mem_len && i < num_vectors; i++) {
719 GstMapInfo map = { 0 };
720 GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
721 if (!gst_memory_map (mem, &map, GST_MAP_READ))
722 g_error ("Unable to map memory %p. This should never happen.", mem);
725 vectors[i].buffer = map.data + mem_skip;
726 vectors[i].size = map.size - mem_skip;
728 vectors[i].buffer = map.data;
729 vectors[i].size = map.size;
737 * map_n_memory_output_vector:
738 * @buf: The #GstBuffer that should be mapped
739 * @offset: Offset into the buffer that should be mapped
740 * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
741 * @num_vectors: the number of elements in @vectors to prevent buffer overruns
743 * Returns: The number of GstMemorys mapped
746 unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
749 g_return_if_fail (num_mappings > 0);
751 for (i = 0; i < num_mappings; i++)
752 gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
756 gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
759 gpointer iter_state = NULL;
763 while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
764 && msg_count < msg_space) {
765 if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
766 msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
775 gst_multi_socket_sink_write (GstMultiSocketSink * sink,
776 GSocket * sock, GstBuffer * buffer, gsize bufoffset,
777 GCancellable * cancellable, GError ** err)
780 GOutputVector vec[8];
783 GSocketControlMessage *cmsgs[CMSG_MAX];
786 mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
788 msg_count = gst_buffer_get_cmsg_list (buffer, cmsgs, CMSG_MAX);
791 g_socket_send_message (sock, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
793 unmap_n_memorys (maps, mems_mapped);
797 /* Handle a write on a client,
798 * which indicates a read request from a client.
800 * For each client we maintain a queue of GstBuffers that contain the raw bytes
801 * we need to send to the client.
803 * We first check to see if we need to send streamheaders. If so, we queue them.
805 * Then we run into the main loop that tries to send as many buffers as
806 * possible. It will first exhaust the mhclient->sending queue and if the queue
807 * is empty, it will pick a buffer from the global queue.
809 * Sending the buffers from the mhclient->sending queue is basically writing
810 * the bytes to the socket and maintaining a count of the bytes that were
811 * sent. When the buffer is completely sent, it is removed from the
812 * mhclient->sending queue and we try to pick a new buffer for sending.
814 * When the sending returns a partial buffer we stop sending more data as
815 * the next send operation could block.
817 * This functions returns FALSE if some error occurred.
820 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
821 GstSocketClient * client)
827 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
828 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
829 GstMultiHandleSinkClass *mhsinkclass =
830 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
833 now = g_get_real_time () * GST_USECOND;
835 flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
839 if (!mhclient->sending) {
840 /* client is not working on a buffer */
841 if (mhclient->bufpos == -1) {
842 /* client is too fast, remove from write queue until new buffer is
844 gst_multi_socket_sink_stop_sending (sink, client);
846 /* if we flushed out all of the client buffers, we can stop */
847 if (mhclient->flushcount == 0)
852 /* client can pick a buffer from the global queue */
854 GstClockTime timestamp;
856 /* for new connections, we need to find a good spot in the
857 * bufqueue to start streaming from */
858 if (mhclient->new_connection && !flushing) {
860 gst_multi_handle_sink_new_client_position (mhsink, mhclient);
863 /* we got a valid spot in the queue */
864 mhclient->new_connection = FALSE;
865 mhclient->bufpos = position;
867 /* cannot send data to this client yet */
868 gst_multi_socket_sink_stop_sending (sink, client);
873 /* we flushed all remaining buffers, no need to get a new one */
874 if (mhclient->flushcount == 0)
878 buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
882 timestamp = GST_BUFFER_TIMESTAMP (buf);
883 if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
884 mhclient->first_buffer_ts = timestamp;
886 mhclient->last_buffer_ts = timestamp;
888 /* decrease flushcount */
889 if (mhclient->flushcount != -1)
890 mhclient->flushcount--;
892 GST_LOG_OBJECT (sink, "%s client %p at position %d",
893 mhclient->debug, client, mhclient->bufpos);
895 /* queueing a buffer will ref it */
896 mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
898 /* need to start from the first byte for this new buffer */
899 mhclient->bufoffset = 0;
903 /* see if we need to send something */
904 if (mhclient->sending) {
908 /* pick first buffer from list */
909 head = GST_BUFFER (mhclient->sending->data);
911 wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
912 mhclient->bufoffset, sink->cancellable, &err);
916 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
917 goto connection_reset;
918 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
919 /* write would block, try again later */
920 GST_LOG_OBJECT (sink, "write would block %p",
921 mhclient->handle.socket);
923 g_clear_error (&err);
928 if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
929 /* partial write, try again now */
930 GST_LOG_OBJECT (sink,
931 "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
932 mhclient->handle.socket, wrote);
933 mhclient->bufoffset += wrote;
935 if (sink->send_dispatched) {
936 gst_pad_push_event (GST_BASE_SINK_PAD (mhsink),
937 gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
938 gst_structure_new ("GstNetworkMessageDispatched",
939 "object", G_TYPE_OBJECT, mhclient->handle.socket,
940 "buffer", GST_TYPE_BUFFER, head, NULL)));
942 /* complete buffer was written, we can proceed to the next one */
943 mhclient->sending = g_slist_remove (mhclient->sending, head);
944 gst_buffer_unref (head);
945 /* make sure we start from byte 0 for the next buffer */
946 mhclient->bufoffset = 0;
949 mhclient->bytes_sent += wrote;
950 mhclient->last_activity_time = now;
951 mhsink->bytes_served += wrote;
961 GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
962 mhclient->status = GST_CLIENT_STATUS_REMOVED;
967 GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
969 mhclient->status = GST_CLIENT_STATUS_CLOSED;
970 g_clear_error (&err);
975 GST_WARNING_OBJECT (sink,
976 "%s could not write, removing client: %s", mhclient->debug,
978 g_clear_error (&err);
979 mhclient->status = GST_CLIENT_STATUS_ERROR;
985 ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
986 GIOCondition condition)
988 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
990 if (client->condition == condition)
993 if (client->source) {
994 g_source_destroy (client->source);
995 g_source_unref (client->source);
997 if (condition && sink->main_context) {
998 client->source = g_socket_create_source (mhclient->handle.socket,
999 condition, sink->cancellable);
1000 g_source_set_callback (client->source,
1001 (GSourceFunc) gst_multi_socket_sink_socket_condition,
1002 gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1003 g_source_attach (client->source, sink->main_context);
1005 client->source = NULL;
1008 client->condition = condition;
1012 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
1013 GstMultiHandleClient * mhclient)
1015 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1016 GstSocketClient *client = (GstSocketClient *) (mhclient);
1018 ensure_condition (sink, client,
1019 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1023 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
1024 GstMultiHandleClient * mhclient)
1026 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1027 GstSocketClient *client = (GstSocketClient *) (mhclient);
1029 ensure_condition (sink, client, 0);
1033 gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
1034 GstSocketClient * client)
1036 ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1039 /* Handle the clients. This is called when a socket becomes ready
1040 * to read or writable. Badly behaving clients are put on a
1041 * garbage list and removed.
1044 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
1045 GIOCondition condition, GstMultiSocketSink * sink)
1048 GstSocketClient *client;
1049 gboolean ret = TRUE;
1050 GstMultiHandleClient *mhclient;
1051 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1052 GstMultiHandleSinkClass *mhsinkclass =
1053 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1055 CLIENTS_LOCK (mhsink);
1056 clink = g_hash_table_lookup (mhsink->handle_hash,
1057 mhsinkclass->handle_hash_key (handle));
1058 if (clink == NULL) {
1063 client = clink->data;
1064 mhclient = (GstMultiHandleClient *) client;
1066 if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1067 && mhclient->status != GST_CLIENT_STATUS_OK) {
1068 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1073 if ((condition & G_IO_ERR)) {
1074 GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
1075 mhclient->status = GST_CLIENT_STATUS_ERROR;
1076 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1079 } else if ((condition & G_IO_HUP)) {
1080 mhclient->status = GST_CLIENT_STATUS_CLOSED;
1081 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1085 if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
1086 /* handle client read */
1087 if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
1088 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1093 if ((condition & G_IO_OUT)) {
1094 /* handle client write */
1095 if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
1096 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1103 CLIENTS_UNLOCK (mhsink);
1109 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
1113 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1115 now = g_get_real_time () * GST_USECOND;
1117 CLIENTS_LOCK (mhsink);
1118 for (clients = mhsink->clients; clients; clients = clients->next) {
1119 GstSocketClient *client;
1120 GstMultiHandleClient *mhclient;
1122 client = clients->data;
1123 mhclient = (GstMultiHandleClient *) client;
1124 if (mhsink->timeout > 0
1125 && now - mhclient->last_activity_time > mhsink->timeout) {
1126 mhclient->status = GST_CLIENT_STATUS_SLOW;
1127 gst_multi_handle_sink_remove_client_link (mhsink, clients);
1130 CLIENTS_UNLOCK (mhsink);
1135 /* we handle the client communication in another thread so that we do not block
1136 * the gstreamer thread while we select() on the client fds */
1138 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
1140 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1141 GSource *timeout = NULL;
1143 while (mhsink->running) {
1144 if (mhsink->timeout > 0) {
1145 timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
1147 g_source_set_callback (timeout,
1148 (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
1149 (GDestroyNotify) gst_object_unref);
1150 g_source_attach (timeout, sink->main_context);
1153 /* Returns after handling all pending events or when
1154 * _wakeup() was called. In any case we have to add
1155 * a new timeout because something happened.
1157 g_main_context_iteration (sink->main_context, TRUE);
1160 g_source_destroy (timeout);
1161 g_source_unref (timeout);
1169 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
1170 const GValue * value, GParamSpec * pspec)
1172 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1175 case PROP_SEND_DISPATCHED:
1176 sink->send_dispatched = g_value_get_boolean (value);
1178 case PROP_SEND_MESSAGES:
1179 sink->send_messages = g_value_get_boolean (value);
1182 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1188 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
1189 GValue * value, GParamSpec * pspec)
1191 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1194 case PROP_SEND_DISPATCHED:
1195 g_value_set_boolean (value, sink->send_dispatched);
1197 case PROP_SEND_MESSAGES:
1198 g_value_set_boolean (value, sink->send_messages);
1201 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1207 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
1209 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1210 GstMultiHandleSinkClass *mhsinkclass =
1211 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1214 GST_INFO_OBJECT (mssink, "starting");
1216 mssink->main_context = g_main_context_new ();
1218 CLIENTS_LOCK (mhsink);
1219 for (clients = mhsink->clients; clients; clients = clients->next) {
1220 GstSocketClient *client = clients->data;
1221 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1225 mhsinkclass->hash_adding (mhsink, mhclient);
1227 CLIENTS_UNLOCK (mhsink);
1233 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1239 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1241 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1243 if (mssink->main_context)
1244 g_main_context_wakeup (mssink->main_context);
1248 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1250 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1252 if (mssink->main_context) {
1253 g_main_context_unref (mssink->main_context);
1254 mssink->main_context = NULL;
1257 g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1262 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1264 GstMultiSocketSink *sink;
1266 sink = GST_MULTI_SOCKET_SINK (bsink);
1268 GST_DEBUG_OBJECT (sink, "set to flushing");
1269 g_cancellable_cancel (sink->cancellable);
1270 if (sink->main_context)
1271 g_main_context_wakeup (sink->main_context);
1276 /* will be called only between calls to start() and stop() */
1278 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1280 GstMultiSocketSink *sink;
1282 sink = GST_MULTI_SOCKET_SINK (bsink);
1284 GST_DEBUG_OBJECT (sink, "unset flushing");
1285 g_object_unref (sink->cancellable);
1286 sink->cancellable = g_cancellable_new ();
1292 gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
1294 /* we support some meta */
1295 gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE,