5c70e3c16fdc994e55a692becd416fdfe9d02aae
[platform/upstream/gstreamer.git] / gst / tcp / gstmultifdsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 #include <gst/gst-i18n-plugin.h>
25
26 #include <sys/ioctl.h>
27 #include <unistd.h>
28 #include <fcntl.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 #include <sys/stat.h>
32
33 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
34 #include <sys/filio.h>
35 #endif
36
37 #include "gstmultifdsink.h"
38 #include "gsttcp-marshal.h"
39
40 #define NOT_IMPLEMENTED 0
41
42 /* the select call is also performed on the control sockets, that way
43  * we can send special commands to unblock or restart the select call */
44 #define CONTROL_RESTART         'R'     /* restart the select call */
45 #define CONTROL_STOP            'S'     /* stop the select call */
46 #define CONTROL_SOCKETS(sink)   sink->control_sock
47 #define WRITE_SOCKET(sink)      sink->control_sock[1]
48 #define READ_SOCKET(sink)       sink->control_sock[0]
49
50 #define SEND_COMMAND(sink, command)             \
51 G_STMT_START {                                  \
52   unsigned char c; c = command;                 \
53   write (WRITE_SOCKET(sink).fd, &c, 1);         \
54 } G_STMT_END
55
56 #define READ_COMMAND(sink, command, res)        \
57 G_STMT_START {                                  \
58   res = read(READ_SOCKET(sink).fd, &command, 1);        \
59 } G_STMT_END
60
61 /* elementfactory information */
62 static GstElementDetails gst_multifdsink_details =
63 GST_ELEMENT_DETAILS ("MultiFd sink",
64     "Sink/Network",
65     "Send data to multiple filedescriptors",
66     "Thomas Vander Stichele <thomas at apestaart dot org>, "
67     "Wim Taymans <wim@fluendo.com>");
68
69 GST_DEBUG_CATEGORY (multifdsink_debug);
70 #define GST_CAT_DEFAULT (multifdsink_debug)
71
72 /* MultiFdSink signals and args */
73 enum
74 {
75   /* methods */
76   SIGNAL_ADD,
77   SIGNAL_REMOVE,
78   SIGNAL_CLEAR,
79   SIGNAL_GET_STATS,
80
81   /* signals */
82   SIGNAL_CLIENT_ADDED,
83   SIGNAL_CLIENT_REMOVED,
84
85   LAST_SIGNAL
86 };
87
88 /* this is really arbitrarily chosen */
89 #define DEFAULT_PROTOCOL                 GST_TCP_PROTOCOL_TYPE_NONE
90 #define DEFAULT_MODE                     GST_FDSET_MODE_POLL
91 #define DEFAULT_BUFFERS_MAX             -1
92 #define DEFAULT_BUFFERS_SOFT_MAX        -1
93 #define DEFAULT_UNIT_TYPE               GST_UNIT_TYPE_BUFFERS
94 #define DEFAULT_UNITS_MAX               -1
95 #define DEFAULT_UNITS_SOFT_MAX          -1
96 #define DEFAULT_RECOVER_POLICY           GST_RECOVER_POLICY_NONE
97 #define DEFAULT_TIMEOUT                  0
98 #define DEFAULT_SYNC_METHOD              GST_SYNC_METHOD_NONE
99
100 enum
101 {
102   ARG_0,
103   ARG_PROTOCOL,
104   ARG_MODE,
105   ARG_BUFFERS_QUEUED,
106   ARG_BYTES_QUEUED,
107   ARG_TIME_QUEUED,
108
109   ARG_UNIT_TYPE,
110   ARG_UNITS_MAX,
111   ARG_UNITS_SOFT_MAX,
112
113   ARG_BUFFERS_MAX,
114   ARG_BUFFERS_SOFT_MAX,
115
116   ARG_RECOVER_POLICY,
117   ARG_TIMEOUT,
118   ARG_SYNC_CLIENTS,             /* deprecated */
119   ARG_SYNC_METHOD,
120   ARG_BYTES_TO_SERVE,
121   ARG_BYTES_SERVED,
122 };
123
124 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
125 static GType
126 gst_recover_policy_get_type (void)
127 {
128   static GType recover_policy_type = 0;
129   static GEnumValue recover_policy[] = {
130     {GST_RECOVER_POLICY_NONE, "GST_RECOVER_POLICY_NONE",
131         "Do not try to recover"},
132     {GST_RECOVER_POLICY_RESYNC_START, "GST_RECOVER_POLICY_RESYNC_START",
133         "Resync client to most recent buffer"},
134     {GST_RECOVER_POLICY_RESYNC_SOFT, "GST_RECOVER_POLICY_RESYNC_SOFT",
135         "Resync client to soft limit"},
136     {GST_RECOVER_POLICY_RESYNC_KEYFRAME, "GST_RECOVER_POLICY_RESYNC_KEYFRAME",
137         "Resync client to most recent keyframe"},
138     {0, NULL, NULL},
139   };
140
141   if (!recover_policy_type) {
142     recover_policy_type =
143         g_enum_register_static ("GstTCPRecoverPolicy", recover_policy);
144   }
145   return recover_policy_type;
146 }
147
148 #define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
149 static GType
150 gst_sync_method_get_type (void)
151 {
152   static GType sync_method_type = 0;
153   static GEnumValue sync_method[] = {
154     {GST_SYNC_METHOD_NONE, "GST_SYNC_METHOD_NONE",
155         "Serve new client the latest buffer"},
156     {GST_SYNC_METHOD_WAIT, "GST_SYNC_METHOD_WAIT",
157         "Make the new client wait for the next keyframe"},
158     {GST_SYNC_METHOD_BURST, "GST_SYNC_METHOD_BURST",
159         "Serve the new client the last keyframe, aka burst"},
160     {0, NULL, NULL},
161   };
162
163   if (!sync_method_type) {
164     sync_method_type = g_enum_register_static ("GstTCPSyncMethod", sync_method);
165   }
166   return sync_method_type;
167 }
168
169 #if NOT_IMPLEMENTED
170 #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
171 static GType
172 gst_unit_type_get_type (void)
173 {
174   static GType unit_type_type = 0;
175   static GEnumValue unit_type[] = {
176     {GST_UNIT_TYPE_BUFFERS, "GST_UNIT_TYPE_BUFFERS", "Buffers"},
177     {GST_UNIT_TYPE_BYTES, "GST_UNIT_TYPE_BYTES", "Bytes"},
178     {GST_UNIT_TYPE_TIME, "GST_UNIT_TYPE_TIME", "Time"},
179     {0, NULL, NULL},
180   };
181
182   if (!unit_type_type) {
183     unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
184   }
185   return unit_type_type;
186 }
187 #endif
188
189 #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
190 static GType
191 gst_client_status_get_type (void)
192 {
193   static GType client_status_type = 0;
194   static GEnumValue client_status[] = {
195     {GST_CLIENT_STATUS_OK, "GST_CLIENT_STATUS_OK", "OK"},
196     {GST_CLIENT_STATUS_CLOSED, "GST_CLIENT_STATUS_CLOSED", "Closed"},
197     {GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
198     {GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
199     {GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
200     {GST_CLIENT_STATUS_DUPLICATE, "GST_CLIENT_STATUS_DUPLICATE", "Duplicate"},
201     {0, NULL, NULL},
202   };
203
204   if (!client_status_type) {
205     client_status_type =
206         g_enum_register_static ("GstTCPClientStatus", client_status);
207   }
208   return client_status_type;
209 }
210
211 static void gst_multifdsink_base_init (gpointer g_class);
212 static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
213 static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
214
215 static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
216     GList * link);
217
218 static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
219 static GstElementStateReturn gst_multifdsink_change_state (GstElement *
220     element);
221
222 static void gst_multifdsink_set_property (GObject * object, guint prop_id,
223     const GValue * value, GParamSpec * pspec);
224 static void gst_multifdsink_get_property (GObject * object, guint prop_id,
225     GValue * value, GParamSpec * pspec);
226
227
228 static GstElementClass *parent_class = NULL;
229
230 static guint gst_multifdsink_signals[LAST_SIGNAL] = { 0 };
231
232 GType
233 gst_multifdsink_get_type (void)
234 {
235   static GType multifdsink_type = 0;
236
237
238   if (!multifdsink_type) {
239     static const GTypeInfo multifdsink_info = {
240       sizeof (GstMultiFdSinkClass),
241       gst_multifdsink_base_init,
242       NULL,
243       (GClassInitFunc) gst_multifdsink_class_init,
244       NULL,
245       NULL,
246       sizeof (GstMultiFdSink),
247       0,
248       (GInstanceInitFunc) gst_multifdsink_init,
249       NULL
250     };
251
252     multifdsink_type =
253         g_type_register_static (GST_TYPE_ELEMENT, "GstMultiFdSink",
254         &multifdsink_info, 0);
255   }
256   return multifdsink_type;
257 }
258
259 static void
260 gst_multifdsink_base_init (gpointer g_class)
261 {
262   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
263
264   gst_element_class_set_details (element_class, &gst_multifdsink_details);
265 }
266
267 static void
268 gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
269 {
270   GObjectClass *gobject_class;
271   GstElementClass *gstelement_class;
272
273   gobject_class = (GObjectClass *) klass;
274   gstelement_class = (GstElementClass *) klass;
275
276   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
277
278   g_object_class_install_property (gobject_class, ARG_PROTOCOL,
279       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
280           GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
281   g_object_class_install_property (gobject_class, ARG_MODE,
282       g_param_spec_enum ("mode", "Mode",
283           "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
284           DEFAULT_MODE, G_PARAM_READWRITE));
285
286   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
287       g_param_spec_int ("buffers-max", "Buffers max",
288           "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
289           DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
290   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
291       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
292           "Recover client when going over this limit (-1 = no limit)", -1,
293           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
294
295 #if NOT_IMPLEMENTED
296   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
297       g_param_spec_enum ("unit-type", "Units type",
298           "The unit to measure the max/soft-max/queued properties",
299           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
300   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
301       g_param_spec_int ("units-max", "Units max",
302           "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
303           DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
304   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
305       g_param_spec_int ("units-soft-max", "Units soft max",
306           "Recover client when going over this limit (-1 = no limit)", -1,
307           G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
308 #endif
309
310   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
311       g_param_spec_uint ("buffers-queued", "Buffers queued",
312           "Number of buffers currently queued", 0, G_MAXUINT, 0,
313           G_PARAM_READABLE));
314 #if NOT_IMPLEMENTED
315   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
316       g_param_spec_uint ("bytes-queued", "Bytes queued",
317           "Number of bytes currently queued", 0, G_MAXUINT, 0,
318           G_PARAM_READABLE));
319   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
320       g_param_spec_uint64 ("time-queued", "Time queued",
321           "Number of time currently queued", 0, G_MAXUINT64, 0,
322           G_PARAM_READABLE));
323 #endif
324
325   g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
326       g_param_spec_enum ("recover-policy", "Recover Policy",
327           "How to recover when client reaches the soft max",
328           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
329   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
330       g_param_spec_uint64 ("timeout", "Timeout",
331           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
332           0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
333   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_CLIENTS,
334       g_param_spec_boolean ("sync-clients", "Sync clients",
335           "(DEPRECATED) Sync clients to a keyframe",
336           DEFAULT_SYNC_METHOD == GST_SYNC_METHOD_WAIT, G_PARAM_READWRITE));
337   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
338       g_param_spec_enum ("sync-method", "Sync Method",
339           "How to sync new clients to the stream",
340           GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
341   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
342       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
343           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
344           G_PARAM_READABLE));
345   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
346       g_param_spec_uint64 ("bytes-served", "Bytes served",
347           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
348           G_PARAM_READABLE));
349
350   gst_multifdsink_signals[SIGNAL_ADD] =
351       g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
352       G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
353       NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
354   gst_multifdsink_signals[SIGNAL_REMOVE] =
355       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
356       G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
357       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
358   gst_multifdsink_signals[SIGNAL_CLEAR] =
359       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
360       G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
361       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
362   gst_multifdsink_signals[SIGNAL_GET_STATS] =
363       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
364       G_STRUCT_OFFSET (GstMultiFdSinkClass, get_stats),
365       NULL, NULL, gst_tcp_marshal_BOXED__INT, G_TYPE_VALUE_ARRAY, 1,
366       G_TYPE_INT);
367
368   gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
369       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
370       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
371       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
372   gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] =
373       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
374       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
375           client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
376       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
377
378   gobject_class->set_property = gst_multifdsink_set_property;
379   gobject_class->get_property = gst_multifdsink_get_property;
380
381   gstelement_class->change_state = gst_multifdsink_change_state;
382
383   klass->add = gst_multifdsink_add;
384   klass->remove = gst_multifdsink_remove;
385   klass->clear = gst_multifdsink_clear;
386   klass->get_stats = gst_multifdsink_get_stats;
387
388   GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
389 }
390
391 static void
392 gst_multifdsink_init (GstMultiFdSink * this)
393 {
394   /* create the sink pad */
395   this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
396   gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
397   gst_pad_set_chain_function (this->sinkpad, gst_multifdsink_chain);
398
399   GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
400
401   this->protocol = DEFAULT_PROTOCOL;
402   this->mode = DEFAULT_MODE;
403
404   this->clientslock = g_mutex_new ();
405   this->clients = NULL;
406   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
407
408   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
409   this->unit_type = DEFAULT_UNIT_TYPE;
410   this->units_max = DEFAULT_UNITS_MAX;
411   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
412   this->recover_policy = DEFAULT_RECOVER_POLICY;
413
414   this->timeout = DEFAULT_TIMEOUT;
415   this->sync_method = DEFAULT_SYNC_METHOD;
416 }
417
418 void
419 gst_multifdsink_add (GstMultiFdSink * sink, int fd)
420 {
421   GstTCPClient *client;
422   GList *clink;
423   GTimeVal now;
424   gint flags, res;
425   struct stat statbuf;
426
427   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd);
428
429   /* create client datastructure */
430   client = g_new0 (GstTCPClient, 1);
431   client->fd.fd = fd;
432   client->status = GST_CLIENT_STATUS_OK;
433   client->bufpos = -1;
434   client->bufoffset = 0;
435   client->sending = NULL;
436   client->bytes_sent = 0;
437   client->dropped_buffers = 0;
438   client->avg_queue_size = 0;
439   client->new_connection = TRUE;
440
441   /* update start time */
442   g_get_current_time (&now);
443   client->connect_time = GST_TIMEVAL_TO_TIME (now);
444   client->disconnect_time = 0;
445   /* send last activity time to connect time */
446   client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
447
448   g_mutex_lock (sink->clientslock);
449
450   /* check the hash to find a duplicate fd */
451   clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
452   if (clink != NULL) {
453     client->status = GST_CLIENT_STATUS_DUPLICATE;
454     g_mutex_unlock (sink->clientslock);
455     GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
456     g_signal_emit (G_OBJECT (sink),
457         gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
458     g_free (client);
459     return;
460   }
461
462   /* we can add the fd now */
463   clink = sink->clients = g_list_prepend (sink->clients, client);
464   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
465
466   /* set the socket to non blocking */
467   res = fcntl (fd, F_SETFL, O_NONBLOCK);
468   /* we always read from a client */
469   gst_fdset_add_fd (sink->fdset, &client->fd);
470
471   /* we don't try to read from write only fds */
472   flags = fcntl (fd, F_GETFL, 0);
473   if ((flags & O_ACCMODE) != O_WRONLY) {
474     gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
475   }
476   /* figure out the mode, can't use send() for non sockets */
477   res = fstat (fd, &statbuf);
478   if (S_ISSOCK (statbuf.st_mode)) {
479     client->is_socket = TRUE;
480   }
481
482   SEND_COMMAND (sink, CONTROL_RESTART);
483
484   g_mutex_unlock (sink->clientslock);
485
486   g_signal_emit (G_OBJECT (sink),
487       gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
488 }
489
490 void
491 gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
492 {
493   GList *clink;
494
495   GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
496
497   g_mutex_lock (sink->clientslock);
498   clink = g_hash_table_lookup (sink->fd_hash, &fd);
499   if (clink != NULL) {
500     GstTCPClient *client = (GstTCPClient *) clink->data;
501
502     client->status = GST_CLIENT_STATUS_REMOVED;
503     gst_multifdsink_remove_client_link (sink, clink);
504     SEND_COMMAND (sink, CONTROL_RESTART);
505   } else {
506     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
507   }
508   g_mutex_unlock (sink->clientslock);
509 }
510
511 void
512 gst_multifdsink_clear (GstMultiFdSink * sink)
513 {
514   GList *clients, *next;
515
516   GST_DEBUG_OBJECT (sink, "clearing all clients");
517
518   g_mutex_lock (sink->clientslock);
519   for (clients = sink->clients; clients; clients = next) {
520     GstTCPClient *client;
521
522     client = (GstTCPClient *) clients->data;
523     next = g_list_next (clients);
524
525     client->status = GST_CLIENT_STATUS_REMOVED;
526     gst_multifdsink_remove_client_link (sink, clients);
527   }
528   SEND_COMMAND (sink, CONTROL_RESTART);
529   g_mutex_unlock (sink->clientslock);
530 }
531
532 GValueArray *
533 gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
534 {
535   GstTCPClient *client;
536   GValueArray *result = NULL;
537   GList *clink;
538
539   g_mutex_lock (sink->clientslock);
540   clink = g_hash_table_lookup (sink->fd_hash, &fd);
541   client = (GstTCPClient *) clink->data;
542   if (client != NULL) {
543     GValue value = { 0 };
544     guint64 interval;
545
546     result = g_value_array_new (4);
547
548     g_value_init (&value, G_TYPE_UINT64);
549     g_value_set_uint64 (&value, client->bytes_sent);
550     result = g_value_array_append (result, &value);
551     g_value_unset (&value);
552     g_value_init (&value, G_TYPE_UINT64);
553     g_value_set_uint64 (&value, client->connect_time);
554     result = g_value_array_append (result, &value);
555     g_value_unset (&value);
556     if (client->disconnect_time == 0) {
557       GTimeVal nowtv;
558
559       g_get_current_time (&nowtv);
560
561       interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
562     } else {
563       interval = client->disconnect_time - client->connect_time;
564     }
565     g_value_init (&value, G_TYPE_UINT64);
566     g_value_set_uint64 (&value, client->disconnect_time);
567     result = g_value_array_append (result, &value);
568     g_value_unset (&value);
569     g_value_init (&value, G_TYPE_UINT64);
570     g_value_set_uint64 (&value, interval);
571     result = g_value_array_append (result, &value);
572     g_value_unset (&value);
573     g_value_init (&value, G_TYPE_UINT64);
574     g_value_set_uint64 (&value, client->last_activity_time);
575     result = g_value_array_append (result, &value);
576   }
577   g_mutex_unlock (sink->clientslock);
578
579   /* python doesn't like a NULL pointer yet */
580   if (result == NULL) {
581     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
582     result = g_value_array_new (0);
583   }
584
585   return result;
586 }
587
588 /* should be called with the clientslock held.
589  * Note that we don't close the fd as we didn't open it in the first
590  * place. An application should connect to the client-removed signal and
591  * close the fd itself.
592  */
593 static void
594 gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
595 {
596   int fd;
597   GTimeVal now;
598   GstTCPClient *client = (GstTCPClient *) link->data;
599   GstMultiFdSinkClass *fclass;
600
601   fclass = GST_MULTIFDSINK_GET_CLASS (sink);
602
603   fd = client->fd.fd;
604
605   /* FIXME: if we keep track of ip we can log it here and signal */
606   switch (client->status) {
607     case GST_CLIENT_STATUS_OK:
608       GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason",
609           fd, client);
610       break;
611     case GST_CLIENT_STATUS_CLOSED:
612       GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close",
613           fd, client);
614       break;
615     case GST_CLIENT_STATUS_REMOVED:
616       GST_DEBUG_OBJECT (sink,
617           "[fd %5d] removing client %p because the app removed it", fd, client);
618       break;
619     case GST_CLIENT_STATUS_SLOW:
620       GST_INFO_OBJECT (sink,
621           "[fd %5d] removing client %p because it was too slow", fd, client);
622       break;
623     case GST_CLIENT_STATUS_ERROR:
624       GST_WARNING_OBJECT (sink,
625           "[fd %5d] removing client %p because of error", fd, client);
626       break;
627     default:
628       GST_WARNING_OBJECT (sink,
629           "[fd %5d] removing client %p with invalid reason", fd, client);
630       break;
631   }
632
633   gst_fdset_remove_fd (sink->fdset, &client->fd);
634
635   g_get_current_time (&now);
636   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
637
638   /* free client buffers */
639   g_slist_foreach (client->sending, (GFunc) gst_data_unref, NULL);
640   g_slist_free (client->sending);
641   client->sending = NULL;
642
643   /* unlock the mutex before signaling because the signal handler
644    * might query some properties */
645   g_mutex_unlock (sink->clientslock);
646
647   g_signal_emit (G_OBJECT (sink),
648       gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
649
650   /* lock again before we remove the client completely */
651   g_mutex_lock (sink->clientslock);
652
653   if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
654     GST_WARNING_OBJECT (sink,
655         "[fd %5d] error removing client %p from hash", client->fd.fd, client);
656   }
657   sink->clients = g_list_delete_link (sink->clients, link);
658
659   if (fclass->removed)
660     fclass->removed (sink, client->fd.fd);
661
662   g_free (client);
663 }
664
665 /* handle a read on a client fd,
666  * which either indicates a close or should be ignored
667  * returns FALSE if some error occured or the client closed. */
668 static gboolean
669 gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
670     GstTCPClient * client)
671 {
672   int avail, fd;
673   gboolean ret;
674
675   fd = client->fd.fd;
676
677   if (ioctl (fd, FIONREAD, &avail) < 0) {
678     GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
679         fd, g_strerror (errno), errno);
680     client->status = GST_CLIENT_STATUS_ERROR;
681     ret = FALSE;
682     return ret;
683   }
684
685   GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
686       fd, avail);
687
688   ret = TRUE;
689
690   if (avail == 0) {
691     /* client sent close, so remove it */
692     GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
693     client->status = GST_CLIENT_STATUS_CLOSED;
694     ret = FALSE;
695   } else if (avail < 0) {
696     GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
697     client->status = GST_CLIENT_STATUS_ERROR;
698     ret = FALSE;
699   } else {
700     guint8 dummy[512];
701     gint nread;
702
703     /* just Read 'n' Drop, could also just drop the client as it's not supposed
704      * to write to us except for closing the socket, I guess it's because we
705      * like to listen to our customers. */
706     do {
707       /* this is the maximum we can read */
708       gint to_read = MIN (avail, 512);
709
710       GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
711           fd, to_read);
712
713       nread = read (fd, dummy, to_read);
714       if (nread < -1) {
715         GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
716             fd, to_read, g_strerror (errno), errno);
717         client->status = GST_CLIENT_STATUS_ERROR;
718         ret = FALSE;
719         break;
720       } else if (nread == 0) {
721         GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
722         client->status = GST_CLIENT_STATUS_ERROR;
723         ret = FALSE;
724         break;
725       }
726       avail -= nread;
727     }
728     while (avail > 0);
729   }
730   return ret;
731 }
732
733 static gboolean
734 gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client,
735     gchar * data, gint len)
736 {
737   GstBuffer *buf;
738
739   buf = gst_buffer_new ();
740   GST_BUFFER_DATA (buf) = data;
741   GST_BUFFER_SIZE (buf) = len;
742
743   GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d",
744       client->fd.fd, len);
745
746   client->sending = g_slist_append (client->sending, buf);
747
748   return TRUE;
749 }
750
751 static gboolean
752 gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client,
753     const GstCaps * caps)
754 {
755   guint8 *header;
756   guint8 *payload;
757   guint length;
758   gchar *string;
759
760   string = gst_caps_to_string (caps);
761   GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP",
762       client->fd.fd, string);
763   g_free (string);
764
765   if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
766     GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
767     return FALSE;
768   }
769   gst_multifdsink_client_queue_data (sink, client, header, length);
770
771   length = gst_dp_header_payload_length (header);
772   gst_multifdsink_client_queue_data (sink, client, payload, length);
773
774   return TRUE;
775 }
776
777 static gboolean
778 is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
779 {
780   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
781     return FALSE;
782   } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
783     return TRUE;
784   }
785   return FALSE;
786 }
787
788 static gboolean
789 gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
790     GstTCPClient * client, GstBuffer * buffer)
791 {
792   if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
793     guint8 *header;
794     guint len;
795
796     if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
797       GST_DEBUG_OBJECT (sink,
798           "[fd %5d] could not create header, removing client", client->fd.fd);
799       return FALSE;
800     }
801     gst_multifdsink_client_queue_data (sink, client, header, len);
802   }
803
804   GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d",
805       client->fd.fd, GST_BUFFER_SIZE (buffer));
806
807   gst_buffer_ref (buffer);
808   client->sending = g_slist_append (client->sending, buffer);
809
810   return TRUE;
811 }
812
813 static gint
814 gst_multifdsink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
815 {
816   gint result;
817
818   switch (sink->sync_method) {
819     case GST_SYNC_METHOD_WAIT:
820     {
821       /* if the buffer at the head of the queue is a sync point we can proceed,
822        * else we need to skip the buffer and wait for a new one */
823       GST_LOG_OBJECT (sink,
824           "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
825           client->bufpos);
826
827       /* the client is not yet alligned to a buffer */
828       if (client->bufpos < 0) {
829         result = -1;
830       } else {
831         GstBuffer *buf;
832         gint i;
833
834         for (i = client->bufpos; i >= 0; i--) {
835           /* get the buffer for the client */
836           buf = g_array_index (sink->bufqueue, GstBuffer *, i);
837           if (is_sync_frame (sink, buf)) {
838             GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync",
839                 client->fd.fd);
840             result = i;
841             goto done;
842           } else {
843             /* client is not on a buffer, need to skip this buffer and
844              * wait some more */
845             GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
846                 client->fd.fd);
847             client->bufpos--;
848           }
849         }
850         result = -1;
851       }
852       break;
853     }
854     case GST_SYNC_METHOD_BURST:
855     {
856       /* FIXME for new clients we constantly scan the complete
857        * buffer queue for sync point whenever a buffer is added. This is
858        * suboptimal because if we cannot find a sync point the first time,
859        * the algorithm should behave as GST_SYNC_METHOD_WAIT */
860       gint i, len;
861
862       GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe",
863           client->fd.fd, client->bufpos);
864
865       /* take length of queued buffers */
866       len = sink->bufqueue->len;
867       /* assume we don't find a keyframe */
868       result = -1;
869       /* then loop over all buffers to find the first keyframe */
870       for (i = 0; i < len; i++) {
871         GstBuffer *buf;
872
873         buf = g_array_index (sink->bufqueue, GstBuffer *, i);
874         if (is_sync_frame (sink, buf)) {
875           /* found a keyframe, return its position */
876           GST_LOG_OBJECT (sink, "found keyframe at %d", i);
877           result = i;
878           goto done;
879         }
880       }
881       GST_LOG_OBJECT (sink, "no keyframe found");
882       /* throw client to the waiting state */
883       client->bufpos = -1;
884       break;
885     }
886     default:
887       /* no syncing, we are happy with whatever the client is going to get */
888       GST_LOG_OBJECT (sink, "no client syn needed");
889       result = client->bufpos;
890       break;
891   }
892 done:
893   return result;
894 }
895
896 /* handle a write on a client,
897  * which indicates a read request from a client.
898  *
899  * The strategy is as follows, for each client we maintain a queue of GstBuffers
900  * that contain the raw bytes we need to send to the client. In the case of the
901  * GDP protocol, we create buffers out of the header bytes so that we can only
902  * focus on sending buffers.
903  *
904  * We first check to see if we need to send caps (in GDP) and streamheaders.
905  * If so, we queue them.
906  *
907  * Then we run into the main loop that tries to send as many buffers as
908  * possible. It will first exhaust the client->sending queue and if the queue
909  * is empty, it will pick a buffer from the global queue.
910  *
911  * Sending the Buffers from the client->sending queue is basically writing
912  * the bytes to the socket and maintaining a count of the bytes that were
913  * sent. When the buffer is completely sent, it is removed from the
914  * client->sending queue and we try to pick a new buffer for sending.
915  *
916  * When the sending returns a partial buffer we stop sending more data as
917  * the next send operation could block.
918  *
919  * This functions returns FALSE if some error occured.
920  */
921 static gboolean
922 gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
923     GstTCPClient * client)
924 {
925   int fd = client->fd.fd;
926   gboolean more;
927   gboolean res;
928   GstClockTime now;
929   GTimeVal nowtv;
930
931   g_get_current_time (&nowtv);
932   now = GST_TIMEVAL_TO_TIME (nowtv);
933
934   /* when using GDP, first check if we have queued caps yet */
935   if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
936     if (!client->caps_sent) {
937       const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad));
938
939       /* queue caps for sending */
940       res = gst_multifdsink_client_queue_caps (sink, client, caps);
941       if (!res) {
942         GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
943         return FALSE;
944       }
945       client->caps_sent = TRUE;
946     }
947   }
948   /* if we have streamheader buffers, and haven't sent them to this client
949    * yet, send them out one by one */
950   if (!client->streamheader_sent) {
951     GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
952         g_slist_length (sink->streamheader));
953     if (sink->streamheader) {
954       GSList *l;
955
956       for (l = sink->streamheader; l; l = l->next) {
957         /* queue stream headers for sending */
958         res =
959             gst_multifdsink_client_queue_buffer (sink, client,
960             GST_BUFFER (l->data));
961         if (!res) {
962           GST_DEBUG_OBJECT (sink,
963               "Failed queueing streamheader, removing client");
964           return FALSE;
965         }
966       }
967     }
968     client->streamheader_sent = TRUE;
969   }
970
971   more = TRUE;
972   do {
973     gint maxsize;
974
975     if (!client->sending) {
976       /* client is not working on a buffer */
977       if (client->bufpos == -1) {
978         /* client is too fast, remove from write queue until new buffer is
979          * available */
980         gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
981         return TRUE;
982       } else {
983         /* client can pick a buffer from the global queue */
984         GstBuffer *buf;
985
986         /* for new connections, we need to find a good spot in the
987          * bufqueue to start streaming from */
988         if (client->new_connection) {
989           gint position = gst_multifdsink_new_client (sink, client);
990
991           if (position >= 0) {
992             /* we got a valid spot in the queue */
993             client->new_connection = FALSE;
994             client->bufpos = position;
995           } else {
996             /* cannot send data to this client yet */
997             gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
998             return TRUE;
999           }
1000         }
1001
1002         /* grab buffer */
1003         buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
1004         client->bufpos--;
1005         GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1006             fd, client, client->bufpos);
1007
1008         /* queueing a buffer will ref it */
1009         gst_multifdsink_client_queue_buffer (sink, client, buf);
1010
1011         /* need to start from the first byte for this new buffer */
1012         client->bufoffset = 0;
1013       }
1014     }
1015
1016     /* see if we need to send something */
1017     if (client->sending) {
1018       ssize_t wrote;
1019       GstBuffer *head;
1020
1021       /* pick first buffer from list */
1022       head = GST_BUFFER (client->sending->data);
1023       maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
1024
1025       /* try to write the complete buffer */
1026 #ifdef MSG_NOSIGNAL
1027 #define FLAGS MSG_NOSIGNAL
1028 #else
1029 #define FLAGS 0
1030 #endif
1031       if (client->is_socket) {
1032         wrote =
1033             send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
1034             FLAGS);
1035       } else {
1036         wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
1037       }
1038
1039       if (wrote < 0) {
1040         /* hmm error.. */
1041         if (errno == EAGAIN) {
1042           /* nothing serious, resource was unavailable, try again later */
1043           more = FALSE;
1044         } else if (errno == ECONNRESET) {
1045           GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing",
1046               fd);
1047           client->status = GST_CLIENT_STATUS_CLOSED;
1048           return FALSE;
1049         } else {
1050           GST_WARNING_OBJECT (sink,
1051               "[fd %5d] could not write, removing client: %s (%d)", fd,
1052               g_strerror (errno), errno);
1053           client->status = GST_CLIENT_STATUS_ERROR;
1054           return FALSE;
1055         }
1056       } else {
1057         if (wrote < maxsize) {
1058           /* partial write means that the client cannot read more and we should
1059            * stop sending more */
1060           GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
1061           client->bufoffset += wrote;
1062           more = FALSE;
1063         } else {
1064           /* complete buffer was written, we can proceed to the next one */
1065           client->sending = g_slist_remove (client->sending, head);
1066           gst_buffer_unref (head);
1067           /* make sure we start from byte 0 for the next buffer */
1068           client->bufoffset = 0;
1069         }
1070         /* update stats */
1071         client->bytes_sent += wrote;
1072         client->last_activity_time = now;
1073         sink->bytes_served += wrote;
1074       }
1075     }
1076   } while (more);
1077
1078   return TRUE;
1079 }
1080
1081 /* calculate the new position for a client after recovery. This function
1082  * does not update the client position but merely returns the required
1083  * position.
1084  */
1085 static gint
1086 gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
1087 {
1088   gint newbufpos;
1089
1090   GST_WARNING_OBJECT (sink,
1091       "[fd %5d] client %p is lagging at %d, recover using policy %d",
1092       client->fd.fd, client, client->bufpos, sink->recover_policy);
1093
1094   switch (sink->recover_policy) {
1095     case GST_RECOVER_POLICY_NONE:
1096       /* do nothing, client will catch up or get kicked out when it reaches
1097        * the hard max */
1098       newbufpos = client->bufpos;
1099       break;
1100     case GST_RECOVER_POLICY_RESYNC_START:
1101       /* move to beginning of queue */
1102       newbufpos = -1;
1103       break;
1104     case GST_RECOVER_POLICY_RESYNC_SOFT:
1105       /* move to beginning of soft max */
1106       newbufpos = sink->units_soft_max;
1107       break;
1108     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1109       /* find keyframe in buffers, we search backwards to find the 
1110        * closest keyframe relative to what this client already received. */
1111       newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
1112
1113       while (newbufpos >= 0) {
1114         GstBuffer *buf;
1115
1116         buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
1117         if (is_sync_frame (sink, buf)) {
1118           /* found a buffer that is not a delta unit */
1119           break;
1120         }
1121         newbufpos--;
1122       }
1123       break;
1124     default:
1125       /* unknown recovery procedure */
1126       newbufpos = sink->units_soft_max;
1127       break;
1128   }
1129   return newbufpos;
1130 }
1131
1132 /* Queue a buffer on the global queue. 
1133  *
1134  * This functions adds the buffer to the front of a GArray. It removes the
1135  * tail buffer if the max queue size is exceeded. Unreffing the buffer that
1136  * is queued. Note that unreffing the buffer is not a problem as clients who
1137  * started writing out this buffer will still have a reference to it in the
1138  * client->sending queue.
1139  *
1140  * After adding the buffer, we update all client positions in the queue. If
1141  * a client moves over the soft max, we start the recovery procedure for this
1142  * slow client. If it goes over the hard max, it is put into the slow list
1143  * and removed.
1144  *
1145  * Special care is taken of clients that were waiting for a new buffer (they
1146  * had a position of -1) because they can proceed after adding this new buffer.
1147  * This is done by adding the client back into the write fd_set and signalling
1148  * the select thread that the fd_set changed.
1149  *
1150  */
1151 static void
1152 gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
1153 {
1154   GList *clients, *next;
1155   gint queuelen;
1156   gboolean need_signal = FALSE;
1157   gint max_buffer_usage;
1158   gint i;
1159   GTimeVal nowtv;
1160   GstClockTime now;
1161
1162   g_get_current_time (&nowtv);
1163   now = GST_TIMEVAL_TO_TIME (nowtv);
1164
1165   g_mutex_lock (sink->clientslock);
1166   /* add buffer to queue */
1167   g_array_prepend_val (sink->bufqueue, buf);
1168   queuelen = sink->bufqueue->len;
1169
1170   /* then loop over the clients and update the positions */
1171   max_buffer_usage = 0;
1172   for (clients = sink->clients; clients; clients = next) {
1173     GstTCPClient *client;
1174
1175     client = (GstTCPClient *) clients->data;
1176     next = g_list_next (clients);
1177
1178     client->bufpos++;
1179     GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1180         client->fd.fd, client, client->bufpos);
1181     /* check soft max if needed, recover client */
1182     if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
1183       gint newpos;
1184
1185       newpos = gst_multifdsink_recover_client (sink, client);
1186       if (newpos != client->bufpos) {
1187         client->bufpos = newpos;
1188         client->discont = TRUE;
1189         GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
1190             client->fd.fd, client, client->bufpos);
1191       } else {
1192         GST_INFO_OBJECT (sink,
1193             "[fd %5d] client %p not recovering position",
1194             client->fd.fd, client);
1195       }
1196     }
1197     /* check hard max and timeout, remove client */
1198     if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
1199         (sink->timeout > 0
1200             && now - client->last_activity_time > sink->timeout)) {
1201       /* remove client */
1202       GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
1203           client->fd.fd, client);
1204       /* remove the client, the fd set will be cleared and the select thread will
1205        * be signaled */
1206       client->status = GST_CLIENT_STATUS_SLOW;
1207       gst_multifdsink_remove_client_link (sink, clients);
1208       /* set client to invalid position while being removed */
1209       client->bufpos = -1;
1210       need_signal = TRUE;
1211     } else if (client->bufpos == 0 || client->new_connection) {
1212       /* can send data to this client now. need to signal the select thread that
1213        * the fd_set changed */
1214       gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
1215       need_signal = TRUE;
1216     }
1217     /* keep track of maximum buffer usage */
1218     if (client->bufpos > max_buffer_usage) {
1219       max_buffer_usage = client->bufpos;
1220     }
1221   }
1222
1223   /* now look for sync points and make sure there is at least one
1224    * sync point in the queue. We only do this if the burst mode
1225    * is enabled. */
1226   if (sink->sync_method == GST_SYNC_METHOD_BURST) {
1227     /* no point in searching beyond the queue length */
1228     gint limit = queuelen;
1229     GstBuffer *buf;
1230
1231     /* no point in searching beyond the soft-max if any. */
1232     if (sink->units_soft_max > 0) {
1233       limit = MIN (limit, sink->units_soft_max);
1234     }
1235     GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
1236         max_buffer_usage);
1237     for (i = 0; i < limit; i++) {
1238       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1239       if (is_sync_frame (sink, buf)) {
1240         /* found a sync frame, now extend the buffer usage to
1241          * include at least this frame. */
1242         max_buffer_usage = MAX (max_buffer_usage, i);
1243         break;
1244       }
1245     }
1246     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1247   }
1248
1249   /* nobody is referencing units after max_buffer_usage so we can
1250    * remove them from the queue. We remove them in reverse order as
1251    * this is the most optimal for GArray. */
1252   for (i = queuelen - 1; i > max_buffer_usage; i--) {
1253     GstBuffer *old;
1254
1255     /* queue exceeded max size */
1256     queuelen--;
1257     old = g_array_index (sink->bufqueue, GstBuffer *, i);
1258     sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
1259
1260     /* unref tail buffer */
1261     gst_buffer_unref (old);
1262   }
1263   /* save for stats */
1264   sink->buffers_queued = max_buffer_usage;
1265   g_mutex_unlock (sink->clientslock);
1266
1267   /* and send a signal to thread if fd_set changed */
1268   if (need_signal) {
1269     SEND_COMMAND (sink, CONTROL_RESTART);
1270   }
1271 }
1272
1273 /* Handle the clients. Basically does a blocking select for one
1274  * of the client fds to become read or writable. We also have a
1275  * filedescriptor to receive commands on that we need to check.
1276  *
1277  * After going out of the select call, we read and write to all
1278  * clients that can do so. Badly behaving clients are put on a
1279  * garbage list and removed.
1280  */
1281 static void
1282 gst_multifdsink_handle_clients (GstMultiFdSink * sink)
1283 {
1284   int result;
1285   GList *clients, *next;
1286   gboolean try_again;
1287   GstMultiFdSinkClass *fclass;
1288
1289   fclass = GST_MULTIFDSINK_GET_CLASS (sink);
1290
1291   do {
1292     gboolean stop = FALSE;
1293
1294     try_again = FALSE;
1295
1296     /* check for:
1297      * - server socket input (ie, new client connections)
1298      * - client socket input (ie, clients saying goodbye)
1299      * - client socket output (ie, client reads)          */
1300     GST_LOG_OBJECT (sink, "waiting on action on fdset");
1301     result = gst_fdset_wait (sink->fdset, -1);
1302
1303     /* < 0 is an error, 0 just means a timeout happened, which is impossible */
1304     if (result < 0) {
1305       GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno),
1306           errno);
1307       if (errno == EBADF) {
1308         /* ok, so one or more of the fds is invalid. We loop over them to find
1309          * the ones that give an error to the F_GETFL fcntl. */
1310         g_mutex_lock (sink->clientslock);
1311         for (clients = sink->clients; clients; clients = next) {
1312           GstTCPClient *client;
1313           int fd;
1314           long flags;
1315           int res;
1316
1317           client = (GstTCPClient *) clients->data;
1318           next = g_list_next (clients);
1319
1320           fd = client->fd.fd;
1321
1322           res = fcntl (fd, F_GETFL, &flags);
1323           if (res == -1) {
1324             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
1325                 fd, g_strerror (errno), errno);
1326             if (errno == EBADF) {
1327               client->status = GST_CLIENT_STATUS_ERROR;
1328               gst_multifdsink_remove_client_link (sink, clients);
1329             }
1330           }
1331         }
1332         g_mutex_unlock (sink->clientslock);
1333         /* after this, go back in the select loop as the read/writefds
1334          * are not valid */
1335         try_again = TRUE;
1336       } else if (errno == EINTR) {
1337         /* interrupted system call, just redo the select */
1338         try_again = TRUE;
1339       } else {
1340         /* this is quite bad... */
1341         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
1342             ("select failed: %s (%d)", g_strerror (errno), errno));
1343         return;
1344       }
1345     } else {
1346       GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
1347       /* read all commands */
1348       if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
1349         GST_LOG_OBJECT (sink, "have a command");
1350         while (TRUE) {
1351           gchar command;
1352           int res;
1353
1354           READ_COMMAND (sink, command, res);
1355           if (res < 0) {
1356             GST_LOG_OBJECT (sink, "no more commands");
1357             /* no more commands */
1358             break;
1359           }
1360
1361           switch (command) {
1362             case CONTROL_RESTART:
1363               GST_LOG_OBJECT (sink, "restart");
1364               /* need to restart the select call as the fd_set changed */
1365               /* if other file descriptors than the READ_SOCKET had activity,
1366                * we don't restart just yet, but handle the other clients first */
1367               if (result == 1)
1368                 try_again = TRUE;
1369               break;
1370             case CONTROL_STOP:
1371               /* break out of the select loop */
1372               GST_LOG_OBJECT (sink, "stop");
1373               /* stop this function */
1374               stop = TRUE;
1375               break;
1376             default:
1377               GST_WARNING_OBJECT (sink, "unkown");
1378               g_warning ("multifdsink: unknown control message received");
1379               break;
1380           }
1381         }
1382       }
1383     }
1384     if (stop) {
1385       return;
1386     }
1387   } while (try_again);
1388
1389   /* subclasses can check fdset with this virtual function */
1390   if (fclass->wait)
1391     fclass->wait (sink, sink->fdset);
1392
1393   /* Check the clients */
1394   g_mutex_lock (sink->clientslock);
1395   for (clients = sink->clients; clients; clients = next) {
1396     GstTCPClient *client;
1397
1398     client = (GstTCPClient *) clients->data;
1399     next = g_list_next (clients);
1400
1401     if (client->status != GST_CLIENT_STATUS_OK) {
1402       gst_multifdsink_remove_client_link (sink, clients);
1403       continue;
1404     }
1405
1406     if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
1407       client->status = GST_CLIENT_STATUS_CLOSED;
1408       gst_multifdsink_remove_client_link (sink, clients);
1409       continue;
1410     }
1411     if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
1412       GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd);
1413       client->status = GST_CLIENT_STATUS_ERROR;
1414       gst_multifdsink_remove_client_link (sink, clients);
1415       continue;
1416     }
1417     if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
1418       /* handle client read */
1419       if (!gst_multifdsink_handle_client_read (sink, client)) {
1420         gst_multifdsink_remove_client_link (sink, clients);
1421         continue;
1422       }
1423     }
1424     if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
1425       /* handle client write */
1426       if (!gst_multifdsink_handle_client_write (sink, client)) {
1427         gst_multifdsink_remove_client_link (sink, clients);
1428         continue;
1429       }
1430     }
1431   }
1432   g_mutex_unlock (sink->clientslock);
1433 }
1434
1435 /* we handle the client communication in another thread so that we do not block
1436  * the gstreamer thread while we select() on the client fds */
1437 static gpointer
1438 gst_multifdsink_thread (GstMultiFdSink * sink)
1439 {
1440   while (sink->running) {
1441     gst_multifdsink_handle_clients (sink);
1442   }
1443   return NULL;
1444 }
1445
1446 static void
1447 gst_multifdsink_chain (GstPad * pad, GstData * _data)
1448 {
1449   GstBuffer *buf = GST_BUFFER (_data);
1450   GstMultiFdSink *sink;
1451
1452   g_return_if_fail (pad != NULL);
1453   g_return_if_fail (GST_IS_PAD (pad));
1454   g_return_if_fail (buf != NULL);
1455   sink = GST_MULTIFDSINK (GST_OBJECT_PARENT (pad));
1456   g_return_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN));
1457
1458   if (GST_IS_EVENT (buf)) {
1459     g_warning ("FIXME: handle events");
1460     return;
1461   }
1462
1463   GST_LOG_OBJECT (sink, "received buffer %p", buf);
1464   /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
1465    * it means we're getting new streamheader buffers, and we should clear
1466    * the old ones */
1467   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) &&
1468       sink->previous_buffer_in_caps == FALSE) {
1469     GST_DEBUG_OBJECT (sink,
1470         "receiving new IN_CAPS buffers, clearing old streamheader");
1471     g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL);
1472     g_slist_free (sink->streamheader);
1473     sink->streamheader = NULL;
1474   }
1475   /* if the incoming buffer is marked as IN CAPS, then we assume for now
1476    * it's a streamheader that needs to be sent to each new client, so we
1477    * put it on our internal list of streamheader buffers.
1478    * After that we return, since we only send these out when we get
1479    * non IN_CAPS buffers so we properly keep track of clients that got
1480    * streamheaders. */
1481   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
1482     sink->previous_buffer_in_caps = TRUE;
1483     GST_DEBUG_OBJECT (sink,
1484         "appending IN_CAPS buffer with length %d to streamheader",
1485         GST_BUFFER_SIZE (buf));
1486     sink->streamheader = g_slist_append (sink->streamheader, buf);
1487     return;
1488   }
1489
1490   sink->previous_buffer_in_caps = FALSE;
1491   /* queue the buffer */
1492   gst_multifdsink_queue_buffer (sink, buf);
1493
1494   sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
1495 }
1496
1497 static void
1498 gst_multifdsink_set_property (GObject * object, guint prop_id,
1499     const GValue * value, GParamSpec * pspec)
1500 {
1501   GstMultiFdSink *multifdsink;
1502
1503   g_return_if_fail (GST_IS_MULTIFDSINK (object));
1504   multifdsink = GST_MULTIFDSINK (object);
1505
1506   switch (prop_id) {
1507     case ARG_PROTOCOL:
1508       multifdsink->protocol = g_value_get_enum (value);
1509       break;
1510     case ARG_MODE:
1511       multifdsink->mode = g_value_get_enum (value);
1512       break;
1513     case ARG_BUFFERS_MAX:
1514       multifdsink->units_max = g_value_get_int (value);
1515       break;
1516     case ARG_BUFFERS_SOFT_MAX:
1517       multifdsink->units_soft_max = g_value_get_int (value);
1518       break;
1519     case ARG_UNIT_TYPE:
1520       multifdsink->unit_type = g_value_get_enum (value);
1521       break;
1522     case ARG_UNITS_MAX:
1523       multifdsink->units_max = g_value_get_int (value);
1524       break;
1525     case ARG_UNITS_SOFT_MAX:
1526       multifdsink->units_soft_max = g_value_get_int (value);
1527       break;
1528     case ARG_RECOVER_POLICY:
1529       multifdsink->recover_policy = g_value_get_enum (value);
1530       break;
1531     case ARG_TIMEOUT:
1532       multifdsink->timeout = g_value_get_uint64 (value);
1533       break;
1534     case ARG_SYNC_CLIENTS:
1535       if (g_value_get_boolean (value) == TRUE) {
1536         multifdsink->sync_method = GST_SYNC_METHOD_WAIT;
1537       } else {
1538         multifdsink->sync_method = GST_SYNC_METHOD_NONE;
1539       }
1540       break;
1541     case ARG_SYNC_METHOD:
1542       multifdsink->sync_method = g_value_get_enum (value);
1543       break;
1544
1545     default:
1546       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1547       break;
1548   }
1549 }
1550
1551 static void
1552 gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
1553     GParamSpec * pspec)
1554 {
1555   GstMultiFdSink *multifdsink;
1556
1557   g_return_if_fail (GST_IS_MULTIFDSINK (object));
1558   multifdsink = GST_MULTIFDSINK (object);
1559
1560   switch (prop_id) {
1561     case ARG_PROTOCOL:
1562       g_value_set_enum (value, multifdsink->protocol);
1563       break;
1564     case ARG_MODE:
1565       g_value_set_enum (value, multifdsink->mode);
1566       break;
1567     case ARG_BUFFERS_MAX:
1568       g_value_set_int (value, multifdsink->units_max);
1569       break;
1570     case ARG_BUFFERS_SOFT_MAX:
1571       g_value_set_int (value, multifdsink->units_soft_max);
1572       break;
1573     case ARG_BUFFERS_QUEUED:
1574       g_value_set_uint (value, multifdsink->buffers_queued);
1575       break;
1576     case ARG_BYTES_QUEUED:
1577       g_value_set_uint (value, multifdsink->bytes_queued);
1578       break;
1579     case ARG_TIME_QUEUED:
1580       g_value_set_uint64 (value, multifdsink->time_queued);
1581       break;
1582     case ARG_UNIT_TYPE:
1583       g_value_set_enum (value, multifdsink->unit_type);
1584       break;
1585     case ARG_UNITS_MAX:
1586       g_value_set_int (value, multifdsink->units_max);
1587       break;
1588     case ARG_UNITS_SOFT_MAX:
1589       g_value_set_int (value, multifdsink->units_soft_max);
1590       break;
1591     case ARG_RECOVER_POLICY:
1592       g_value_set_enum (value, multifdsink->recover_policy);
1593       break;
1594     case ARG_TIMEOUT:
1595       g_value_set_uint64 (value, multifdsink->timeout);
1596       break;
1597     case ARG_SYNC_CLIENTS:
1598       g_value_set_boolean (value,
1599           multifdsink->sync_method == GST_SYNC_METHOD_WAIT);
1600       break;
1601     case ARG_SYNC_METHOD:
1602       g_value_set_enum (value, multifdsink->sync_method);
1603       break;
1604     case ARG_BYTES_TO_SERVE:
1605       g_value_set_uint64 (value, multifdsink->bytes_to_serve);
1606       break;
1607     case ARG_BYTES_SERVED:
1608       g_value_set_uint64 (value, multifdsink->bytes_served);
1609       break;
1610
1611     default:
1612       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1613       break;
1614   }
1615 }
1616
1617
1618 /* create a socket for sending to remote machine */
1619 static gboolean
1620 gst_multifdsink_init_send (GstMultiFdSink * this)
1621 {
1622   GstMultiFdSinkClass *fclass;
1623   int control_socket[2];
1624
1625   fclass = GST_MULTIFDSINK_GET_CLASS (this);
1626
1627   GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
1628   this->fdset = gst_fdset_new (this->mode);
1629
1630   if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) {
1631     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
1632         GST_ERROR_SYSTEM);
1633     return FALSE;
1634   }
1635   READ_SOCKET (this).fd = control_socket[0];
1636   WRITE_SOCKET (this).fd = control_socket[1];
1637
1638   gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
1639   gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
1640
1641   fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1642   fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1643
1644   this->streamheader = NULL;
1645   this->bytes_to_serve = 0;
1646   this->bytes_served = 0;
1647
1648   if (fclass->init) {
1649     fclass->init (this);
1650   }
1651
1652   this->running = TRUE;
1653   this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread,
1654       this, TRUE, NULL);
1655
1656   return TRUE;
1657 }
1658
1659 static void
1660 gst_multifdsink_close (GstMultiFdSink * this)
1661 {
1662   GstMultiFdSinkClass *fclass;
1663
1664   fclass = GST_MULTIFDSINK_GET_CLASS (this);
1665
1666   this->running = FALSE;
1667
1668   SEND_COMMAND (this, CONTROL_STOP);
1669   if (this->thread) {
1670     g_thread_join (this->thread);
1671     this->thread = NULL;
1672   }
1673
1674   /* free the clients */
1675   gst_multifdsink_clear (this);
1676
1677   close (READ_SOCKET (this).fd);
1678   close (WRITE_SOCKET (this).fd);
1679
1680   if (this->streamheader) {
1681     g_slist_foreach (this->streamheader, (GFunc) gst_data_unref, NULL);
1682     g_slist_free (this->streamheader);
1683     this->streamheader = NULL;
1684   }
1685
1686   if (fclass->close)
1687     fclass->close (this);
1688
1689   if (this->fdset) {
1690     gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
1691     gst_fdset_free (this->fdset);
1692     this->fdset = NULL;
1693   }
1694 }
1695
1696 static GstElementStateReturn
1697 gst_multifdsink_change_state (GstElement * element)
1698 {
1699   GstMultiFdSink *sink;
1700
1701   g_return_val_if_fail (GST_IS_MULTIFDSINK (element), GST_STATE_FAILURE);
1702   sink = GST_MULTIFDSINK (element);
1703
1704   /* we disallow changing the state from the streaming thread */
1705   if (g_thread_self () == sink->thread)
1706     return GST_STATE_FAILURE;
1707
1708   switch (GST_STATE_TRANSITION (element)) {
1709     case GST_STATE_NULL_TO_READY:
1710       if (!GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
1711         if (!gst_multifdsink_init_send (sink))
1712           return GST_STATE_FAILURE;
1713         GST_FLAG_SET (sink, GST_MULTIFDSINK_OPEN);
1714       }
1715       break;
1716     case GST_STATE_READY_TO_PAUSED:
1717       break;
1718     case GST_STATE_PAUSED_TO_PLAYING:
1719       break;
1720     case GST_STATE_PLAYING_TO_PAUSED:
1721       break;
1722     case GST_STATE_PAUSED_TO_READY:
1723       break;
1724     case GST_STATE_READY_TO_NULL:
1725       if (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
1726         gst_multifdsink_close (GST_MULTIFDSINK (element));
1727         GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN);
1728       }
1729       break;
1730   }
1731
1732   if (GST_ELEMENT_CLASS (parent_class)->change_state)
1733     return GST_ELEMENT_CLASS (parent_class)->change_state (element);
1734
1735   return GST_STATE_SUCCESS;
1736 }