2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
23 * @short_description: a sink that writes to multiple file descriptors
24 * @see_also: tcpserversink
26 * Incredibly, still a sink that writes to multiple file descriptors
32 #include <gst/gst-i18n-plugin.h>
34 #include <sys/ioctl.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
41 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
42 #include <sys/filio.h>
45 #include "gstmultifdsink.h"
46 #include "gsttcp-marshal.h"
48 #define NOT_IMPLEMENTED 0
50 /* the select call is also performed on the control sockets, that way
51 * we can send special commands to unblock or restart the select call */
52 #define CONTROL_RESTART 'R' /* restart the select call */
53 #define CONTROL_STOP 'S' /* stop the select call */
54 #define CONTROL_SOCKETS(sink) sink->control_sock
55 #define WRITE_SOCKET(sink) sink->control_sock[1]
56 #define READ_SOCKET(sink) sink->control_sock[0]
58 #define SEND_COMMAND(sink, command) \
60 unsigned char c; c = command; \
61 write (WRITE_SOCKET(sink).fd, &c, 1); \
64 #define READ_COMMAND(sink, command, res) \
66 res = read(READ_SOCKET(sink).fd, &command, 1); \
69 /* elementfactory information */
70 static GstElementDetails gst_multifdsink_details =
71 GST_ELEMENT_DETAILS ("MultiFd sink",
73 "Send data to multiple filedescriptors",
74 "Thomas Vander Stichele <thomas at apestaart dot org>, "
75 "Wim Taymans <wim@fluendo.com>");
77 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
82 GST_DEBUG_CATEGORY (multifdsink_debug);
83 #define GST_CAT_DEFAULT (multifdsink_debug)
85 /* MultiFdSink signals and args */
96 SIGNAL_CLIENT_REMOVED,
101 /* this is really arbitrarily chosen */
102 #define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE
103 #define DEFAULT_MODE GST_FDSET_MODE_POLL
104 #define DEFAULT_BUFFERS_MAX -1
105 #define DEFAULT_BUFFERS_SOFT_MAX -1
106 #define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
107 #define DEFAULT_UNITS_MAX -1
108 #define DEFAULT_UNITS_SOFT_MAX -1
109 #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
110 #define DEFAULT_TIMEOUT 0
111 #define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_NONE
127 ARG_BUFFERS_SOFT_MAX,
131 ARG_SYNC_CLIENTS, /* deprecated */
137 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
139 gst_recover_policy_get_type (void)
141 static GType recover_policy_type = 0;
142 static GEnumValue recover_policy[] = {
143 {GST_RECOVER_POLICY_NONE, "GST_RECOVER_POLICY_NONE",
144 "Do not try to recover"},
145 {GST_RECOVER_POLICY_RESYNC_START, "GST_RECOVER_POLICY_RESYNC_START",
146 "Resync client to most recent buffer"},
147 {GST_RECOVER_POLICY_RESYNC_SOFT, "GST_RECOVER_POLICY_RESYNC_SOFT",
148 "Resync client to soft limit"},
149 {GST_RECOVER_POLICY_RESYNC_KEYFRAME, "GST_RECOVER_POLICY_RESYNC_KEYFRAME",
150 "Resync client to most recent keyframe"},
154 if (!recover_policy_type) {
155 recover_policy_type =
156 g_enum_register_static ("GstRecoverPolicy", recover_policy);
158 return recover_policy_type;
161 #define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
163 gst_sync_method_get_type (void)
165 static GType sync_method_type = 0;
166 static GEnumValue sync_method[] = {
167 {GST_SYNC_METHOD_NONE, "GST_SYNC_METHOD_NONE",
168 "Serve new client the latest buffer"},
169 {GST_SYNC_METHOD_WAIT, "GST_SYNC_METHOD_WAIT",
170 "Make the new client wait for the next keyframe"},
171 {GST_SYNC_METHOD_BURST, "GST_SYNC_METHOD_BURST",
172 "Serve the new client the last keyframe, aka burst"},
176 if (!sync_method_type) {
177 sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method);
179 return sync_method_type;
183 #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
185 gst_unit_type_get_type (void)
187 static GType unit_type_type = 0;
188 static GEnumValue unit_type[] = {
189 {GST_UNIT_TYPE_BUFFERS, "GST_UNIT_TYPE_BUFFERS", "Buffers"},
190 {GST_UNIT_TYPE_BYTES, "GST_UNIT_TYPE_BYTES", "Bytes"},
191 {GST_UNIT_TYPE_TIME, "GST_UNIT_TYPE_TIME", "Time"},
195 if (!unit_type_type) {
196 unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
198 return unit_type_type;
202 #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
204 gst_client_status_get_type (void)
206 static GType client_status_type = 0;
207 static GEnumValue client_status[] = {
208 {GST_CLIENT_STATUS_OK, "GST_CLIENT_STATUS_OK", "OK"},
209 {GST_CLIENT_STATUS_CLOSED, "GST_CLIENT_STATUS_CLOSED", "Closed"},
210 {GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
211 {GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
212 {GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
213 {GST_CLIENT_STATUS_DUPLICATE, "GST_CLIENT_STATUS_DUPLICATE", "Duplicate"},
217 if (!client_status_type) {
219 g_enum_register_static ("GstClientStatus", client_status);
221 return client_status_type;
224 static void gst_multifdsink_base_init (gpointer g_class);
225 static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
226 static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
228 static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
231 static GstFlowReturn gst_multifdsink_render (GstBaseSink * bsink,
233 static GstStateChangeReturn gst_multifdsink_change_state (GstElement *
234 element, GstStateChange transition);
236 static void gst_multifdsink_set_property (GObject * object, guint prop_id,
237 const GValue * value, GParamSpec * pspec);
238 static void gst_multifdsink_get_property (GObject * object, guint prop_id,
239 GValue * value, GParamSpec * pspec);
242 static GstElementClass *parent_class = NULL;
244 static guint gst_multifdsink_signals[LAST_SIGNAL] = { 0 };
247 gst_multifdsink_get_type (void)
249 static GType multifdsink_type = 0;
252 if (!multifdsink_type) {
253 static const GTypeInfo multifdsink_info = {
254 sizeof (GstMultiFdSinkClass),
255 gst_multifdsink_base_init,
257 (GClassInitFunc) gst_multifdsink_class_init,
260 sizeof (GstMultiFdSink),
262 (GInstanceInitFunc) gst_multifdsink_init,
267 g_type_register_static (GST_TYPE_BASE_SINK, "GstMultiFdSink",
268 &multifdsink_info, 0);
270 return multifdsink_type;
274 gst_multifdsink_base_init (gpointer g_class)
276 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
278 gst_element_class_add_pad_template (element_class,
279 gst_static_pad_template_get (&sinktemplate));
281 gst_element_class_set_details (element_class, &gst_multifdsink_details);
285 gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
287 GObjectClass *gobject_class;
288 GstElementClass *gstelement_class;
289 GstBaseSinkClass *gstbasesink_class;
291 gobject_class = (GObjectClass *) klass;
292 gstelement_class = (GstElementClass *) klass;
293 gstbasesink_class = (GstBaseSinkClass *) klass;
295 parent_class = g_type_class_ref (GST_TYPE_BASE_SINK);
297 gobject_class->set_property = gst_multifdsink_set_property;
298 gobject_class->get_property = gst_multifdsink_get_property;
300 g_object_class_install_property (gobject_class, ARG_PROTOCOL,
301 g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
302 GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
303 g_object_class_install_property (gobject_class, ARG_MODE,
304 g_param_spec_enum ("mode", "Mode",
305 "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
306 DEFAULT_MODE, G_PARAM_READWRITE));
308 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
309 g_param_spec_int ("buffers-max", "Buffers max",
310 "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
311 DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
312 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
313 g_param_spec_int ("buffers-soft-max", "Buffers soft max",
314 "Recover client when going over this limit (-1 = no limit)", -1,
315 G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
318 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
319 g_param_spec_enum ("unit-type", "Units type",
320 "The unit to measure the max/soft-max/queued properties",
321 GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
322 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
323 g_param_spec_int ("units-max", "Units max",
324 "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
325 DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
326 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
327 g_param_spec_int ("units-soft-max", "Units soft max",
328 "Recover client when going over this limit (-1 = no limit)", -1,
329 G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
332 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
333 g_param_spec_uint ("buffers-queued", "Buffers queued",
334 "Number of buffers currently queued", 0, G_MAXUINT, 0,
337 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
338 g_param_spec_uint ("bytes-queued", "Bytes queued",
339 "Number of bytes currently queued", 0, G_MAXUINT, 0,
341 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
342 g_param_spec_uint64 ("time-queued", "Time queued",
343 "Number of time currently queued", 0, G_MAXUINT64, 0,
347 g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
348 g_param_spec_enum ("recover-policy", "Recover Policy",
349 "How to recover when client reaches the soft max",
350 GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
351 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
352 g_param_spec_uint64 ("timeout", "Timeout",
353 "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
354 0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
355 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_CLIENTS,
356 g_param_spec_boolean ("sync-clients", "Sync clients",
357 "(DEPRECATED) Sync clients to a keyframe",
358 DEFAULT_SYNC_METHOD == GST_SYNC_METHOD_WAIT, G_PARAM_READWRITE));
359 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
360 g_param_spec_enum ("sync-method", "Sync Method",
361 "How to sync new clients to the stream",
362 GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
363 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
364 g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
365 "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
367 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
368 g_param_spec_uint64 ("bytes-served", "Bytes served",
369 "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
373 * GstMultiFdSink::add:
374 * @gstmultifdsink: the multifdsink element to emit this signal on
375 * @arg1: the file descriptor to add to multifdsink
377 * Hand the given open file descriptor to multifdsink to write to.
379 gst_multifdsink_signals[SIGNAL_ADD] =
380 g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
381 G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
382 NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
384 * GstMultiFdSink::remove:
385 * @gstmultifdsink: the multifdsink element to emit this signal on
386 * @arg1: the file descriptor to remove from multifdsink
388 * Remove the given open file descriptor from multifdsink.
390 gst_multifdsink_signals[SIGNAL_REMOVE] =
391 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
392 G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
393 NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
395 * GstMultiFdSink::clear:
396 * @gstmultifdsink: the multifdsink element to emit this signal on
398 * Clear all file descriptors from multifdsink.
400 gst_multifdsink_signals[SIGNAL_CLEAR] =
401 g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
402 G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
403 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
404 gst_multifdsink_signals[SIGNAL_GET_STATS] =
405 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
406 G_STRUCT_OFFSET (GstMultiFdSinkClass, get_stats),
407 NULL, NULL, gst_tcp_marshal_BOXED__INT, G_TYPE_VALUE_ARRAY, 1,
411 * GstMultiFdSink::client-added:
412 * @gstmultifdsink: the multifdsink element that emitted this signal
413 * @arg1: the file descriptor that was added to multifdsink
415 * The given file descriptor was added to multifdsink.
417 gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
418 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
419 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
420 NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
422 * GstMultiFdSink::client-removed:
423 * @gstmultifdsink: the multifdsink element that emitted this signal
424 * @arg1: the file descriptor that was removed from multifdsink
426 * The given file descriptor was removed from multifdsink.
428 gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] =
429 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
430 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
431 client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
432 G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
434 gstelement_class->change_state =
435 GST_DEBUG_FUNCPTR (gst_multifdsink_change_state);
437 gstbasesink_class->render = gst_multifdsink_render;
439 klass->add = gst_multifdsink_add;
440 klass->remove = gst_multifdsink_remove;
441 klass->clear = gst_multifdsink_clear;
442 klass->get_stats = gst_multifdsink_get_stats;
444 GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
448 gst_multifdsink_init (GstMultiFdSink * this)
450 GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
452 this->protocol = DEFAULT_PROTOCOL;
453 this->mode = DEFAULT_MODE;
455 CLIENTS_LOCK_INIT (this);
456 this->clients = NULL;
457 this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
459 this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
460 this->unit_type = DEFAULT_UNIT_TYPE;
461 this->units_max = DEFAULT_UNITS_MAX;
462 this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
463 this->recover_policy = DEFAULT_RECOVER_POLICY;
465 this->timeout = DEFAULT_TIMEOUT;
466 this->sync_method = DEFAULT_SYNC_METHOD;
470 gst_multifdsink_add (GstMultiFdSink * sink, int fd)
472 GstTCPClient *client;
478 GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd);
480 /* create client datastructure */
481 client = g_new0 (GstTCPClient, 1);
483 client->status = GST_CLIENT_STATUS_OK;
485 client->bufoffset = 0;
486 client->sending = NULL;
487 client->bytes_sent = 0;
488 client->dropped_buffers = 0;
489 client->avg_queue_size = 0;
490 client->new_connection = TRUE;
492 /* update start time */
493 g_get_current_time (&now);
494 client->connect_time = GST_TIMEVAL_TO_TIME (now);
495 client->disconnect_time = 0;
496 /* send last activity time to connect time */
497 client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
501 /* check the hash to find a duplicate fd */
502 clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
504 client->status = GST_CLIENT_STATUS_DUPLICATE;
505 CLIENTS_UNLOCK (sink);
506 GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
507 g_signal_emit (G_OBJECT (sink),
508 gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
513 /* we can add the fd now */
514 clink = sink->clients = g_list_prepend (sink->clients, client);
515 g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
517 /* set the socket to non blocking */
518 res = fcntl (fd, F_SETFL, O_NONBLOCK);
519 /* we always read from a client */
520 gst_fdset_add_fd (sink->fdset, &client->fd);
522 /* we don't try to read from write only fds */
523 flags = fcntl (fd, F_GETFL, 0);
524 if ((flags & O_ACCMODE) != O_WRONLY) {
525 gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
527 /* figure out the mode, can't use send() for non sockets */
528 res = fstat (fd, &statbuf);
529 if (S_ISSOCK (statbuf.st_mode)) {
530 client->is_socket = TRUE;
533 SEND_COMMAND (sink, CONTROL_RESTART);
535 CLIENTS_UNLOCK (sink);
537 g_signal_emit (G_OBJECT (sink),
538 gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
542 gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
546 GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
549 clink = g_hash_table_lookup (sink->fd_hash, &fd);
551 GstTCPClient *client = (GstTCPClient *) clink->data;
553 client->status = GST_CLIENT_STATUS_REMOVED;
554 gst_multifdsink_remove_client_link (sink, clink);
555 SEND_COMMAND (sink, CONTROL_RESTART);
557 GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
559 CLIENTS_UNLOCK (sink);
563 gst_multifdsink_clear (GstMultiFdSink * sink)
565 GList *clients, *next;
567 GST_DEBUG_OBJECT (sink, "clearing all clients");
570 for (clients = sink->clients; clients; clients = next) {
571 GstTCPClient *client;
573 client = (GstTCPClient *) clients->data;
574 next = g_list_next (clients);
576 client->status = GST_CLIENT_STATUS_REMOVED;
577 gst_multifdsink_remove_client_link (sink, clients);
579 SEND_COMMAND (sink, CONTROL_RESTART);
580 CLIENTS_UNLOCK (sink);
584 gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
586 GstTCPClient *client;
587 GValueArray *result = NULL;
591 clink = g_hash_table_lookup (sink->fd_hash, &fd);
592 client = (GstTCPClient *) clink->data;
593 if (client != NULL) {
594 GValue value = { 0 };
597 result = g_value_array_new (4);
599 g_value_init (&value, G_TYPE_UINT64);
600 g_value_set_uint64 (&value, client->bytes_sent);
601 result = g_value_array_append (result, &value);
602 g_value_unset (&value);
603 g_value_init (&value, G_TYPE_UINT64);
604 g_value_set_uint64 (&value, client->connect_time);
605 result = g_value_array_append (result, &value);
606 g_value_unset (&value);
607 if (client->disconnect_time == 0) {
610 g_get_current_time (&nowtv);
612 interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
614 interval = client->disconnect_time - client->connect_time;
616 g_value_init (&value, G_TYPE_UINT64);
617 g_value_set_uint64 (&value, client->disconnect_time);
618 result = g_value_array_append (result, &value);
619 g_value_unset (&value);
620 g_value_init (&value, G_TYPE_UINT64);
621 g_value_set_uint64 (&value, interval);
622 result = g_value_array_append (result, &value);
623 g_value_unset (&value);
624 g_value_init (&value, G_TYPE_UINT64);
625 g_value_set_uint64 (&value, client->last_activity_time);
626 result = g_value_array_append (result, &value);
628 CLIENTS_UNLOCK (sink);
630 /* python doesn't like a NULL pointer yet */
631 if (result == NULL) {
632 GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
633 result = g_value_array_new (0);
639 /* should be called with the clientslock held.
640 * Note that we don't close the fd as we didn't open it in the first
641 * place. An application should connect to the client-removed signal and
642 * close the fd itself.
645 gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
649 GstTCPClient *client = (GstTCPClient *) link->data;
650 GstMultiFdSinkClass *fclass;
652 fclass = GST_MULTIFDSINK_GET_CLASS (sink);
656 /* FIXME: if we keep track of ip we can log it here and signal */
657 switch (client->status) {
658 case GST_CLIENT_STATUS_OK:
659 GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason",
662 case GST_CLIENT_STATUS_CLOSED:
663 GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close",
666 case GST_CLIENT_STATUS_REMOVED:
667 GST_DEBUG_OBJECT (sink,
668 "[fd %5d] removing client %p because the app removed it", fd, client);
670 case GST_CLIENT_STATUS_SLOW:
671 GST_INFO_OBJECT (sink,
672 "[fd %5d] removing client %p because it was too slow", fd, client);
674 case GST_CLIENT_STATUS_ERROR:
675 GST_WARNING_OBJECT (sink,
676 "[fd %5d] removing client %p because of error", fd, client);
679 GST_WARNING_OBJECT (sink,
680 "[fd %5d] removing client %p with invalid reason", fd, client);
684 gst_fdset_remove_fd (sink->fdset, &client->fd);
686 g_get_current_time (&now);
687 client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
689 /* free client buffers */
690 g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
691 g_slist_free (client->sending);
692 client->sending = NULL;
694 /* unlock the mutex before signaling because the signal handler
695 * might query some properties */
696 CLIENTS_UNLOCK (sink);
698 g_signal_emit (G_OBJECT (sink),
699 gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
701 /* lock again before we remove the client completely */
704 if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
705 GST_WARNING_OBJECT (sink,
706 "[fd %5d] error removing client %p from hash", client->fd.fd, client);
708 /* after releasing the lock above, the link could be invalid, more
709 * precisely, the next and prev pointers could point to invalid list
710 * links. One optimisation could be to add a cookie to the linked list
711 * and take a shortcut when it did not change between unlocking and locking
712 * our mutex. For now we just walk the list again. */
713 sink->clients = g_list_remove (sink->clients, client);
716 fclass->removed (sink, client->fd.fd);
721 /* handle a read on a client fd,
722 * which either indicates a close or should be ignored
723 * returns FALSE if some error occured or the client closed. */
725 gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
726 GstTCPClient * client)
733 if (ioctl (fd, FIONREAD, &avail) < 0) {
734 GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
735 fd, g_strerror (errno), errno);
736 client->status = GST_CLIENT_STATUS_ERROR;
741 GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
747 /* client sent close, so remove it */
748 GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
749 client->status = GST_CLIENT_STATUS_CLOSED;
751 } else if (avail < 0) {
752 GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
753 client->status = GST_CLIENT_STATUS_ERROR;
759 /* just Read 'n' Drop, could also just drop the client as it's not supposed
760 * to write to us except for closing the socket, I guess it's because we
761 * like to listen to our customers. */
763 /* this is the maximum we can read */
764 gint to_read = MIN (avail, 512);
766 GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
769 nread = read (fd, dummy, to_read);
771 GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
772 fd, to_read, g_strerror (errno), errno);
773 client->status = GST_CLIENT_STATUS_ERROR;
776 } else if (nread == 0) {
777 GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
778 client->status = GST_CLIENT_STATUS_ERROR;
790 gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client,
791 gchar * data, gint len)
795 buf = gst_buffer_new ();
796 GST_BUFFER_DATA (buf) = (guint8 *) data;
797 GST_BUFFER_SIZE (buf) = len;
799 GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d",
802 client->sending = g_slist_append (client->sending, buf);
808 gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client,
809 const GstCaps * caps)
816 string = gst_caps_to_string (caps);
817 GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP",
818 client->fd.fd, string);
821 if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
822 GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
825 gst_multifdsink_client_queue_data (sink, client, (gchar *) header, length);
827 length = gst_dp_header_payload_length (header);
828 gst_multifdsink_client_queue_data (sink, client, (gchar *) payload, length);
834 is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
836 if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
838 } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
845 gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
846 GstTCPClient * client, GstBuffer * buffer)
848 if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
852 if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
853 GST_DEBUG_OBJECT (sink,
854 "[fd %5d] could not create header, removing client", client->fd.fd);
857 gst_multifdsink_client_queue_data (sink, client, (gchar *) header, len);
860 GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d",
861 client->fd.fd, GST_BUFFER_SIZE (buffer));
863 gst_buffer_ref (buffer);
864 client->sending = g_slist_append (client->sending, buffer);
870 gst_multifdsink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
874 switch (sink->sync_method) {
875 case GST_SYNC_METHOD_WAIT:
877 /* if the buffer at the head of the queue is a sync point we can proceed,
878 * else we need to skip the buffer and wait for a new one */
879 GST_LOG_OBJECT (sink,
880 "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
883 /* the client is not yet alligned to a buffer */
884 if (client->bufpos < 0) {
890 for (i = client->bufpos; i >= 0; i--) {
891 /* get the buffer for the client */
892 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
893 if (is_sync_frame (sink, buf)) {
894 GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync",
899 /* client is not on a buffer, need to skip this buffer and
901 GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
910 case GST_SYNC_METHOD_BURST:
912 /* FIXME for new clients we constantly scan the complete
913 * buffer queue for sync point whenever a buffer is added. This is
914 * suboptimal because if we cannot find a sync point the first time,
915 * the algorithm should behave as GST_SYNC_METHOD_WAIT */
918 GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe",
919 client->fd.fd, client->bufpos);
921 /* take length of queued buffers */
922 len = sink->bufqueue->len;
923 /* assume we don't find a keyframe */
925 /* then loop over all buffers to find the first keyframe */
926 for (i = 0; i < len; i++) {
929 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
930 if (is_sync_frame (sink, buf)) {
931 /* found a keyframe, return its position */
932 GST_LOG_OBJECT (sink, "found keyframe at %d", i);
937 GST_LOG_OBJECT (sink, "no keyframe found");
938 /* throw client to the waiting state */
943 /* no syncing, we are happy with whatever the client is going to get */
944 GST_LOG_OBJECT (sink, "no client syn needed");
945 result = client->bufpos;
952 /* handle a write on a client,
953 * which indicates a read request from a client.
955 * The strategy is as follows, for each client we maintain a queue of GstBuffers
956 * that contain the raw bytes we need to send to the client. In the case of the
957 * GDP protocol, we create buffers out of the header bytes so that we can only
958 * focus on sending buffers.
960 * We first check to see if we need to send caps (in GDP) and streamheaders.
961 * If so, we queue them.
963 * Then we run into the main loop that tries to send as many buffers as
964 * possible. It will first exhaust the client->sending queue and if the queue
965 * is empty, it will pick a buffer from the global queue.
967 * Sending the Buffers from the client->sending queue is basically writing
968 * the bytes to the socket and maintaining a count of the bytes that were
969 * sent. When the buffer is completely sent, it is removed from the
970 * client->sending queue and we try to pick a new buffer for sending.
972 * When the sending returns a partial buffer we stop sending more data as
973 * the next send operation could block.
975 * This functions returns FALSE if some error occured.
978 gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
979 GstTCPClient * client)
981 int fd = client->fd.fd;
987 g_get_current_time (&nowtv);
988 now = GST_TIMEVAL_TO_TIME (nowtv);
990 /* when using GDP, first check if we have queued caps yet */
991 if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
992 if (!client->caps_sent) {
993 const GstCaps *caps =
994 GST_PAD_CAPS (GST_PAD_PEER (GST_BASE_SINK_PAD (sink)));
996 /* queue caps for sending */
997 res = gst_multifdsink_client_queue_caps (sink, client, caps);
999 GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
1002 client->caps_sent = TRUE;
1005 /* if we have streamheader buffers, and haven't sent them to this client
1006 * yet, send them out one by one */
1007 if (!client->streamheader_sent) {
1008 GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
1009 g_slist_length (sink->streamheader));
1010 if (sink->streamheader) {
1013 for (l = sink->streamheader; l; l = l->next) {
1014 /* queue stream headers for sending */
1016 gst_multifdsink_client_queue_buffer (sink, client,
1017 GST_BUFFER (l->data));
1019 GST_DEBUG_OBJECT (sink,
1020 "Failed queueing streamheader, removing client");
1025 client->streamheader_sent = TRUE;
1032 if (!client->sending) {
1033 /* client is not working on a buffer */
1034 if (client->bufpos == -1) {
1035 /* client is too fast, remove from write queue until new buffer is
1037 gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1040 /* client can pick a buffer from the global queue */
1043 /* for new connections, we need to find a good spot in the
1044 * bufqueue to start streaming from */
1045 if (client->new_connection) {
1046 gint position = gst_multifdsink_new_client (sink, client);
1048 if (position >= 0) {
1049 /* we got a valid spot in the queue */
1050 client->new_connection = FALSE;
1051 client->bufpos = position;
1053 /* cannot send data to this client yet */
1054 gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1060 buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
1062 GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1063 fd, client, client->bufpos);
1065 /* queueing a buffer will ref it */
1066 gst_multifdsink_client_queue_buffer (sink, client, buf);
1068 /* need to start from the first byte for this new buffer */
1069 client->bufoffset = 0;
1073 /* see if we need to send something */
1074 if (client->sending) {
1078 /* pick first buffer from list */
1079 head = GST_BUFFER (client->sending->data);
1080 maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
1082 /* try to write the complete buffer */
1084 #define FLAGS MSG_NOSIGNAL
1088 if (client->is_socket) {
1090 send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
1093 wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
1098 if (errno == EAGAIN) {
1099 /* nothing serious, resource was unavailable, try again later */
1101 } else if (errno == ECONNRESET) {
1102 GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing",
1104 client->status = GST_CLIENT_STATUS_CLOSED;
1107 GST_WARNING_OBJECT (sink,
1108 "[fd %5d] could not write, removing client: %s (%d)", fd,
1109 g_strerror (errno), errno);
1110 client->status = GST_CLIENT_STATUS_ERROR;
1114 if (wrote < maxsize) {
1115 /* partial write means that the client cannot read more and we should
1116 * stop sending more */
1117 GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
1118 client->bufoffset += wrote;
1121 /* complete buffer was written, we can proceed to the next one */
1122 client->sending = g_slist_remove (client->sending, head);
1123 gst_buffer_unref (head);
1124 /* make sure we start from byte 0 for the next buffer */
1125 client->bufoffset = 0;
1128 client->bytes_sent += wrote;
1129 client->last_activity_time = now;
1130 sink->bytes_served += wrote;
1138 /* calculate the new position for a client after recovery. This function
1139 * does not update the client position but merely returns the required
1143 gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
1147 GST_WARNING_OBJECT (sink,
1148 "[fd %5d] client %p is lagging at %d, recover using policy %d",
1149 client->fd.fd, client, client->bufpos, sink->recover_policy);
1151 switch (sink->recover_policy) {
1152 case GST_RECOVER_POLICY_NONE:
1153 /* do nothing, client will catch up or get kicked out when it reaches
1155 newbufpos = client->bufpos;
1157 case GST_RECOVER_POLICY_RESYNC_START:
1158 /* move to beginning of queue */
1161 case GST_RECOVER_POLICY_RESYNC_SOFT:
1162 /* move to beginning of soft max */
1163 newbufpos = sink->units_soft_max;
1165 case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1166 /* find keyframe in buffers, we search backwards to find the
1167 * closest keyframe relative to what this client already received. */
1168 newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
1170 while (newbufpos >= 0) {
1173 buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
1174 if (is_sync_frame (sink, buf)) {
1175 /* found a buffer that is not a delta unit */
1182 /* unknown recovery procedure */
1183 newbufpos = sink->units_soft_max;
1189 /* Queue a buffer on the global queue.
1191 * This functions adds the buffer to the front of a GArray. It removes the
1192 * tail buffer if the max queue size is exceeded. Unreffing the buffer that
1193 * is queued. Note that unreffing the buffer is not a problem as clients who
1194 * started writing out this buffer will still have a reference to it in the
1195 * client->sending queue.
1197 * After adding the buffer, we update all client positions in the queue. If
1198 * a client moves over the soft max, we start the recovery procedure for this
1199 * slow client. If it goes over the hard max, it is put into the slow list
1202 * Special care is taken of clients that were waiting for a new buffer (they
1203 * had a position of -1) because they can proceed after adding this new buffer.
1204 * This is done by adding the client back into the write fd_set and signalling
1205 * the select thread that the fd_set changed.
1209 gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
1211 GList *clients, *next;
1213 gboolean need_signal = FALSE;
1214 gint max_buffer_usage;
1219 g_get_current_time (&nowtv);
1220 now = GST_TIMEVAL_TO_TIME (nowtv);
1222 CLIENTS_LOCK (sink);
1223 /* add buffer to queue */
1224 g_array_prepend_val (sink->bufqueue, buf);
1225 queuelen = sink->bufqueue->len;
1227 /* then loop over the clients and update the positions */
1228 max_buffer_usage = 0;
1229 for (clients = sink->clients; clients; clients = next) {
1230 GstTCPClient *client;
1232 client = (GstTCPClient *) clients->data;
1233 next = g_list_next (clients);
1236 GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1237 client->fd.fd, client, client->bufpos);
1238 /* check soft max if needed, recover client */
1239 if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
1242 newpos = gst_multifdsink_recover_client (sink, client);
1243 if (newpos != client->bufpos) {
1244 client->bufpos = newpos;
1245 client->discont = TRUE;
1246 GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
1247 client->fd.fd, client, client->bufpos);
1249 GST_INFO_OBJECT (sink,
1250 "[fd %5d] client %p not recovering position",
1251 client->fd.fd, client);
1254 /* check hard max and timeout, remove client */
1255 if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
1257 && now - client->last_activity_time > sink->timeout)) {
1259 GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
1260 client->fd.fd, client);
1261 /* remove the client, the fd set will be cleared and the select thread will
1263 client->status = GST_CLIENT_STATUS_SLOW;
1264 gst_multifdsink_remove_client_link (sink, clients);
1265 /* set client to invalid position while being removed */
1266 client->bufpos = -1;
1268 } else if (client->bufpos == 0 || client->new_connection) {
1269 /* can send data to this client now. need to signal the select thread that
1270 * the fd_set changed */
1271 gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
1274 /* keep track of maximum buffer usage */
1275 if (client->bufpos > max_buffer_usage) {
1276 max_buffer_usage = client->bufpos;
1280 /* now look for sync points and make sure there is at least one
1281 * sync point in the queue. We only do this if the burst mode
1283 if (sink->sync_method == GST_SYNC_METHOD_BURST) {
1284 /* no point in searching beyond the queue length */
1285 gint limit = queuelen;
1288 /* no point in searching beyond the soft-max if any. */
1289 if (sink->units_soft_max > 0) {
1290 limit = MIN (limit, sink->units_soft_max);
1292 GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
1294 for (i = 0; i < limit; i++) {
1295 buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1296 if (is_sync_frame (sink, buf)) {
1297 /* found a sync frame, now extend the buffer usage to
1298 * include at least this frame. */
1299 max_buffer_usage = MAX (max_buffer_usage, i);
1303 GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1306 /* nobody is referencing units after max_buffer_usage so we can
1307 * remove them from the queue. We remove them in reverse order as
1308 * this is the most optimal for GArray. */
1309 for (i = queuelen - 1; i > max_buffer_usage; i--) {
1312 /* queue exceeded max size */
1314 old = g_array_index (sink->bufqueue, GstBuffer *, i);
1315 sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
1317 /* unref tail buffer */
1318 gst_buffer_unref (old);
1320 /* save for stats */
1321 sink->buffers_queued = max_buffer_usage;
1322 CLIENTS_UNLOCK (sink);
1324 /* and send a signal to thread if fd_set changed */
1326 SEND_COMMAND (sink, CONTROL_RESTART);
1330 /* Handle the clients. Basically does a blocking select for one
1331 * of the client fds to become read or writable. We also have a
1332 * filedescriptor to receive commands on that we need to check.
1334 * After going out of the select call, we read and write to all
1335 * clients that can do so. Badly behaving clients are put on a
1336 * garbage list and removed.
1339 gst_multifdsink_handle_clients (GstMultiFdSink * sink)
1342 GList *clients, *next;
1344 GstMultiFdSinkClass *fclass;
1346 fclass = GST_MULTIFDSINK_GET_CLASS (sink);
1349 gboolean stop = FALSE;
1354 * - server socket input (ie, new client connections)
1355 * - client socket input (ie, clients saying goodbye)
1356 * - client socket output (ie, client reads) */
1357 GST_LOG_OBJECT (sink, "waiting on action on fdset");
1358 result = gst_fdset_wait (sink->fdset, -1);
1360 /* < 0 is an error, 0 just means a timeout happened, which is impossible */
1362 GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno),
1364 if (errno == EBADF) {
1365 /* ok, so one or more of the fds is invalid. We loop over them to find
1366 * the ones that give an error to the F_GETFL fcntl. */
1367 CLIENTS_LOCK (sink);
1368 for (clients = sink->clients; clients; clients = next) {
1369 GstTCPClient *client;
1374 client = (GstTCPClient *) clients->data;
1375 next = g_list_next (clients);
1379 res = fcntl (fd, F_GETFL, &flags);
1381 GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
1382 fd, g_strerror (errno), errno);
1383 if (errno == EBADF) {
1384 client->status = GST_CLIENT_STATUS_ERROR;
1385 gst_multifdsink_remove_client_link (sink, clients);
1389 CLIENTS_UNLOCK (sink);
1390 /* after this, go back in the select loop as the read/writefds
1393 } else if (errno == EINTR) {
1394 /* interrupted system call, just redo the select */
1397 /* this is quite bad... */
1398 GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
1399 ("select failed: %s (%d)", g_strerror (errno), errno));
1403 GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
1404 /* read all commands */
1405 if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
1406 GST_LOG_OBJECT (sink, "have a command");
1411 READ_COMMAND (sink, command, res);
1413 GST_LOG_OBJECT (sink, "no more commands");
1414 /* no more commands */
1419 case CONTROL_RESTART:
1420 GST_LOG_OBJECT (sink, "restart");
1421 /* need to restart the select call as the fd_set changed */
1422 /* if other file descriptors than the READ_SOCKET had activity,
1423 * we don't restart just yet, but handle the other clients first */
1428 /* break out of the select loop */
1429 GST_LOG_OBJECT (sink, "stop");
1430 /* stop this function */
1434 GST_WARNING_OBJECT (sink, "unkown");
1435 g_warning ("multifdsink: unknown control message received");
1444 } while (try_again);
1446 /* subclasses can check fdset with this virtual function */
1448 fclass->wait (sink, sink->fdset);
1450 /* Check the clients */
1451 CLIENTS_LOCK (sink);
1452 for (clients = sink->clients; clients; clients = next) {
1453 GstTCPClient *client;
1455 client = (GstTCPClient *) clients->data;
1456 next = g_list_next (clients);
1458 if (client->status != GST_CLIENT_STATUS_OK) {
1459 gst_multifdsink_remove_client_link (sink, clients);
1463 if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
1464 client->status = GST_CLIENT_STATUS_CLOSED;
1465 gst_multifdsink_remove_client_link (sink, clients);
1468 if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
1469 GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd);
1470 client->status = GST_CLIENT_STATUS_ERROR;
1471 gst_multifdsink_remove_client_link (sink, clients);
1474 if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
1475 /* handle client read */
1476 if (!gst_multifdsink_handle_client_read (sink, client)) {
1477 gst_multifdsink_remove_client_link (sink, clients);
1481 if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
1482 /* handle client write */
1483 if (!gst_multifdsink_handle_client_write (sink, client)) {
1484 gst_multifdsink_remove_client_link (sink, clients);
1489 CLIENTS_UNLOCK (sink);
1492 /* we handle the client communication in another thread so that we do not block
1493 * the gstreamer thread while we select() on the client fds */
1495 gst_multifdsink_thread (GstMultiFdSink * sink)
1497 while (sink->running) {
1498 gst_multifdsink_handle_clients (sink);
1503 static GstFlowReturn
1504 gst_multifdsink_render (GstBaseSink * bsink, GstBuffer * buf)
1506 GstMultiFdSink *sink;
1508 sink = GST_MULTIFDSINK (bsink);
1510 /* since we keep this buffer out of the scope of this method */
1511 gst_buffer_ref (buf);
1513 g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN),
1516 GST_LOG_OBJECT (sink, "received buffer %p", buf);
1517 /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
1518 * it means we're getting new streamheader buffers, and we should clear
1520 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
1521 sink->previous_buffer_in_caps == FALSE) {
1522 GST_DEBUG_OBJECT (sink,
1523 "receiving new IN_CAPS buffers, clearing old streamheader");
1524 g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
1525 g_slist_free (sink->streamheader);
1526 sink->streamheader = NULL;
1528 /* if the incoming buffer is marked as IN CAPS, then we assume for now
1529 * it's a streamheader that needs to be sent to each new client, so we
1530 * put it on our internal list of streamheader buffers.
1531 * After that we return, since we only send these out when we get
1532 * non IN_CAPS buffers so we properly keep track of clients that got
1534 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
1535 sink->previous_buffer_in_caps = TRUE;
1536 GST_DEBUG_OBJECT (sink,
1537 "appending IN_CAPS buffer with length %d to streamheader",
1538 GST_BUFFER_SIZE (buf));
1539 sink->streamheader = g_slist_append (sink->streamheader, buf);
1543 sink->previous_buffer_in_caps = FALSE;
1544 /* queue the buffer */
1545 gst_multifdsink_queue_buffer (sink, buf);
1547 sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
1553 gst_multifdsink_set_property (GObject * object, guint prop_id,
1554 const GValue * value, GParamSpec * pspec)
1556 GstMultiFdSink *multifdsink;
1558 g_return_if_fail (GST_IS_MULTIFDSINK (object));
1559 multifdsink = GST_MULTIFDSINK (object);
1563 multifdsink->protocol = g_value_get_enum (value);
1566 multifdsink->mode = g_value_get_enum (value);
1568 case ARG_BUFFERS_MAX:
1569 multifdsink->units_max = g_value_get_int (value);
1571 case ARG_BUFFERS_SOFT_MAX:
1572 multifdsink->units_soft_max = g_value_get_int (value);
1575 multifdsink->unit_type = g_value_get_enum (value);
1578 multifdsink->units_max = g_value_get_int (value);
1580 case ARG_UNITS_SOFT_MAX:
1581 multifdsink->units_soft_max = g_value_get_int (value);
1583 case ARG_RECOVER_POLICY:
1584 multifdsink->recover_policy = g_value_get_enum (value);
1587 multifdsink->timeout = g_value_get_uint64 (value);
1589 case ARG_SYNC_CLIENTS:
1590 if (g_value_get_boolean (value) == TRUE) {
1591 multifdsink->sync_method = GST_SYNC_METHOD_WAIT;
1593 multifdsink->sync_method = GST_SYNC_METHOD_NONE;
1596 case ARG_SYNC_METHOD:
1597 multifdsink->sync_method = g_value_get_enum (value);
1601 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1607 gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
1610 GstMultiFdSink *multifdsink;
1612 g_return_if_fail (GST_IS_MULTIFDSINK (object));
1613 multifdsink = GST_MULTIFDSINK (object);
1617 g_value_set_enum (value, multifdsink->protocol);
1620 g_value_set_enum (value, multifdsink->mode);
1622 case ARG_BUFFERS_MAX:
1623 g_value_set_int (value, multifdsink->units_max);
1625 case ARG_BUFFERS_SOFT_MAX:
1626 g_value_set_int (value, multifdsink->units_soft_max);
1628 case ARG_BUFFERS_QUEUED:
1629 g_value_set_uint (value, multifdsink->buffers_queued);
1631 case ARG_BYTES_QUEUED:
1632 g_value_set_uint (value, multifdsink->bytes_queued);
1634 case ARG_TIME_QUEUED:
1635 g_value_set_uint64 (value, multifdsink->time_queued);
1638 g_value_set_enum (value, multifdsink->unit_type);
1641 g_value_set_int (value, multifdsink->units_max);
1643 case ARG_UNITS_SOFT_MAX:
1644 g_value_set_int (value, multifdsink->units_soft_max);
1646 case ARG_RECOVER_POLICY:
1647 g_value_set_enum (value, multifdsink->recover_policy);
1650 g_value_set_uint64 (value, multifdsink->timeout);
1652 case ARG_SYNC_CLIENTS:
1653 g_value_set_boolean (value,
1654 multifdsink->sync_method == GST_SYNC_METHOD_WAIT);
1656 case ARG_SYNC_METHOD:
1657 g_value_set_enum (value, multifdsink->sync_method);
1659 case ARG_BYTES_TO_SERVE:
1660 g_value_set_uint64 (value, multifdsink->bytes_to_serve);
1662 case ARG_BYTES_SERVED:
1663 g_value_set_uint64 (value, multifdsink->bytes_served);
1667 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1673 /* create a socket for sending to remote machine */
1675 gst_multifdsink_start (GstBaseSink * bsink)
1677 GstMultiFdSinkClass *fclass;
1678 int control_socket[2];
1679 GstMultiFdSink *this;
1681 if (GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
1684 this = GST_MULTIFDSINK (bsink);
1685 fclass = GST_MULTIFDSINK_GET_CLASS (this);
1687 GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
1688 this->fdset = gst_fdset_new (this->mode);
1690 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
1693 READ_SOCKET (this).fd = control_socket[0];
1694 WRITE_SOCKET (this).fd = control_socket[1];
1696 gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
1697 gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
1699 fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1700 fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1702 this->streamheader = NULL;
1703 this->bytes_to_serve = 0;
1704 this->bytes_served = 0;
1707 fclass->init (this);
1710 this->running = TRUE;
1711 this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread,
1714 GST_FLAG_SET (this, GST_MULTIFDSINK_OPEN);
1721 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
1728 gst_multifdsink_stop (GstBaseSink * bsink)
1730 GstMultiFdSinkClass *fclass;
1731 GstMultiFdSink *this;
1733 this = GST_MULTIFDSINK (bsink);
1734 fclass = GST_MULTIFDSINK_GET_CLASS (this);
1736 if (!GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
1739 this->running = FALSE;
1741 SEND_COMMAND (this, CONTROL_STOP);
1743 g_thread_join (this->thread);
1744 this->thread = NULL;
1747 /* free the clients */
1748 gst_multifdsink_clear (this);
1750 close (READ_SOCKET (this).fd);
1751 close (WRITE_SOCKET (this).fd);
1753 if (this->streamheader) {
1754 g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
1755 g_slist_free (this->streamheader);
1756 this->streamheader = NULL;
1760 fclass->close (this);
1763 gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
1764 gst_fdset_free (this->fdset);
1767 GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
1768 CLIENTS_LOCK_FREE (this);
1769 g_hash_table_destroy (this->fd_hash);
1774 static GstStateChangeReturn
1775 gst_multifdsink_change_state (GstElement * element, GstStateChange transition)
1777 GstMultiFdSink *sink;
1778 GstStateChangeReturn ret;
1780 sink = GST_MULTIFDSINK (element);
1782 /* we disallow changing the state from the streaming thread */
1783 if (g_thread_self () == sink->thread)
1784 return GST_STATE_CHANGE_FAILURE;
1787 switch (transition) {
1788 case GST_STATE_CHANGE_NULL_TO_READY:
1789 if (!gst_multifdsink_start (GST_BASE_SINK (sink)))
1792 case GST_STATE_CHANGE_READY_TO_PAUSED:
1794 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1800 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1802 switch (transition) {
1803 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1805 case GST_STATE_CHANGE_PAUSED_TO_READY:
1807 case GST_STATE_CHANGE_READY_TO_NULL:
1808 gst_multifdsink_stop (GST_BASE_SINK (sink));
1818 return GST_STATE_CHANGE_FAILURE;