gst/tcp/gstmultifdsink.*: Added more stats, added timeout for a client, fixed some...
authorWim Taymans <wim.taymans@gmail.com>
Tue, 20 Jul 2004 09:55:04 +0000 (09:55 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 20 Jul 2004 09:55:04 +0000 (09:55 +0000)
Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_add),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_queue_buffer), (gst_multifdsink_chain),
(gst_multifdsink_set_property), (gst_multifdsink_get_property),
(gst_multifdsink_init_send):
* gst/tcp/gstmultifdsink.h:
Added more stats, added timeout for a client, fixed some typos
and added some comments.

ChangeLog
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h

index 380d56e..00b57af 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,19 @@
 2004-07-20  Wim Taymans  <wim@fluendo.com>
 
        * gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init),
+       (gst_multifdsink_init), (gst_multifdsink_add),
+       (gst_multifdsink_client_remove),
+       (gst_multifdsink_handle_client_write),
+       (gst_multifdsink_queue_buffer), (gst_multifdsink_chain),
+       (gst_multifdsink_set_property), (gst_multifdsink_get_property),
+       (gst_multifdsink_init_send):
+       * gst/tcp/gstmultifdsink.h:
+       Added more stats, added timeout for a client, fixed some typos
+       and added some comments.
+
+2004-07-20  Wim Taymans  <wim@fluendo.com>
+
+       * gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init),
        (gst_multifdsink_add), (gst_multifdsink_get_stats),
        (gst_multifdsink_client_remove),
        (gst_multifdsink_handle_client_write):
index 9915c74..15cf270 100644 (file)
@@ -32,6 +32,8 @@
 #include "gstmultifdsink.h"
 #include "gsttcp-marshal.h"
 
+/* the select call is also performed on the control sockets, that way
+ * we can send special commands to unblock or restart the select call */
 #define CONTROL_RESTART                'R'     /* restart the select call */
 #define CONTROL_STOP           'S'     /* stop the select call */
 #define CONTROL_SOCKETS(sink)  sink->control_sock
@@ -50,10 +52,11 @@ G_STMT_START {                                      \
 
 /* elementfactory information */
 static GstElementDetails gst_multifdsink_details =
-GST_ELEMENT_DETAILS ("TCP Server sink",
+GST_ELEMENT_DETAILS ("MultiFd sink",
     "Sink/Network",
-    "Send data as a server over the network via TCP",
-    "Thomas Vander Stichele <thomas at apestaart dot org>");
+    "Send data to multiple filedescriptors",
+    "Thomas Vander Stichele <thomas at apestaart dot org>, "
+    "Wim Taymans <wim@fluendo.com>");
 
 GST_DEBUG_CATEGORY (multifdsink_debug);
 #define GST_CAT_DEFAULT (multifdsink_debug)
@@ -66,15 +69,20 @@ enum
   SIGNAL_REMOVE,
   SIGNAL_CLEAR,
   SIGNAL_GET_STATS,
+
   /* signals */
   SIGNAL_CLIENT_ADDED,
   SIGNAL_CLIENT_REMOVED,
+
   LAST_SIGNAL
 };
 
 /* this is really arbitrary choosen */
+#define DEFAULT_PROTOCOL                GST_TCP_PROTOCOL_TYPE_NONE
 #define DEFAULT_BUFFERS_MAX            -1
 #define DEFAULT_BUFFERS_SOFT_MAX       -1
+#define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
+#define DEFAULT_TIMEOUT                         0
 
 enum
 {
@@ -84,6 +92,9 @@ enum
   ARG_BUFFERS_SOFT_MAX,
   ARG_BUFFERS_QUEUED,
   ARG_RECOVER_POLICY,
+  ARG_TIMEOUT,
+  ARG_BYTES_TO_SERVE,
+  ARG_BYTES_SERVED,
 };
 
 #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
@@ -179,8 +190,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
 
   g_object_class_install_property (gobject_class, ARG_PROTOCOL,
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
-          GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
-          G_PARAM_READWRITE));
+          GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
       g_param_spec_int ("buffers-max", "Buffers max",
           "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
@@ -191,12 +201,24 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
       g_param_spec_int ("buffers-queued", "Buffers queued",
-          "Number of buffers current queued", 0, G_MAXINT, 0,
+          "Number of buffers currently queued", 0, G_MAXINT, 0,
           G_PARAM_READABLE));
   g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
       g_param_spec_enum ("recover-policy", "Recover Policy",
           "How to recover when client reaches the soft max",
-          GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE));
+          GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
+      g_param_spec_uint64 ("timeout", "Timeout",
+          "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
+          0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READABLE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
+      g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
+          "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
+          G_PARAM_READABLE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
+      g_param_spec_uint64 ("bytes-served", "Bytes served",
+          "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
+          G_PARAM_READABLE));
 
   gst_multifdsink_signals[SIGNAL_ADD] =
       g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
@@ -250,16 +272,17 @@ gst_multifdsink_init (GstMultiFdSink * this)
 
   GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
 
-  this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
+  this->protocol = DEFAULT_PROTOCOL;
 
+  this->clientslock = g_mutex_new ();
   this->clients = NULL;
 
   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
   this->buffers_max = DEFAULT_BUFFERS_MAX;
   this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
+  this->recover_policy = DEFAULT_RECOVER_POLICY;
 
-  this->clientslock = g_mutex_new ();
-  this->recover_policy = GST_RECOVER_POLICY_NONE;
+  this->timeout = DEFAULT_TIMEOUT;
 }
 
 static void
@@ -293,6 +316,8 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
   /* update start time */
   g_get_current_time (&now);
   client->connect_time = GST_TIMEVAL_TO_TIME (now);
+  /* send last activity time to connect time */
+  client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
 
   g_mutex_lock (sink->clientslock);
 
@@ -404,8 +429,6 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
   }
   SEND_COMMAND (sink, CONTROL_RESTART);
 
-  sink->clients = g_list_remove (sink->clients, client);
-
   g_get_current_time (&now);
   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
   client->connect_interval = client->disconnect_time = client->connect_time;
@@ -413,6 +436,8 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
   g_signal_emit (G_OBJECT (sink),
       gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
 
+  sink->clients = g_list_remove (sink->clients, client);
+
   g_free (client);
 }
 
@@ -552,6 +577,11 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
   int fd = client->fd;
   gboolean more;
   gboolean res;
+  GstClockTime now;
+  GTimeVal nowtv;
+
+  g_get_current_time (&nowtv);
+  now = GST_TIMEVAL_TO_TIME (nowtv);
 
   /* when using GDP, first check if we have queued caps yet */
   if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
@@ -643,20 +673,24 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
               fd);
           return FALSE;
         }
-      } else if (wrote < maxsize) {
-        /* partial write means that the client cannot read more and we should
-         * stop sending more */
-        GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
-        client->bufoffset += wrote;
-        client->bytes_sent += wrote;
-        more = FALSE;
       } else {
-        /* complete buffer was written, we can proceed to the next one */
-        client->sending = g_list_remove (client->sending, head);
-        gst_buffer_unref (head);
-        /* make sure we start from byte 0 for the next buffer */
-        client->bufoffset = 0;
+        if (wrote < maxsize) {
+          /* partial write means that the client cannot read more and we should
+           * stop sending more */
+          GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
+          client->bufoffset += wrote;
+          more = FALSE;
+        } else {
+          /* complete buffer was written, we can proceed to the next one */
+          client->sending = g_list_remove (client->sending, head);
+          gst_buffer_unref (head);
+          /* make sure we start from byte 0 for the next buffer */
+          client->bufoffset = 0;
+        }
+        /* update stats */
         client->bytes_sent += wrote;
+        client->last_activity_time = now;
+        sink->bytes_served += wrote;
       }
     }
   } while (more);
@@ -732,6 +766,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
   gboolean need_signal = FALSE;
   gint max_buffer_usage;
   gint i;
+  GTimeVal nowtv;
+  GstClockTime now;
+
+  g_get_current_time (&nowtv);
+  now = GST_TIMEVAL_TO_TIME (nowtv);
 
   g_mutex_lock (sink->clientslock);
   /* add buffer to queue */
@@ -763,8 +802,10 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
             "client %p with fd %d not recovering position", client, client->fd);
       }
     }
-    /* check hard max, remove client */
-    if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) {
+    /* check hard max and timeout, remove client */
+    if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) ||
+        (sink->timeout > 0
+            && now - client->last_activity_time > sink->timeout)) {
       /* remove client */
       GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing",
           client, client->fd);
@@ -943,6 +984,8 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
   g_mutex_unlock (sink->clientslock);
 }
 
+/* we handle the client communication in another thread so that we do not block
+ * the gstreamer thread while we select() on the client fds */
 static gpointer
 gst_multifdsink_thread (GstMultiFdSink * sink)
 {
@@ -986,7 +1029,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
   /* queue the buffer */
   gst_multifdsink_queue_buffer (sink, buf);
 
-  sink->data_written += GST_BUFFER_SIZE (buf);
+  sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
 }
 
 static void
@@ -1011,6 +1054,9 @@ gst_multifdsink_set_property (GObject * object, guint prop_id,
     case ARG_RECOVER_POLICY:
       multifdsink->recover_policy = g_value_get_enum (value);
       break;
+    case ARG_TIMEOUT:
+      multifdsink->timeout = g_value_get_uint64 (value);
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1043,6 +1089,15 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
     case ARG_RECOVER_POLICY:
       g_value_set_enum (value, multifdsink->recover_policy);
       break;
+    case ARG_TIMEOUT:
+      g_value_set_uint64 (value, multifdsink->timeout);
+      break;
+    case ARG_BYTES_TO_SERVE:
+      g_value_set_uint64 (value, multifdsink->bytes_to_serve);
+      break;
+    case ARG_BYTES_SERVED:
+      g_value_set_uint64 (value, multifdsink->bytes_served);
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1070,7 +1125,8 @@ gst_multifdsink_init_send (GstMultiFdSink * this)
   fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK);
 
   this->streamheader = NULL;
-  this->data_written = 0;
+  this->bytes_to_serve = 0;
+  this->bytes_served = 0;
 
   if (fclass->init) {
     fclass->init (this);
index a05653d..6efa9ac 100644 (file)
@@ -96,6 +96,7 @@ typedef struct {
   guint64 connect_time;
   guint64 disconnect_time;
   guint64 connect_interval;
+  guint64 last_activity_time;
   guint64 dropped_buffers;
   guint64 avg_queue_size;
   
@@ -107,7 +108,8 @@ struct _GstMultiFdSink {
   /* pad */
   GstPad *sinkpad;
 
-  size_t data_written; /* how much bytes have we written ? */
+  guint64 bytes_to_serve; /* how much bytes we must serve */
+  guint64 bytes_served; /* how much bytes have we served */
 
   GMutex *clientslock; /* lock to protect the clients list */
   GList *clients;      /* list of clients we are serving */
@@ -127,7 +129,7 @@ struct _GstMultiFdSink {
   GThread *thread;     /* the sender thread */
 
   gint buffers_max;    /* max buffers to queue */
-  gint buffers_soft_max;       /* max buffers a client can lay before recoevery starts */
+  gint buffers_soft_max;       /* max buffers a client can lag before recovery starts */
   GstRecoverPolicy recover_policy;
   GstClockTime timeout;        /* max amount of nanoseconds to remain idle */
   /* stats */