gst/tcp/gstmultifdsink.*: Added more debugging info. Changed the way clients are...
authorWim Taymans <wim.taymans@gmail.com>
Tue, 10 Aug 2004 11:35:44 +0000 (11:35 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 10 Aug 2004 11:35:44 +0000 (11:35 +0000)
Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_remove_client_link),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients),
(gst_multifdsink_chain), (gst_multifdsink_close):
* gst/tcp/gstmultifdsink.h:
Added more debugging info. Changed the way clients are
removed from the lists. Fixed a bug where a bad file descriptor
could cause many clients to be removed.

ChangeLog
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h

index 020046a..5f5643f 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,19 @@
+2004-08-10  Wim Taymans  <wim@fluendo.com>
+
+       * gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
+       (gst_multifdsink_remove), (gst_multifdsink_clear),
+       (gst_multifdsink_remove_client_link),
+       (gst_multifdsink_handle_client_read),
+       (gst_multifdsink_client_queue_data),
+       (gst_multifdsink_client_queue_buffer),
+       (gst_multifdsink_handle_client_write),
+       (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients),
+       (gst_multifdsink_chain), (gst_multifdsink_close):
+       * gst/tcp/gstmultifdsink.h:
+       Added more debugging info. Changed the way clients are 
+       removed from the lists. Fixed a bug where a bad file descriptor
+       could cause many clients to be removed.
+
 2004-08-06  Benjamin Otte  <in7y118@public.uni-hamburg.de>
 
        * gst/videotestsrc/gstvideotestsrc.c: (generate_capslist):
index 9f9952e..60eb000 100644 (file)
@@ -125,8 +125,8 @@ static void gst_multifdsink_base_init (gpointer g_class);
 static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
 static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
 
-static void gst_multifdsink_client_remove (GstMultiFdSink * sink,
-    GstTCPClient * client);
+static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
+    GList * link);
 
 static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
 static GstElementStateReturn gst_multifdsink_change_state (GstElement *
@@ -307,7 +307,7 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
   /* create client datastructure */
   client = g_new0 (GstTCPClient, 1);
   client->fd = fd;
-  client->bad = FALSE;
+  client->status = GST_CLIENT_STATUS_OK;
   client->bufpos = -1;
   client->bufoffset = 0;
   client->sending = NULL;
@@ -341,19 +341,21 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
 void
 gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
 {
-  GList *clients;
+  GList *clients, *next;
 
   GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
 
   g_mutex_lock (sink->clientslock);
   /* loop over the clients to find the one with the fd */
-  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+  for (clients = sink->clients; clients; clients = next) {
     GstTCPClient *client;
 
     client = (GstTCPClient *) clients->data;
+    next = g_list_next (clients);
 
     if (client->fd == fd) {
-      gst_multifdsink_client_remove (sink, client);
+      client->status = GST_CLIENT_STATUS_REMOVED;
+      gst_multifdsink_remove_client_link (sink, clients);
       break;
     }
   }
@@ -363,14 +365,19 @@ gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
 void
 gst_multifdsink_clear (GstMultiFdSink * sink)
 {
+  GList *clients, *next;
+
   GST_DEBUG_OBJECT (sink, "clearing all clients");
 
   g_mutex_lock (sink->clientslock);
-  while (sink->clients) {
+  for (clients = sink->clients; clients; clients = next) {
     GstTCPClient *client;
 
-    client = (GstTCPClient *) sink->clients->data;
-    gst_multifdsink_client_remove (sink, client);
+    client = (GstTCPClient *) clients->data;
+    next = g_list_next (clients);
+
+    client->status = GST_CLIENT_STATUS_REMOVED;
+    gst_multifdsink_remove_client_link (sink, clients);
   }
   g_mutex_unlock (sink->clientslock);
 }
@@ -437,16 +444,51 @@ gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
 
 /* should be called with the clientslock held */
 static void
-gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
+gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
 {
-  int fd = client->fd;
+  int fd;
   GTimeVal now;
+  GstTCPClient *client = (GstTCPClient *) link->data;
+
+  fd = client->fd;
 
   /* FIXME: if we keep track of ip we can log it here and signal */
   GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
+
+  switch (client->status) {
+    case GST_CLIENT_STATUS_OK:
+      GST_WARNING_OBJECT (sink, "removing client %p with fd %d for no reason",
+          client, client->fd);
+      break;
+    case GST_CLIENT_STATUS_CLOSED:
+      GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close",
+          client, client->fd);
+      break;
+    case GST_CLIENT_STATUS_REMOVED:
+      GST_DEBUG_OBJECT (sink,
+          "removing client %p with fd %d because the app removed it", client,
+          client->fd);
+      break;
+    case GST_CLIENT_STATUS_SLOW:
+      GST_INFO_OBJECT (sink,
+          "removing client %p with fd %d because it was too slow", client,
+          client->fd);
+      break;
+    case GST_CLIENT_STATUS_ERROR:
+      GST_WARNING_OBJECT (sink,
+          "removing client %p with fd %d because of error", client, client->fd);
+      break;
+    default:
+      GST_WARNING_OBJECT (sink,
+          "removing client %p with fd %d with invalid reason", client,
+          client->fd);
+      break;
+  }
+
   FD_CLR (fd, &sink->readfds);
   FD_CLR (fd, &sink->writefds);
   if (close (fd) != 0) {
+    /* this is not really an error */
     GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
   }
   SEND_COMMAND (sink, CONTROL_RESTART);
@@ -454,21 +496,24 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
   g_get_current_time (&now);
   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
 
+  /* unlock the mutex before signaling because the signal handler
+   * might query some properties */
   g_mutex_unlock (sink->clientslock);
 
   g_signal_emit (G_OBJECT (sink),
       gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd);
 
+  /* lock again before we remove the client completely */
   g_mutex_lock (sink->clientslock);
 
-  sink->clients = g_list_remove (sink->clients, client);
+  sink->clients = g_list_delete_link (sink->clients, link);
 
   g_free (client);
 }
 
 /* handle a read on a client fd,
  * which either indicates a close or should be ignored
- * returns FALSE if the client has been closed. */
+ * returns FALSE if some error occured or the client closed. */
 static gboolean
 gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
     GstTCPClient * client)
@@ -479,7 +524,9 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
   fd = client->fd;
 
   if (ioctl (fd, FIONREAD, &avail) < 0) {
-    GST_WARNING_OBJECT (sink, "ioctl failed for fd %d", fd);
+    GST_WARNING_OBJECT (sink, "ioctl failed for fd %d: %s",
+        fd, g_strerror (errno));
+    client->status = GST_CLIENT_STATUS_ERROR;
     ret = FALSE;
     return ret;
   }
@@ -492,9 +539,11 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
   if (avail == 0) {
     /* client sent close, so remove it */
     GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
+    client->status = GST_CLIENT_STATUS_CLOSED;
     ret = FALSE;
   } else if (avail < 0) {
     GST_WARNING_OBJECT (sink, "avail < 0, removing on fd %d", fd);
+    client->status = GST_CLIENT_STATUS_ERROR;
     ret = FALSE;
   } else {
     guint8 dummy[512];
@@ -514,10 +563,12 @@ gst_multifdsink_handle_client_read (GstMultiFdSink * sink,
       if (nread < -1) {
         GST_WARNING_OBJECT (sink, "could not read bytes from fd %d: %s",
             fd, g_strerror (errno));
+        client->status = GST_CLIENT_STATUS_ERROR;
         ret = FALSE;
         break;
       } else if (nread == 0) {
         GST_WARNING_OBJECT (sink, "fd %d: gave 0 bytes in read, removing", fd);
+        client->status = GST_CLIENT_STATUS_ERROR;
         ret = FALSE;
         break;
       }
@@ -541,7 +592,7 @@ gst_multifdsink_client_queue_data (GstMultiFdSink * sink, GstTCPClient * client,
   GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d",
       len, client->fd);
 
-  client->sending = g_list_append (client->sending, buf);
+  client->sending = g_slist_append (client->sending, buf);
 
   return TRUE;
 }
@@ -589,7 +640,7 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
   }
 
   gst_buffer_ref (buffer);
-  client->sending = g_list_append (client->sending, buffer);
+  client->sending = g_slist_append (client->sending, buffer);
 
   return TRUE;
 }
@@ -617,6 +668,8 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
  *
  * When the sending returns a partial buffer we stop sending more data as the next send
  * operation could block.
+ *
+ * This functions returns FALSE if some error occured.
  */
 static gboolean
 gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
@@ -649,7 +702,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
    * yet, send them out one by one */
   if (!client->streamheader_sent) {
     if (sink->streamheader) {
-      GList *l;
+      GSList *l;
 
       for (l = sink->streamheader; l; l = l->next) {
         /* queue stream headers for sending */
@@ -719,6 +772,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
         } else {
           GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d",
               fd);
+          client->status = GST_CLIENT_STATUS_ERROR;
           return FALSE;
         }
       } else {
@@ -730,7 +784,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
           more = FALSE;
         } else {
           /* complete buffer was written, we can proceed to the next one */
-          client->sending = g_list_remove (client->sending, head);
+          client->sending = g_slist_remove (client->sending, head);
           gst_buffer_unref (head);
           /* make sure we start from byte 0 for the next buffer */
           client->bufoffset = 0;
@@ -808,9 +862,8 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
 static void
 gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
 {
-  GList *clients;
+  GList *clients, *next;
   gint queuelen;
-  GList *slow = NULL;
   gboolean need_signal = FALSE;
   gint max_buffer_usage;
   gint i;
@@ -827,10 +880,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
 
   /* then loop over the clients and update the positions */
   max_buffer_usage = 0;
-  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+  for (clients = sink->clients; clients; clients = next) {
     GstTCPClient *client;
 
     client = (GstTCPClient *) clients->data;
+    next = g_list_next (clients);
 
     client->bufpos++;
     GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
@@ -859,7 +913,8 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
           client, client->fd);
       FD_CLR (client->fd, &sink->readfds);
       FD_CLR (client->fd, &sink->writefds);
-      slow = g_list_prepend (slow, client);
+      client->status = GST_CLIENT_STATUS_SLOW;
+      gst_multifdsink_remove_client_link (sink, clients);
       /* cannot send data to this client anymore. need to signal the select thread that
        * the fd_set changed */
       need_signal = TRUE;
@@ -876,15 +931,6 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
       max_buffer_usage = client->bufpos;
     }
   }
-  /* remove crap clients */
-  for (clients = slow; clients; clients = g_list_next (clients)) {
-    GstTCPClient *client;
-
-    client = (GstTCPClient *) clients->data;
-
-    gst_multifdsink_client_remove (sink, client);
-  }
-  g_list_free (slow);
   /* nobody is referencing buffers after max_buffer_usage so we can
    * remove them from the queue */
   for (i = queuelen - 1; i > max_buffer_usage; i--) {
@@ -920,7 +966,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
 {
   int result;
   fd_set testreadfds, testwritefds;
-  GList *clients, *closed = NULL;
+  GList *clients, *next;
   gboolean try_again;
   GstMultiFdSinkClass *fclass;
 
@@ -951,29 +997,35 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
       GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno));
       if (errno == EBADF) {
         /* ok, so one of the fds is invalid. We loop over them to find one
-         * that gives an error to the F_GETFL fcntl. 
-         */
+         * that gives an error to the F_GETFL fcntl.  */
         g_mutex_lock (sink->clientslock);
-        for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+        for (clients = sink->clients; clients; clients = next) {
           GstTCPClient *client;
           int fd;
           long flags;
           int res;
 
           client = (GstTCPClient *) clients->data;
+          next = g_list_next (clients);
+
           fd = client->fd;
 
           res = fcntl (fd, F_GETFL, &flags);
           if (res == -1) {
-            GST_WARNING_OBJECT (sink, "fnctl failed for %d, marking as bad: %s",
+            GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s",
                 fd, g_strerror (errno));
             if (errno == EBADF) {
-              client->bad = TRUE;
+              client->status = GST_CLIENT_STATUS_ERROR;
+              gst_multifdsink_remove_client_link (sink, clients);
             }
           }
         }
         g_mutex_unlock (sink->clientslock);
+        /* after this, go back in the select loop as the read/writefds
+         * are not valid */
+        try_again = TRUE;
       } else if (errno == EINTR) {
+        /* interrupted system call, just redo the select */
         try_again = TRUE;
       } else {
         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
@@ -1025,13 +1077,15 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
 
   /* Check the reads */
   g_mutex_lock (sink->clientslock);
-  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+  for (clients = sink->clients; clients; clients = next) {
     GstTCPClient *client;
     int fd;
 
     client = (GstTCPClient *) clients->data;
-    if (client->bad) {
-      closed = g_list_prepend (closed, client);
+    next = g_list_next (clients);
+
+    if (client->status != GST_CLIENT_STATUS_OK) {
+      gst_multifdsink_remove_client_link (sink, clients);
       continue;
     }
 
@@ -1040,29 +1094,18 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
     if (FD_ISSET (fd, &testreadfds)) {
       /* handle client read */
       if (!gst_multifdsink_handle_client_read (sink, client)) {
-        closed = g_list_prepend (closed, client);
+        gst_multifdsink_remove_client_link (sink, clients);
         continue;
       }
     }
     if (FD_ISSET (fd, &testwritefds)) {
       /* handle client write */
       if (!gst_multifdsink_handle_client_write (sink, client)) {
-        closed = g_list_prepend (closed, client);
+        gst_multifdsink_remove_client_link (sink, clients);
         continue;
       }
     }
   }
-  /* remove crappy clients */
-  for (clients = closed; clients; clients = g_list_next (clients)) {
-    GstTCPClient *client;
-
-    client = (GstTCPClient *) clients->data;
-
-    GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close",
-        client, client->fd);
-    gst_multifdsink_client_remove (sink, client);
-  }
-  g_list_free (closed);
   g_mutex_unlock (sink->clientslock);
 }
 
@@ -1104,7 +1147,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
     GST_DEBUG_OBJECT (sink,
         "appending IN_CAPS buffer with length %d to streamheader",
         GST_BUFFER_SIZE (buf));
-    sink->streamheader = g_list_append (sink->streamheader, buf);
+    sink->streamheader = g_slist_append (sink->streamheader, buf);
     return;
   }
 
@@ -1239,12 +1282,12 @@ gst_multifdsink_close (GstMultiFdSink * this)
   close (WRITE_SOCKET (this));
 
   if (this->streamheader) {
-    GList *l;
+    GSList *l;
 
     for (l = this->streamheader; l; l = l->next) {
       gst_buffer_unref (l->data);
     }
-    g_list_free (this->streamheader);
+    g_slist_free (this->streamheader);
   }
 
   if (fclass->close)
index 66558e4..deeae5a 100644 (file)
@@ -75,15 +75,24 @@ typedef enum
   GST_RECOVER_POLICY_RESYNC_KEYFRAME,
 } GstRecoverPolicy;
 
+typedef enum
+{
+  GST_CLIENT_STATUS_OK,
+  GST_CLIENT_STATUS_CLOSED,
+  GST_CLIENT_STATUS_REMOVED,
+  GST_CLIENT_STATUS_SLOW,
+  GST_CLIENT_STATUS_ERROR,
+} GstClientStatus;
+
 /* structure for a client
  *  */
 typedef struct {
   int fd;
   gint bufpos;                  /* position of this client in the global queue */
 
-  gboolean bad;
+  GstClientStatus status;
 
-  GList *sending;               /* the buffers we need to send */
+  GSList *sending;              /* the buffers we need to send */
   gint bufoffset;               /* offset in the first buffer */
 
   gboolean discont;
@@ -120,7 +129,7 @@ struct _GstMultiFdSink {
 
   int control_sock[2]; /* sockets for controlling the select call */
 
-  GList *streamheader; /* GList of GstBuffers to use as streamheader */
+  GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
   GstTCPProtocolType protocol;
   guint mtu;