2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5 * Copyright (C) <2011> Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
25 * SECTION:element-multisocketsink
26 * @see_also: tcpserversink
28 * This plugin writes incoming data to a set of file descriptors. The
29 * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
30 * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32 * As of version 0.10.8, a client can also be added with the #GstMultiSocketSink::add-full signal
33 * that allows for more control over what and how much data a client
36 * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
37 * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
38 * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
39 * client is not active anymore or, depending on the value of the
40 * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
41 * In all cases, multisocketsink will never close a file descriptor itself.
42 * The user of multisocketsink is responsible for closing all file descriptors.
43 * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal.
44 * Note that multisocketsink still has a reference to the file descriptor when the
45 * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
46 * the descriptor; it is therefore not safe to close the file descriptor in
47 * the #GstMultiSocketSink::client-removed signal handler, and you should use the
48 * #GstMultiSocketSink::client-fd-removed signal to safely close the fd.
50 * Multisocketsink internally keeps a queue of the incoming buffers and uses a
51 * separate thread to send the buffers to the clients. This ensures that no
52 * client write can block the pipeline and that clients can read with different
55 * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
56 * which buffer in the queued buffers will be sent first to the client. Clients
57 * can be sent the most recent buffer (which might not be decodable by the
58 * client if it is not a keyframe), the next keyframe received in
59 * multisocketsink (which can take some time depending on the keyframe rate), or the
60 * last received keyframe (which will cause a simple burst-on-connect).
61 * Multisocketsink will always keep at least one keyframe in its internal buffers
62 * when the sync-mode is set to latest-keyframe.
64 * As of version 0.10.8, there are additional values for the #GstMultiSocketSink:sync-method
65 * property to allow finer control over burst-on-connect behaviour. By selecting
66 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
67 * additionally requires that the burst begin with a keyframe, and
68 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
69 * prefer a minimum burst size even if it requires not starting with a keyframe.
71 * Multisocketsink can be instructed to keep at least a minimum amount of data
72 * expressed in time or byte units in its internal queues with the
73 * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
74 * These properties are useful if the application adds clients with the
75 * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
76 * actually be honored.
78 * When streaming data, clients are allowed to read at a different rate than
79 * the rate at which multisocketsink receives data. If the client is reading too
80 * fast, no data will be send to the client until multisocketsink receives more
81 * data. If the client, however, reads too slowly, data for that client will be
82 * queued up in multisocketsink. Two properties control the amount of data
83 * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and
84 * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
85 * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
87 * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
88 * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
89 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
90 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
91 * positions the client to the soft limit in the buffer queue and
92 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
95 * multisocketsink will by default synchronize on the clock before serving the
96 * buffers to the clients. This behaviour can be disabled by setting the sync
97 * property to FALSE. Multisocketsink will by default not do QoS and will never
100 * Last reviewed on 2006-09-12 (0.10.10)
107 #include <gst/gst-i18n-plugin.h>
111 #include "gstmultisocketsink.h"
112 #include "gsttcp-marshal.h"
115 #include <netinet/in.h>
118 #define NOT_IMPLEMENTED 0
120 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
121 #define GST_CAT_DEFAULT (multisocketsink_debug)
123 /* MultiSocketSink signals and args */
135 SIGNAL_CLIENT_REMOVED,
136 SIGNAL_CLIENT_SOCKET_REMOVED,
148 static void gst_multi_socket_sink_finalize (GObject * object);
150 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
152 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
153 GSocket * socket, GstSyncMethod sync, GstFormat min_format,
154 guint64 min_value, GstFormat max_format, guint64 max_value);
155 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
157 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
159 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
162 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
163 GstMultiSinkHandle handle);
164 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
165 GstMultiSinkHandle handle, GstClientStatus status);
167 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
168 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
169 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
170 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
171 static GstMultiHandleClient
172 * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
173 GstMultiSinkHandle handle, GstSyncMethod sync_method);
174 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
175 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
176 GstMultiHandleClient * client);
177 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
180 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
182 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
183 GstMultiHandleClient * mhclient);
184 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
185 GstMultiHandleClient * mhclient);
187 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
188 handle, GIOCondition condition, GstMultiSocketSink * sink);
190 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
191 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
193 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
194 const GValue * value, GParamSpec * pspec);
195 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
196 GValue * value, GParamSpec * pspec);
198 #define gst_multi_socket_sink_parent_class parent_class
199 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
200 GST_TYPE_MULTI_HANDLE_SINK);
202 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
205 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
207 GObjectClass *gobject_class;
208 GstElementClass *gstelement_class;
209 GstBaseSinkClass *gstbasesink_class;
210 GstMultiHandleSinkClass *gstmultihandlesink_class;
212 gobject_class = (GObjectClass *) klass;
213 gstelement_class = (GstElementClass *) klass;
214 gstbasesink_class = (GstBaseSinkClass *) klass;
215 gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
217 gobject_class->set_property = gst_multi_socket_sink_set_property;
218 gobject_class->get_property = gst_multi_socket_sink_get_property;
219 gobject_class->finalize = gst_multi_socket_sink_finalize;
222 * GstMultiSocketSink::add:
223 * @gstmultisocketsink: the multisocketsink element to emit this signal on
224 * @socket: the socket to add to multisocketsink
226 * Hand the given open socket to multisocketsink to write to.
228 gst_multi_socket_sink_signals[SIGNAL_ADD] =
229 g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
230 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
231 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
232 g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
234 * GstMultiSocketSink::add-full:
235 * @gstmultisocketsink: the multisocketsink element to emit this signal on
236 * @socket: the socket to add to multisocketsink
237 * @sync: the sync method to use
238 * @format_min: the format of @value_min
239 * @value_min: the minimum amount of data to burst expressed in
241 * @format_max: the format of @value_max
242 * @value_max: the maximum amount of data to burst expressed in
245 * Hand the given open socket to multisocketsink to write to and
246 * specify the burst parameters for the new connection.
248 gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
249 g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
250 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
251 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
252 gst_tcp_marshal_VOID__OBJECT_ENUM_ENUM_UINT64_ENUM_UINT64, G_TYPE_NONE, 6,
253 G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64,
254 GST_TYPE_FORMAT, G_TYPE_UINT64);
256 * GstMultiSocketSink::remove:
257 * @gstmultisocketsink: the multisocketsink element to emit this signal on
258 * @socket: the socket to remove from multisocketsink
260 * Remove the given open socket from multisocketsink.
262 gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
263 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
264 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
265 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL,
266 g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
268 * GstMultiSocketSink::remove-flush:
269 * @gstmultisocketsink: the multisocketsink element to emit this signal on
270 * @socket: the socket to remove from multisocketsink
272 * Remove the given open socket from multisocketsink after flushing all
273 * the pending data to the socket.
275 gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
276 g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
277 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
278 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL,
279 g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
282 * GstMultiSocketSink::get-stats:
283 * @gstmultisocketsink: the multisocketsink element to emit this signal on
284 * @socket: the socket to get stats of from multisocketsink
286 * Get statistics about @socket. This function returns a GstStructure.
288 * Returns: a GstStructure with the statistics. The structure contains
289 * values that represent: total number of bytes sent, time
290 * when the client was added, time when the client was
291 * disconnected/removed, time the client is/was active, last activity
292 * time (in epoch seconds), number of buffers dropped.
293 * All times are expressed in nanoseconds (GstClockTime).
295 gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
296 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
297 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
298 G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL,
299 gst_tcp_marshal_BOXED__OBJECT, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
302 * GstMultiSocketSink::client-added:
303 * @gstmultisocketsink: the multisocketsink element that emitted this signal
304 * @socket: the socket that was added to multisocketsink
306 * The given socket was added to multisocketsink. This signal will
307 * be emitted from the streaming thread so application should be prepared
310 gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
311 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
312 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
313 G_TYPE_NONE, 1, G_TYPE_OBJECT);
315 * GstMultiSocketSink::client-removed:
316 * @gstmultisocketsink: the multisocketsink element that emitted this signal
317 * @socket: the socket that is to be removed from multisocketsink
318 * @status: the reason why the client was removed
320 * The given socket is about to be removed from multisocketsink. This
321 * signal will be emitted from the streaming thread so applications should
322 * be prepared for that.
324 * @gstmultisocketsink still holds a handle to @socket so it is possible to call
325 * the get-stats signal from this callback. For the same reason it is
326 * not safe to close() and reuse @socket in this callback.
328 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
329 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
330 G_SIGNAL_RUN_LAST, 0, NULL, NULL, gst_tcp_marshal_VOID__OBJECT_ENUM,
331 G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
333 * GstMultiSocketSink::client-socket-removed:
334 * @gstmultisocketsink: the multisocketsink element that emitted this signal
335 * @socket: the socket that was removed from multisocketsink
337 * The given socket was removed from multisocketsink. This signal will
338 * be emitted from the streaming thread so applications should be prepared
341 * In this callback, @gstmultisocketsink has removed all the information
342 * associated with @socket and it is therefore not possible to call get-stats
343 * with @socket. It is however safe to close() and reuse @fd in the callback.
347 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
348 g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
349 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
350 G_TYPE_NONE, 1, G_TYPE_SOCKET);
352 gst_element_class_set_static_metadata (gstelement_class,
353 "Multi socket sink", "Sink/Network",
354 "Send data to multiple sockets",
355 "Thomas Vander Stichele <thomas at apestaart dot org>, "
356 "Wim Taymans <wim@fluendo.com>, "
357 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
359 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
360 gstbasesink_class->unlock_stop =
361 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
363 klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
364 klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
365 klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
366 klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
367 klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
369 gstmultihandlesink_class->emit_client_added =
370 gst_multi_socket_sink_emit_client_added;
371 gstmultihandlesink_class->emit_client_removed =
372 gst_multi_socket_sink_emit_client_removed;
374 gstmultihandlesink_class->stop_pre =
375 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
376 gstmultihandlesink_class->stop_post =
377 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
378 gstmultihandlesink_class->start_pre =
379 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
380 gstmultihandlesink_class->thread =
381 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
382 gstmultihandlesink_class->new_client =
383 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
384 gstmultihandlesink_class->client_get_fd =
385 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
386 gstmultihandlesink_class->client_free =
387 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
388 gstmultihandlesink_class->handle_debug =
389 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
390 gstmultihandlesink_class->handle_hash_key =
391 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
392 gstmultihandlesink_class->hash_adding =
393 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
394 gstmultihandlesink_class->hash_removing =
395 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
397 GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
398 "Multi socket sink");
402 gst_multi_socket_sink_init (GstMultiSocketSink * this)
404 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
406 mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
408 this->cancellable = g_cancellable_new ();
412 gst_multi_socket_sink_finalize (GObject * object)
414 GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
416 if (this->cancellable) {
417 g_object_unref (this->cancellable);
418 this->cancellable = NULL;
421 G_OBJECT_CLASS (parent_class)->finalize (object);
424 /* methods to emit signals */
427 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
428 GstMultiSinkHandle handle)
430 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
435 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
436 GstMultiSinkHandle handle, GstClientStatus status)
438 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
439 0, handle.socket, status);
445 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
447 GstMultiSinkHandle handle;
449 handle.socket = socket;
450 gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
454 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
455 GstSyncMethod sync, GstFormat min_format, guint64 min_value,
456 GstFormat max_format, guint64 max_value)
458 GstMultiSinkHandle handle;
460 handle.socket = socket;
461 gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
462 sync, min_format, min_value, max_format, max_value);
466 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
468 GstMultiSinkHandle handle;
470 handle.socket = socket;
471 gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
475 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
477 GstMultiSinkHandle handle;
479 handle.socket = socket;
480 gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
484 static GstStructure *
485 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
487 GstMultiSinkHandle handle;
489 handle.socket = socket;
490 return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
494 static GstMultiHandleClient *
495 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
496 GstMultiSinkHandle handle, GstSyncMethod sync_method)
498 GstSocketClient *client;
499 GstMultiHandleClient *mhclient;
500 GstMultiHandleSinkClass *mhsinkclass =
501 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
503 /* create client datastructure */
504 g_assert (G_IS_SOCKET (handle.socket));
505 client = g_new0 (GstSocketClient, 1);
506 mhclient = (GstMultiHandleClient *) client;
508 mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
510 gst_multi_handle_sink_client_init (mhclient, sync_method);
511 mhsinkclass->handle_debug (handle, mhclient->debug);
513 /* set the socket to non blocking */
514 g_socket_set_blocking (handle.socket, FALSE);
516 /* we always read from a client */
517 mhsinkclass->hash_adding (mhsink, mhclient);
519 gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
525 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
527 return g_socket_get_fd (client->handle.socket);
531 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
532 GstMultiHandleClient * client)
534 g_assert (G_IS_SOCKET (client->handle.socket));
536 g_signal_emit (mhsink,
537 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
538 client->handle.socket);
540 g_object_unref (client->handle.socket);
544 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
546 g_snprintf (debug, 30, "[socket %p]", handle.socket);
550 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
552 return handle.socket;
555 /* handle a read on a client socket,
556 * which either indicates a close or should be ignored
557 * returns FALSE if some error occured or the client closed. */
559 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
560 GstSocketClient * client)
566 gboolean first = TRUE;
567 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
569 GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
573 /* just Read 'n' Drop, could also just drop the client as it's not supposed
574 * to write to us except for closing the socket, I guess it's because we
575 * like to listen to our customers. */
579 GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
581 navail = g_socket_get_available_bytes (mhclient->handle.socket);
586 g_socket_receive (mhclient->handle.socket, dummy, MIN (navail,
587 sizeof (dummy)), sink->cancellable, &err);
588 if (first && nread == 0) {
589 /* client sent close, so remove it */
590 GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
592 mhclient->status = GST_CLIENT_STATUS_CLOSED;
594 } else if (nread < 0) {
595 GST_WARNING_OBJECT (sink, "%s could not read: %s",
596 mhclient->debug, err->message);
597 mhclient->status = GST_CLIENT_STATUS_ERROR;
603 g_clear_error (&err);
608 /* Handle a write on a client,
609 * which indicates a read request from a client.
611 * For each client we maintain a queue of GstBuffers that contain the raw bytes
612 * we need to send to the client.
614 * We first check to see if we need to send streamheaders. If so, we queue them.
616 * Then we run into the main loop that tries to send as many buffers as
617 * possible. It will first exhaust the mhclient->sending queue and if the queue
618 * is empty, it will pick a buffer from the global queue.
620 * Sending the buffers from the mhclient->sending queue is basically writing
621 * the bytes to the socket and maintaining a count of the bytes that were
622 * sent. When the buffer is completely sent, it is removed from the
623 * mhclient->sending queue and we try to pick a new buffer for sending.
625 * When the sending returns a partial buffer we stop sending more data as
626 * the next send operation could block.
628 * This functions returns FALSE if some error occured.
631 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
632 GstSocketClient * client)
639 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
640 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
641 GstMultiHandleSinkClass *mhsinkclass =
642 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
645 g_get_current_time (&nowtv);
646 now = GST_TIMEVAL_TO_TIME (nowtv);
648 flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
654 if (!mhclient->sending) {
655 /* client is not working on a buffer */
656 if (mhclient->bufpos == -1) {
657 /* client is too fast, remove from write queue until new buffer is
659 /* FIXME: specific */
660 if (client->source) {
661 g_source_destroy (client->source);
662 g_source_unref (client->source);
663 client->source = NULL;
666 /* if we flushed out all of the client buffers, we can stop */
667 if (mhclient->flushcount == 0)
672 /* client can pick a buffer from the global queue */
674 GstClockTime timestamp;
676 /* for new connections, we need to find a good spot in the
677 * bufqueue to start streaming from */
678 if (mhclient->new_connection && !flushing) {
680 gst_multi_handle_sink_new_client_position (mhsink, mhclient);
683 /* we got a valid spot in the queue */
684 mhclient->new_connection = FALSE;
685 mhclient->bufpos = position;
687 /* cannot send data to this client yet */
688 /* FIXME: specific */
689 if (client->source) {
690 g_source_destroy (client->source);
691 g_source_unref (client->source);
692 client->source = NULL;
699 /* we flushed all remaining buffers, no need to get a new one */
700 if (mhclient->flushcount == 0)
704 buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
708 timestamp = GST_BUFFER_TIMESTAMP (buf);
709 if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
710 mhclient->first_buffer_ts = timestamp;
712 mhclient->last_buffer_ts = timestamp;
714 /* decrease flushcount */
715 if (mhclient->flushcount != -1)
716 mhclient->flushcount--;
718 GST_LOG_OBJECT (sink, "%s client %p at position %d",
719 mhclient->debug, client, mhclient->bufpos);
721 /* queueing a buffer will ref it */
722 mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
724 /* need to start from the first byte for this new buffer */
725 mhclient->bufoffset = 0;
729 /* see if we need to send something */
730 if (mhclient->sending) {
735 /* pick first buffer from list */
736 head = GST_BUFFER (mhclient->sending->data);
738 gst_buffer_map (head, &map, GST_MAP_READ);
739 maxsize = map.size - mhclient->bufoffset;
741 /* FIXME: specific */
742 /* try to write the complete buffer */
745 g_socket_send (mhclient->handle.socket,
746 (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
748 gst_buffer_unmap (head, &map);
752 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
753 goto connection_reset;
758 if (wrote < maxsize) {
759 /* partial write means that the client cannot read more and we should
760 * stop sending more */
761 GST_LOG_OBJECT (sink,
762 "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
763 mhclient->handle.socket, wrote);
764 mhclient->bufoffset += wrote;
767 /* complete buffer was written, we can proceed to the next one */
768 mhclient->sending = g_slist_remove (mhclient->sending, head);
769 gst_buffer_unref (head);
770 /* make sure we start from byte 0 for the next buffer */
771 mhclient->bufoffset = 0;
774 mhclient->bytes_sent += wrote;
775 mhclient->last_activity_time = now;
776 mhsink->bytes_served += wrote;
786 GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
787 mhclient->status = GST_CLIENT_STATUS_REMOVED;
792 GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
794 mhclient->status = GST_CLIENT_STATUS_CLOSED;
795 g_clear_error (&err);
800 GST_WARNING_OBJECT (sink,
801 "%s could not write, removing client: %s", mhclient->debug,
803 g_clear_error (&err);
804 mhclient->status = GST_CLIENT_STATUS_ERROR;
810 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
811 GstMultiHandleClient * mhclient)
813 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
814 GstSocketClient *client = (GstSocketClient *) (mhclient);
816 if (!sink->main_context)
819 if (!client->source) {
821 g_socket_create_source (mhclient->handle.socket,
822 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
823 g_source_set_callback (client->source,
824 (GSourceFunc) gst_multi_socket_sink_socket_condition,
825 gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
826 g_source_attach (client->source, sink->main_context);
831 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
832 GstMultiHandleClient * mhclient)
834 GstSocketClient *client = (GstSocketClient *) (mhclient);
836 if (client->source) {
837 g_source_destroy (client->source);
838 g_source_unref (client->source);
839 client->source = NULL;
843 /* Handle the clients. This is called when a socket becomes ready
844 * to read or writable. Badly behaving clients are put on a
845 * garbage list and removed.
848 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
849 GIOCondition condition, GstMultiSocketSink * sink)
852 GstSocketClient *client;
854 GstMultiHandleClient *mhclient;
855 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
856 GstMultiHandleSinkClass *mhsinkclass =
857 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
859 CLIENTS_LOCK (mhsink);
860 clink = g_hash_table_lookup (mhsink->handle_hash,
861 mhsinkclass->handle_hash_key (handle));
867 client = clink->data;
868 mhclient = (GstMultiHandleClient *) client;
870 if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
871 && mhclient->status != GST_CLIENT_STATUS_OK) {
872 gst_multi_handle_sink_remove_client_link (mhsink, clink);
877 if ((condition & G_IO_ERR)) {
878 GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
879 mhclient->status = GST_CLIENT_STATUS_ERROR;
880 gst_multi_handle_sink_remove_client_link (mhsink, clink);
883 } else if ((condition & G_IO_HUP)) {
884 mhclient->status = GST_CLIENT_STATUS_CLOSED;
885 gst_multi_handle_sink_remove_client_link (mhsink, clink);
888 } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
889 /* handle client read */
890 if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
891 gst_multi_handle_sink_remove_client_link (mhsink, clink);
895 } else if ((condition & G_IO_OUT)) {
896 /* handle client write */
897 if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
898 gst_multi_handle_sink_remove_client_link (mhsink, clink);
905 CLIENTS_UNLOCK (mhsink);
911 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
916 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
918 g_get_current_time (&nowtv);
919 now = GST_TIMEVAL_TO_TIME (nowtv);
921 CLIENTS_LOCK (mhsink);
922 for (clients = mhsink->clients; clients; clients = clients->next) {
923 GstSocketClient *client;
924 GstMultiHandleClient *mhclient;
926 client = clients->data;
927 mhclient = (GstMultiHandleClient *) client;
928 if (mhsink->timeout > 0
929 && now - mhclient->last_activity_time > mhsink->timeout) {
930 mhclient->status = GST_CLIENT_STATUS_SLOW;
931 gst_multi_handle_sink_remove_client_link (mhsink, clients);
934 CLIENTS_UNLOCK (mhsink);
939 /* we handle the client communication in another thread so that we do not block
940 * the gstreamer thread while we select() on the client fds */
942 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
944 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
945 GSource *timeout = NULL;
947 while (mhsink->running) {
948 if (mhsink->timeout > 0) {
949 timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
951 g_source_set_callback (timeout,
952 (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
953 (GDestroyNotify) gst_object_unref);
954 g_source_attach (timeout, sink->main_context);
957 /* Returns after handling all pending events or when
958 * _wakeup() was called. In any case we have to add
959 * a new timeout because something happened.
961 g_main_context_iteration (sink->main_context, TRUE);
964 g_source_destroy (timeout);
965 g_source_unref (timeout);
973 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
974 const GValue * value, GParamSpec * pspec)
979 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
985 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
986 GValue * value, GParamSpec * pspec)
990 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
996 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
998 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
999 GstMultiHandleSinkClass *mhsinkclass =
1000 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1003 GST_INFO_OBJECT (mssink, "starting");
1005 mssink->main_context = g_main_context_new ();
1007 CLIENTS_LOCK (mhsink);
1008 for (clients = mhsink->clients; clients; clients = clients->next) {
1009 GstSocketClient *client = clients->data;
1010 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1014 mhsinkclass->hash_adding (mhsink, mhclient);
1016 CLIENTS_UNLOCK (mhsink);
1022 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1028 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1030 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1032 if (mssink->main_context)
1033 g_main_context_wakeup (mssink->main_context);
1037 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1039 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1041 if (mssink->main_context) {
1042 g_main_context_unref (mssink->main_context);
1043 mssink->main_context = NULL;
1046 g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1051 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1053 GstMultiSocketSink *sink;
1055 sink = GST_MULTI_SOCKET_SINK (bsink);
1057 GST_DEBUG_OBJECT (sink, "set to flushing");
1058 g_cancellable_cancel (sink->cancellable);
1059 if (sink->main_context)
1060 g_main_context_wakeup (sink->main_context);
1065 /* will be called only between calls to start() and stop() */
1067 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1069 GstMultiSocketSink *sink;
1071 sink = GST_MULTI_SOCKET_SINK (bsink);
1073 GST_DEBUG_OBJECT (sink, "unset flushing");
1074 g_cancellable_reset (sink->cancellable);