All plugins updated for element state changes.
[platform/upstream/gstreamer.git] / gst / tcp / gstmultifdsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:multifdsink
23  * @short_description: a sink that writes to multiple file descriptors
24  * @see_also: tcpserversink
25  *
26  * Incredibly, still a sink that writes to multiple file descriptors
27  */
28
29 #ifdef HAVE_CONFIG_H
30 #include "config.h"
31 #endif
32 #include <gst/gst-i18n-plugin.h>
33
34 #include <sys/ioctl.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <sys/stat.h>
40
41 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
42 #include <sys/filio.h>
43 #endif
44
45 #include "gstmultifdsink.h"
46 #include "gsttcp-marshal.h"
47
48 #define NOT_IMPLEMENTED 0
49
50 /* the select call is also performed on the control sockets, that way
51  * we can send special commands to unblock or restart the select call */
52 #define CONTROL_RESTART         'R'     /* restart the select call */
53 #define CONTROL_STOP            'S'     /* stop the select call */
54 #define CONTROL_SOCKETS(sink)   sink->control_sock
55 #define WRITE_SOCKET(sink)      sink->control_sock[1]
56 #define READ_SOCKET(sink)       sink->control_sock[0]
57
58 #define SEND_COMMAND(sink, command)             \
59 G_STMT_START {                                  \
60   unsigned char c; c = command;                 \
61   write (WRITE_SOCKET(sink).fd, &c, 1);         \
62 } G_STMT_END
63
64 #define READ_COMMAND(sink, command, res)        \
65 G_STMT_START {                                  \
66   res = read(READ_SOCKET(sink).fd, &command, 1);        \
67 } G_STMT_END
68
69 /* elementfactory information */
70 static GstElementDetails gst_multifdsink_details =
71 GST_ELEMENT_DETAILS ("MultiFd sink",
72     "Sink/Network",
73     "Send data to multiple filedescriptors",
74     "Thomas Vander Stichele <thomas at apestaart dot org>, "
75     "Wim Taymans <wim@fluendo.com>");
76
77 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
78     GST_PAD_SINK,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS_ANY);
81
82 GST_DEBUG_CATEGORY (multifdsink_debug);
83 #define GST_CAT_DEFAULT (multifdsink_debug)
84
85 /* MultiFdSink signals and args */
86 enum
87 {
88   /* methods */
89   SIGNAL_ADD,
90   SIGNAL_REMOVE,
91   SIGNAL_CLEAR,
92   SIGNAL_GET_STATS,
93
94   /* signals */
95   SIGNAL_CLIENT_ADDED,
96   SIGNAL_CLIENT_REMOVED,
97
98   LAST_SIGNAL
99 };
100
101 /* this is really arbitrarily chosen */
102 #define DEFAULT_PROTOCOL                 GST_TCP_PROTOCOL_NONE
103 #define DEFAULT_MODE                     GST_FDSET_MODE_POLL
104 #define DEFAULT_BUFFERS_MAX             -1
105 #define DEFAULT_BUFFERS_SOFT_MAX        -1
106 #define DEFAULT_UNIT_TYPE               GST_UNIT_TYPE_BUFFERS
107 #define DEFAULT_UNITS_MAX               -1
108 #define DEFAULT_UNITS_SOFT_MAX          -1
109 #define DEFAULT_RECOVER_POLICY           GST_RECOVER_POLICY_NONE
110 #define DEFAULT_TIMEOUT                  0
111 #define DEFAULT_SYNC_METHOD              GST_SYNC_METHOD_NONE
112
113 enum
114 {
115   ARG_0,
116   ARG_PROTOCOL,
117   ARG_MODE,
118   ARG_BUFFERS_QUEUED,
119   ARG_BYTES_QUEUED,
120   ARG_TIME_QUEUED,
121
122   ARG_UNIT_TYPE,
123   ARG_UNITS_MAX,
124   ARG_UNITS_SOFT_MAX,
125
126   ARG_BUFFERS_MAX,
127   ARG_BUFFERS_SOFT_MAX,
128
129   ARG_RECOVER_POLICY,
130   ARG_TIMEOUT,
131   ARG_SYNC_CLIENTS,             /* deprecated */
132   ARG_SYNC_METHOD,
133   ARG_BYTES_TO_SERVE,
134   ARG_BYTES_SERVED,
135 };
136
137 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
138 static GType
139 gst_recover_policy_get_type (void)
140 {
141   static GType recover_policy_type = 0;
142   static GEnumValue recover_policy[] = {
143     {GST_RECOVER_POLICY_NONE, "GST_RECOVER_POLICY_NONE",
144         "Do not try to recover"},
145     {GST_RECOVER_POLICY_RESYNC_START, "GST_RECOVER_POLICY_RESYNC_START",
146         "Resync client to most recent buffer"},
147     {GST_RECOVER_POLICY_RESYNC_SOFT, "GST_RECOVER_POLICY_RESYNC_SOFT",
148         "Resync client to soft limit"},
149     {GST_RECOVER_POLICY_RESYNC_KEYFRAME, "GST_RECOVER_POLICY_RESYNC_KEYFRAME",
150         "Resync client to most recent keyframe"},
151     {0, NULL, NULL},
152   };
153
154   if (!recover_policy_type) {
155     recover_policy_type =
156         g_enum_register_static ("GstRecoverPolicy", recover_policy);
157   }
158   return recover_policy_type;
159 }
160
161 #define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
162 static GType
163 gst_sync_method_get_type (void)
164 {
165   static GType sync_method_type = 0;
166   static GEnumValue sync_method[] = {
167     {GST_SYNC_METHOD_NONE, "GST_SYNC_METHOD_NONE",
168         "Serve new client the latest buffer"},
169     {GST_SYNC_METHOD_WAIT, "GST_SYNC_METHOD_WAIT",
170         "Make the new client wait for the next keyframe"},
171     {GST_SYNC_METHOD_BURST, "GST_SYNC_METHOD_BURST",
172         "Serve the new client the last keyframe, aka burst"},
173     {0, NULL, NULL},
174   };
175
176   if (!sync_method_type) {
177     sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method);
178   }
179   return sync_method_type;
180 }
181
182 #if NOT_IMPLEMENTED
183 #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
184 static GType
185 gst_unit_type_get_type (void)
186 {
187   static GType unit_type_type = 0;
188   static GEnumValue unit_type[] = {
189     {GST_UNIT_TYPE_BUFFERS, "GST_UNIT_TYPE_BUFFERS", "Buffers"},
190     {GST_UNIT_TYPE_BYTES, "GST_UNIT_TYPE_BYTES", "Bytes"},
191     {GST_UNIT_TYPE_TIME, "GST_UNIT_TYPE_TIME", "Time"},
192     {0, NULL, NULL},
193   };
194
195   if (!unit_type_type) {
196     unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
197   }
198   return unit_type_type;
199 }
200 #endif
201
202 #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
203 static GType
204 gst_client_status_get_type (void)
205 {
206   static GType client_status_type = 0;
207   static GEnumValue client_status[] = {
208     {GST_CLIENT_STATUS_OK, "GST_CLIENT_STATUS_OK", "OK"},
209     {GST_CLIENT_STATUS_CLOSED, "GST_CLIENT_STATUS_CLOSED", "Closed"},
210     {GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
211     {GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
212     {GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
213     {GST_CLIENT_STATUS_DUPLICATE, "GST_CLIENT_STATUS_DUPLICATE", "Duplicate"},
214     {0, NULL, NULL},
215   };
216
217   if (!client_status_type) {
218     client_status_type =
219         g_enum_register_static ("GstClientStatus", client_status);
220   }
221   return client_status_type;
222 }
223
224 static void gst_multifdsink_base_init (gpointer g_class);
225 static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
226 static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
227
228 static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
229     GList * link);
230
231 static GstFlowReturn gst_multifdsink_render (GstBaseSink * bsink,
232     GstBuffer * buf);
233 static GstStateChangeReturn gst_multifdsink_change_state (GstElement *
234     element, GstStateChange transition);
235
236 static void gst_multifdsink_set_property (GObject * object, guint prop_id,
237     const GValue * value, GParamSpec * pspec);
238 static void gst_multifdsink_get_property (GObject * object, guint prop_id,
239     GValue * value, GParamSpec * pspec);
240
241
242 static GstElementClass *parent_class = NULL;
243
244 static guint gst_multifdsink_signals[LAST_SIGNAL] = { 0 };
245
246 GType
247 gst_multifdsink_get_type (void)
248 {
249   static GType multifdsink_type = 0;
250
251
252   if (!multifdsink_type) {
253     static const GTypeInfo multifdsink_info = {
254       sizeof (GstMultiFdSinkClass),
255       gst_multifdsink_base_init,
256       NULL,
257       (GClassInitFunc) gst_multifdsink_class_init,
258       NULL,
259       NULL,
260       sizeof (GstMultiFdSink),
261       0,
262       (GInstanceInitFunc) gst_multifdsink_init,
263       NULL
264     };
265
266     multifdsink_type =
267         g_type_register_static (GST_TYPE_BASE_SINK, "GstMultiFdSink",
268         &multifdsink_info, 0);
269   }
270   return multifdsink_type;
271 }
272
273 static void
274 gst_multifdsink_base_init (gpointer g_class)
275 {
276   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
277
278   gst_element_class_add_pad_template (element_class,
279       gst_static_pad_template_get (&sinktemplate));
280
281   gst_element_class_set_details (element_class, &gst_multifdsink_details);
282 }
283
284 static void
285 gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
286 {
287   GObjectClass *gobject_class;
288   GstElementClass *gstelement_class;
289   GstBaseSinkClass *gstbasesink_class;
290
291   gobject_class = (GObjectClass *) klass;
292   gstelement_class = (GstElementClass *) klass;
293   gstbasesink_class = (GstBaseSinkClass *) klass;
294
295   parent_class = g_type_class_ref (GST_TYPE_BASE_SINK);
296
297   gobject_class->set_property = gst_multifdsink_set_property;
298   gobject_class->get_property = gst_multifdsink_get_property;
299
300   g_object_class_install_property (gobject_class, ARG_PROTOCOL,
301       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
302           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
303   g_object_class_install_property (gobject_class, ARG_MODE,
304       g_param_spec_enum ("mode", "Mode",
305           "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
306           DEFAULT_MODE, G_PARAM_READWRITE));
307
308   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
309       g_param_spec_int ("buffers-max", "Buffers max",
310           "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
311           DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
312   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
313       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
314           "Recover client when going over this limit (-1 = no limit)", -1,
315           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
316
317 #if NOT_IMPLEMENTED
318   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
319       g_param_spec_enum ("unit-type", "Units type",
320           "The unit to measure the max/soft-max/queued properties",
321           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
322   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
323       g_param_spec_int ("units-max", "Units max",
324           "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
325           DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
326   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
327       g_param_spec_int ("units-soft-max", "Units soft max",
328           "Recover client when going over this limit (-1 = no limit)", -1,
329           G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
330 #endif
331
332   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
333       g_param_spec_uint ("buffers-queued", "Buffers queued",
334           "Number of buffers currently queued", 0, G_MAXUINT, 0,
335           G_PARAM_READABLE));
336 #if NOT_IMPLEMENTED
337   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
338       g_param_spec_uint ("bytes-queued", "Bytes queued",
339           "Number of bytes currently queued", 0, G_MAXUINT, 0,
340           G_PARAM_READABLE));
341   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
342       g_param_spec_uint64 ("time-queued", "Time queued",
343           "Number of time currently queued", 0, G_MAXUINT64, 0,
344           G_PARAM_READABLE));
345 #endif
346
347   g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
348       g_param_spec_enum ("recover-policy", "Recover Policy",
349           "How to recover when client reaches the soft max",
350           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
351   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
352       g_param_spec_uint64 ("timeout", "Timeout",
353           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
354           0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
355   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_CLIENTS,
356       g_param_spec_boolean ("sync-clients", "Sync clients",
357           "(DEPRECATED) Sync clients to a keyframe",
358           DEFAULT_SYNC_METHOD == GST_SYNC_METHOD_WAIT, G_PARAM_READWRITE));
359   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
360       g_param_spec_enum ("sync-method", "Sync Method",
361           "How to sync new clients to the stream",
362           GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
363   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
364       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
365           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
366           G_PARAM_READABLE));
367   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
368       g_param_spec_uint64 ("bytes-served", "Bytes served",
369           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
370           G_PARAM_READABLE));
371
372   /**
373    * GstMultiFdSink::add:
374    * @gstmultifdsink: the multifdsink element to emit this signal on
375    * @arg1:           the file descriptor to add to multifdsink
376    *
377    * Hand the given open file descriptor to multifdsink to write to.
378    */
379   gst_multifdsink_signals[SIGNAL_ADD] =
380       g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
381       G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
382       NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
383   /**
384    * GstMultiFdSink::remove:
385    * @gstmultifdsink: the multifdsink element to emit this signal on
386    * @arg1:           the file descriptor to remove from multifdsink
387    *
388    * Remove the given open file descriptor from multifdsink.
389    */
390   gst_multifdsink_signals[SIGNAL_REMOVE] =
391       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
392       G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
393       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
394   /**
395    * GstMultiFdSink::clear:
396    * @gstmultifdsink: the multifdsink element to emit this signal on
397    *
398    * Clear all file descriptors from multifdsink.
399    */
400   gst_multifdsink_signals[SIGNAL_CLEAR] =
401       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
402       G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
403       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
404   gst_multifdsink_signals[SIGNAL_GET_STATS] =
405       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
406       G_STRUCT_OFFSET (GstMultiFdSinkClass, get_stats),
407       NULL, NULL, gst_tcp_marshal_BOXED__INT, G_TYPE_VALUE_ARRAY, 1,
408       G_TYPE_INT);
409
410   /**
411    * GstMultiFdSink::client-added:
412    * @gstmultifdsink: the multifdsink element that emitted this signal
413    * @arg1:           the file descriptor that was added to multifdsink
414    *
415    * The given file descriptor was added to multifdsink.
416    */
417   gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
418       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
419       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
420       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
421   /**
422    * GstMultiFdSink::client-removed:
423    * @gstmultifdsink: the multifdsink element that emitted this signal
424    * @arg1:           the file descriptor that was removed from multifdsink
425    *
426    * The given file descriptor was removed from multifdsink.
427    */
428   gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] =
429       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
430       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
431           client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
432       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
433
434   gstelement_class->change_state =
435       GST_DEBUG_FUNCPTR (gst_multifdsink_change_state);
436
437   gstbasesink_class->render = gst_multifdsink_render;
438
439   klass->add = gst_multifdsink_add;
440   klass->remove = gst_multifdsink_remove;
441   klass->clear = gst_multifdsink_clear;
442   klass->get_stats = gst_multifdsink_get_stats;
443
444   GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
445 }
446
447 static void
448 gst_multifdsink_init (GstMultiFdSink * this)
449 {
450   GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
451
452   this->protocol = DEFAULT_PROTOCOL;
453   this->mode = DEFAULT_MODE;
454
455   CLIENTS_LOCK_INIT (this);
456   this->clients = NULL;
457   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
458
459   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
460   this->unit_type = DEFAULT_UNIT_TYPE;
461   this->units_max = DEFAULT_UNITS_MAX;
462   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
463   this->recover_policy = DEFAULT_RECOVER_POLICY;
464
465   this->timeout = DEFAULT_TIMEOUT;
466   this->sync_method = DEFAULT_SYNC_METHOD;
467 }
468
469 void
470 gst_multifdsink_add (GstMultiFdSink * sink, int fd)
471 {
472   GstTCPClient *client;
473   GList *clink;
474   GTimeVal now;
475   gint flags, res;
476   struct stat statbuf;
477
478   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd);
479
480   /* create client datastructure */
481   client = g_new0 (GstTCPClient, 1);
482   client->fd.fd = fd;
483   client->status = GST_CLIENT_STATUS_OK;
484   client->bufpos = -1;
485   client->bufoffset = 0;
486   client->sending = NULL;
487   client->bytes_sent = 0;
488   client->dropped_buffers = 0;
489   client->avg_queue_size = 0;
490   client->new_connection = TRUE;
491
492   /* update start time */
493   g_get_current_time (&now);
494   client->connect_time = GST_TIMEVAL_TO_TIME (now);
495   client->disconnect_time = 0;
496   /* send last activity time to connect time */
497   client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
498
499   CLIENTS_LOCK (sink);
500
501   /* check the hash to find a duplicate fd */
502   clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
503   if (clink != NULL) {
504     client->status = GST_CLIENT_STATUS_DUPLICATE;
505     CLIENTS_UNLOCK (sink);
506     GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
507     g_signal_emit (G_OBJECT (sink),
508         gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
509     g_free (client);
510     return;
511   }
512
513   /* we can add the fd now */
514   clink = sink->clients = g_list_prepend (sink->clients, client);
515   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
516
517   /* set the socket to non blocking */
518   res = fcntl (fd, F_SETFL, O_NONBLOCK);
519   /* we always read from a client */
520   gst_fdset_add_fd (sink->fdset, &client->fd);
521
522   /* we don't try to read from write only fds */
523   flags = fcntl (fd, F_GETFL, 0);
524   if ((flags & O_ACCMODE) != O_WRONLY) {
525     gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
526   }
527   /* figure out the mode, can't use send() for non sockets */
528   res = fstat (fd, &statbuf);
529   if (S_ISSOCK (statbuf.st_mode)) {
530     client->is_socket = TRUE;
531   }
532
533   SEND_COMMAND (sink, CONTROL_RESTART);
534
535   CLIENTS_UNLOCK (sink);
536
537   g_signal_emit (G_OBJECT (sink),
538       gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
539 }
540
541 void
542 gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
543 {
544   GList *clink;
545
546   GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
547
548   CLIENTS_LOCK (sink);
549   clink = g_hash_table_lookup (sink->fd_hash, &fd);
550   if (clink != NULL) {
551     GstTCPClient *client = (GstTCPClient *) clink->data;
552
553     client->status = GST_CLIENT_STATUS_REMOVED;
554     gst_multifdsink_remove_client_link (sink, clink);
555     SEND_COMMAND (sink, CONTROL_RESTART);
556   } else {
557     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
558   }
559   CLIENTS_UNLOCK (sink);
560 }
561
562 void
563 gst_multifdsink_clear (GstMultiFdSink * sink)
564 {
565   GList *clients, *next;
566
567   GST_DEBUG_OBJECT (sink, "clearing all clients");
568
569   CLIENTS_LOCK (sink);
570   for (clients = sink->clients; clients; clients = next) {
571     GstTCPClient *client;
572
573     client = (GstTCPClient *) clients->data;
574     next = g_list_next (clients);
575
576     client->status = GST_CLIENT_STATUS_REMOVED;
577     gst_multifdsink_remove_client_link (sink, clients);
578   }
579   SEND_COMMAND (sink, CONTROL_RESTART);
580   CLIENTS_UNLOCK (sink);
581 }
582
583 GValueArray *
584 gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
585 {
586   GstTCPClient *client;
587   GValueArray *result = NULL;
588   GList *clink;
589
590   CLIENTS_LOCK (sink);
591   clink = g_hash_table_lookup (sink->fd_hash, &fd);
592   client = (GstTCPClient *) clink->data;
593   if (client != NULL) {
594     GValue value = { 0 };
595     guint64 interval;
596
597     result = g_value_array_new (4);
598
599     g_value_init (&value, G_TYPE_UINT64);
600     g_value_set_uint64 (&value, client->bytes_sent);
601     result = g_value_array_append (result, &value);
602     g_value_unset (&value);
603     g_value_init (&value, G_TYPE_UINT64);
604     g_value_set_uint64 (&value, client->connect_time);
605     result = g_value_array_append (result, &value);
606     g_value_unset (&value);
607     if (client->disconnect_time == 0) {
608       GTimeVal nowtv;
609
610       g_get_current_time (&nowtv);
611
612       interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
613     } else {
614       interval = client->disconnect_time - client->connect_time;
615     }
616     g_value_init (&value, G_TYPE_UINT64);
617     g_value_set_uint64 (&value, client->disconnect_time);
618     result = g_value_array_append (result, &value);
619     g_value_unset (&value);
620     g_value_init (&value, G_TYPE_UINT64);
621     g_value_set_uint64 (&value, interval);
622     result = g_value_array_append (result, &value);
623     g_value_unset (&value);
624     g_value_init (&value, G_TYPE_UINT64);
625     g_value_set_uint64 (&value, client->last_activity_time);
626     result = g_value_array_append (result, &value);
627   }
628   CLIENTS_UNLOCK (sink);
629
630   /* python doesn't like a NULL pointer yet */
631   if (result == NULL) {
632     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
633     result = g_value_array_new (0);
634   }
635
636   return result;
637 }
638
639 /* should be called with the clientslock held.
640  * Note that we don't close the fd as we didn't open it in the first
641  * place. An application should connect to the client-removed signal and
642  * close the fd itself.
643  */
644 static void
645 gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
646 {
647   int fd;
648   GTimeVal now;
649   GstTCPClient *client = (GstTCPClient *) link->data;
650   GstMultiFdSinkClass *fclass;
651
652   fclass = GST_MULTIFDSINK_GET_CLASS (sink);
653
654   fd = client->fd.fd;
655
656   /* FIXME: if we keep track of ip we can log it here and signal */
657   switch (client->status) {
658     case GST_CLIENT_STATUS_OK:
659       GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason",
660           fd, client);
661       break;
662     case GST_CLIENT_STATUS_CLOSED:
663       GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close",
664           fd, client);
665       break;
666     case GST_CLIENT_STATUS_REMOVED:
667       GST_DEBUG_OBJECT (sink,
668           "[fd %5d] removing client %p because the app removed it", fd, client);
669       break;
670     case GST_CLIENT_STATUS_SLOW:
671       GST_INFO_OBJECT (sink,
672           "[fd %5d] removing client %p because it was too slow", fd, client);
673       break;
674     case GST_CLIENT_STATUS_ERROR:
675       GST_WARNING_OBJECT (sink,
676           "[fd %5d] removing client %p because of error", fd, client);
677       break;
678     default:
679       GST_WARNING_OBJECT (sink,
680           "[fd %5d] removing client %p with invalid reason", fd, client);
681       break;
682   }
683
684   gst_fdset_remove_fd (sink->fdset, &client->fd);
685
686   g_get_current_time (&now);
687   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
688
689   /* free client buffers */
690   g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
691   g_slist_free (client->sending);
692   client->sending = NULL;
693
694   /* unlock the mutex before signaling because the signal handler
695    * might query some properties */
696   CLIENTS_UNLOCK (sink);
697
698   g_signal_emit (G_OBJECT (sink),
699       gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
700
701   /* lock again before we remove the client completely */
702   CLIENTS_LOCK (sink);
703
704   if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
705     GST_WARNING_OBJECT (sink,
706         "[fd %5d] error removing client %p from hash", client->fd.fd, client);
707   }
708   /* after releasing the lock above, the link could be invalid, more
709    * precisely, the next and prev pointers could point to invalid list
710    * links. One optimisation could be to add a cookie to the linked list
711    * and take a shortcut when it did not change between unlocking and locking
712    * our mutex. For now we just walk the list again. */
713   sink->clients = g_list_remove (sink->clients, client);
714
715   if (fclass->removed)
716     fclass->removed (sink, client->fd.fd);
717
718   g_free (client);
719 }
720
721 /* handle a read on a client fd,
722  * which either indicates a close or should be ignored
723  * returns FALSE if some error occured or the client closed. */
724 static gboolean
725 gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
726     GstTCPClient * client)
727 {
728   int avail, fd;
729   gboolean ret;
730
731   fd = client->fd.fd;
732
733   if (ioctl (fd, FIONREAD, &avail) < 0) {
734     GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
735         fd, g_strerror (errno), errno);
736     client->status = GST_CLIENT_STATUS_ERROR;
737     ret = FALSE;
738     return ret;
739   }
740
741   GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
742       fd, avail);
743
744   ret = TRUE;
745
746   if (avail == 0) {
747     /* client sent close, so remove it */
748     GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
749     client->status = GST_CLIENT_STATUS_CLOSED;
750     ret = FALSE;
751   } else if (avail < 0) {
752     GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
753     client->status = GST_CLIENT_STATUS_ERROR;
754     ret = FALSE;
755   } else {
756     guint8 dummy[512];
757     gint nread;
758
759     /* just Read 'n' Drop, could also just drop the client as it's not supposed
760      * to write to us except for closing the socket, I guess it's because we
761      * like to listen to our customers. */
762     do {
763       /* this is the maximum we can read */
764       gint to_read = MIN (avail, 512);
765
766       GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
767           fd, to_read);
768
769       nread = read (fd, dummy, to_read);
770       if (nread < -1) {
771         GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
772             fd, to_read, g_strerror (errno), errno);
773         client->status = GST_CLIENT_STATUS_ERROR;
774         ret = FALSE;
775         break;
776       } else if (nread == 0) {
777         GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
778         client->status = GST_CLIENT_STATUS_ERROR;
779         ret = FALSE;
780         break;
781       }
782       avail -= nread;
783     }
784     while (avail > 0);
785   }
786   return ret;
787 }
788
789 static gboolean
790 gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client,
791     gchar * data, gint len)
792 {
793   GstBuffer *buf;
794
795   buf = gst_buffer_new ();
796   GST_BUFFER_DATA (buf) = (guint8 *) data;
797   GST_BUFFER_SIZE (buf) = len;
798
799   GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d",
800       client->fd.fd, len);
801
802   client->sending = g_slist_append (client->sending, buf);
803
804   return TRUE;
805 }
806
807 static gboolean
808 gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client,
809     const GstCaps * caps)
810 {
811   guint8 *header;
812   guint8 *payload;
813   guint length;
814   gchar *string;
815
816   string = gst_caps_to_string (caps);
817   GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP",
818       client->fd.fd, string);
819   g_free (string);
820
821   if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
822     GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
823     return FALSE;
824   }
825   gst_multifdsink_client_queue_data (sink, client, (gchar *) header, length);
826
827   length = gst_dp_header_payload_length (header);
828   gst_multifdsink_client_queue_data (sink, client, (gchar *) payload, length);
829
830   return TRUE;
831 }
832
833 static gboolean
834 is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
835 {
836   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
837     return FALSE;
838   } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
839     return TRUE;
840   }
841   return FALSE;
842 }
843
844 static gboolean
845 gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
846     GstTCPClient * client, GstBuffer * buffer)
847 {
848   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
849     guint8 *header;
850     guint len;
851
852     if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
853       GST_DEBUG_OBJECT (sink,
854           "[fd %5d] could not create header, removing client", client->fd.fd);
855       return FALSE;
856     }
857     gst_multifdsink_client_queue_data (sink, client, (gchar *) header, len);
858   }
859
860   GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d",
861       client->fd.fd, GST_BUFFER_SIZE (buffer));
862
863   gst_buffer_ref (buffer);
864   client->sending = g_slist_append (client->sending, buffer);
865
866   return TRUE;
867 }
868
869 static gint
870 gst_multifdsink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
871 {
872   gint result;
873
874   switch (sink->sync_method) {
875     case GST_SYNC_METHOD_WAIT:
876     {
877       /* if the buffer at the head of the queue is a sync point we can proceed,
878        * else we need to skip the buffer and wait for a new one */
879       GST_LOG_OBJECT (sink,
880           "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
881           client->bufpos);
882
883       /* the client is not yet alligned to a buffer */
884       if (client->bufpos < 0) {
885         result = -1;
886       } else {
887         GstBuffer *buf;
888         gint i;
889
890         for (i = client->bufpos; i >= 0; i--) {
891           /* get the buffer for the client */
892           buf = g_array_index (sink->bufqueue, GstBuffer *, i);
893           if (is_sync_frame (sink, buf)) {
894             GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync",
895                 client->fd.fd);
896             result = i;
897             goto done;
898           } else {
899             /* client is not on a buffer, need to skip this buffer and
900              * wait some more */
901             GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
902                 client->fd.fd);
903             client->bufpos--;
904           }
905         }
906         result = -1;
907       }
908       break;
909     }
910     case GST_SYNC_METHOD_BURST:
911     {
912       /* FIXME for new clients we constantly scan the complete
913        * buffer queue for sync point whenever a buffer is added. This is
914        * suboptimal because if we cannot find a sync point the first time,
915        * the algorithm should behave as GST_SYNC_METHOD_WAIT */
916       gint i, len;
917
918       GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe",
919           client->fd.fd, client->bufpos);
920
921       /* take length of queued buffers */
922       len = sink->bufqueue->len;
923       /* assume we don't find a keyframe */
924       result = -1;
925       /* then loop over all buffers to find the first keyframe */
926       for (i = 0; i < len; i++) {
927         GstBuffer *buf;
928
929         buf = g_array_index (sink->bufqueue, GstBuffer *, i);
930         if (is_sync_frame (sink, buf)) {
931           /* found a keyframe, return its position */
932           GST_LOG_OBJECT (sink, "found keyframe at %d", i);
933           result = i;
934           goto done;
935         }
936       }
937       GST_LOG_OBJECT (sink, "no keyframe found");
938       /* throw client to the waiting state */
939       client->bufpos = -1;
940       break;
941     }
942     default:
943       /* no syncing, we are happy with whatever the client is going to get */
944       GST_LOG_OBJECT (sink, "no client syn needed");
945       result = client->bufpos;
946       break;
947   }
948 done:
949   return result;
950 }
951
952 /* handle a write on a client,
953  * which indicates a read request from a client.
954  *
955  * The strategy is as follows, for each client we maintain a queue of GstBuffers
956  * that contain the raw bytes we need to send to the client. In the case of the
957  * GDP protocol, we create buffers out of the header bytes so that we can only
958  * focus on sending buffers.
959  *
960  * We first check to see if we need to send caps (in GDP) and streamheaders.
961  * If so, we queue them.
962  *
963  * Then we run into the main loop that tries to send as many buffers as
964  * possible. It will first exhaust the client->sending queue and if the queue
965  * is empty, it will pick a buffer from the global queue.
966  *
967  * Sending the Buffers from the client->sending queue is basically writing
968  * the bytes to the socket and maintaining a count of the bytes that were
969  * sent. When the buffer is completely sent, it is removed from the
970  * client->sending queue and we try to pick a new buffer for sending.
971  *
972  * When the sending returns a partial buffer we stop sending more data as
973  * the next send operation could block.
974  *
975  * This functions returns FALSE if some error occured.
976  */
977 static gboolean
978 gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
979     GstTCPClient * client)
980 {
981   int fd = client->fd.fd;
982   gboolean more;
983   gboolean res;
984   GstClockTime now;
985   GTimeVal nowtv;
986
987   g_get_current_time (&nowtv);
988   now = GST_TIMEVAL_TO_TIME (nowtv);
989
990   /* when using GDP, first check if we have queued caps yet */
991   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
992     if (!client->caps_sent) {
993       const GstCaps *caps =
994           GST_PAD_CAPS (GST_PAD_PEER (GST_BASE_SINK_PAD (sink)));
995
996       /* queue caps for sending */
997       res = gst_multifdsink_client_queue_caps (sink, client, caps);
998       if (!res) {
999         GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
1000         return FALSE;
1001       }
1002       client->caps_sent = TRUE;
1003     }
1004   }
1005   /* if we have streamheader buffers, and haven't sent them to this client
1006    * yet, send them out one by one */
1007   if (!client->streamheader_sent) {
1008     GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
1009         g_slist_length (sink->streamheader));
1010     if (sink->streamheader) {
1011       GSList *l;
1012
1013       for (l = sink->streamheader; l; l = l->next) {
1014         /* queue stream headers for sending */
1015         res =
1016             gst_multifdsink_client_queue_buffer (sink, client,
1017             GST_BUFFER (l->data));
1018         if (!res) {
1019           GST_DEBUG_OBJECT (sink,
1020               "Failed queueing streamheader, removing client");
1021           return FALSE;
1022         }
1023       }
1024     }
1025     client->streamheader_sent = TRUE;
1026   }
1027
1028   more = TRUE;
1029   do {
1030     gint maxsize;
1031
1032     if (!client->sending) {
1033       /* client is not working on a buffer */
1034       if (client->bufpos == -1) {
1035         /* client is too fast, remove from write queue until new buffer is
1036          * available */
1037         gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1038         return TRUE;
1039       } else {
1040         /* client can pick a buffer from the global queue */
1041         GstBuffer *buf;
1042
1043         /* for new connections, we need to find a good spot in the
1044          * bufqueue to start streaming from */
1045         if (client->new_connection) {
1046           gint position = gst_multifdsink_new_client (sink, client);
1047
1048           if (position >= 0) {
1049             /* we got a valid spot in the queue */
1050             client->new_connection = FALSE;
1051             client->bufpos = position;
1052           } else {
1053             /* cannot send data to this client yet */
1054             gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1055             return TRUE;
1056           }
1057         }
1058
1059         /* grab buffer */
1060         buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
1061         client->bufpos--;
1062         GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1063             fd, client, client->bufpos);
1064
1065         /* queueing a buffer will ref it */
1066         gst_multifdsink_client_queue_buffer (sink, client, buf);
1067
1068         /* need to start from the first byte for this new buffer */
1069         client->bufoffset = 0;
1070       }
1071     }
1072
1073     /* see if we need to send something */
1074     if (client->sending) {
1075       ssize_t wrote;
1076       GstBuffer *head;
1077
1078       /* pick first buffer from list */
1079       head = GST_BUFFER (client->sending->data);
1080       maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
1081
1082       /* try to write the complete buffer */
1083 #ifdef MSG_NOSIGNAL
1084 #define FLAGS MSG_NOSIGNAL
1085 #else
1086 #define FLAGS 0
1087 #endif
1088       if (client->is_socket) {
1089         wrote =
1090             send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
1091             FLAGS);
1092       } else {
1093         wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
1094       }
1095
1096       if (wrote < 0) {
1097         /* hmm error.. */
1098         if (errno == EAGAIN) {
1099           /* nothing serious, resource was unavailable, try again later */
1100           more = FALSE;
1101         } else if (errno == ECONNRESET) {
1102           GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing",
1103               fd);
1104           client->status = GST_CLIENT_STATUS_CLOSED;
1105           return FALSE;
1106         } else {
1107           GST_WARNING_OBJECT (sink,
1108               "[fd %5d] could not write, removing client: %s (%d)", fd,
1109               g_strerror (errno), errno);
1110           client->status = GST_CLIENT_STATUS_ERROR;
1111           return FALSE;
1112         }
1113       } else {
1114         if (wrote < maxsize) {
1115           /* partial write means that the client cannot read more and we should
1116            * stop sending more */
1117           GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
1118           client->bufoffset += wrote;
1119           more = FALSE;
1120         } else {
1121           /* complete buffer was written, we can proceed to the next one */
1122           client->sending = g_slist_remove (client->sending, head);
1123           gst_buffer_unref (head);
1124           /* make sure we start from byte 0 for the next buffer */
1125           client->bufoffset = 0;
1126         }
1127         /* update stats */
1128         client->bytes_sent += wrote;
1129         client->last_activity_time = now;
1130         sink->bytes_served += wrote;
1131       }
1132     }
1133   } while (more);
1134
1135   return TRUE;
1136 }
1137
1138 /* calculate the new position for a client after recovery. This function
1139  * does not update the client position but merely returns the required
1140  * position.
1141  */
1142 static gint
1143 gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
1144 {
1145   gint newbufpos;
1146
1147   GST_WARNING_OBJECT (sink,
1148       "[fd %5d] client %p is lagging at %d, recover using policy %d",
1149       client->fd.fd, client, client->bufpos, sink->recover_policy);
1150
1151   switch (sink->recover_policy) {
1152     case GST_RECOVER_POLICY_NONE:
1153       /* do nothing, client will catch up or get kicked out when it reaches
1154        * the hard max */
1155       newbufpos = client->bufpos;
1156       break;
1157     case GST_RECOVER_POLICY_RESYNC_START:
1158       /* move to beginning of queue */
1159       newbufpos = -1;
1160       break;
1161     case GST_RECOVER_POLICY_RESYNC_SOFT:
1162       /* move to beginning of soft max */
1163       newbufpos = sink->units_soft_max;
1164       break;
1165     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1166       /* find keyframe in buffers, we search backwards to find the 
1167        * closest keyframe relative to what this client already received. */
1168       newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
1169
1170       while (newbufpos >= 0) {
1171         GstBuffer *buf;
1172
1173         buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
1174         if (is_sync_frame (sink, buf)) {
1175           /* found a buffer that is not a delta unit */
1176           break;
1177         }
1178         newbufpos--;
1179       }
1180       break;
1181     default:
1182       /* unknown recovery procedure */
1183       newbufpos = sink->units_soft_max;
1184       break;
1185   }
1186   return newbufpos;
1187 }
1188
1189 /* Queue a buffer on the global queue. 
1190  *
1191  * This functions adds the buffer to the front of a GArray. It removes the
1192  * tail buffer if the max queue size is exceeded. Unreffing the buffer that
1193  * is queued. Note that unreffing the buffer is not a problem as clients who
1194  * started writing out this buffer will still have a reference to it in the
1195  * client->sending queue.
1196  *
1197  * After adding the buffer, we update all client positions in the queue. If
1198  * a client moves over the soft max, we start the recovery procedure for this
1199  * slow client. If it goes over the hard max, it is put into the slow list
1200  * and removed.
1201  *
1202  * Special care is taken of clients that were waiting for a new buffer (they
1203  * had a position of -1) because they can proceed after adding this new buffer.
1204  * This is done by adding the client back into the write fd_set and signalling
1205  * the select thread that the fd_set changed.
1206  *
1207  */
1208 static void
1209 gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
1210 {
1211   GList *clients, *next;
1212   gint queuelen;
1213   gboolean need_signal = FALSE;
1214   gint max_buffer_usage;
1215   gint i;
1216   GTimeVal nowtv;
1217   GstClockTime now;
1218
1219   g_get_current_time (&nowtv);
1220   now = GST_TIMEVAL_TO_TIME (nowtv);
1221
1222   CLIENTS_LOCK (sink);
1223   /* add buffer to queue */
1224   g_array_prepend_val (sink->bufqueue, buf);
1225   queuelen = sink->bufqueue->len;
1226
1227   /* then loop over the clients and update the positions */
1228   max_buffer_usage = 0;
1229   for (clients = sink->clients; clients; clients = next) {
1230     GstTCPClient *client;
1231
1232     client = (GstTCPClient *) clients->data;
1233     next = g_list_next (clients);
1234
1235     client->bufpos++;
1236     GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1237         client->fd.fd, client, client->bufpos);
1238     /* check soft max if needed, recover client */
1239     if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
1240       gint newpos;
1241
1242       newpos = gst_multifdsink_recover_client (sink, client);
1243       if (newpos != client->bufpos) {
1244         client->bufpos = newpos;
1245         client->discont = TRUE;
1246         GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
1247             client->fd.fd, client, client->bufpos);
1248       } else {
1249         GST_INFO_OBJECT (sink,
1250             "[fd %5d] client %p not recovering position",
1251             client->fd.fd, client);
1252       }
1253     }
1254     /* check hard max and timeout, remove client */
1255     if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
1256         (sink->timeout > 0
1257             && now - client->last_activity_time > sink->timeout)) {
1258       /* remove client */
1259       GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
1260           client->fd.fd, client);
1261       /* remove the client, the fd set will be cleared and the select thread will
1262        * be signaled */
1263       client->status = GST_CLIENT_STATUS_SLOW;
1264       gst_multifdsink_remove_client_link (sink, clients);
1265       /* set client to invalid position while being removed */
1266       client->bufpos = -1;
1267       need_signal = TRUE;
1268     } else if (client->bufpos == 0 || client->new_connection) {
1269       /* can send data to this client now. need to signal the select thread that
1270        * the fd_set changed */
1271       gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
1272       need_signal = TRUE;
1273     }
1274     /* keep track of maximum buffer usage */
1275     if (client->bufpos > max_buffer_usage) {
1276       max_buffer_usage = client->bufpos;
1277     }
1278   }
1279
1280   /* now look for sync points and make sure there is at least one
1281    * sync point in the queue. We only do this if the burst mode
1282    * is enabled. */
1283   if (sink->sync_method == GST_SYNC_METHOD_BURST) {
1284     /* no point in searching beyond the queue length */
1285     gint limit = queuelen;
1286     GstBuffer *buf;
1287
1288     /* no point in searching beyond the soft-max if any. */
1289     if (sink->units_soft_max > 0) {
1290       limit = MIN (limit, sink->units_soft_max);
1291     }
1292     GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
1293         max_buffer_usage);
1294     for (i = 0; i < limit; i++) {
1295       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1296       if (is_sync_frame (sink, buf)) {
1297         /* found a sync frame, now extend the buffer usage to
1298          * include at least this frame. */
1299         max_buffer_usage = MAX (max_buffer_usage, i);
1300         break;
1301       }
1302     }
1303     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1304   }
1305
1306   /* nobody is referencing units after max_buffer_usage so we can
1307    * remove them from the queue. We remove them in reverse order as
1308    * this is the most optimal for GArray. */
1309   for (i = queuelen - 1; i > max_buffer_usage; i--) {
1310     GstBuffer *old;
1311
1312     /* queue exceeded max size */
1313     queuelen--;
1314     old = g_array_index (sink->bufqueue, GstBuffer *, i);
1315     sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
1316
1317     /* unref tail buffer */
1318     gst_buffer_unref (old);
1319   }
1320   /* save for stats */
1321   sink->buffers_queued = max_buffer_usage;
1322   CLIENTS_UNLOCK (sink);
1323
1324   /* and send a signal to thread if fd_set changed */
1325   if (need_signal) {
1326     SEND_COMMAND (sink, CONTROL_RESTART);
1327   }
1328 }
1329
1330 /* Handle the clients. Basically does a blocking select for one
1331  * of the client fds to become read or writable. We also have a
1332  * filedescriptor to receive commands on that we need to check.
1333  *
1334  * After going out of the select call, we read and write to all
1335  * clients that can do so. Badly behaving clients are put on a
1336  * garbage list and removed.
1337  */
1338 static void
1339 gst_multifdsink_handle_clients (GstMultiFdSink * sink)
1340 {
1341   int result;
1342   GList *clients, *next;
1343   gboolean try_again;
1344   GstMultiFdSinkClass *fclass;
1345
1346   fclass = GST_MULTIFDSINK_GET_CLASS (sink);
1347
1348   do {
1349     gboolean stop = FALSE;
1350
1351     try_again = FALSE;
1352
1353     /* check for:
1354      * - server socket input (ie, new client connections)
1355      * - client socket input (ie, clients saying goodbye)
1356      * - client socket output (ie, client reads)          */
1357     GST_LOG_OBJECT (sink, "waiting on action on fdset");
1358     result = gst_fdset_wait (sink->fdset, -1);
1359
1360     /* < 0 is an error, 0 just means a timeout happened, which is impossible */
1361     if (result < 0) {
1362       GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno),
1363           errno);
1364       if (errno == EBADF) {
1365         /* ok, so one or more of the fds is invalid. We loop over them to find
1366          * the ones that give an error to the F_GETFL fcntl. */
1367         CLIENTS_LOCK (sink);
1368         for (clients = sink->clients; clients; clients = next) {
1369           GstTCPClient *client;
1370           int fd;
1371           long flags;
1372           int res;
1373
1374           client = (GstTCPClient *) clients->data;
1375           next = g_list_next (clients);
1376
1377           fd = client->fd.fd;
1378
1379           res = fcntl (fd, F_GETFL, &flags);
1380           if (res == -1) {
1381             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
1382                 fd, g_strerror (errno), errno);
1383             if (errno == EBADF) {
1384               client->status = GST_CLIENT_STATUS_ERROR;
1385               gst_multifdsink_remove_client_link (sink, clients);
1386             }
1387           }
1388         }
1389         CLIENTS_UNLOCK (sink);
1390         /* after this, go back in the select loop as the read/writefds
1391          * are not valid */
1392         try_again = TRUE;
1393       } else if (errno == EINTR) {
1394         /* interrupted system call, just redo the select */
1395         try_again = TRUE;
1396       } else {
1397         /* this is quite bad... */
1398         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
1399             ("select failed: %s (%d)", g_strerror (errno), errno));
1400         return;
1401       }
1402     } else {
1403       GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
1404       /* read all commands */
1405       if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
1406         GST_LOG_OBJECT (sink, "have a command");
1407         while (TRUE) {
1408           gchar command;
1409           int res;
1410
1411           READ_COMMAND (sink, command, res);
1412           if (res < 0) {
1413             GST_LOG_OBJECT (sink, "no more commands");
1414             /* no more commands */
1415             break;
1416           }
1417
1418           switch (command) {
1419             case CONTROL_RESTART:
1420               GST_LOG_OBJECT (sink, "restart");
1421               /* need to restart the select call as the fd_set changed */
1422               /* if other file descriptors than the READ_SOCKET had activity,
1423                * we don't restart just yet, but handle the other clients first */
1424               if (result == 1)
1425                 try_again = TRUE;
1426               break;
1427             case CONTROL_STOP:
1428               /* break out of the select loop */
1429               GST_LOG_OBJECT (sink, "stop");
1430               /* stop this function */
1431               stop = TRUE;
1432               break;
1433             default:
1434               GST_WARNING_OBJECT (sink, "unkown");
1435               g_warning ("multifdsink: unknown control message received");
1436               break;
1437           }
1438         }
1439       }
1440     }
1441     if (stop) {
1442       return;
1443     }
1444   } while (try_again);
1445
1446   /* subclasses can check fdset with this virtual function */
1447   if (fclass->wait)
1448     fclass->wait (sink, sink->fdset);
1449
1450   /* Check the clients */
1451   CLIENTS_LOCK (sink);
1452   for (clients = sink->clients; clients; clients = next) {
1453     GstTCPClient *client;
1454
1455     client = (GstTCPClient *) clients->data;
1456     next = g_list_next (clients);
1457
1458     if (client->status != GST_CLIENT_STATUS_OK) {
1459       gst_multifdsink_remove_client_link (sink, clients);
1460       continue;
1461     }
1462
1463     if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
1464       client->status = GST_CLIENT_STATUS_CLOSED;
1465       gst_multifdsink_remove_client_link (sink, clients);
1466       continue;
1467     }
1468     if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
1469       GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd);
1470       client->status = GST_CLIENT_STATUS_ERROR;
1471       gst_multifdsink_remove_client_link (sink, clients);
1472       continue;
1473     }
1474     if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
1475       /* handle client read */
1476       if (!gst_multifdsink_handle_client_read (sink, client)) {
1477         gst_multifdsink_remove_client_link (sink, clients);
1478         continue;
1479       }
1480     }
1481     if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
1482       /* handle client write */
1483       if (!gst_multifdsink_handle_client_write (sink, client)) {
1484         gst_multifdsink_remove_client_link (sink, clients);
1485         continue;
1486       }
1487     }
1488   }
1489   CLIENTS_UNLOCK (sink);
1490 }
1491
1492 /* we handle the client communication in another thread so that we do not block
1493  * the gstreamer thread while we select() on the client fds */
1494 static gpointer
1495 gst_multifdsink_thread (GstMultiFdSink * sink)
1496 {
1497   while (sink->running) {
1498     gst_multifdsink_handle_clients (sink);
1499   }
1500   return NULL;
1501 }
1502
1503 static GstFlowReturn
1504 gst_multifdsink_render (GstBaseSink * bsink, GstBuffer * buf)
1505 {
1506   GstMultiFdSink *sink;
1507
1508   sink = GST_MULTIFDSINK (bsink);
1509
1510   /* since we keep this buffer out of the scope of this method */
1511   gst_buffer_ref (buf);
1512
1513   g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN),
1514       GST_FLOW_ERROR);
1515
1516   GST_LOG_OBJECT (sink, "received buffer %p", buf);
1517   /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
1518    * it means we're getting new streamheader buffers, and we should clear
1519    * the old ones */
1520   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
1521       sink->previous_buffer_in_caps == FALSE) {
1522     GST_DEBUG_OBJECT (sink,
1523         "receiving new IN_CAPS buffers, clearing old streamheader");
1524     g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
1525     g_slist_free (sink->streamheader);
1526     sink->streamheader = NULL;
1527   }
1528   /* if the incoming buffer is marked as IN CAPS, then we assume for now
1529    * it's a streamheader that needs to be sent to each new client, so we
1530    * put it on our internal list of streamheader buffers.
1531    * After that we return, since we only send these out when we get
1532    * non IN_CAPS buffers so we properly keep track of clients that got
1533    * streamheaders. */
1534   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
1535     sink->previous_buffer_in_caps = TRUE;
1536     GST_DEBUG_OBJECT (sink,
1537         "appending IN_CAPS buffer with length %d to streamheader",
1538         GST_BUFFER_SIZE (buf));
1539     sink->streamheader = g_slist_append (sink->streamheader, buf);
1540     return GST_FLOW_OK;
1541   }
1542
1543   sink->previous_buffer_in_caps = FALSE;
1544   /* queue the buffer */
1545   gst_multifdsink_queue_buffer (sink, buf);
1546
1547   sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
1548
1549   return GST_FLOW_OK;
1550 }
1551
1552 static void
1553 gst_multifdsink_set_property (GObject * object, guint prop_id,
1554     const GValue * value, GParamSpec * pspec)
1555 {
1556   GstMultiFdSink *multifdsink;
1557
1558   g_return_if_fail (GST_IS_MULTIFDSINK (object));
1559   multifdsink = GST_MULTIFDSINK (object);
1560
1561   switch (prop_id) {
1562     case ARG_PROTOCOL:
1563       multifdsink->protocol = g_value_get_enum (value);
1564       break;
1565     case ARG_MODE:
1566       multifdsink->mode = g_value_get_enum (value);
1567       break;
1568     case ARG_BUFFERS_MAX:
1569       multifdsink->units_max = g_value_get_int (value);
1570       break;
1571     case ARG_BUFFERS_SOFT_MAX:
1572       multifdsink->units_soft_max = g_value_get_int (value);
1573       break;
1574     case ARG_UNIT_TYPE:
1575       multifdsink->unit_type = g_value_get_enum (value);
1576       break;
1577     case ARG_UNITS_MAX:
1578       multifdsink->units_max = g_value_get_int (value);
1579       break;
1580     case ARG_UNITS_SOFT_MAX:
1581       multifdsink->units_soft_max = g_value_get_int (value);
1582       break;
1583     case ARG_RECOVER_POLICY:
1584       multifdsink->recover_policy = g_value_get_enum (value);
1585       break;
1586     case ARG_TIMEOUT:
1587       multifdsink->timeout = g_value_get_uint64 (value);
1588       break;
1589     case ARG_SYNC_CLIENTS:
1590       if (g_value_get_boolean (value) == TRUE) {
1591         multifdsink->sync_method = GST_SYNC_METHOD_WAIT;
1592       } else {
1593         multifdsink->sync_method = GST_SYNC_METHOD_NONE;
1594       }
1595       break;
1596     case ARG_SYNC_METHOD:
1597       multifdsink->sync_method = g_value_get_enum (value);
1598       break;
1599
1600     default:
1601       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1602       break;
1603   }
1604 }
1605
1606 static void
1607 gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
1608     GParamSpec * pspec)
1609 {
1610   GstMultiFdSink *multifdsink;
1611
1612   g_return_if_fail (GST_IS_MULTIFDSINK (object));
1613   multifdsink = GST_MULTIFDSINK (object);
1614
1615   switch (prop_id) {
1616     case ARG_PROTOCOL:
1617       g_value_set_enum (value, multifdsink->protocol);
1618       break;
1619     case ARG_MODE:
1620       g_value_set_enum (value, multifdsink->mode);
1621       break;
1622     case ARG_BUFFERS_MAX:
1623       g_value_set_int (value, multifdsink->units_max);
1624       break;
1625     case ARG_BUFFERS_SOFT_MAX:
1626       g_value_set_int (value, multifdsink->units_soft_max);
1627       break;
1628     case ARG_BUFFERS_QUEUED:
1629       g_value_set_uint (value, multifdsink->buffers_queued);
1630       break;
1631     case ARG_BYTES_QUEUED:
1632       g_value_set_uint (value, multifdsink->bytes_queued);
1633       break;
1634     case ARG_TIME_QUEUED:
1635       g_value_set_uint64 (value, multifdsink->time_queued);
1636       break;
1637     case ARG_UNIT_TYPE:
1638       g_value_set_enum (value, multifdsink->unit_type);
1639       break;
1640     case ARG_UNITS_MAX:
1641       g_value_set_int (value, multifdsink->units_max);
1642       break;
1643     case ARG_UNITS_SOFT_MAX:
1644       g_value_set_int (value, multifdsink->units_soft_max);
1645       break;
1646     case ARG_RECOVER_POLICY:
1647       g_value_set_enum (value, multifdsink->recover_policy);
1648       break;
1649     case ARG_TIMEOUT:
1650       g_value_set_uint64 (value, multifdsink->timeout);
1651       break;
1652     case ARG_SYNC_CLIENTS:
1653       g_value_set_boolean (value,
1654           multifdsink->sync_method == GST_SYNC_METHOD_WAIT);
1655       break;
1656     case ARG_SYNC_METHOD:
1657       g_value_set_enum (value, multifdsink->sync_method);
1658       break;
1659     case ARG_BYTES_TO_SERVE:
1660       g_value_set_uint64 (value, multifdsink->bytes_to_serve);
1661       break;
1662     case ARG_BYTES_SERVED:
1663       g_value_set_uint64 (value, multifdsink->bytes_served);
1664       break;
1665
1666     default:
1667       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1668       break;
1669   }
1670 }
1671
1672
1673 /* create a socket for sending to remote machine */
1674 static gboolean
1675 gst_multifdsink_start (GstBaseSink * bsink)
1676 {
1677   GstMultiFdSinkClass *fclass;
1678   int control_socket[2];
1679   GstMultiFdSink *this;
1680
1681   if (GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
1682     return TRUE;
1683
1684   this = GST_MULTIFDSINK (bsink);
1685   fclass = GST_MULTIFDSINK_GET_CLASS (this);
1686
1687   GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
1688   this->fdset = gst_fdset_new (this->mode);
1689
1690   if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
1691     goto socket_pair;
1692
1693   READ_SOCKET (this).fd = control_socket[0];
1694   WRITE_SOCKET (this).fd = control_socket[1];
1695
1696   gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
1697   gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
1698
1699   fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1700   fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1701
1702   this->streamheader = NULL;
1703   this->bytes_to_serve = 0;
1704   this->bytes_served = 0;
1705
1706   if (fclass->init) {
1707     fclass->init (this);
1708   }
1709
1710   this->running = TRUE;
1711   this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread,
1712       this, TRUE, NULL);
1713
1714   GST_FLAG_SET (this, GST_MULTIFDSINK_OPEN);
1715
1716   return TRUE;
1717
1718   /* ERRORS */
1719 socket_pair:
1720   {
1721     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
1722         GST_ERROR_SYSTEM);
1723     return FALSE;
1724   }
1725 }
1726
1727 static gboolean
1728 gst_multifdsink_stop (GstBaseSink * bsink)
1729 {
1730   GstMultiFdSinkClass *fclass;
1731   GstMultiFdSink *this;
1732
1733   this = GST_MULTIFDSINK (bsink);
1734   fclass = GST_MULTIFDSINK_GET_CLASS (this);
1735
1736   if (!GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
1737     return TRUE;
1738
1739   this->running = FALSE;
1740
1741   SEND_COMMAND (this, CONTROL_STOP);
1742   if (this->thread) {
1743     g_thread_join (this->thread);
1744     this->thread = NULL;
1745   }
1746
1747   /* free the clients */
1748   gst_multifdsink_clear (this);
1749
1750   close (READ_SOCKET (this).fd);
1751   close (WRITE_SOCKET (this).fd);
1752
1753   if (this->streamheader) {
1754     g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
1755     g_slist_free (this->streamheader);
1756     this->streamheader = NULL;
1757   }
1758
1759   if (fclass->close)
1760     fclass->close (this);
1761
1762   if (this->fdset) {
1763     gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
1764     gst_fdset_free (this->fdset);
1765     this->fdset = NULL;
1766   }
1767   GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
1768   CLIENTS_LOCK_FREE (this);
1769   g_hash_table_destroy (this->fd_hash);
1770
1771   return TRUE;
1772 }
1773
1774 static GstStateChangeReturn
1775 gst_multifdsink_change_state (GstElement * element, GstStateChange transition)
1776 {
1777   GstMultiFdSink *sink;
1778   GstStateChangeReturn ret;
1779
1780   sink = GST_MULTIFDSINK (element);
1781
1782   /* we disallow changing the state from the streaming thread */
1783   if (g_thread_self () == sink->thread)
1784     return GST_STATE_CHANGE_FAILURE;
1785
1786
1787   switch (transition) {
1788     case GST_STATE_CHANGE_NULL_TO_READY:
1789       if (!gst_multifdsink_start (GST_BASE_SINK (sink)))
1790         goto start_failed;
1791       break;
1792     case GST_STATE_CHANGE_READY_TO_PAUSED:
1793       break;
1794     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1795       break;
1796     default:
1797       break;
1798   }
1799
1800   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1801
1802   switch (transition) {
1803     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1804       break;
1805     case GST_STATE_CHANGE_PAUSED_TO_READY:
1806       break;
1807     case GST_STATE_CHANGE_READY_TO_NULL:
1808       gst_multifdsink_stop (GST_BASE_SINK (sink));
1809       break;
1810     default:
1811       break;
1812   }
1813   return ret;
1814
1815   /* ERRORS */
1816 start_failed:
1817   {
1818     return GST_STATE_CHANGE_FAILURE;
1819   }
1820 }