gst/tcp/: Starting to prepare for specifying buffer time in other units than buffers...
authorWim Taymans <wim.taymans@gmail.com>
Tue, 10 Aug 2004 15:23:19 +0000 (15:23 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 10 Aug 2004 15:23:19 +0000 (15:23 +0000)
Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type),
(gst_client_status_get_type), (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_remove_client_link),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_set_property),
(gst_multifdsink_get_property):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp-marshal.list:
Starting to prepare for specifying buffer time in other units
than buffers. Expose remove reason in signal.

ChangeLog
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gsttcp-marshal.list

index 5f5643f..4ed7f84 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,20 @@
 2004-08-10  Wim Taymans  <wim@fluendo.com>
 
+       * gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type),
+       (gst_client_status_get_type), (gst_multifdsink_class_init),
+       (gst_multifdsink_init), (gst_multifdsink_remove_client_link),
+       (gst_multifdsink_handle_client_read),
+       (gst_multifdsink_handle_client_write),
+       (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
+       (gst_multifdsink_handle_clients), (gst_multifdsink_set_property),
+       (gst_multifdsink_get_property):
+       * gst/tcp/gstmultifdsink.h:
+       * gst/tcp/gsttcp-marshal.list:
+       Starting to prepare for specifying buffer time in other units
+       than buffers. Expose remove reason in signal.
+
+2004-08-10  Wim Taymans  <wim@fluendo.com>
+
        * gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
        (gst_multifdsink_remove), (gst_multifdsink_clear),
        (gst_multifdsink_remove_client_link),
index 60eb000..20057ed 100644 (file)
@@ -81,6 +81,9 @@ enum
 #define DEFAULT_PROTOCOL                GST_TCP_PROTOCOL_TYPE_NONE
 #define DEFAULT_BUFFERS_MAX            -1
 #define DEFAULT_BUFFERS_SOFT_MAX       -1
+#define DEFAULT_UNIT_TYPE              GST_UNIT_TYPE_BUFFERS
+#define DEFAULT_UNITS_MAX              -1
+#define DEFAULT_UNITS_SOFT_MAX         -1
 #define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
 #define DEFAULT_TIMEOUT                         0
 
@@ -88,9 +91,17 @@ enum
 {
   ARG_0,
   ARG_PROTOCOL,
+  ARG_BUFFERS_QUEUED,
+  ARG_BYTES_QUEUED,
+  ARG_TIME_QUEUED,
+
+  ARG_UNIT_TYPE,
+  ARG_UNITS_MAX,
+  ARG_UNITS_SOFT_MAX,
+
   ARG_BUFFERS_MAX,
   ARG_BUFFERS_SOFT_MAX,
-  ARG_BUFFERS_QUEUED,
+
   ARG_RECOVER_POLICY,
   ARG_TIMEOUT,
   ARG_BYTES_TO_SERVE,
@@ -121,6 +132,45 @@ gst_recover_policy_get_type (void)
   return recover_policy_type;
 }
 
+#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
+static GType
+gst_unit_type_get_type (void)
+{
+  static GType unit_type_type = 0;
+  static GEnumValue unit_type[] = {
+    {GST_UNIT_TYPE_BUFFERS, "GST_UNIT_TYPE_BUFFERS", "Buffers"},
+    {GST_UNIT_TYPE_BYTES, "GST_UNIT_TYPE_BYTES", "Bytes"},
+    {GST_UNIT_TYPE_TIME, "GST_UNIT_TYPE_TIME", "Time"},
+    {0, NULL, NULL},
+  };
+
+  if (!unit_type_type) {
+    unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
+  }
+  return unit_type_type;
+}
+
+#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
+static GType
+gst_client_status_get_type (void)
+{
+  static GType client_status_type = 0;
+  static GEnumValue client_status[] = {
+    {GST_CLIENT_STATUS_OK, "GST_CLIENT_STATUS_OK", "OK"},
+    {GST_CLIENT_STATUS_CLOSED, "GST_CLIENT_STATUS_CLOSED", "Closed"},
+    {GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
+    {GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
+    {GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
+    {0, NULL, NULL},
+  };
+
+  if (!client_status_type) {
+    client_status_type =
+        g_enum_register_static ("GstTCPClientStatus", client_status);
+  }
+  return client_status_type;
+}
+
 static void gst_multifdsink_base_init (gpointer g_class);
 static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
 static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
@@ -191,6 +241,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
   g_object_class_install_property (gobject_class, ARG_PROTOCOL,
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
           GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
+
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
       g_param_spec_int ("buffers-max", "Buffers max",
           "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
@@ -199,10 +250,33 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
           "Recover client when going over this limit (-1 = no limit)", -1,
           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
+      g_param_spec_enum ("unit-type", "Units type",
+          "The unit to measure the max/soft-max/queued properties",
+          GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
+      g_param_spec_int ("units-max", "Units max",
+          "max number of units to queue (-1 = no limit)", -1, G_MAXINT,
+          DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
+      g_param_spec_int ("units-soft-max", "Units soft max",
+          "Recover client when going over this limit (-1 = no limit)", -1,
+          G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
+
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
-      g_param_spec_int ("buffers-queued", "Buffers queued",
-          "Number of buffers currently queued", 0, G_MAXINT, 0,
+      g_param_spec_uint ("buffers-queued", "Buffers queued",
+          "Number of buffers currently queued", 0, G_MAXUINT, 0,
           G_PARAM_READABLE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
+      g_param_spec_uint ("bytes-queued", "Bytes queued",
+          "Number of bytes currently queued", 0, G_MAXUINT, 0,
+          G_PARAM_READABLE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
+      g_param_spec_uint64 ("time-queued", "Time queued",
+          "Number of time currently queued", 0, G_MAXUINT64, 0,
+          G_PARAM_READABLE));
+
   g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
       g_param_spec_enum ("recover-policy", "Recover Policy",
           "How to recover when client reaches the soft max",
@@ -227,7 +301,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
   gst_multifdsink_signals[SIGNAL_REMOVE] =
       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
-      NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+      NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
   gst_multifdsink_signals[SIGNAL_CLEAR] =
       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
@@ -245,8 +319,8 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
   gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] =
       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
-          client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
-      G_TYPE_NONE, 1, G_TYPE_INT);
+          client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
+      G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
 
   gobject_class->set_property = gst_multifdsink_set_property;
   gobject_class->get_property = gst_multifdsink_get_property;
@@ -277,8 +351,9 @@ gst_multifdsink_init (GstMultiFdSink * this)
   this->clients = NULL;
 
   this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
-  this->buffers_max = DEFAULT_BUFFERS_MAX;
-  this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
+  this->unit_type = DEFAULT_UNIT_TYPE;
+  this->units_max = DEFAULT_UNITS_MAX;
+  this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
   this->recover_policy = DEFAULT_RECOVER_POLICY;
 
   this->timeout = DEFAULT_TIMEOUT;
@@ -501,7 +576,7 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
   g_mutex_unlock (sink->clientslock);
 
   g_signal_emit (G_OBJECT (sink),
-      gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd);
+      gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
 
   /* lock again before we remove the client completely */
   g_mutex_lock (sink->clientslock);
@@ -734,8 +809,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
         /* client can pick a buffer from the global queue */
         GstBuffer *buf;
 
-        /* grab buffer and ref, we need to ref since it could be unreffed in
-         * another thread */
+        /* grab buffer */
         buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
         client->bufpos--;
 
@@ -809,12 +883,10 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
 {
   gint newbufpos;
 
-  /* FIXME: implement recover procedure here, like moving the position to
-   * the next keyframe, dropping buffers back to the beginning of the queue,
-   * stuff like that... */
   GST_WARNING_OBJECT (sink,
-      "client %p with fd %d is lagging, recover using policy %d", client,
-      client->fd, sink->recover_policy);
+      "client %p with fd %d is lagging at %d, recover using policy %d", client,
+      client->fd, client->bufpos, sink->recover_policy);
+
   switch (sink->recover_policy) {
     case GST_RECOVER_POLICY_NONE:
       /* do nothing, client will catch up or get kicked out when it reaches 
@@ -827,14 +899,15 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
       break;
     case GST_RECOVER_POLICY_RESYNC_SOFT:
       /* move to beginning of soft max */
-      newbufpos = sink->buffers_soft_max;
+      newbufpos = sink->units_soft_max;
       break;
     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
       /* FIXME, find keyframe in buffers */
-      newbufpos = sink->buffers_soft_max;
+      newbufpos = sink->units_soft_max;
       break;
     default:
-      newbufpos = sink->buffers_soft_max;
+      /* unknown recovery procedure */
+      newbufpos = sink->units_soft_max;
       break;
   }
   return newbufpos;
@@ -890,7 +963,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
     GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
         client, client->fd, client->bufpos);
     /* check soft max if needed, recover client */
-    if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) {
+    if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
       gint newpos;
 
       newpos = gst_multifdsink_recover_client (sink, client);
@@ -905,7 +978,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
       }
     }
     /* check hard max and timeout, remove client */
-    if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) ||
+    if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
         (sink->timeout > 0
             && now - client->last_activity_time > sink->timeout)) {
       /* remove client */
@@ -931,7 +1004,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
       max_buffer_usage = client->bufpos;
     }
   }
-  /* nobody is referencing buffers after max_buffer_usage so we can
+  /* nobody is referencing units after max_buffer_usage so we can
    * remove them from the queue */
   for (i = queuelen - 1; i > max_buffer_usage; i--) {
     GstBuffer *old;
@@ -1171,10 +1244,19 @@ gst_multifdsink_set_property (GObject * object, guint prop_id,
       multifdsink->protocol = g_value_get_enum (value);
       break;
     case ARG_BUFFERS_MAX:
-      multifdsink->buffers_max = g_value_get_int (value);
+      multifdsink->units_max = g_value_get_int (value);
       break;
     case ARG_BUFFERS_SOFT_MAX:
-      multifdsink->buffers_soft_max = g_value_get_int (value);
+      multifdsink->units_soft_max = g_value_get_int (value);
+      break;
+    case ARG_UNIT_TYPE:
+      multifdsink->unit_type = g_value_get_enum (value);
+      break;
+    case ARG_UNITS_MAX:
+      multifdsink->units_max = g_value_get_int (value);
+      break;
+    case ARG_UNITS_SOFT_MAX:
+      multifdsink->units_soft_max = g_value_get_int (value);
       break;
     case ARG_RECOVER_POLICY:
       multifdsink->recover_policy = g_value_get_enum (value);
@@ -1203,13 +1285,28 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
       g_value_set_enum (value, multifdsink->protocol);
       break;
     case ARG_BUFFERS_MAX:
-      g_value_set_int (value, multifdsink->buffers_max);
+      g_value_set_int (value, multifdsink->units_max);
       break;
     case ARG_BUFFERS_SOFT_MAX:
-      g_value_set_int (value, multifdsink->buffers_soft_max);
+      g_value_set_int (value, multifdsink->units_soft_max);
       break;
     case ARG_BUFFERS_QUEUED:
-      g_value_set_int (value, multifdsink->buffers_queued);
+      g_value_set_uint (value, multifdsink->buffers_queued);
+      break;
+    case ARG_BYTES_QUEUED:
+      g_value_set_uint (value, multifdsink->bytes_queued);
+      break;
+    case ARG_TIME_QUEUED:
+      g_value_set_uint64 (value, multifdsink->time_queued);
+      break;
+    case ARG_UNIT_TYPE:
+      g_value_set_enum (value, multifdsink->unit_type);
+      break;
+    case ARG_UNITS_MAX:
+      g_value_set_int (value, multifdsink->units_max);
+      break;
+    case ARG_UNITS_SOFT_MAX:
+      g_value_set_int (value, multifdsink->units_soft_max);
       break;
     case ARG_RECOVER_POLICY:
       g_value_set_enum (value, multifdsink->recover_policy);
index deeae5a..03ca73b 100644 (file)
@@ -77,11 +77,18 @@ typedef enum
 
 typedef enum
 {
-  GST_CLIENT_STATUS_OK,
-  GST_CLIENT_STATUS_CLOSED,
-  GST_CLIENT_STATUS_REMOVED,
-  GST_CLIENT_STATUS_SLOW,
-  GST_CLIENT_STATUS_ERROR,
+  GST_UNIT_TYPE_BUFFERS,
+  GST_UNIT_TYPE_TIME,
+  GST_UNIT_TYPE_BYTES,
+} GstUnitType;
+
+typedef enum
+{
+  GST_CLIENT_STATUS_OK         = 0,
+  GST_CLIENT_STATUS_CLOSED     = 1,
+  GST_CLIENT_STATUS_REMOVED    = 2,
+  GST_CLIENT_STATUS_SLOW       = 3,
+  GST_CLIENT_STATUS_ERROR      = 4,
 } GstClientStatus;
 
 /* structure for a client
@@ -138,12 +145,16 @@ struct _GstMultiFdSink {
   gboolean running;    /* the thread state */
   GThread *thread;     /* the sender thread */
 
-  gint buffers_max;    /* max buffers to queue */
-  gint buffers_soft_max;       /* max buffers a client can lag before recovery starts */
+  GstUnitType unit_type;/* the type of the units */
+  gint units_max;      /* max units to queue */
+  gint units_soft_max; /* max units a client can lag before recovery starts */
   GstRecoverPolicy recover_policy;
   GstClockTime timeout;        /* max amount of nanoseconds to remain idle */
+
   /* stats */
   gint buffers_queued; /* number of queued buffers */
+  gint bytes_queued;   /* number of queued bytes */
+  gint time_queued;    /* number of queued time */
 };
 
 struct _GstMultiFdSinkClass {
index c50d52a..49f4e52 100644 (file)
@@ -1,3 +1,4 @@
 VOID:STRING,UINT
 VOID:INT
+VOID:INT,BOXED
 BOXED:INT