expand tabs
[platform/upstream/gstreamer.git] / gst / tcp / gstmultifdsink.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-multifdsink
23  * @see_also: tcpserversink
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include <gst/gst-i18n-plugin.h>
30
31 #include <sys/ioctl.h>
32 #include <unistd.h>
33 #include <fcntl.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <sys/stat.h>
37
38 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
39 #include <sys/filio.h>
40 #endif
41
42 #include "gstmultifdsink.h"
43 #include "gsttcp-marshal.h"
44
45 #define NOT_IMPLEMENTED 0
46
47 /* the select call is also performed on the control sockets, that way
48  * we can send special commands to unblock or restart the select call */
49 #define CONTROL_RESTART         'R'     /* restart the select call */
50 #define CONTROL_STOP            'S'     /* stop the select call */
51 #define CONTROL_SOCKETS(sink)   sink->control_sock
52 #define WRITE_SOCKET(sink)      sink->control_sock[1]
53 #define READ_SOCKET(sink)       sink->control_sock[0]
54
55 #define SEND_COMMAND(sink, command)             \
56 G_STMT_START {                                  \
57   unsigned char c; c = command;                 \
58   write (WRITE_SOCKET(sink).fd, &c, 1);         \
59 } G_STMT_END
60
61 #define READ_COMMAND(sink, command, res)        \
62 G_STMT_START {                                  \
63   res = read(READ_SOCKET(sink).fd, &command, 1);        \
64 } G_STMT_END
65
66 /* elementfactory information */
67 static GstElementDetails gst_multi_fd_sink_details =
68 GST_ELEMENT_DETAILS ("MultiFd sink",
69     "Sink/Network",
70     "Send data to multiple filedescriptors",
71     "Thomas Vander Stichele <thomas at apestaart dot org>, "
72     "Wim Taymans <wim@fluendo.com>");
73
74 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
75     GST_PAD_SINK,
76     GST_PAD_ALWAYS,
77     GST_STATIC_CAPS_ANY);
78
79 GST_DEBUG_CATEGORY (multifdsink_debug);
80 #define GST_CAT_DEFAULT (multifdsink_debug)
81
82 /* MultiFdSink signals and args */
83 enum
84 {
85   /* methods */
86   SIGNAL_ADD,
87   SIGNAL_REMOVE,
88   SIGNAL_CLEAR,
89   SIGNAL_GET_STATS,
90
91   /* signals */
92   SIGNAL_CLIENT_ADDED,
93   SIGNAL_CLIENT_REMOVED,
94
95   LAST_SIGNAL
96 };
97
98 /* this is really arbitrarily chosen */
99 #define DEFAULT_PROTOCOL                 GST_TCP_PROTOCOL_NONE
100 #define DEFAULT_MODE                     GST_FDSET_MODE_POLL
101 #define DEFAULT_BUFFERS_MAX             -1
102 #define DEFAULT_BUFFERS_SOFT_MAX        -1
103 #define DEFAULT_UNIT_TYPE               GST_UNIT_TYPE_BUFFERS
104 #define DEFAULT_UNITS_MAX               -1
105 #define DEFAULT_UNITS_SOFT_MAX          -1
106 #define DEFAULT_RECOVER_POLICY           GST_RECOVER_POLICY_NONE
107 #define DEFAULT_TIMEOUT                  0
108 #define DEFAULT_SYNC_METHOD              GST_SYNC_METHOD_LATEST
109
110 enum
111 {
112   ARG_0,
113   ARG_PROTOCOL,
114   ARG_MODE,
115   ARG_BUFFERS_QUEUED,
116   ARG_BYTES_QUEUED,
117   ARG_TIME_QUEUED,
118
119   ARG_UNIT_TYPE,
120   ARG_UNITS_MAX,
121   ARG_UNITS_SOFT_MAX,
122
123   ARG_BUFFERS_MAX,
124   ARG_BUFFERS_SOFT_MAX,
125
126   ARG_RECOVER_POLICY,
127   ARG_TIMEOUT,
128   ARG_SYNC_METHOD,
129   ARG_BYTES_TO_SERVE,
130   ARG_BYTES_SERVED,
131 };
132
133 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
134 static GType
135 gst_recover_policy_get_type (void)
136 {
137   static GType recover_policy_type = 0;
138   static GEnumValue recover_policy[] = {
139     {GST_RECOVER_POLICY_NONE,
140         "Do not try to recover", "none"},
141     {GST_RECOVER_POLICY_RESYNC_LATEST,
142         "Resync client to latest buffer", "latest"},
143     {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
144         "Resync client to soft limit", "soft-limit"},
145     {GST_RECOVER_POLICY_RESYNC_KEYFRAME,
146         "Resync client to most recent keyframe", "keyframe"},
147     {0, NULL, NULL},
148   };
149
150   if (!recover_policy_type) {
151     recover_policy_type =
152         g_enum_register_static ("GstRecoverPolicy", recover_policy);
153   }
154   return recover_policy_type;
155 }
156
157 #define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
158 static GType
159 gst_sync_method_get_type (void)
160 {
161   static GType sync_method_type = 0;
162   static GEnumValue sync_method[] = {
163     {GST_SYNC_METHOD_LATEST,
164         "Serve starting from the latest buffer", "latest"},
165     {GST_SYNC_METHOD_NEXT_KEYFRAME,
166         "Serve starting from the next keyframe", "next-keyframe"},
167     {GST_SYNC_METHOD_LATEST_KEYFRAME,
168           "Serve everything since the latest keyframe (burst)",
169         "latest-keyframe"},
170     {0, NULL, NULL},
171   };
172
173   if (!sync_method_type) {
174     sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method);
175   }
176   return sync_method_type;
177 }
178
179 #if NOT_IMPLEMENTED
180 #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
181 static GType
182 gst_unit_type_get_type (void)
183 {
184   static GType unit_type_type = 0;
185   static GEnumValue unit_type[] = {
186     {GST_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
187     {GST_UNIT_TYPE_BYTES, "Bytes", "bytes"},
188     {GST_UNIT_TYPE_TIME, "Time", "time"},
189     {0, NULL, NULL},
190   };
191
192   if (!unit_type_type) {
193     unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
194   }
195   return unit_type_type;
196 }
197 #endif
198
199 #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
200 static GType
201 gst_client_status_get_type (void)
202 {
203   static GType client_status_type = 0;
204   static GEnumValue client_status[] = {
205     {GST_CLIENT_STATUS_OK, "ok"},
206     {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"},
207     {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"},
208     {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
209     {GST_CLIENT_STATUS_ERROR, "Error", "error"},
210     {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
211     {0, NULL, NULL},
212   };
213
214   if (!client_status_type) {
215     client_status_type =
216         g_enum_register_static ("GstClientStatus", client_status);
217   }
218   return client_status_type;
219 }
220
221 static void gst_multi_fd_sink_finalize (GObject * object);
222
223 static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink,
224     GList * link);
225
226 static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
227     GstBuffer * buf);
228 static GstStateChangeReturn gst_multi_fd_sink_change_state (GstElement *
229     element, GstStateChange transition);
230
231 static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
232     const GValue * value, GParamSpec * pspec);
233 static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
234     GValue * value, GParamSpec * pspec);
235
236
237 GST_BOILERPLATE (GstMultiFdSink, gst_multi_fd_sink, GstBaseSink,
238     GST_TYPE_BASE_SINK);
239
240
241 static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 };
242
243
244 static void
245 gst_multi_fd_sink_base_init (gpointer g_class)
246 {
247   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
248
249   gst_element_class_add_pad_template (element_class,
250       gst_static_pad_template_get (&sinktemplate));
251
252   gst_element_class_set_details (element_class, &gst_multi_fd_sink_details);
253 }
254
255 static void
256 gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
257 {
258   GObjectClass *gobject_class;
259   GstElementClass *gstelement_class;
260   GstBaseSinkClass *gstbasesink_class;
261
262   gobject_class = (GObjectClass *) klass;
263   gstelement_class = (GstElementClass *) klass;
264   gstbasesink_class = (GstBaseSinkClass *) klass;
265
266   gobject_class->set_property = gst_multi_fd_sink_set_property;
267   gobject_class->get_property = gst_multi_fd_sink_get_property;
268   gobject_class->finalize = gst_multi_fd_sink_finalize;
269
270   g_object_class_install_property (gobject_class, ARG_PROTOCOL,
271       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
272           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
273   g_object_class_install_property (gobject_class, ARG_MODE,
274       g_param_spec_enum ("mode", "Mode",
275           "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
276           DEFAULT_MODE, G_PARAM_READWRITE));
277
278   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
279       g_param_spec_int ("buffers-max", "Buffers max",
280           "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
281           DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
282   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
283       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
284           "Recover client when going over this limit (-1 = no limit)", -1,
285           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
286
287 #if NOT_IMPLEMENTED
288   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
289       g_param_spec_enum ("unit-type", "Units type",
290           "The unit to measure the max/soft-max/queued properties",
291           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
292   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
293       g_param_spec_int ("units-max", "Units max",
294           "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
295           DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
296   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
297       g_param_spec_int ("units-soft-max", "Units soft max",
298           "Recover client when going over this limit (-1 = no limit)", -1,
299           G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
300 #endif
301
302   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
303       g_param_spec_uint ("buffers-queued", "Buffers queued",
304           "Number of buffers currently queued", 0, G_MAXUINT, 0,
305           G_PARAM_READABLE));
306 #if NOT_IMPLEMENTED
307   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
308       g_param_spec_uint ("bytes-queued", "Bytes queued",
309           "Number of bytes currently queued", 0, G_MAXUINT, 0,
310           G_PARAM_READABLE));
311   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
312       g_param_spec_uint64 ("time-queued", "Time queued",
313           "Number of time currently queued", 0, G_MAXUINT64, 0,
314           G_PARAM_READABLE));
315 #endif
316
317   g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
318       g_param_spec_enum ("recover-policy", "Recover Policy",
319           "How to recover when client reaches the soft max",
320           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
321   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
322       g_param_spec_uint64 ("timeout", "Timeout",
323           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
324           0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
325   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
326       g_param_spec_enum ("sync-method", "Sync Method",
327           "How to sync new clients to the stream",
328           GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
329   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
330       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
331           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
332           G_PARAM_READABLE));
333   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
334       g_param_spec_uint64 ("bytes-served", "Bytes served",
335           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
336           G_PARAM_READABLE));
337
338   /**
339    * GstMultiFdSink::add:
340    * @gstmultifdsink: the multifdsink element to emit this signal on
341    * @arg1:           the file descriptor to add to multifdsink
342    *
343    * Hand the given open file descriptor to multifdsink to write to.
344    */
345   gst_multi_fd_sink_signals[SIGNAL_ADD] =
346       g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
347       G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
348       NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
349   /**
350    * GstMultiFdSink::remove:
351    * @gstmultifdsink: the multifdsink element to emit this signal on
352    * @arg1:           the file descriptor to remove from multifdsink
353    *
354    * Remove the given open file descriptor from multifdsink.
355    */
356   gst_multi_fd_sink_signals[SIGNAL_REMOVE] =
357       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
358       G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
359       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
360   /**
361    * GstMultiFdSink::clear:
362    * @gstmultifdsink: the multifdsink element to emit this signal on
363    *
364    * Clear all file descriptors from multifdsink.
365    */
366   gst_multi_fd_sink_signals[SIGNAL_CLEAR] =
367       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
368       G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
369       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
370   gst_multi_fd_sink_signals[SIGNAL_GET_STATS] =
371       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
372       G_STRUCT_OFFSET (GstMultiFdSinkClass, get_stats),
373       NULL, NULL, gst_tcp_marshal_BOXED__INT, G_TYPE_VALUE_ARRAY, 1,
374       G_TYPE_INT);
375
376   /**
377    * GstMultiFdSink::client-added:
378    * @gstmultifdsink: the multifdsink element that emitted this signal
379    * @arg1:           the file descriptor that was added to multifdsink
380    *
381    * The given file descriptor was added to multifdsink.
382    */
383   gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] =
384       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
385       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
386       NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
387   /**
388    * GstMultiFdSink::client-removed:
389    * @gstmultifdsink: the multifdsink element that emitted this signal
390    * @arg1:           the file descriptor that was removed from multifdsink
391    *
392    * The given file descriptor was removed from multifdsink.
393    */
394   gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] =
395       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
396       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
397           client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
398       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
399
400   gstelement_class->change_state =
401       GST_DEBUG_FUNCPTR (gst_multi_fd_sink_change_state);
402
403   gstbasesink_class->render = gst_multi_fd_sink_render;
404
405   klass->add = gst_multi_fd_sink_add;
406   klass->remove = gst_multi_fd_sink_remove;
407   klass->clear = gst_multi_fd_sink_clear;
408   klass->get_stats = gst_multi_fd_sink_get_stats;
409
410   GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
411 }
412
413 static void
414 gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass)
415 {
416   GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN);
417
418   this->protocol = DEFAULT_PROTOCOL;
419   this->mode = DEFAULT_MODE;
420
421   CLIENTS_LOCK_INIT (this);
422   this->clients = NULL;
423   this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
424
425   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
426   this->unit_type = DEFAULT_UNIT_TYPE;
427   this->units_max = DEFAULT_UNITS_MAX;
428   this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
429   this->recover_policy = DEFAULT_RECOVER_POLICY;
430
431   this->timeout = DEFAULT_TIMEOUT;
432   this->sync_method = DEFAULT_SYNC_METHOD;
433 }
434
435 static void
436 gst_multi_fd_sink_finalize (GObject * object)
437 {
438   GstMultiFdSink *this;
439
440   this = GST_MULTI_FD_SINK (object);
441
442   CLIENTS_LOCK_FREE (this);
443   g_hash_table_destroy (this->fd_hash);
444   g_array_free (this->bufqueue, TRUE);
445
446   G_OBJECT_CLASS (parent_class)->finalize (object);
447 }
448
449 void
450 gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
451 {
452   GstTCPClient *client;
453   GList *clink;
454   GTimeVal now;
455   gint flags, res;
456   struct stat statbuf;
457
458   GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd);
459
460   /* create client datastructure */
461   client = g_new0 (GstTCPClient, 1);
462   client->fd.fd = fd;
463   client->status = GST_CLIENT_STATUS_OK;
464   client->bufpos = -1;
465   client->bufoffset = 0;
466   client->sending = NULL;
467   client->bytes_sent = 0;
468   client->dropped_buffers = 0;
469   client->avg_queue_size = 0;
470   client->new_connection = TRUE;
471
472   /* update start time */
473   g_get_current_time (&now);
474   client->connect_time = GST_TIMEVAL_TO_TIME (now);
475   client->disconnect_time = 0;
476   /* send last activity time to connect time */
477   client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
478
479   CLIENTS_LOCK (sink);
480
481   /* check the hash to find a duplicate fd */
482   clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
483   if (clink != NULL) {
484     client->status = GST_CLIENT_STATUS_DUPLICATE;
485     CLIENTS_UNLOCK (sink);
486     GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
487     g_signal_emit (G_OBJECT (sink),
488         gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
489         client->status);
490     g_free (client);
491     return;
492   }
493
494   /* we can add the fd now */
495   clink = sink->clients = g_list_prepend (sink->clients, client);
496   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
497
498   /* set the socket to non blocking */
499   res = fcntl (fd, F_SETFL, O_NONBLOCK);
500   /* we always read from a client */
501   gst_fdset_add_fd (sink->fdset, &client->fd);
502
503   /* we don't try to read from write only fds */
504   flags = fcntl (fd, F_GETFL, 0);
505   if ((flags & O_ACCMODE) != O_WRONLY) {
506     gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
507   }
508   /* figure out the mode, can't use send() for non sockets */
509   res = fstat (fd, &statbuf);
510   if (S_ISSOCK (statbuf.st_mode)) {
511     client->is_socket = TRUE;
512   }
513
514   SEND_COMMAND (sink, CONTROL_RESTART);
515
516   CLIENTS_UNLOCK (sink);
517
518   g_signal_emit (G_OBJECT (sink),
519       gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd);
520 }
521
522 void
523 gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
524 {
525   GList *clink;
526
527   GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
528
529   CLIENTS_LOCK (sink);
530   clink = g_hash_table_lookup (sink->fd_hash, &fd);
531   if (clink != NULL) {
532     GstTCPClient *client = (GstTCPClient *) clink->data;
533
534     client->status = GST_CLIENT_STATUS_REMOVED;
535     gst_multi_fd_sink_remove_client_link (sink, clink);
536     SEND_COMMAND (sink, CONTROL_RESTART);
537   } else {
538     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
539   }
540   CLIENTS_UNLOCK (sink);
541 }
542
543 void
544 gst_multi_fd_sink_clear (GstMultiFdSink * sink)
545 {
546   GList *clients, *next;
547
548   GST_DEBUG_OBJECT (sink, "clearing all clients");
549
550   CLIENTS_LOCK (sink);
551   for (clients = sink->clients; clients; clients = next) {
552     GstTCPClient *client;
553
554     client = (GstTCPClient *) clients->data;
555     next = g_list_next (clients);
556
557     client->status = GST_CLIENT_STATUS_REMOVED;
558     gst_multi_fd_sink_remove_client_link (sink, clients);
559   }
560   SEND_COMMAND (sink, CONTROL_RESTART);
561   CLIENTS_UNLOCK (sink);
562 }
563
564 GValueArray *
565 gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
566 {
567   GstTCPClient *client;
568   GValueArray *result = NULL;
569   GList *clink;
570
571   CLIENTS_LOCK (sink);
572   clink = g_hash_table_lookup (sink->fd_hash, &fd);
573   client = (GstTCPClient *) clink->data;
574   if (client != NULL) {
575     GValue value = { 0 };
576     guint64 interval;
577
578     result = g_value_array_new (4);
579
580     g_value_init (&value, G_TYPE_UINT64);
581     g_value_set_uint64 (&value, client->bytes_sent);
582     result = g_value_array_append (result, &value);
583     g_value_unset (&value);
584     g_value_init (&value, G_TYPE_UINT64);
585     g_value_set_uint64 (&value, client->connect_time);
586     result = g_value_array_append (result, &value);
587     g_value_unset (&value);
588     if (client->disconnect_time == 0) {
589       GTimeVal nowtv;
590
591       g_get_current_time (&nowtv);
592
593       interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
594     } else {
595       interval = client->disconnect_time - client->connect_time;
596     }
597     g_value_init (&value, G_TYPE_UINT64);
598     g_value_set_uint64 (&value, client->disconnect_time);
599     result = g_value_array_append (result, &value);
600     g_value_unset (&value);
601     g_value_init (&value, G_TYPE_UINT64);
602     g_value_set_uint64 (&value, interval);
603     result = g_value_array_append (result, &value);
604     g_value_unset (&value);
605     g_value_init (&value, G_TYPE_UINT64);
606     g_value_set_uint64 (&value, client->last_activity_time);
607     result = g_value_array_append (result, &value);
608   }
609   CLIENTS_UNLOCK (sink);
610
611   /* python doesn't like a NULL pointer yet */
612   if (result == NULL) {
613     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
614     result = g_value_array_new (0);
615   }
616
617   return result;
618 }
619
620 /* should be called with the clientslock held.
621  * Note that we don't close the fd as we didn't open it in the first
622  * place. An application should connect to the client-removed signal and
623  * close the fd itself.
624  */
625 static void
626 gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
627 {
628   int fd;
629   GTimeVal now;
630   GstTCPClient *client = (GstTCPClient *) link->data;
631   GstMultiFdSinkClass *fclass;
632
633   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
634
635   fd = client->fd.fd;
636
637   /* FIXME: if we keep track of ip we can log it here and signal */
638   switch (client->status) {
639     case GST_CLIENT_STATUS_OK:
640       GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason",
641           fd, client);
642       break;
643     case GST_CLIENT_STATUS_CLOSED:
644       GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close",
645           fd, client);
646       break;
647     case GST_CLIENT_STATUS_REMOVED:
648       GST_DEBUG_OBJECT (sink,
649           "[fd %5d] removing client %p because the app removed it", fd, client);
650       break;
651     case GST_CLIENT_STATUS_SLOW:
652       GST_INFO_OBJECT (sink,
653           "[fd %5d] removing client %p because it was too slow", fd, client);
654       break;
655     case GST_CLIENT_STATUS_ERROR:
656       GST_WARNING_OBJECT (sink,
657           "[fd %5d] removing client %p because of error", fd, client);
658       break;
659     default:
660       GST_WARNING_OBJECT (sink,
661           "[fd %5d] removing client %p with invalid reason", fd, client);
662       break;
663   }
664
665   gst_fdset_remove_fd (sink->fdset, &client->fd);
666
667   g_get_current_time (&now);
668   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
669
670   /* free client buffers */
671   g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
672   g_slist_free (client->sending);
673   client->sending = NULL;
674
675   /* unlock the mutex before signaling because the signal handler
676    * might query some properties */
677   CLIENTS_UNLOCK (sink);
678
679   g_signal_emit (G_OBJECT (sink),
680       gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
681
682   /* lock again before we remove the client completely */
683   CLIENTS_LOCK (sink);
684
685   if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
686     GST_WARNING_OBJECT (sink,
687         "[fd %5d] error removing client %p from hash", client->fd.fd, client);
688   }
689   /* after releasing the lock above, the link could be invalid, more
690    * precisely, the next and prev pointers could point to invalid list
691    * links. One optimisation could be to add a cookie to the linked list
692    * and take a shortcut when it did not change between unlocking and locking
693    * our mutex. For now we just walk the list again. */
694   sink->clients = g_list_remove (sink->clients, client);
695
696   if (fclass->removed)
697     fclass->removed (sink, client->fd.fd);
698
699   g_free (client);
700 }
701
702 /* handle a read on a client fd,
703  * which either indicates a close or should be ignored
704  * returns FALSE if some error occured or the client closed. */
705 static gboolean
706 gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
707     GstTCPClient * client)
708 {
709   int avail, fd;
710   gboolean ret;
711
712   fd = client->fd.fd;
713
714   if (ioctl (fd, FIONREAD, &avail) < 0) {
715     GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
716         fd, g_strerror (errno), errno);
717     client->status = GST_CLIENT_STATUS_ERROR;
718     ret = FALSE;
719     return ret;
720   }
721
722   GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
723       fd, avail);
724
725   ret = TRUE;
726
727   if (avail == 0) {
728     /* client sent close, so remove it */
729     GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
730     client->status = GST_CLIENT_STATUS_CLOSED;
731     ret = FALSE;
732   } else if (avail < 0) {
733     GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
734     client->status = GST_CLIENT_STATUS_ERROR;
735     ret = FALSE;
736   } else {
737     guint8 dummy[512];
738     gint nread;
739
740     /* just Read 'n' Drop, could also just drop the client as it's not supposed
741      * to write to us except for closing the socket, I guess it's because we
742      * like to listen to our customers. */
743     do {
744       /* this is the maximum we can read */
745       gint to_read = MIN (avail, 512);
746
747       GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
748           fd, to_read);
749
750       nread = read (fd, dummy, to_read);
751       if (nread < -1) {
752         GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
753             fd, to_read, g_strerror (errno), errno);
754         client->status = GST_CLIENT_STATUS_ERROR;
755         ret = FALSE;
756         break;
757       } else if (nread == 0) {
758         GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
759         client->status = GST_CLIENT_STATUS_ERROR;
760         ret = FALSE;
761         break;
762       }
763       avail -= nread;
764     }
765     while (avail > 0);
766   }
767   return ret;
768 }
769
770 /* Queue raw data, creating a new buffer. This takes ownership of the data by
771  * setting it as GST_BUFFER_MALLOCDATA() on the created buffer
772  */
773 static gboolean
774 gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink,
775     GstTCPClient * client, gchar * data, gint len)
776 {
777   GstBuffer *buf;
778
779   buf = gst_buffer_new ();
780   GST_BUFFER_DATA (buf) = (guint8 *) data;
781   GST_BUFFER_MALLOCDATA (buf) = (guint8 *) data;
782   GST_BUFFER_SIZE (buf) = len;
783
784   GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d",
785       client->fd.fd, len);
786
787   client->sending = g_slist_append (client->sending, buf);
788
789   return TRUE;
790 }
791
792 static gboolean
793 gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink,
794     GstTCPClient * client, const GstCaps * caps)
795 {
796   guint8 *header;
797   guint8 *payload;
798   guint length;
799   gchar *string;
800
801   g_return_val_if_fail (caps != NULL, FALSE);
802
803   string = gst_caps_to_string (caps);
804   GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP",
805       client->fd.fd, string);
806   g_free (string);
807
808   if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
809     GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
810     return FALSE;
811   }
812   gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, length);
813
814   length = gst_dp_header_payload_length (header);
815   gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) payload, length);
816
817   return TRUE;
818 }
819
820 static gboolean
821 is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
822 {
823   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
824     return FALSE;
825   } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
826     return TRUE;
827   }
828   return FALSE;
829 }
830
831 static gboolean
832 gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
833     GstTCPClient * client, GstBuffer * buffer)
834 {
835   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
836     guint8 *header;
837     guint len;
838
839     if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
840       GST_DEBUG_OBJECT (sink,
841           "[fd %5d] could not create header, removing client", client->fd.fd);
842       return FALSE;
843     }
844     gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, len);
845   }
846
847   GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d",
848       client->fd.fd, GST_BUFFER_SIZE (buffer));
849
850   gst_buffer_ref (buffer);
851   client->sending = g_slist_append (client->sending, buffer);
852
853   return TRUE;
854 }
855
856 static gint
857 gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
858 {
859   gint result;
860
861   switch (sink->sync_method) {
862     case GST_SYNC_METHOD_NEXT_KEYFRAME:
863     {
864       /* if the buffer at the head of the queue is a sync point we can proceed,
865        * else we need to skip the buffer and wait for a new one */
866       GST_LOG_OBJECT (sink,
867           "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
868           client->bufpos);
869
870       /* the client is not yet alligned to a buffer */
871       if (client->bufpos < 0) {
872         result = -1;
873       } else {
874         GstBuffer *buf;
875         gint i;
876
877         for (i = client->bufpos; i >= 0; i--) {
878           /* get the buffer for the client */
879           buf = g_array_index (sink->bufqueue, GstBuffer *, i);
880           if (is_sync_frame (sink, buf)) {
881             GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync",
882                 client->fd.fd);
883             result = i;
884             goto done;
885           } else {
886             /* client is not on a buffer, need to skip this buffer and
887              * wait some more */
888             GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer",
889                 client->fd.fd);
890             client->bufpos--;
891           }
892         }
893         result = -1;
894       }
895       break;
896     }
897     case GST_SYNC_METHOD_LATEST_KEYFRAME:
898     {
899       /* FIXME for new clients we constantly scan the complete
900        * buffer queue for sync point whenever a buffer is added. This is
901        * suboptimal because if we cannot find a sync point the first time,
902        * the algorithm should behave as GST_SYNC_METHOD_NEXT_KEYFRAME */
903       gint i, len;
904
905       GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe",
906           client->fd.fd, client->bufpos);
907
908       /* take length of queued buffers */
909       len = sink->bufqueue->len;
910       /* assume we don't find a keyframe */
911       result = -1;
912       /* then loop over all buffers to find the first keyframe */
913       for (i = 0; i < len; i++) {
914         GstBuffer *buf;
915
916         buf = g_array_index (sink->bufqueue, GstBuffer *, i);
917         if (is_sync_frame (sink, buf)) {
918           /* found a keyframe, return its position */
919           GST_LOG_OBJECT (sink, "found keyframe at %d", i);
920           result = i;
921           goto done;
922         }
923       }
924       GST_LOG_OBJECT (sink, "no keyframe found");
925       /* throw client to the waiting state */
926       client->bufpos = -1;
927       break;
928     }
929     default:
930       /* no syncing, we are happy with whatever the client is going to get */
931       GST_LOG_OBJECT (sink, "no client syn needed");
932       result = client->bufpos;
933       break;
934   }
935 done:
936   return result;
937 }
938
939 /* handle a write on a client,
940  * which indicates a read request from a client.
941  *
942  * The strategy is as follows, for each client we maintain a queue of GstBuffers
943  * that contain the raw bytes we need to send to the client. In the case of the
944  * GDP protocol, we create buffers out of the header bytes so that we can only
945  * focus on sending buffers.
946  *
947  * We first check to see if we need to send caps (in GDP) and streamheaders.
948  * If so, we queue them.
949  *
950  * Then we run into the main loop that tries to send as many buffers as
951  * possible. It will first exhaust the client->sending queue and if the queue
952  * is empty, it will pick a buffer from the global queue.
953  *
954  * Sending the Buffers from the client->sending queue is basically writing
955  * the bytes to the socket and maintaining a count of the bytes that were
956  * sent. When the buffer is completely sent, it is removed from the
957  * client->sending queue and we try to pick a new buffer for sending.
958  *
959  * When the sending returns a partial buffer we stop sending more data as
960  * the next send operation could block.
961  *
962  * This functions returns FALSE if some error occured.
963  */
964 static gboolean
965 gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
966     GstTCPClient * client)
967 {
968   int fd = client->fd.fd;
969   gboolean more;
970   gboolean res;
971   GstClockTime now;
972   GTimeVal nowtv;
973
974   g_get_current_time (&nowtv);
975   now = GST_TIMEVAL_TO_TIME (nowtv);
976
977   /* when using GDP, first check if we have queued caps yet */
978   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
979     if (!client->caps_sent) {
980       GstPad *peer;
981       GstCaps *caps;
982
983       peer = gst_pad_get_peer (GST_BASE_SINK_PAD (sink));
984       if (!peer) {
985         GST_WARNING_OBJECT (sink, "pad has no peer");
986         return FALSE;
987       }
988
989       caps = gst_pad_get_negotiated_caps (peer);
990       gst_object_unref (peer);
991
992       /* queue caps for sending */
993       res = gst_multi_fd_sink_client_queue_caps (sink, client, caps);
994
995       gst_caps_unref (caps);
996
997       if (!res) {
998         GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
999         return FALSE;
1000       }
1001       client->caps_sent = TRUE;
1002     }
1003   }
1004   /* if we have streamheader buffers, and haven't sent them to this client
1005    * yet, send them out one by one */
1006   if (!client->streamheader_sent) {
1007     GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
1008         g_slist_length (sink->streamheader));
1009     if (sink->streamheader) {
1010       GSList *l;
1011
1012       for (l = sink->streamheader; l; l = l->next) {
1013         /* queue stream headers for sending */
1014         res =
1015             gst_multi_fd_sink_client_queue_buffer (sink, client,
1016             GST_BUFFER (l->data));
1017         if (!res) {
1018           GST_DEBUG_OBJECT (sink,
1019               "Failed queueing streamheader, removing client");
1020           return FALSE;
1021         }
1022       }
1023     }
1024     client->streamheader_sent = TRUE;
1025   }
1026
1027   more = TRUE;
1028   do {
1029     gint maxsize;
1030
1031     if (!client->sending) {
1032       /* client is not working on a buffer */
1033       if (client->bufpos == -1) {
1034         /* client is too fast, remove from write queue until new buffer is
1035          * available */
1036         gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1037         return TRUE;
1038       } else {
1039         /* client can pick a buffer from the global queue */
1040         GstBuffer *buf;
1041
1042         /* for new connections, we need to find a good spot in the
1043          * bufqueue to start streaming from */
1044         if (client->new_connection) {
1045           gint position = gst_multi_fd_sink_new_client (sink, client);
1046
1047           if (position >= 0) {
1048             /* we got a valid spot in the queue */
1049             client->new_connection = FALSE;
1050             client->bufpos = position;
1051           } else {
1052             /* cannot send data to this client yet */
1053             gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
1054             return TRUE;
1055           }
1056         }
1057
1058         /* grab buffer */
1059         buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
1060         client->bufpos--;
1061         GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1062             fd, client, client->bufpos);
1063
1064         /* queueing a buffer will ref it */
1065         gst_multi_fd_sink_client_queue_buffer (sink, client, buf);
1066
1067         /* need to start from the first byte for this new buffer */
1068         client->bufoffset = 0;
1069       }
1070     }
1071
1072     /* see if we need to send something */
1073     if (client->sending) {
1074       ssize_t wrote;
1075       GstBuffer *head;
1076
1077       /* pick first buffer from list */
1078       head = GST_BUFFER (client->sending->data);
1079       maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
1080
1081       /* try to write the complete buffer */
1082 #ifdef MSG_NOSIGNAL
1083 #define FLAGS MSG_NOSIGNAL
1084 #else
1085 #define FLAGS 0
1086 #endif
1087       if (client->is_socket) {
1088         wrote =
1089             send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
1090             FLAGS);
1091       } else {
1092         wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
1093       }
1094
1095       if (wrote < 0) {
1096         /* hmm error.. */
1097         if (errno == EAGAIN) {
1098           /* nothing serious, resource was unavailable, try again later */
1099           more = FALSE;
1100         } else if (errno == ECONNRESET) {
1101           GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing",
1102               fd);
1103           client->status = GST_CLIENT_STATUS_CLOSED;
1104           return FALSE;
1105         } else {
1106           GST_WARNING_OBJECT (sink,
1107               "[fd %5d] could not write, removing client: %s (%d)", fd,
1108               g_strerror (errno), errno);
1109           client->status = GST_CLIENT_STATUS_ERROR;
1110           return FALSE;
1111         }
1112       } else {
1113         if (wrote < maxsize) {
1114           /* partial write means that the client cannot read more and we should
1115            * stop sending more */
1116           GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
1117           client->bufoffset += wrote;
1118           more = FALSE;
1119         } else {
1120           /* complete buffer was written, we can proceed to the next one */
1121           client->sending = g_slist_remove (client->sending, head);
1122           gst_buffer_unref (head);
1123           /* make sure we start from byte 0 for the next buffer */
1124           client->bufoffset = 0;
1125         }
1126         /* update stats */
1127         client->bytes_sent += wrote;
1128         client->last_activity_time = now;
1129         sink->bytes_served += wrote;
1130       }
1131     }
1132   } while (more);
1133
1134   return TRUE;
1135 }
1136
1137 /* calculate the new position for a client after recovery. This function
1138  * does not update the client position but merely returns the required
1139  * position.
1140  */
1141 static gint
1142 gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
1143 {
1144   gint newbufpos;
1145
1146   GST_WARNING_OBJECT (sink,
1147       "[fd %5d] client %p is lagging at %d, recover using policy %d",
1148       client->fd.fd, client, client->bufpos, sink->recover_policy);
1149
1150   switch (sink->recover_policy) {
1151     case GST_RECOVER_POLICY_NONE:
1152       /* do nothing, client will catch up or get kicked out when it reaches
1153        * the hard max */
1154       newbufpos = client->bufpos;
1155       break;
1156     case GST_RECOVER_POLICY_RESYNC_LATEST:
1157       /* move to beginning of queue */
1158       newbufpos = -1;
1159       break;
1160     case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
1161       /* move to beginning of soft max */
1162       newbufpos = sink->units_soft_max;
1163       break;
1164     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
1165       /* find keyframe in buffers, we search backwards to find the 
1166        * closest keyframe relative to what this client already received. */
1167       newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
1168
1169       while (newbufpos >= 0) {
1170         GstBuffer *buf;
1171
1172         buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
1173         if (is_sync_frame (sink, buf)) {
1174           /* found a buffer that is not a delta unit */
1175           break;
1176         }
1177         newbufpos--;
1178       }
1179       break;
1180     default:
1181       /* unknown recovery procedure */
1182       newbufpos = sink->units_soft_max;
1183       break;
1184   }
1185   return newbufpos;
1186 }
1187
1188 /* Queue a buffer on the global queue. 
1189  *
1190  * This functions adds the buffer to the front of a GArray. It removes the
1191  * tail buffer if the max queue size is exceeded. Unreffing the buffer that
1192  * is queued. Note that unreffing the buffer is not a problem as clients who
1193  * started writing out this buffer will still have a reference to it in the
1194  * client->sending queue.
1195  *
1196  * After adding the buffer, we update all client positions in the queue. If
1197  * a client moves over the soft max, we start the recovery procedure for this
1198  * slow client. If it goes over the hard max, it is put into the slow list
1199  * and removed.
1200  *
1201  * Special care is taken of clients that were waiting for a new buffer (they
1202  * had a position of -1) because they can proceed after adding this new buffer.
1203  * This is done by adding the client back into the write fd_set and signalling
1204  * the select thread that the fd_set changed.
1205  *
1206  */
1207 static void
1208 gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
1209 {
1210   GList *clients, *next;
1211   gint queuelen;
1212   gboolean need_signal = FALSE;
1213   gint max_buffer_usage;
1214   gint i;
1215   GTimeVal nowtv;
1216   GstClockTime now;
1217
1218   g_get_current_time (&nowtv);
1219   now = GST_TIMEVAL_TO_TIME (nowtv);
1220
1221   CLIENTS_LOCK (sink);
1222   /* add buffer to queue */
1223   g_array_prepend_val (sink->bufqueue, buf);
1224   queuelen = sink->bufqueue->len;
1225
1226   /* then loop over the clients and update the positions */
1227   max_buffer_usage = 0;
1228   for (clients = sink->clients; clients; clients = next) {
1229     GstTCPClient *client;
1230
1231     client = (GstTCPClient *) clients->data;
1232     next = g_list_next (clients);
1233
1234     client->bufpos++;
1235     GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
1236         client->fd.fd, client, client->bufpos);
1237     /* check soft max if needed, recover client */
1238     if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
1239       gint newpos;
1240
1241       newpos = gst_multi_fd_sink_recover_client (sink, client);
1242       if (newpos != client->bufpos) {
1243         client->bufpos = newpos;
1244         client->discont = TRUE;
1245         GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
1246             client->fd.fd, client, client->bufpos);
1247       } else {
1248         GST_INFO_OBJECT (sink,
1249             "[fd %5d] client %p not recovering position",
1250             client->fd.fd, client);
1251       }
1252     }
1253     /* check hard max and timeout, remove client */
1254     if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
1255         (sink->timeout > 0
1256             && now - client->last_activity_time > sink->timeout)) {
1257       /* remove client */
1258       GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
1259           client->fd.fd, client);
1260       /* remove the client, the fd set will be cleared and the select thread will
1261        * be signaled */
1262       client->status = GST_CLIENT_STATUS_SLOW;
1263       gst_multi_fd_sink_remove_client_link (sink, clients);
1264       /* set client to invalid position while being removed */
1265       client->bufpos = -1;
1266       need_signal = TRUE;
1267     } else if (client->bufpos == 0 || client->new_connection) {
1268       /* can send data to this client now. need to signal the select thread that
1269        * the fd_set changed */
1270       gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
1271       need_signal = TRUE;
1272     }
1273     /* keep track of maximum buffer usage */
1274     if (client->bufpos > max_buffer_usage) {
1275       max_buffer_usage = client->bufpos;
1276     }
1277   }
1278
1279   /* now look for sync points and make sure there is at least one
1280    * sync point in the queue. We only do this if the burst mode
1281    * is enabled. */
1282   if (sink->sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) {
1283     /* no point in searching beyond the queue length */
1284     gint limit = queuelen;
1285     GstBuffer *buf;
1286
1287     /* no point in searching beyond the soft-max if any. */
1288     if (sink->units_soft_max > 0) {
1289       limit = MIN (limit, sink->units_soft_max);
1290     }
1291     GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
1292         max_buffer_usage);
1293     for (i = 0; i < limit; i++) {
1294       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
1295       if (is_sync_frame (sink, buf)) {
1296         /* found a sync frame, now extend the buffer usage to
1297          * include at least this frame. */
1298         max_buffer_usage = MAX (max_buffer_usage, i);
1299         break;
1300       }
1301     }
1302     GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
1303   }
1304
1305   /* nobody is referencing units after max_buffer_usage so we can
1306    * remove them from the queue. We remove them in reverse order as
1307    * this is the most optimal for GArray. */
1308   for (i = queuelen - 1; i > max_buffer_usage; i--) {
1309     GstBuffer *old;
1310
1311     /* queue exceeded max size */
1312     queuelen--;
1313     old = g_array_index (sink->bufqueue, GstBuffer *, i);
1314     sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
1315
1316     /* unref tail buffer */
1317     gst_buffer_unref (old);
1318   }
1319   /* save for stats */
1320   sink->buffers_queued = max_buffer_usage;
1321   CLIENTS_UNLOCK (sink);
1322
1323   /* and send a signal to thread if fd_set changed */
1324   if (need_signal) {
1325     SEND_COMMAND (sink, CONTROL_RESTART);
1326   }
1327 }
1328
1329 /* Handle the clients. Basically does a blocking select for one
1330  * of the client fds to become read or writable. We also have a
1331  * filedescriptor to receive commands on that we need to check.
1332  *
1333  * After going out of the select call, we read and write to all
1334  * clients that can do so. Badly behaving clients are put on a
1335  * garbage list and removed.
1336  */
1337 static void
1338 gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
1339 {
1340   int result;
1341   GList *clients, *next;
1342   gboolean try_again;
1343   GstMultiFdSinkClass *fclass;
1344
1345   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
1346
1347   do {
1348     gboolean stop = FALSE;
1349
1350     try_again = FALSE;
1351
1352     /* check for:
1353      * - server socket input (ie, new client connections)
1354      * - client socket input (ie, clients saying goodbye)
1355      * - client socket output (ie, client reads)          */
1356     GST_LOG_OBJECT (sink, "waiting on action on fdset");
1357     result = gst_fdset_wait (sink->fdset, -1);
1358
1359     /* < 0 is an error, 0 just means a timeout happened, which is impossible */
1360     if (result < 0) {
1361       GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno),
1362           errno);
1363       if (errno == EBADF) {
1364         /* ok, so one or more of the fds is invalid. We loop over them to find
1365          * the ones that give an error to the F_GETFL fcntl. */
1366         CLIENTS_LOCK (sink);
1367         for (clients = sink->clients; clients; clients = next) {
1368           GstTCPClient *client;
1369           int fd;
1370           long flags;
1371           int res;
1372
1373           client = (GstTCPClient *) clients->data;
1374           next = g_list_next (clients);
1375
1376           fd = client->fd.fd;
1377
1378           res = fcntl (fd, F_GETFL, &flags);
1379           if (res == -1) {
1380             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
1381                 fd, g_strerror (errno), errno);
1382             if (errno == EBADF) {
1383               client->status = GST_CLIENT_STATUS_ERROR;
1384               gst_multi_fd_sink_remove_client_link (sink, clients);
1385             }
1386           }
1387         }
1388         CLIENTS_UNLOCK (sink);
1389         /* after this, go back in the select loop as the read/writefds
1390          * are not valid */
1391         try_again = TRUE;
1392       } else if (errno == EINTR) {
1393         /* interrupted system call, just redo the select */
1394         try_again = TRUE;
1395       } else {
1396         /* this is quite bad... */
1397         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
1398             ("select failed: %s (%d)", g_strerror (errno), errno));
1399         return;
1400       }
1401     } else {
1402       GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
1403       /* read all commands */
1404       if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
1405         GST_LOG_OBJECT (sink, "have a command");
1406         while (TRUE) {
1407           gchar command;
1408           int res;
1409
1410           READ_COMMAND (sink, command, res);
1411           if (res < 0) {
1412             GST_LOG_OBJECT (sink, "no more commands");
1413             /* no more commands */
1414             break;
1415           }
1416
1417           switch (command) {
1418             case CONTROL_RESTART:
1419               GST_LOG_OBJECT (sink, "restart");
1420               /* need to restart the select call as the fd_set changed */
1421               /* if other file descriptors than the READ_SOCKET had activity,
1422                * we don't restart just yet, but handle the other clients first */
1423               if (result == 1)
1424                 try_again = TRUE;
1425               break;
1426             case CONTROL_STOP:
1427               /* break out of the select loop */
1428               GST_LOG_OBJECT (sink, "stop");
1429               /* stop this function */
1430               stop = TRUE;
1431               break;
1432             default:
1433               GST_WARNING_OBJECT (sink, "unkown");
1434               g_warning ("multifdsink: unknown control message received");
1435               break;
1436           }
1437         }
1438       }
1439     }
1440     if (stop) {
1441       return;
1442     }
1443   } while (try_again);
1444
1445   /* subclasses can check fdset with this virtual function */
1446   if (fclass->wait)
1447     fclass->wait (sink, sink->fdset);
1448
1449   /* Check the clients */
1450   CLIENTS_LOCK (sink);
1451   for (clients = sink->clients; clients; clients = next) {
1452     GstTCPClient *client;
1453
1454     client = (GstTCPClient *) clients->data;
1455     next = g_list_next (clients);
1456
1457     if (client->status != GST_CLIENT_STATUS_OK) {
1458       gst_multi_fd_sink_remove_client_link (sink, clients);
1459       continue;
1460     }
1461
1462     if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
1463       client->status = GST_CLIENT_STATUS_CLOSED;
1464       gst_multi_fd_sink_remove_client_link (sink, clients);
1465       continue;
1466     }
1467     if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
1468       GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd);
1469       client->status = GST_CLIENT_STATUS_ERROR;
1470       gst_multi_fd_sink_remove_client_link (sink, clients);
1471       continue;
1472     }
1473     if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
1474       /* handle client read */
1475       if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
1476         gst_multi_fd_sink_remove_client_link (sink, clients);
1477         continue;
1478       }
1479     }
1480     if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
1481       /* handle client write */
1482       if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
1483         gst_multi_fd_sink_remove_client_link (sink, clients);
1484         continue;
1485       }
1486     }
1487   }
1488   CLIENTS_UNLOCK (sink);
1489 }
1490
1491 /* we handle the client communication in another thread so that we do not block
1492  * the gstreamer thread while we select() on the client fds */
1493 static gpointer
1494 gst_multi_fd_sink_thread (GstMultiFdSink * sink)
1495 {
1496   while (sink->running) {
1497     gst_multi_fd_sink_handle_clients (sink);
1498   }
1499   return NULL;
1500 }
1501
1502 static GstFlowReturn
1503 gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
1504 {
1505   GstMultiFdSink *sink;
1506
1507   sink = GST_MULTI_FD_SINK (bsink);
1508
1509   /* since we keep this buffer out of the scope of this method */
1510   gst_buffer_ref (buf);
1511
1512   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN),
1513       GST_FLOW_ERROR);
1514
1515   GST_LOG_OBJECT (sink, "received buffer %p", buf);
1516   /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
1517    * it means we're getting new streamheader buffers, and we should clear
1518    * the old ones */
1519   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
1520       sink->previous_buffer_in_caps == FALSE) {
1521     GST_DEBUG_OBJECT (sink,
1522         "receiving new IN_CAPS buffers, clearing old streamheader");
1523     g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
1524     g_slist_free (sink->streamheader);
1525     sink->streamheader = NULL;
1526   }
1527
1528   /* if the incoming buffer is marked as IN CAPS, then we assume for now
1529    * it's a streamheader that needs to be sent to each new client, so we
1530    * put it on our internal list of streamheader buffers.
1531    * After that we return, since we only send these out when we get
1532    * non IN_CAPS buffers so we properly keep track of clients that got
1533    * streamheaders. */
1534   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
1535     sink->previous_buffer_in_caps = TRUE;
1536     GST_DEBUG_OBJECT (sink,
1537         "appending IN_CAPS buffer with length %d to streamheader",
1538         GST_BUFFER_SIZE (buf));
1539     sink->streamheader = g_slist_append (sink->streamheader, buf);
1540     return GST_FLOW_OK;
1541   }
1542
1543   sink->previous_buffer_in_caps = FALSE;
1544   /* queue the buffer */
1545   gst_multi_fd_sink_queue_buffer (sink, buf);
1546
1547   sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
1548
1549   return GST_FLOW_OK;
1550 }
1551
1552 static void
1553 gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
1554     const GValue * value, GParamSpec * pspec)
1555 {
1556   GstMultiFdSink *multifdsink;
1557
1558   g_return_if_fail (GST_IS_MULTI_FD_SINK (object));
1559   multifdsink = GST_MULTI_FD_SINK (object);
1560
1561   switch (prop_id) {
1562     case ARG_PROTOCOL:
1563       multifdsink->protocol = g_value_get_enum (value);
1564       break;
1565     case ARG_MODE:
1566       multifdsink->mode = g_value_get_enum (value);
1567       break;
1568     case ARG_BUFFERS_MAX:
1569       multifdsink->units_max = g_value_get_int (value);
1570       break;
1571     case ARG_BUFFERS_SOFT_MAX:
1572       multifdsink->units_soft_max = g_value_get_int (value);
1573       break;
1574     case ARG_UNIT_TYPE:
1575       multifdsink->unit_type = g_value_get_enum (value);
1576       break;
1577     case ARG_UNITS_MAX:
1578       multifdsink->units_max = g_value_get_int (value);
1579       break;
1580     case ARG_UNITS_SOFT_MAX:
1581       multifdsink->units_soft_max = g_value_get_int (value);
1582       break;
1583     case ARG_RECOVER_POLICY:
1584       multifdsink->recover_policy = g_value_get_enum (value);
1585       break;
1586     case ARG_TIMEOUT:
1587       multifdsink->timeout = g_value_get_uint64 (value);
1588       break;
1589     case ARG_SYNC_METHOD:
1590       multifdsink->sync_method = g_value_get_enum (value);
1591       break;
1592
1593     default:
1594       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1595       break;
1596   }
1597 }
1598
1599 static void
1600 gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
1601     GParamSpec * pspec)
1602 {
1603   GstMultiFdSink *multifdsink;
1604
1605   g_return_if_fail (GST_IS_MULTI_FD_SINK (object));
1606   multifdsink = GST_MULTI_FD_SINK (object);
1607
1608   switch (prop_id) {
1609     case ARG_PROTOCOL:
1610       g_value_set_enum (value, multifdsink->protocol);
1611       break;
1612     case ARG_MODE:
1613       g_value_set_enum (value, multifdsink->mode);
1614       break;
1615     case ARG_BUFFERS_MAX:
1616       g_value_set_int (value, multifdsink->units_max);
1617       break;
1618     case ARG_BUFFERS_SOFT_MAX:
1619       g_value_set_int (value, multifdsink->units_soft_max);
1620       break;
1621     case ARG_BUFFERS_QUEUED:
1622       g_value_set_uint (value, multifdsink->buffers_queued);
1623       break;
1624     case ARG_BYTES_QUEUED:
1625       g_value_set_uint (value, multifdsink->bytes_queued);
1626       break;
1627     case ARG_TIME_QUEUED:
1628       g_value_set_uint64 (value, multifdsink->time_queued);
1629       break;
1630     case ARG_UNIT_TYPE:
1631       g_value_set_enum (value, multifdsink->unit_type);
1632       break;
1633     case ARG_UNITS_MAX:
1634       g_value_set_int (value, multifdsink->units_max);
1635       break;
1636     case ARG_UNITS_SOFT_MAX:
1637       g_value_set_int (value, multifdsink->units_soft_max);
1638       break;
1639     case ARG_RECOVER_POLICY:
1640       g_value_set_enum (value, multifdsink->recover_policy);
1641       break;
1642     case ARG_TIMEOUT:
1643       g_value_set_uint64 (value, multifdsink->timeout);
1644       break;
1645     case ARG_SYNC_METHOD:
1646       g_value_set_enum (value, multifdsink->sync_method);
1647       break;
1648     case ARG_BYTES_TO_SERVE:
1649       g_value_set_uint64 (value, multifdsink->bytes_to_serve);
1650       break;
1651     case ARG_BYTES_SERVED:
1652       g_value_set_uint64 (value, multifdsink->bytes_served);
1653       break;
1654
1655     default:
1656       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1657       break;
1658   }
1659 }
1660
1661
1662 /* create a socket for sending to remote machine */
1663 static gboolean
1664 gst_multi_fd_sink_start (GstBaseSink * bsink)
1665 {
1666   GstMultiFdSinkClass *fclass;
1667   int control_socket[2];
1668   GstMultiFdSink *this;
1669
1670   if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
1671     return TRUE;
1672
1673   this = GST_MULTI_FD_SINK (bsink);
1674   fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
1675
1676   GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
1677   this->fdset = gst_fdset_new (this->mode);
1678
1679   if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
1680     goto socket_pair;
1681
1682   READ_SOCKET (this).fd = control_socket[0];
1683   WRITE_SOCKET (this).fd = control_socket[1];
1684
1685   gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
1686   gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
1687
1688   fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1689   fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
1690
1691   this->streamheader = NULL;
1692   this->bytes_to_serve = 0;
1693   this->bytes_served = 0;
1694
1695   if (fclass->init) {
1696     fclass->init (this);
1697   }
1698
1699   this->running = TRUE;
1700   this->thread = g_thread_create ((GThreadFunc) gst_multi_fd_sink_thread,
1701       this, TRUE, NULL);
1702
1703   GST_OBJECT_FLAG_SET (this, GST_MULTI_FD_SINK_OPEN);
1704
1705   return TRUE;
1706
1707   /* ERRORS */
1708 socket_pair:
1709   {
1710     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
1711         GST_ERROR_SYSTEM);
1712     return FALSE;
1713   }
1714 }
1715
1716 static gboolean
1717 multifdsink_hash_remove (gpointer key, gpointer value, gpointer data)
1718 {
1719   return TRUE;
1720 }
1721
1722 static gboolean
1723 gst_multi_fd_sink_stop (GstBaseSink * bsink)
1724 {
1725   GstMultiFdSinkClass *fclass;
1726   GstMultiFdSink *this;
1727
1728   this = GST_MULTI_FD_SINK (bsink);
1729   fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
1730
1731   if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
1732     return TRUE;
1733
1734   this->running = FALSE;
1735
1736   SEND_COMMAND (this, CONTROL_STOP);
1737   if (this->thread) {
1738     g_thread_join (this->thread);
1739     this->thread = NULL;
1740   }
1741
1742   /* free the clients */
1743   gst_multi_fd_sink_clear (this);
1744
1745   close (READ_SOCKET (this).fd);
1746   close (WRITE_SOCKET (this).fd);
1747
1748   if (this->streamheader) {
1749     g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
1750     g_slist_free (this->streamheader);
1751     this->streamheader = NULL;
1752   }
1753
1754   if (fclass->close)
1755     fclass->close (this);
1756
1757   if (this->fdset) {
1758     gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
1759     gst_fdset_free (this->fdset);
1760     this->fdset = NULL;
1761   }
1762   g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this);
1763   GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN);
1764
1765   return TRUE;
1766 }
1767
1768 static GstStateChangeReturn
1769 gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition)
1770 {
1771   GstMultiFdSink *sink;
1772   GstStateChangeReturn ret;
1773
1774   sink = GST_MULTI_FD_SINK (element);
1775
1776   /* we disallow changing the state from the streaming thread */
1777   if (g_thread_self () == sink->thread)
1778     return GST_STATE_CHANGE_FAILURE;
1779
1780
1781   switch (transition) {
1782     case GST_STATE_CHANGE_NULL_TO_READY:
1783       if (!gst_multi_fd_sink_start (GST_BASE_SINK (sink)))
1784         goto start_failed;
1785       break;
1786     case GST_STATE_CHANGE_READY_TO_PAUSED:
1787       break;
1788     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1789       break;
1790     default:
1791       break;
1792   }
1793
1794   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1795
1796   switch (transition) {
1797     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1798       break;
1799     case GST_STATE_CHANGE_PAUSED_TO_READY:
1800       break;
1801     case GST_STATE_CHANGE_READY_TO_NULL:
1802       gst_multi_fd_sink_stop (GST_BASE_SINK (sink));
1803       break;
1804     default:
1805       break;
1806   }
1807   return ret;
1808
1809   /* ERRORS */
1810 start_failed:
1811   {
1812     return GST_STATE_CHANGE_FAILURE;
1813   }
1814 }