2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5 * Copyright (C) <2011> Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21 * Boston, MA 02111-1307, USA.
25 * SECTION:element-multisocketsink
26 * @see_also: tcpserversink
28 * This plugin writes incoming data to a set of file descriptors. The
29 * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
30 * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32 * As of version 0.10.8, a client can also be added with the #GstMultiSocketSink::add-full signal
33 * that allows for more control over what and how much data a client
36 * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
37 * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
38 * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
39 * client is not active anymore or, depending on the value of the
40 * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
41 * In all cases, multisocketsink will never close a file descriptor itself.
42 * The user of multisocketsink is responsible for closing all file descriptors.
43 * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal.
44 * Note that multisocketsink still has a reference to the file descriptor when the
45 * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
46 * the descriptor; it is therefore not safe to close the file descriptor in
47 * the #GstMultiSocketSink::client-removed signal handler, and you should use the
48 * #GstMultiSocketSink::client-fd-removed signal to safely close the fd.
50 * Multisocketsink internally keeps a queue of the incoming buffers and uses a
51 * separate thread to send the buffers to the clients. This ensures that no
52 * client write can block the pipeline and that clients can read with different
55 * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
56 * which buffer in the queued buffers will be sent first to the client. Clients
57 * can be sent the most recent buffer (which might not be decodable by the
58 * client if it is not a keyframe), the next keyframe received in
59 * multisocketsink (which can take some time depending on the keyframe rate), or the
60 * last received keyframe (which will cause a simple burst-on-connect).
61 * Multisocketsink will always keep at least one keyframe in its internal buffers
62 * when the sync-mode is set to latest-keyframe.
64 * As of version 0.10.8, there are additional values for the #GstMultiSocketSink:sync-method
65 * property to allow finer control over burst-on-connect behaviour. By selecting
66 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
67 * additionally requires that the burst begin with a keyframe, and
68 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
69 * prefer a minimum burst size even if it requires not starting with a keyframe.
71 * Multisocketsink can be instructed to keep at least a minimum amount of data
72 * expressed in time or byte units in its internal queues with the
73 * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
74 * These properties are useful if the application adds clients with the
75 * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
76 * actually be honored.
78 * When streaming data, clients are allowed to read at a different rate than
79 * the rate at which multisocketsink receives data. If the client is reading too
80 * fast, no data will be send to the client until multisocketsink receives more
81 * data. If the client, however, reads too slowly, data for that client will be
82 * queued up in multisocketsink. Two properties control the amount of data
83 * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and
84 * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
85 * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
87 * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
88 * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
89 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
90 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
91 * positions the client to the soft limit in the buffer queue and
92 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
95 * multisocketsink will by default synchronize on the clock before serving the
96 * buffers to the clients. This behaviour can be disabled by setting the sync
97 * property to FALSE. Multisocketsink will by default not do QoS and will never
100 * Last reviewed on 2006-09-12 (0.10.10)
107 #include <gst/gst-i18n-plugin.h>
109 #include "gstmultisocketsink.h"
110 #include "gsttcp-marshal.h"
113 #include <netinet/in.h>
116 #define NOT_IMPLEMENTED 0
118 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
121 GST_STATIC_CAPS_ANY);
123 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
124 #define GST_CAT_DEFAULT (multisocketsink_debug)
126 /* MultiSocketSink signals and args */
138 SIGNAL_CLIENT_REMOVED,
139 SIGNAL_CLIENT_SOCKET_REMOVED,
145 /* this is really arbitrarily chosen */
146 #define DEFAULT_MODE 1
147 #define DEFAULT_BUFFERS_MAX -1
148 #define DEFAULT_BUFFERS_SOFT_MAX -1
149 #define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS
150 #define DEFAULT_UNITS_MAX -1
151 #define DEFAULT_UNITS_SOFT_MAX -1
153 #define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
154 #define DEFAULT_BURST_VALUE 0
156 #define DEFAULT_QOS_DSCP -1
168 PROP_BUFFERS_SOFT_MAX,
180 static void gst_multi_socket_sink_finalize (GObject * object);
182 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
183 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
184 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
185 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
187 static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
189 static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
190 GIOCondition condition, GstMultiSocketSink * sink);
192 static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink,
194 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
195 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
197 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
198 const GValue * value, GParamSpec * pspec);
199 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
200 GValue * value, GParamSpec * pspec);
202 #define gst_multi_socket_sink_parent_class parent_class
203 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
204 GST_TYPE_MULTI_HANDLE_SINK);
206 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
209 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
211 GObjectClass *gobject_class;
212 GstElementClass *gstelement_class;
213 GstBaseSinkClass *gstbasesink_class;
214 GstMultiHandleSinkClass *gstmultihandlesink_class;
216 gobject_class = (GObjectClass *) klass;
217 gstelement_class = (GstElementClass *) klass;
218 gstbasesink_class = (GstBaseSinkClass *) klass;
219 gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
221 gobject_class->set_property = gst_multi_socket_sink_set_property;
222 gobject_class->get_property = gst_multi_socket_sink_get_property;
223 gobject_class->finalize = gst_multi_socket_sink_finalize;
225 g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
226 g_param_spec_int ("buffers-max", "Buffers max",
227 "max number of buffers to queue for a client (-1 = no limit)", -1,
228 G_MAXINT, DEFAULT_BUFFERS_MAX,
229 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230 g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
231 g_param_spec_int ("buffers-soft-max", "Buffers soft max",
232 "Recover client when going over this limit (-1 = no limit)", -1,
233 G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
234 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236 g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
237 g_param_spec_enum ("unit-type", "Units type",
238 "The unit to measure the max/soft-max/queued properties",
239 GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
240 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241 g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
242 g_param_spec_int64 ("units-max", "Units max",
243 "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
244 DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
245 g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
246 g_param_spec_int64 ("units-soft-max", "Units soft max",
247 "Recover client when going over this limit (-1 = no limit)", -1,
248 G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
249 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
252 g_param_spec_enum ("burst-format", "Burst format",
253 "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
254 GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
255 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256 g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
257 g_param_spec_uint64 ("burst-value", "Burst value",
258 "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
259 DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261 g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
262 g_param_spec_int ("qos-dscp", "QoS diff srv code point",
263 "Quality of Service, differentiated services code point (-1 default)",
264 -1, 63, DEFAULT_QOS_DSCP,
265 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267 g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
268 g_param_spec_uint ("num-sockets", "Number of sockets",
269 "The current number of client sockets",
270 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
273 * GstMultiSocketSink::add:
274 * @gstmultisocketsink: the multisocketsink element to emit this signal on
275 * @socket: the socket to add to multisocketsink
277 * Hand the given open socket to multisocketsink to write to.
279 gst_multi_socket_sink_signals[SIGNAL_ADD] =
280 g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
281 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
282 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
283 g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
285 * GstMultiSocketSink::add-full:
286 * @gstmultisocketsink: the multisocketsink element to emit this signal on
287 * @socket: the socket to add to multisocketsink
288 * @sync: the sync method to use
289 * @format_min: the format of @value_min
290 * @value_min: the minimum amount of data to burst expressed in
292 * @format_max: the format of @value_max
293 * @value_max: the maximum amount of data to burst expressed in
296 * Hand the given open socket to multisocketsink to write to and
297 * specify the burst parameters for the new connection.
299 gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
300 g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
301 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
302 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
303 gst_tcp_marshal_VOID__OBJECT_ENUM_ENUM_UINT64_ENUM_UINT64, G_TYPE_NONE, 6,
304 G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64,
305 GST_TYPE_FORMAT, G_TYPE_UINT64);
307 * GstMultiSocketSink::remove:
308 * @gstmultisocketsink: the multisocketsink element to emit this signal on
309 * @socket: the socket to remove from multisocketsink
311 * Remove the given open socket from multisocketsink.
313 gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
314 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
315 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
316 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL,
317 g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
319 * GstMultiSocketSink::remove-flush:
320 * @gstmultisocketsink: the multisocketsink element to emit this signal on
321 * @socket: the socket to remove from multisocketsink
323 * Remove the given open socket from multisocketsink after flushing all
324 * the pending data to the socket.
326 gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
327 g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
328 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
329 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL,
330 g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
333 * GstMultiSocketSink::get-stats:
334 * @gstmultisocketsink: the multisocketsink element to emit this signal on
335 * @socket: the socket to get stats of from multisocketsink
337 * Get statistics about @socket. This function returns a GstStructure.
339 * Returns: a GstStructure with the statistics. The structure contains
340 * values that represent: total number of bytes sent, time
341 * when the client was added, time when the client was
342 * disconnected/removed, time the client is/was active, last activity
343 * time (in epoch seconds), number of buffers dropped.
344 * All times are expressed in nanoseconds (GstClockTime).
346 gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
347 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
348 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
349 G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL,
350 gst_tcp_marshal_BOXED__OBJECT, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
353 * GstMultiSocketSink::client-added:
354 * @gstmultisocketsink: the multisocketsink element that emitted this signal
355 * @socket: the socket that was added to multisocketsink
357 * The given socket was added to multisocketsink. This signal will
358 * be emitted from the streaming thread so application should be prepared
361 gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
362 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
363 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass,
364 client_added), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
365 G_TYPE_NONE, 1, G_TYPE_OBJECT);
367 * GstMultiSocketSink::client-removed:
368 * @gstmultisocketsink: the multisocketsink element that emitted this signal
369 * @socket: the socket that is to be removed from multisocketsink
370 * @status: the reason why the client was removed
372 * The given socket is about to be removed from multisocketsink. This
373 * signal will be emitted from the streaming thread so applications should
374 * be prepared for that.
376 * @gstmultisocketsink still holds a handle to @socket so it is possible to call
377 * the get-stats signal from this callback. For the same reason it is
378 * not safe to close() and reuse @socket in this callback.
380 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
381 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
382 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass,
383 client_removed), NULL, NULL, gst_tcp_marshal_VOID__OBJECT_ENUM,
384 G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
386 * GstMultiSocketSink::client-socket-removed:
387 * @gstmultisocketsink: the multisocketsink element that emitted this signal
388 * @socket: the socket that was removed from multisocketsink
390 * The given socket was removed from multisocketsink. This signal will
391 * be emitted from the streaming thread so applications should be prepared
394 * In this callback, @gstmultisocketsink has removed all the information
395 * associated with @socket and it is therefore not possible to call get-stats
396 * with @socket. It is however safe to close() and reuse @fd in the callback.
400 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
401 g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
402 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiSocketSinkClass,
403 client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
404 G_TYPE_NONE, 1, G_TYPE_SOCKET);
406 gst_element_class_add_pad_template (gstelement_class,
407 gst_static_pad_template_get (&sinktemplate));
409 gst_element_class_set_details_simple (gstelement_class,
410 "Multi socket sink", "Sink/Network",
411 "Send data to multiple sockets",
412 "Thomas Vander Stichele <thomas at apestaart dot org>, "
413 "Wim Taymans <wim@fluendo.com>, "
414 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
416 gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render);
418 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
419 gstbasesink_class->unlock_stop =
420 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
422 gstmultihandlesink_class->stop_pre =
423 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
424 gstmultihandlesink_class->stop_post =
425 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
426 gstmultihandlesink_class->start_pre =
427 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
428 gstmultihandlesink_class->thread =
429 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
431 gstmultihandlesink_class->remove_client_link =
432 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
434 klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
435 klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
436 klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
437 klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
438 klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
440 GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
441 "Multi socket sink");
445 gst_multi_socket_sink_init (GstMultiSocketSink * this)
447 this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
449 this->unit_type = DEFAULT_UNIT_TYPE;
450 this->units_max = DEFAULT_UNITS_MAX;
451 this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
453 this->def_burst_format = DEFAULT_BURST_FORMAT;
454 this->def_burst_value = DEFAULT_BURST_VALUE;
456 this->qos_dscp = DEFAULT_QOS_DSCP;
458 this->header_flags = 0;
459 this->cancellable = g_cancellable_new ();
463 gst_multi_socket_sink_finalize (GObject * object)
465 GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
467 g_hash_table_destroy (this->socket_hash);
468 if (this->cancellable) {
469 g_object_unref (this->cancellable);
470 this->cancellable = NULL;
473 G_OBJECT_CLASS (parent_class)->finalize (object);
477 setup_dscp_client (GstMultiSocketSink * sink, GstSocketClient * client)
488 struct sockaddr_in6 sa_in6;
489 struct sockaddr_storage sa_stor;
491 socklen_t slen = sizeof (sa);
495 if (sink->qos_dscp < 0)
498 fd = g_socket_get_fd (client->socket);
500 if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
501 GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
505 af = sa.sa.sa_family;
507 /* if this is an IPv4-mapped address then do IPv4 QoS */
508 if (af == AF_INET6) {
510 GST_DEBUG_OBJECT (sink, "check IP6 socket");
511 if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
512 GST_DEBUG_OBJECT (sink, "mapped to IPV4");
517 /* extract and shift 6 bits of the DSCP */
518 tos = (sink->qos_dscp & 0x3f) << 2;
522 ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
526 ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
531 GST_ERROR_OBJECT (sink, "unsupported AF");
535 GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
542 setup_dscp (GstMultiSocketSink * sink)
545 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
548 for (clients = mhsink->clients; clients; clients = clients->next) {
549 GstSocketClient *client;
551 client = clients->data;
553 setup_dscp_client (sink, client);
555 CLIENTS_UNLOCK (sink);
558 /* "add-full" signal implementation */
560 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
561 GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
562 GstFormat max_format, guint64 max_value)
564 GstSocketClient *client;
565 GstMultiHandleClient *mhclient;
567 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
569 GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, "
570 "min_format %d, min_value %" G_GUINT64_FORMAT
571 ", max_format %d, max_value %" G_GUINT64_FORMAT, socket,
572 sync_method, min_format, min_value, max_format, max_value);
574 /* do limits check if we can */
575 if (min_format == max_format) {
576 if (max_value != -1 && min_value != -1 && max_value < min_value)
580 /* create client datastructure */
581 client = g_new0 (GstSocketClient, 1);
582 mhclient = (GstMultiHandleClient *) client;
583 gst_multi_handle_sink_client_init (mhclient, sync_method);
584 g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
585 client->socket = G_SOCKET (g_object_ref (socket));
587 client->burst_min_format = min_format;
588 client->burst_min_value = min_value;
589 client->burst_max_format = max_format;
590 client->burst_max_value = max_value;
594 /* check the hash to find a duplicate fd */
595 clink = g_hash_table_lookup (sink->socket_hash, socket);
599 /* we can add the fd now */
600 clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
601 g_hash_table_insert (sink->socket_hash, socket, clink);
602 mhsink->clients_cookie++;
604 /* set the socket to non blocking */
605 g_socket_set_blocking (socket, FALSE);
607 /* we always read from a client */
608 if (sink->main_context) {
610 g_socket_create_source (client->socket,
611 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
612 g_source_set_callback (client->source,
613 (GSourceFunc) gst_multi_socket_sink_socket_condition,
614 gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
615 g_source_attach (client->source, sink->main_context);
618 setup_dscp_client (sink, client);
620 CLIENTS_UNLOCK (sink);
622 g_signal_emit (G_OBJECT (sink),
623 gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, socket);
630 GST_WARNING_OBJECT (sink,
631 "[socket %p] wrong values min =%" G_GUINT64_FORMAT ", max=%"
632 G_GUINT64_FORMAT ", format %d specified when adding client", socket,
633 min_value, max_value, min_format);
638 mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
639 CLIENTS_UNLOCK (sink);
640 GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing",
642 g_signal_emit (G_OBJECT (sink),
643 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
650 /* "add" signal implementation */
652 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
654 GstMultiHandleSink *mhsink;
656 mhsink = GST_MULTI_HANDLE_SINK (sink);
657 gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method,
658 sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
662 /* "remove" signal implementation */
664 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
667 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
668 GstMultiHandleSinkClass *mhsinkclass =
669 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
671 GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket);
674 clink = g_hash_table_lookup (sink->socket_hash, socket);
676 GstSocketClient *client = clink->data;
677 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
679 if (mhclient->status != GST_CLIENT_STATUS_OK) {
680 GST_INFO_OBJECT (sink,
681 "[socket %p] Client already disconnecting with status %d",
682 socket, mhclient->status);
686 mhclient->status = GST_CLIENT_STATUS_REMOVED;
687 mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
689 GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!",
694 CLIENTS_UNLOCK (sink);
697 /* "remove-flush" signal implementation */
699 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
703 GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", socket);
706 clink = g_hash_table_lookup (sink->socket_hash, socket);
708 GstSocketClient *client = clink->data;
709 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
711 if (mhclient->status != GST_CLIENT_STATUS_OK) {
712 GST_INFO_OBJECT (sink,
713 "[socket %p] Client already disconnecting with status %d",
714 socket, mhclient->status);
718 /* take the position of the client as the number of buffers left to flush.
719 * If the client was at position -1, we flush 0 buffers, 0 == flush 1
721 mhclient->flushcount = mhclient->bufpos + 1;
722 /* mark client as flushing. We can not remove the client right away because
723 * it might have some buffers to flush in the ->sending queue. */
724 mhclient->status = GST_CLIENT_STATUS_FLUSHING;
726 GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!",
730 CLIENTS_UNLOCK (sink);
733 /* "get-stats" signal implementation
736 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
738 GstSocketClient *client;
739 GstStructure *result = NULL;
743 clink = g_hash_table_lookup (sink->socket_hash, socket);
747 client = clink->data;
748 if (client != NULL) {
749 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
752 result = gst_structure_new_empty ("multisocketsink-stats");
754 if (mhclient->disconnect_time == 0) {
757 g_get_current_time (&nowtv);
759 interval = GST_TIMEVAL_TO_TIME (nowtv) - mhclient->connect_time;
761 interval = mhclient->disconnect_time - mhclient->connect_time;
764 gst_structure_set (result,
765 "bytes-sent", G_TYPE_UINT64, mhclient->bytes_sent,
766 "connect-time", G_TYPE_UINT64, mhclient->connect_time,
767 "disconnect-time", G_TYPE_UINT64, mhclient->disconnect_time,
768 "connected-duration", G_TYPE_UINT64, interval,
769 "last-activatity-time", G_TYPE_UINT64, mhclient->last_activity_time,
770 "dropped-buffers", G_TYPE_UINT64, mhclient->dropped_buffers,
771 "first-buffer-ts", G_TYPE_UINT64, mhclient->first_buffer_ts,
772 "last-buffer-ts", G_TYPE_UINT64, mhclient->last_buffer_ts, NULL);
776 CLIENTS_UNLOCK (sink);
778 /* python doesn't like a NULL pointer yet */
779 if (result == NULL) {
780 GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket);
781 result = gst_structure_new_empty ("multisocketsink-stats");
787 /* should be called with the clientslock helt.
788 * Note that we don't close the fd as we didn't open it in the first
789 * place. An application should connect to the client-fd-removed signal and
790 * close the fd itself.
793 gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
798 GstSocketClient *client = link->data;
799 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
800 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (sink);
801 GstMultiSocketSinkClass *fclass;
803 fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink);
805 socket = client->socket;
807 if (mhclient->currently_removing) {
808 GST_WARNING_OBJECT (sink, "%s client is already being removed",
812 mhclient->currently_removing = TRUE;
815 /* FIXME: if we keep track of ip we can log it here and signal */
816 switch (mhclient->status) {
817 case GST_CLIENT_STATUS_OK:
818 GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
819 mhclient->debug, client);
821 case GST_CLIENT_STATUS_CLOSED:
822 GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
823 mhclient->debug, client);
825 case GST_CLIENT_STATUS_REMOVED:
826 GST_DEBUG_OBJECT (sink,
827 "%s removing client %p because the app removed it", mhclient->debug,
830 case GST_CLIENT_STATUS_SLOW:
831 GST_INFO_OBJECT (sink,
832 "%s removing client %p because it was too slow", mhclient->debug,
835 case GST_CLIENT_STATUS_ERROR:
836 GST_WARNING_OBJECT (sink,
837 "%s removing client %p because of error", mhclient->debug, client);
839 case GST_CLIENT_STATUS_FLUSHING:
841 GST_WARNING_OBJECT (sink,
842 "%s removing client %p with invalid reason %d", mhclient->debug,
843 client, mhclient->status);
847 if (client->source) {
848 g_source_destroy (client->source);
849 g_source_unref (client->source);
850 client->source = NULL;
853 g_get_current_time (&now);
854 mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now);
856 /* free client buffers */
857 g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL);
858 g_slist_free (mhclient->sending);
859 mhclient->sending = NULL;
862 gst_caps_unref (mhclient->caps);
863 mhclient->caps = NULL;
865 /* unlock the mutex before signaling because the signal handler
866 * might query some properties */
867 CLIENTS_UNLOCK (sink);
869 g_signal_emit (G_OBJECT (sink),
870 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
873 /* lock again before we remove the client completely */
876 /* fd cannot be reused in the above signal callback so we can safely
877 * remove it from the hashtable here */
878 if (!g_hash_table_remove (mssink->socket_hash, socket)) {
879 GST_WARNING_OBJECT (sink,
880 "[socket %p] error removing client %p from hash", socket, client);
882 /* after releasing the lock above, the link could be invalid, more
883 * precisely, the next and prev pointers could point to invalid list
884 * links. One optimisation could be to add a cookie to the linked list
885 * and take a shortcut when it did not change between unlocking and locking
886 * our mutex. For now we just walk the list again. */
887 sink->clients = g_list_remove (sink->clients, client);
888 sink->clients_cookie++;
891 fclass->removed (mssink, socket);
894 CLIENTS_UNLOCK (sink);
896 /* and the fd is really gone now */
897 g_signal_emit (G_OBJECT (sink),
898 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, socket);
899 g_object_unref (socket);
904 /* handle a read on a client socket,
905 * which either indicates a close or should be ignored
906 * returns FALSE if some error occured or the client closed. */
908 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
909 GstSocketClient * client)
915 gboolean first = TRUE;
916 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
918 GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read",
923 /* just Read 'n' Drop, could also just drop the client as it's not supposed
924 * to write to us except for closing the socket, I guess it's because we
925 * like to listen to our customers. */
929 GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
932 navail = g_socket_get_available_bytes (client->socket);
937 g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
938 sink->cancellable, &err);
939 if (first && nread == 0) {
940 /* client sent close, so remove it */
941 GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing",
943 mhclient->status = GST_CLIENT_STATUS_CLOSED;
945 } else if (nread < 0) {
946 GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s",
947 client->socket, err->message);
948 mhclient->status = GST_CLIENT_STATUS_ERROR;
954 g_clear_error (&err);
959 /* queue the given buffer for the given client */
961 gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
962 GstSocketClient * client, GstBuffer * buffer)
964 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
965 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
968 /* TRUE: send them if the new caps have them */
969 gboolean send_streamheader = FALSE;
972 /* before we queue the buffer, we check if we need to queue streamheader
973 * buffers (because it's a new client, or because they changed) */
974 caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink));
976 if (!mhclient->caps) {
977 GST_DEBUG_OBJECT (sink,
978 "[socket %p] no previous caps for this client, send streamheader",
980 send_streamheader = TRUE;
981 mhclient->caps = gst_caps_ref (caps);
983 /* there were previous caps recorded, so compare */
984 if (!gst_caps_is_equal (caps, mhclient->caps)) {
985 const GValue *sh1, *sh2;
987 /* caps are not equal, but could still have the same streamheader */
988 s = gst_caps_get_structure (caps, 0);
989 if (!gst_structure_has_field (s, "streamheader")) {
990 /* no new streamheader, so nothing new to send */
991 GST_DEBUG_OBJECT (sink,
992 "[socket %p] new caps do not have streamheader, not sending",
995 /* there is a new streamheader */
996 s = gst_caps_get_structure (mhclient->caps, 0);
997 if (!gst_structure_has_field (s, "streamheader")) {
998 /* no previous streamheader, so send the new one */
999 GST_DEBUG_OBJECT (sink,
1000 "[socket %p] previous caps did not have streamheader, sending",
1002 send_streamheader = TRUE;
1004 /* both old and new caps have streamheader set */
1005 if (!mhsink->resend_streamheader) {
1006 GST_DEBUG_OBJECT (sink,
1007 "[socket %p] asked to not resend the streamheader, not sending",
1009 send_streamheader = FALSE;
1011 sh1 = gst_structure_get_value (s, "streamheader");
1012 s = gst_caps_get_structure (caps, 0);
1013 sh2 = gst_structure_get_value (s, "streamheader");
1014 if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
1015 GST_DEBUG_OBJECT (sink,
1016 "[socket %p] new streamheader different from old, sending",
1018 send_streamheader = TRUE;
1024 /* Replace the old caps */
1025 gst_caps_unref (mhclient->caps);
1026 mhclient->caps = gst_caps_ref (caps);
1029 if (G_UNLIKELY (send_streamheader)) {
1034 GST_LOG_OBJECT (sink,
1035 "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
1036 client->socket, caps);
1037 s = gst_caps_get_structure (caps, 0);
1038 if (!gst_structure_has_field (s, "streamheader")) {
1039 GST_DEBUG_OBJECT (sink,
1040 "[socket %p] no new streamheader, so nothing to send",
1043 GST_LOG_OBJECT (sink,
1044 "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
1045 client->socket, caps);
1046 sh = gst_structure_get_value (s, "streamheader");
1047 g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
1048 buffers = g_value_peek_pointer (sh);
1049 GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len);
1050 for (i = 0; i < buffers->len; ++i) {
1054 bufval = &g_array_index (buffers, GValue, i);
1055 g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
1056 buffer = g_value_peek_pointer (bufval);
1057 GST_DEBUG_OBJECT (sink,
1058 "[socket %p] queueing streamheader buffer of length %"
1059 G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer));
1060 gst_buffer_ref (buffer);
1062 mhclient->sending = g_slist_append (mhclient->sending, buffer);
1067 gst_caps_unref (caps);
1070 GST_LOG_OBJECT (sink,
1071 "[socket %p] queueing buffer of length %" G_GSIZE_FORMAT, client->socket,
1072 gst_buffer_get_size (buffer));
1074 gst_buffer_ref (buffer);
1075 mhclient->sending = g_slist_append (mhclient->sending, buffer);
1080 /* Get the number of buffers from the buffer queue needed to satisfy
1081 * the maximum max in the configured units.
1082 * If units are not BUFFERS, and there are insufficient buffers in the
1083 * queue to satify the limit, return len(queue) + 1 */
1085 get_buffers_max (GstMultiSocketSink * sink, gint64 max)
1087 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1089 switch (sink->unit_type) {
1090 case GST_FORMAT_BUFFERS:
1092 case GST_FORMAT_TIME:
1098 GstClockTime first = GST_CLOCK_TIME_NONE;
1100 len = mhsink->bufqueue->len;
1102 for (i = 0; i < len; i++) {
1103 buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1104 if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
1106 first = GST_BUFFER_TIMESTAMP (buf);
1108 diff = first - GST_BUFFER_TIMESTAMP (buf);
1116 case GST_FORMAT_BYTES:
1123 len = mhsink->bufqueue->len;
1125 for (i = 0; i < len; i++) {
1126 buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1127 acc += gst_buffer_get_size (buf);
1139 /* find the positions in the buffer queue where *_min and *_max
1142 /* count the amount of data in the buffers and return the index
1143 * that satifies the given limits.
1145 * Returns: index @idx in the buffer queue so that the given limits are
1146 * satisfied. TRUE if all the limits could be satisfied, FALSE if not
1147 * enough data was in the queue.
1149 * FIXME, this code might now work if any of the units is in buffers...
1152 find_limits (GstMultiSocketSink * sink,
1153 gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
1154 gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
1156 GstClockTime first, time;
1158 gboolean result, max_hit;
1159 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1161 /* take length of queue */
1162 len = mhsink->bufqueue->len;
1164 /* this must hold */
1167 GST_LOG_OBJECT (sink,
1168 "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
1169 ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
1170 buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
1171 GST_TIME_ARGS (time_max));
1173 /* do the trivial buffer limit test */
1174 if (buffers_min != -1 && len < buffers_min) {
1181 /* else count bytes and time */
1190 /* loop through the buffers, when a limit is ok, mark it
1191 * as -1, we have at least one buffer in the queue. */
1195 /* if we checked all min limits, update result */
1196 if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
1197 /* don't go below 0 */
1198 *min_idx = MAX (i - 1, 0);
1200 /* if we reached one max limit break out */
1202 /* i > 0 when we get here, we subtract one to get the position
1203 * of the previous buffer. */
1205 /* we have valid complete result if we found a min_idx too */
1206 result = *min_idx != -1;
1209 buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1211 bytes += gst_buffer_get_size (buf);
1213 /* take timestamp and save for the base first timestamp */
1214 if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
1215 GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer",
1216 GST_TIME_ARGS (time));
1220 /* increase max usage if we did not fill enough. Note that
1221 * buffers are sorted from new to old, so the first timestamp is
1222 * bigger than the next one. */
1223 if (time_min != -1 && first - time >= time_min)
1225 if (time_max != -1 && first - time >= time_max)
1228 GST_LOG_OBJECT (sink, "No timestamp on buffer");
1230 /* time is OK or unknown, check and increase if not enough bytes */
1231 if (bytes_min != -1) {
1232 if (bytes >= bytes_min)
1235 if (bytes_max != -1) {
1236 if (bytes >= bytes_max) {
1244 /* if we did not hit the max or min limit, set to buffer size */
1247 /* make sure min does not exceed max */
1249 *min_idx = *max_idx;
1254 /* parse the unit/value pair and assign it to the result value of the
1255 * right type, leave the other values untouched
1257 * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
1260 assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
1261 GstClockTime * time)
1263 gboolean res = TRUE;
1265 /* set only the limit of the given format to the given value */
1267 case GST_FORMAT_BUFFERS:
1268 *buffers = (gint) value;
1270 case GST_FORMAT_TIME:
1273 case GST_FORMAT_BYTES:
1274 *bytes = (gint) value;
1276 case GST_FORMAT_UNDEFINED:
1284 /* count the index in the buffer queue to satisfy the given unit
1285 * and value pair starting from buffer at index 0.
1287 * Returns: TRUE if there was enough data in the queue to satisfy the
1288 * burst values. @idx contains the index in the buffer that contains enough
1289 * data to satisfy the limits or the last buffer in the queue when the
1290 * function returns FALSE.
1293 count_burst_unit (GstMultiSocketSink * sink, gint * min_idx,
1294 GstFormat min_format, guint64 min_value, gint * max_idx,
1295 GstFormat max_format, guint64 max_value)
1297 gint bytes_min = -1, buffers_min = -1;
1298 gint bytes_max = -1, buffers_max = -1;
1299 GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
1301 assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
1302 assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
1304 return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
1305 max_idx, bytes_max, buffers_max, time_max);
1308 /* decide where in the current buffer queue this new client should start
1309 * receiving buffers from.
1310 * This function is called whenever a client is connected and has not yet
1311 * received a buffer.
1312 * If this returns -1, it means that we haven't found a good point to
1313 * start streaming from yet, and this function should be called again later
1314 * when more buffers have arrived.
1317 gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
1318 GstSocketClient * client)
1321 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1322 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1324 GST_DEBUG_OBJECT (sink,
1325 "[socket %p] new client, deciding where to start in queue",
1327 GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
1328 mhsink->bufqueue->len);
1329 switch (mhclient->sync_method) {
1330 case GST_SYNC_METHOD_LATEST:
1331 /* no syncing, we are happy with whatever the client is going to get */
1332 result = mhclient->bufpos;
1333 GST_DEBUG_OBJECT (sink,
1334 "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket,
1337 case GST_SYNC_METHOD_NEXT_KEYFRAME:
1339 /* if one of the new buffers (between mhclient->bufpos and 0) in the queue
1340 * is a sync point, we can proceed, otherwise we need to keep waiting */
1341 GST_LOG_OBJECT (sink,
1342 "[socket %p] new client, bufpos %d, waiting for keyframe",
1343 client->socket, mhclient->bufpos);
1345 result = find_prev_syncframe (mhsink, mhclient->bufpos);
1347 GST_DEBUG_OBJECT (sink,
1348 "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
1349 client->socket, result);
1353 /* client is not on a syncbuffer, need to skip these buffers and
1355 GST_LOG_OBJECT (sink,
1356 "[socket %p] new client, skipping buffer(s), no syncpoint found",
1358 mhclient->bufpos = -1;
1361 case GST_SYNC_METHOD_LATEST_KEYFRAME:
1363 GST_DEBUG_OBJECT (sink,
1364 "[socket %p] SYNC_METHOD_LATEST_KEYFRAME", client->socket);
1366 /* for new clients we initially scan the complete buffer queue for
1367 * a sync point when a buffer is added. If we don't find a keyframe,
1368 * we need to wait for the next keyframe and so we change the client's
1369 * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
1371 result = find_next_syncframe (mhsink, 0);
1373 GST_DEBUG_OBJECT (sink,
1374 "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
1375 client->socket, result);
1379 GST_DEBUG_OBJECT (sink,
1380 "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, "
1381 "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket);
1382 /* throw client to the waiting state */
1383 mhclient->bufpos = -1;
1384 /* and make client sync to next keyframe */
1385 mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1388 case GST_SYNC_METHOD_BURST:
1393 /* move to the position where we satisfy the client's burst
1394 * parameters. If we could not satisfy the parameters because there
1395 * is not enough data, we just send what we have (which is in result).
1396 * We use the max value to limit the search
1398 ok = count_burst_unit (sink, &result, client->burst_min_format,
1399 client->burst_min_value, &max, client->burst_max_format,
1400 client->burst_max_value);
1401 GST_DEBUG_OBJECT (sink,
1402 "[socket %p] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
1403 client->socket, ok, result);
1405 GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
1407 /* we hit the max and it is below the min, use that then */
1408 if (max != -1 && max <= result) {
1409 result = MAX (max - 1, 0);
1410 GST_DEBUG_OBJECT (sink,
1411 "[socket %p] SYNC_METHOD_BURST: result above max, taken down to %d",
1412 client->socket, result);
1416 case GST_SYNC_METHOD_BURST_KEYFRAME:
1418 gint min_idx, max_idx;
1419 gint next_syncframe, prev_syncframe;
1423 * _always_ start sending a keyframe to the client. We first search
1424 * a keyframe between min/max limits. If there is none, we send it the
1425 * last keyframe before min. If there is none, the behaviour is like
1428 /* gather burst limits */
1429 count_burst_unit (sink, &min_idx, client->burst_min_format,
1430 client->burst_min_value, &max_idx, client->burst_max_format,
1431 client->burst_max_value);
1433 GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1435 /* first find a keyframe after min_idx */
1436 next_syncframe = find_next_syncframe (mhsink, min_idx);
1437 if (next_syncframe != -1 && next_syncframe < max_idx) {
1438 /* we have a valid keyframe and it's below the max */
1439 GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1440 result = next_syncframe;
1444 /* no valid keyframe, try to find one below min */
1445 prev_syncframe = find_prev_syncframe (mhsink, min_idx);
1446 if (prev_syncframe != -1) {
1447 GST_WARNING_OBJECT (sink,
1448 "using keyframe below min in BURST_KEYFRAME sync mode");
1449 result = prev_syncframe;
1453 /* no prev keyframe or not enough data */
1454 GST_WARNING_OBJECT (sink,
1455 "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next");
1457 /* throw client to the waiting state */
1458 mhclient->bufpos = -1;
1459 /* and make client sync to next keyframe */
1460 mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME;
1464 case GST_SYNC_METHOD_BURST_WITH_KEYFRAME:
1466 gint min_idx, max_idx;
1467 gint next_syncframe;
1469 /* BURST_WITH_KEYFRAME:
1471 * try to start sending a keyframe to the client. We first search
1472 * a keyframe between min/max limits. If there is none, we send it the
1473 * amount of data up 'till min.
1475 /* gather enough data to burst */
1476 count_burst_unit (sink, &min_idx, client->burst_min_format,
1477 client->burst_min_value, &max_idx, client->burst_max_format,
1478 client->burst_max_value);
1480 GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
1482 /* first find a keyframe after min_idx */
1483 next_syncframe = find_next_syncframe (mhsink, min_idx);
1484 if (next_syncframe != -1 && next_syncframe < max_idx) {
1485 /* we have a valid keyframe and it's below the max */
1486 GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
1487 result = next_syncframe;
1491 /* no keyframe, send data from min_idx */
1492 GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode");
1494 /* make sure we don't go over the max limit */
1495 if (max_idx != -1 && max_idx <= min_idx) {
1496 result = MAX (max_idx - 1, 0);
1504 g_warning ("unknown sync method %d", mhclient->sync_method);
1505 result = mhclient->bufpos;
1511 /* Handle a write on a client,
1512 * which indicates a read request from a client.
1514 * For each client we maintain a queue of GstBuffers that contain the raw bytes
1515 * we need to send to the client.
1517 * We first check to see if we need to send streamheaders. If so, we queue them.
1519 * Then we run into the main loop that tries to send as many buffers as
1520 * possible. It will first exhaust the mhclient->sending queue and if the queue
1521 * is empty, it will pick a buffer from the global queue.
1523 * Sending the buffers from the mhclient->sending queue is basically writing
1524 * the bytes to the socket and maintaining a count of the bytes that were
1525 * sent. When the buffer is completely sent, it is removed from the
1526 * mhclient->sending queue and we try to pick a new buffer for sending.
1528 * When the sending returns a partial buffer we stop sending more data as
1529 * the next send operation could block.
1531 * This functions returns FALSE if some error occured.
1534 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
1535 GstSocketClient * client)
1537 GSocket *socket = client->socket;
1543 GstMultiHandleSink *mhsink;
1544 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1546 mhsink = GST_MULTI_HANDLE_SINK (sink);
1548 g_get_current_time (&nowtv);
1549 now = GST_TIMEVAL_TO_TIME (nowtv);
1551 flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
1557 if (!mhclient->sending) {
1558 /* client is not working on a buffer */
1559 if (mhclient->bufpos == -1) {
1560 /* client is too fast, remove from write queue until new buffer is
1562 if (client->source) {
1563 g_source_destroy (client->source);
1564 g_source_unref (client->source);
1565 client->source = NULL;
1567 /* if we flushed out all of the client buffers, we can stop */
1568 if (mhclient->flushcount == 0)
1573 /* client can pick a buffer from the global queue */
1575 GstClockTime timestamp;
1577 /* for new connections, we need to find a good spot in the
1578 * bufqueue to start streaming from */
1579 if (mhclient->new_connection && !flushing) {
1580 gint position = gst_multi_socket_sink_new_client (sink, client);
1582 if (position >= 0) {
1583 /* we got a valid spot in the queue */
1584 mhclient->new_connection = FALSE;
1585 mhclient->bufpos = position;
1587 /* cannot send data to this client yet */
1588 if (client->source) {
1589 g_source_destroy (client->source);
1590 g_source_unref (client->source);
1591 client->source = NULL;
1597 /* we flushed all remaining buffers, no need to get a new one */
1598 if (mhclient->flushcount == 0)
1602 buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
1606 timestamp = GST_BUFFER_TIMESTAMP (buf);
1607 if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
1608 mhclient->first_buffer_ts = timestamp;
1609 if (timestamp != -1)
1610 mhclient->last_buffer_ts = timestamp;
1612 /* decrease flushcount */
1613 if (mhclient->flushcount != -1)
1614 mhclient->flushcount--;
1616 GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
1617 socket, client, mhclient->bufpos);
1619 /* queueing a buffer will ref it */
1620 gst_multi_socket_sink_client_queue_buffer (sink, client, buf);
1622 /* need to start from the first byte for this new buffer */
1623 mhclient->bufoffset = 0;
1627 /* see if we need to send something */
1628 if (mhclient->sending) {
1633 /* pick first buffer from list */
1634 head = GST_BUFFER (mhclient->sending->data);
1636 gst_buffer_map (head, &map, GST_MAP_READ);
1637 maxsize = map.size - mhclient->bufoffset;
1639 /* try to write the complete buffer */
1642 g_socket_send (socket, (gchar *) map.data + mhclient->bufoffset,
1643 maxsize, sink->cancellable, &err);
1644 gst_buffer_unmap (head, &map);
1648 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
1649 goto connection_reset;
1654 if (wrote < maxsize) {
1655 /* partial write means that the client cannot read more and we should
1656 * stop sending more */
1657 GST_LOG_OBJECT (sink,
1658 "partial write on %p of %" G_GSSIZE_FORMAT " bytes", socket,
1660 mhclient->bufoffset += wrote;
1663 /* complete buffer was written, we can proceed to the next one */
1664 mhclient->sending = g_slist_remove (mhclient->sending, head);
1665 gst_buffer_unref (head);
1666 /* make sure we start from byte 0 for the next buffer */
1667 mhclient->bufoffset = 0;
1670 mhclient->bytes_sent += wrote;
1671 mhclient->last_activity_time = now;
1672 mhsink->bytes_served += wrote;
1682 GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket);
1683 mhclient->status = GST_CLIENT_STATUS_REMOVED;
1688 GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing",
1690 mhclient->status = GST_CLIENT_STATUS_CLOSED;
1691 g_clear_error (&err);
1696 GST_WARNING_OBJECT (sink,
1697 "[socket %p] could not write, removing client: %s", socket,
1699 g_clear_error (&err);
1700 mhclient->status = GST_CLIENT_STATUS_ERROR;
1705 /* calculate the new position for a client after recovery. This function
1706 * does not update the client position but merely returns the required
1710 gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
1711 GstSocketClient * client)
1714 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1715 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1717 GST_WARNING_OBJECT (sink,
1718 "[socket %p] client %p is lagging at %d, recover using policy %d",
1719 client->socket, client, mhclient->bufpos, mhsink->recover_policy);
1721 switch (mhsink->recover_policy) {
1722 case GST_RECOVER_POLICY_NONE:
1723 /* do nothing, client will catch up or get kicked out when it reaches
1725 newbufpos = mhclient->bufpos;
1727 case GST_RECOVER_POLICY_RESYNC_LATEST:
1728 /* move to beginning of queue */
1731 case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
1732 /* move to beginning of soft max */
1733 newbufpos = get_buffers_max (sink, sink->units_soft_max);
1735 case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1736 /* find keyframe in buffers, we search backwards to find the
1737 * closest keyframe relative to what this client already received. */
1738 newbufpos = MIN (mhsink->bufqueue->len - 1,
1739 get_buffers_max (sink, sink->units_soft_max) - 1);
1741 while (newbufpos >= 0) {
1744 buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
1745 if (is_sync_frame (mhsink, buf)) {
1746 /* found a buffer that is not a delta unit */
1753 /* unknown recovery procedure */
1754 newbufpos = get_buffers_max (sink, sink->units_soft_max);
1760 /* Queue a buffer on the global queue.
1762 * This function adds the buffer to the front of a GArray. It removes the
1763 * tail buffer if the max queue size is exceeded, unreffing the queued buffer.
1764 * Note that unreffing the buffer is not a problem as clients who
1765 * started writing out this buffer will still have a reference to it in the
1766 * mhclient->sending queue.
1768 * After adding the buffer, we update all client positions in the queue. If
1769 * a client moves over the soft max, we start the recovery procedure for this
1770 * slow client. If it goes over the hard max, it is put into the slow list
1773 * Special care is taken of clients that were waiting for a new buffer (they
1774 * had a position of -1) because they can proceed after adding this new buffer.
1775 * This is done by adding the client back into the write fd_set and signaling
1776 * the select thread that the fd_set changed.
1779 gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
1781 GList *clients, *next;
1783 gint max_buffer_usage;
1787 gint max_buffers, soft_max_buffers;
1789 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1790 GstMultiHandleSinkClass *mhsinkclass =
1791 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1793 g_get_current_time (&nowtv);
1794 now = GST_TIMEVAL_TO_TIME (nowtv);
1796 CLIENTS_LOCK (sink);
1797 /* add buffer to queue */
1798 gst_buffer_ref (buf);
1799 g_array_prepend_val (mhsink->bufqueue, buf);
1800 queuelen = mhsink->bufqueue->len;
1802 if (sink->units_max > 0)
1803 max_buffers = get_buffers_max (sink, sink->units_max);
1807 if (sink->units_soft_max > 0)
1808 soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
1810 soft_max_buffers = -1;
1811 GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
1814 /* then loop over the clients and update the positions */
1815 max_buffer_usage = 0;
1818 cookie = mhsink->clients_cookie;
1819 for (clients = mhsink->clients; clients; clients = next) {
1820 GstSocketClient *client;
1821 GstMultiHandleClient *mhclient;
1823 if (cookie != mhsink->clients_cookie) {
1824 GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
1828 client = clients->data;
1829 mhclient = (GstMultiHandleClient *) client;
1830 next = g_list_next (clients);
1833 GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
1834 client->socket, client, mhclient->bufpos);
1835 /* check soft max if needed, recover client */
1836 if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
1839 newpos = gst_multi_socket_sink_recover_client (sink, client);
1840 if (newpos != mhclient->bufpos) {
1841 mhclient->dropped_buffers += mhclient->bufpos - newpos;
1842 mhclient->bufpos = newpos;
1843 mhclient->discont = TRUE;
1844 GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d",
1845 client->socket, client, mhclient->bufpos);
1847 GST_INFO_OBJECT (sink,
1848 "[socket %p] client %p not recovering position",
1849 client->socket, client);
1852 /* check hard max and timeout, remove client */
1853 if ((max_buffers > 0 && mhclient->bufpos >= max_buffers) ||
1854 (mhsink->timeout > 0
1855 && now - mhclient->last_activity_time > mhsink->timeout)) {
1857 GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing",
1858 client->socket, client);
1859 /* remove the client, the fd set will be cleared and the select thread
1860 * will be signaled */
1861 mhclient->status = GST_CLIENT_STATUS_SLOW;
1862 /* set client to invalid position while being removed */
1863 mhclient->bufpos = -1;
1864 mhsinkclass->remove_client_link (mhsink, clients);
1866 } else if (mhclient->bufpos == 0 || mhclient->new_connection) {
1867 /* can send data to this client now. need to signal the select thread that
1868 * the fd_set changed */
1869 if (!client->source) {
1871 g_socket_create_source (client->socket,
1872 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
1874 g_source_set_callback (client->source,
1875 (GSourceFunc) gst_multi_socket_sink_socket_condition,
1876 gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1877 g_source_attach (client->source, sink->main_context);
1880 /* keep track of maximum buffer usage */
1881 if (mhclient->bufpos > max_buffer_usage) {
1882 max_buffer_usage = mhclient->bufpos;
1886 /* make sure we respect bytes-min, buffers-min and time-min when they are set */
1890 GST_LOG_OBJECT (sink,
1891 "extending queue %d to respect time_min %" GST_TIME_FORMAT
1892 ", bytes_min %d, buffers_min %d", max_buffer_usage,
1893 GST_TIME_ARGS (mhsink->time_min), mhsink->bytes_min,
1894 mhsink->buffers_min);
1896 /* get index where the limits are ok, we don't really care if all limits
1897 * are ok, we just queue as much as we need. We also don't compare against
1898 * the max limits. */
1899 find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min,
1900 mhsink->time_min, &max, -1, -1, -1);
1902 max_buffer_usage = MAX (max_buffer_usage, usage + 1);
1903 GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage);
1906 /* now look for sync points and make sure there is at least one
1907 * sync point in the queue. We only do this if the LATEST_KEYFRAME or
1908 * BURST_KEYFRAME mode is selected */
1909 if (mhsink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME ||
1910 mhsink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) {
1911 /* no point in searching beyond the queue length */
1912 gint limit = queuelen;
1915 /* no point in searching beyond the soft-max if any. */
1916 if (soft_max_buffers > 0) {
1917 limit = MIN (limit, soft_max_buffers);
1919 GST_LOG_OBJECT (sink,
1920 "extending queue to include sync point, now at %d, limit is %d",
1921 max_buffer_usage, limit);
1922 for (i = 0; i < limit; i++) {
1923 buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1924 if (is_sync_frame (mhsink, buf)) {
1925 /* found a sync frame, now extend the buffer usage to
1926 * include at least this frame. */
1927 max_buffer_usage = MAX (max_buffer_usage, i);
1931 GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1934 GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage);
1936 /* nobody is referencing units after max_buffer_usage so we can
1937 * remove them from the queue. We remove them in reverse order as
1938 * this is the most optimal for GArray. */
1939 for (i = queuelen - 1; i > max_buffer_usage; i--) {
1942 /* queue exceeded max size */
1944 old = g_array_index (mhsink->bufqueue, GstBuffer *, i);
1945 mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
1947 /* unref tail buffer */
1948 gst_buffer_unref (old);
1950 /* save for stats */
1951 mhsink->buffers_queued = max_buffer_usage;
1952 CLIENTS_UNLOCK (sink);
1955 /* Handle the clients. This is called when a socket becomes ready
1956 * to read or writable. Badly behaving clients are put on a
1957 * garbage list and removed.
1960 gst_multi_socket_sink_socket_condition (GSocket * socket,
1961 GIOCondition condition, GstMultiSocketSink * sink)
1964 GstSocketClient *client;
1965 gboolean ret = TRUE;
1966 GstMultiHandleClient *mhclient;
1967 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1968 GstMultiHandleSinkClass *mhsinkclass =
1969 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1971 CLIENTS_LOCK (sink);
1972 clink = g_hash_table_lookup (sink->socket_hash, socket);
1973 if (clink == NULL) {
1978 client = clink->data;
1979 mhclient = (GstMultiHandleClient *) client;
1981 if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1982 && mhclient->status != GST_CLIENT_STATUS_OK) {
1983 mhsinkclass->remove_client_link (mhsink, clink);
1988 if ((condition & G_IO_ERR)) {
1989 GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket);
1990 mhclient->status = GST_CLIENT_STATUS_ERROR;
1991 mhsinkclass->remove_client_link (mhsink, clink);
1994 } else if ((condition & G_IO_HUP)) {
1995 mhclient->status = GST_CLIENT_STATUS_CLOSED;
1996 mhsinkclass->remove_client_link (mhsink, clink);
1999 } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
2000 /* handle client read */
2001 if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
2002 mhsinkclass->remove_client_link (mhsink, clink);
2006 } else if ((condition & G_IO_OUT)) {
2007 /* handle client write */
2008 if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
2009 mhsinkclass->remove_client_link (mhsink, clink);
2016 CLIENTS_UNLOCK (sink);
2022 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
2027 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
2028 GstMultiHandleSinkClass *mhsinkclass =
2029 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
2031 g_get_current_time (&nowtv);
2032 now = GST_TIMEVAL_TO_TIME (nowtv);
2034 CLIENTS_LOCK (sink);
2035 for (clients = mhsink->clients; clients; clients = clients->next) {
2036 GstSocketClient *client;
2037 GstMultiHandleClient *mhclient;
2039 client = clients->data;
2040 mhclient = (GstMultiHandleClient *) client;
2041 if (mhsink->timeout > 0
2042 && now - mhclient->last_activity_time > mhsink->timeout) {
2043 mhclient->status = GST_CLIENT_STATUS_SLOW;
2044 mhsinkclass->remove_client_link (mhsink, clients);
2047 CLIENTS_UNLOCK (sink);
2052 /* we handle the client communication in another thread so that we do not block
2053 * the gstreamer thread while we select() on the client fds */
2055 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
2057 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
2058 GSource *timeout = NULL;
2060 while (mhsink->running) {
2061 if (mhsink->timeout > 0) {
2062 timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
2064 g_source_set_callback (timeout,
2065 (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
2066 (GDestroyNotify) gst_object_unref);
2067 g_source_attach (timeout, sink->main_context);
2070 /* Returns after handling all pending events or when
2071 * _wakeup() was called. In any case we have to add
2072 * a new timeout because something happened.
2074 g_main_context_iteration (sink->main_context, TRUE);
2077 g_source_destroy (timeout);
2078 g_source_unref (timeout);
2085 static GstFlowReturn
2086 gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
2088 GstMultiSocketSink *sink;
2091 GstCaps *bufcaps, *padcaps;
2093 GstMultiHandleSink *mhsink;
2095 sink = GST_MULTI_SOCKET_SINK (bsink);
2096 mhsink = GST_MULTI_HANDLE_SINK (sink);
2098 g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
2099 GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
2102 /* since we check every buffer for streamheader caps, we need to make
2103 * sure every buffer has caps set */
2104 bufcaps = gst_buffer_get_caps (buf);
2105 padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
2107 /* make sure we have caps on the pad */
2108 if (!padcaps && !bufcaps)
2112 /* get HEADER first, code below might mess with the flags */
2113 in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
2116 /* stamp the buffer with previous caps if no caps set */
2118 if (!gst_buffer_is_writable (buf)) {
2119 /* metadata is not writable, copy will be made and original buffer
2120 * will be unreffed so we need to ref so that we don't lose the
2121 * buffer in the render method. */
2122 gst_buffer_ref (buf);
2123 /* the new buffer is ours only, we keep it out of the scope of this
2125 buf = gst_buffer_make_writable (buf);
2127 /* else the metadata is writable, we ref because we keep the buffer
2128 * out of the scope of this method */
2129 gst_buffer_ref (buf);
2131 /* buffer metadata is writable now, set the caps */
2132 gst_buffer_set_caps (buf, padcaps);
2134 gst_caps_unref (bufcaps);
2136 /* since we keep this buffer out of the scope of this method */
2137 gst_buffer_ref (buf);
2141 GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
2142 G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
2143 ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
2144 buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
2145 GST_BUFFER_OFFSET_END (buf),
2146 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
2147 GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
2149 /* if we get HEADER buffers, but the previous buffer was not HEADER,
2150 * it means we're getting new streamheader buffers, and we should clear
2152 if (in_caps && sink->previous_buffer_in_caps == FALSE) {
2153 GST_DEBUG_OBJECT (sink,
2154 "receiving new HEADER buffers, clearing old streamheader");
2155 g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
2156 g_slist_free (mhsink->streamheader);
2157 mhsink->streamheader = NULL;
2160 /* save the current in_caps */
2161 sink->previous_buffer_in_caps = in_caps;
2163 /* if the incoming buffer is marked as IN CAPS, then we assume for now
2164 * it's a streamheader that needs to be sent to each new client, so we
2165 * put it on our internal list of streamheader buffers.
2166 * FIXME: we could check if the buffer's contents are in fact part of the
2167 * current streamheader.
2169 * We don't send the buffer to the client, since streamheaders are sent
2170 * separately when necessary. */
2172 GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
2173 G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
2174 mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
2176 /* queue the buffer, this is a regular data buffer. */
2177 gst_multi_socket_sink_queue_buffer (sink, buf);
2179 mhsink->bytes_to_serve += gst_buffer_get_size (buf);
2187 GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
2188 ("Received first buffer without caps set"));
2189 return GST_FLOW_NOT_NEGOTIATED;
2195 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
2196 const GValue * value, GParamSpec * pspec)
2198 GstMultiSocketSink *multisocketsink;
2200 multisocketsink = GST_MULTI_SOCKET_SINK (object);
2203 case PROP_BUFFERS_MAX:
2204 multisocketsink->units_max = g_value_get_int (value);
2206 case PROP_BUFFERS_SOFT_MAX:
2207 multisocketsink->units_soft_max = g_value_get_int (value);
2209 case PROP_UNIT_TYPE:
2210 multisocketsink->unit_type = g_value_get_enum (value);
2212 case PROP_UNITS_MAX:
2213 multisocketsink->units_max = g_value_get_int64 (value);
2215 case PROP_UNITS_SOFT_MAX:
2216 multisocketsink->units_soft_max = g_value_get_int64 (value);
2218 case PROP_BURST_FORMAT:
2219 multisocketsink->def_burst_format = g_value_get_enum (value);
2221 case PROP_BURST_VALUE:
2222 multisocketsink->def_burst_value = g_value_get_uint64 (value);
2225 multisocketsink->qos_dscp = g_value_get_int (value);
2226 setup_dscp (multisocketsink);
2230 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2236 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
2237 GValue * value, GParamSpec * pspec)
2239 GstMultiSocketSink *multisocketsink;
2241 multisocketsink = GST_MULTI_SOCKET_SINK (object);
2244 case PROP_BUFFERS_MAX:
2245 g_value_set_int (value, multisocketsink->units_max);
2247 case PROP_BUFFERS_SOFT_MAX:
2248 g_value_set_int (value, multisocketsink->units_soft_max);
2250 case PROP_UNIT_TYPE:
2251 g_value_set_enum (value, multisocketsink->unit_type);
2253 case PROP_UNITS_MAX:
2254 g_value_set_int64 (value, multisocketsink->units_max);
2256 case PROP_UNITS_SOFT_MAX:
2257 g_value_set_int64 (value, multisocketsink->units_soft_max);
2259 case PROP_BURST_FORMAT:
2260 g_value_set_enum (value, multisocketsink->def_burst_format);
2262 case PROP_BURST_VALUE:
2263 g_value_set_uint64 (value, multisocketsink->def_burst_value);
2266 g_value_set_int (value, multisocketsink->qos_dscp);
2268 case PROP_NUM_SOCKETS:
2269 g_value_set_uint (value,
2270 g_hash_table_size (multisocketsink->socket_hash));
2274 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2280 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
2282 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
2285 GST_INFO_OBJECT (mssink, "starting");
2287 mssink->main_context = g_main_context_new ();
2289 CLIENTS_LOCK (mssink);
2290 for (clients = mhsink->clients; clients; clients = clients->next) {
2291 GstSocketClient *client;
2293 client = clients->data;
2297 g_socket_create_source (client->socket,
2298 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
2299 mssink->cancellable);
2300 g_source_set_callback (client->source,
2301 (GSourceFunc) gst_multi_socket_sink_socket_condition,
2302 gst_object_ref (mssink), (GDestroyNotify) gst_object_unref);
2303 g_source_attach (client->source, mssink->main_context);
2305 CLIENTS_UNLOCK (mssink);
2312 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
2318 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
2320 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
2322 if (mssink->main_context)
2323 g_main_context_wakeup (mssink->main_context);
2327 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
2329 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
2331 if (mssink->main_context) {
2332 g_main_context_unref (mssink->main_context);
2333 mssink->main_context = NULL;
2336 g_hash_table_foreach_remove (mssink->socket_hash, multisocketsink_hash_remove,
2341 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
2343 GstMultiSocketSink *sink;
2345 sink = GST_MULTI_SOCKET_SINK (bsink);
2347 GST_DEBUG_OBJECT (sink, "set to flushing");
2348 g_cancellable_cancel (sink->cancellable);
2349 if (sink->main_context)
2350 g_main_context_wakeup (sink->main_context);
2355 /* will be called only between calls to start() and stop() */
2357 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
2359 GstMultiSocketSink *sink;
2361 sink = GST_MULTI_SOCKET_SINK (bsink);
2363 GST_DEBUG_OBJECT (sink, "unset flushing");
2364 g_cancellable_reset (sink->cancellable);