gst/tcp/: More multifdsink fixes, more recovery policy fixes.
authorWim Taymans <wim.taymans@gmail.com>
Sun, 27 Jun 2004 11:15:23 +0000 (11:15 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Sun, 27 Jun 2004 11:15:23 +0000 (11:15 +0000)
Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_caps),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_thread),
(gst_multifdsink_init_send), (gst_multifdsink_close):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcpserversink.c:
(gst_tcpserversink_handle_server_read),
(gst_tcpserversink_handle_select), (gst_tcpserversink_close):
More multifdsink fixes, more recovery policy fixes.
Removed stupid g_print

ChangeLog
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gsttcpserversink.c

index 99f38ca..fcda8ec 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,24 @@
+2004-06-27  Wim Taymans  <wim@fluendo.com>
+
+       * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
+       (gst_multifdsink_class_init), (gst_multifdsink_add),
+       (gst_multifdsink_remove), (gst_multifdsink_clear),
+       (gst_multifdsink_client_remove),
+       (gst_multifdsink_handle_client_read),
+       (gst_multifdsink_client_queue_data),
+       (gst_multifdsink_client_queue_caps),
+       (gst_multifdsink_client_queue_buffer),
+       (gst_multifdsink_handle_client_write),
+       (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
+       (gst_multifdsink_handle_clients), (gst_multifdsink_thread),
+       (gst_multifdsink_init_send), (gst_multifdsink_close):
+       * gst/tcp/gstmultifdsink.h:
+       * gst/tcp/gsttcpserversink.c:
+       (gst_tcpserversink_handle_server_read),
+       (gst_tcpserversink_handle_select), (gst_tcpserversink_close):
+       More multifdsink fixes, more recovery policy fixes.
+       Removed stupid g_print
+
 2004-06-26  Wim Taymans  <wim@fluendo.com>
 
        * gst/tcp/Makefile.am:
index f65803a..ab4726c 100644 (file)
@@ -61,6 +61,11 @@ GST_DEBUG_CATEGORY (multifdsink_debug);
 /* MultiFdSink signals and args */
 enum
 {
+  /* methods */
+  SIGNAL_ADD,
+  SIGNAL_REMOVE,
+  SIGNAL_CLEAR,
+  /* signals */
   SIGNAL_CLIENT_ADDED,
   SIGNAL_CLIENT_REMOVED,
   LAST_SIGNAL
@@ -105,9 +110,12 @@ gst_recover_policy_get_type (void)
 }
 
 static void gst_multifdsink_base_init (gpointer g_class);
-static void gst_multifdsink_class_init (GstMultiFdSink * klass);
+static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
 static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
 
+static void gst_multifdsink_client_remove (GstMultiFdSink * sink,
+    GstTCPClient * client);
+
 static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
 static GstElementStateReturn gst_multifdsink_change_state (GstElement *
     element);
@@ -158,7 +166,7 @@ gst_multifdsink_base_init (gpointer g_class)
 }
 
 static void
-gst_multifdsink_class_init (GstMultiFdSink * klass)
+gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
@@ -189,6 +197,18 @@ gst_multifdsink_class_init (GstMultiFdSink * klass)
           "How to recover when client reaches the soft max",
           GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE));
 
+  gst_multifdsink_signals[SIGNAL_ADD] =
+      g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
+      NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+  gst_multifdsink_signals[SIGNAL_REMOVE] =
+      g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
+      NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+  gst_multifdsink_signals[SIGNAL_CLEAR] =
+      g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
+      NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0);
 
   gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
@@ -206,6 +226,10 @@ gst_multifdsink_class_init (GstMultiFdSink * klass)
 
   gstelement_class->change_state = gst_multifdsink_change_state;
 
+  klass->add = gst_multifdsink_add;
+  klass->remove = gst_multifdsink_remove;
+  klass->clear = gst_multifdsink_clear;
+
   GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
 }
 
@@ -243,6 +267,67 @@ gst_multifdsink_debug_fdset (GstMultiFdSink * sink, fd_set * testfds)
   }
 }
 
+void
+gst_multifdsink_add (GstMultiFdSink * sink, int fd)
+{
+  GstTCPClient *client;
+
+  /* create client datastructure */
+  client = g_new0 (GstTCPClient, 1);
+  client->fd = fd;
+  client->bufpos = -1;
+  client->bufoffset = 0;
+  client->sending = NULL;
+
+  g_mutex_lock (sink->clientslock);
+  sink->clients = g_list_prepend (sink->clients, client);
+  g_mutex_unlock (sink->clientslock);
+
+  /* we always read from a client */
+  FD_SET (fd, &sink->readfds);
+
+  /* set the socket to non blocking */
+  fcntl (fd, F_SETFL, O_NONBLOCK);
+
+  g_signal_emit (G_OBJECT (sink),
+      gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, NULL, fd);
+}
+
+void
+gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
+{
+  GList *clients;
+
+  g_mutex_lock (sink->clientslock);
+  /* loop over the clients to find the one with the fd */
+  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+    GstTCPClient *client;
+
+    client = (GstTCPClient *) clients->data;
+
+    if (client->fd == fd) {
+      gst_multifdsink_client_remove (sink, client);
+      break;
+    }
+  }
+  g_mutex_unlock (sink->clientslock);
+}
+
+void
+gst_multifdsink_clear (GstMultiFdSink * sink)
+{
+  GList *clients;
+
+  g_mutex_lock (sink->clientslock);
+  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+    GstTCPClient *client;
+
+    client = (GstTCPClient *) clients->data;
+    gst_multifdsink_client_remove (sink, client);
+  }
+  g_mutex_unlock (sink->clientslock);
+}
+
 static void
 gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
 {
@@ -440,11 +525,10 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
          * another thread */
         buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
         client->bufpos--;
-        gst_buffer_ref (buf);
 
+        /* queueing a buffer will ref it */
         gst_multifdsink_client_queue_buffer (sink, client, buf);
-        /* it is safe to unref now as queueing a buffer will ref it */
-        gst_buffer_unref (buf);
+
         /* need to start from the first byte for this new buffer */
         client->bufoffset = 0;
       }
@@ -492,9 +576,15 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
   return TRUE;
 }
 
-static void
+/* calculate the new position for a client after recovery. This function
+ * does not update the client position but merely returns the required
+ * position.
+ */
+static gint
 gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
 {
+  gint newbufpos;
+
   /* FIXME: implement recover procedure here, like moving the position to
    * the next keyframe, dropping buffers back to the beginning of the queue,
    * stuff like that... */
@@ -504,20 +594,25 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
     case GST_RECOVER_POLICY_NONE:
       /* do nothing, client will catch up or get kicked out when it reaches 
        * the hard max */
+      newbufpos = client->bufpos;
       break;
     case GST_RECOVER_POLICY_RESYNC_START:
       /* move to beginning of queue */
-      client->bufpos = -1;
+      newbufpos = -1;
       break;
     case GST_RECOVER_POLICY_RESYNC_SOFT:
       /* move to beginning of soft max */
-      client->bufpos = sink->buffers_soft_max;
+      newbufpos = sink->buffers_soft_max;
       break;
     case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
       /* FIXME, find keyframe in buffers */
-      client->bufpos = sink->buffers_soft_max;
+      newbufpos = sink->buffers_soft_max;
+      break;
+    default:
+      newbufpos = sink->buffers_soft_max;
       break;
   }
+  return newbufpos;
 }
 
 /* Queue a buffer on the global queue. 
@@ -566,7 +661,13 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
         client, client->fd, client->bufpos);
     /* check soft max if needed, recover client */
     if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) {
-      gst_multifdsink_recover_client (sink, client);
+      gint newpos;
+
+      newpos = gst_multifdsink_recover_client (sink, client);
+      if (newpos != client->bufpos) {
+        client->bufpos = newpos;
+        client->discont = TRUE;
+      }
     }
     /* check hard max, remove client */
     if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) {
@@ -615,7 +716,6 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
     gst_buffer_unref (old);
   }
   sink->buffers_queued = max_buffer_usage;
-  g_print ("%d\n", max_buffer_usage);
   g_mutex_unlock (sink->clientslock);
 
   /* and send a signal to thread if fd_set changed */
index 5eeecfe..4746a31 100644 (file)
@@ -84,6 +84,8 @@ typedef struct {
   GList *sending;               /* the buffers we need to send */
   gint bufoffset;               /* offset in the first buffer */
 
+  gboolean discont;
+
   GstTCPProtocolType protocol;
 
   gboolean caps_sent;
@@ -125,6 +127,12 @@ struct _GstMultiFdSink {
 struct _GstMultiFdSinkClass {
   GstElementClass parent_class;
 
+  /* element methods */
+  void (*add)    (GstMultiFdSink *sink, int fd);
+  void (*remove) (GstMultiFdSink *sink, int fd);
+  void (*clear)  (GstMultiFdSink *sink);
+
+  /* vtable */
   gboolean (*init)   (GstMultiFdSink *sink);
   gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds);
   gboolean (*close)  (GstMultiFdSink *sink);
@@ -136,6 +144,10 @@ struct _GstMultiFdSinkClass {
 
 GType gst_multifdsink_get_type (void);
 
+void gst_multifdsink_add (GstMultiFdSink *sink, int fd);
+void gst_multifdsink_remove (GstMultiFdSink *sink, int fd);
+void gst_multifdsink_clear (GstMultiFdSink *sink);
+
 
 #ifdef __cplusplus
 }
index 591f953..b449983 100644 (file)
@@ -70,8 +70,6 @@ static void gst_tcpserversink_get_property (GObject * object, guint prop_id,
 
 static GstMultiFdSinkClass *parent_class = NULL;
 
-//static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 };
-
 GType
 gst_tcpserversink_get_type (void)
 {
@@ -156,8 +154,6 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
   int client_sock_fd;
   struct sockaddr_in client_address;
   int client_address_len;
-  GstTCPClient *client;
-  GstMultiFdSink *parent = GST_MULTIFDSINK (sink);
 
   client_sock_fd =
       accept (sink->server_sock_fd, (struct sockaddr *) &client_address,
@@ -168,22 +164,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
     return FALSE;
   }
 
-  /* create client datastructure */
-  client = g_new0 (GstTCPClient, 1);
-  client->fd = client_sock_fd;
-  client->bufpos = -1;
-  client->bufoffset = 0;
-  client->sending = NULL;
-
-  g_mutex_lock (parent->clientslock);
-  parent->clients = g_list_prepend (parent->clients, client);
-  g_mutex_unlock (parent->clientslock);
-
-  /* we always read from a client */
-  FD_SET (client_sock_fd, &parent->readfds);
-
-  /* set the socket to non blocking */
-  fcntl (client_sock_fd, F_SETFL, O_NONBLOCK);
+  gst_multifdsink_add (GST_MULTIFDSINK (sink), client_sock_fd);
 
   GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d",
       inet_ntoa (client_address.sin_addr), client_sock_fd);