gst/tcp/gsttcpserversink.*: Serversink rewrite. Really do non blocking writes to...
authorWim Taymans <wim.taymans@gmail.com>
Fri, 25 Jun 2004 17:06:51 +0000 (17:06 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Fri, 25 Jun 2004 17:06:51 +0000 (17:06 +0000)
Original commit message from CVS:
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init),
(gst_tcpserversink_init), (gst_tcpserversink_handle_server_read),
(gst_tcpserversink_client_remove),
(gst_tcpserversink_handle_client_read),
(gst_tcpserversink_client_queue_data),
(gst_tcpserversink_client_queue_caps),
(gst_tcpserversink_client_queue_buffer),
(gst_tcpserversink_handle_client_write),
(gst_tcpserversink_queue_buffer),
(gst_tcpserversink_handle_clients), (gst_tcpserversink_thread),
(gst_tcpserversink_chain), (gst_tcpserversink_set_property),
(gst_tcpserversink_get_property), (gst_tcpserversink_init_send),
(gst_tcpserversink_close):
* gst/tcp/gsttcpserversink.h:
Serversink rewrite. Really do non blocking writes to clients and
maintain an internal queue to handle slower clients while not
disturbing fast clients.

ChangeLog
gst/tcp/gsttcpserversink.c
gst/tcp/gsttcpserversink.h

index 55c7259..0fc1abc 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,23 @@
+2004-06-25  Wim Taymans  <wim@fluendo.com>
+
+       * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init),
+       (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read),
+       (gst_tcpserversink_client_remove),
+       (gst_tcpserversink_handle_client_read),
+       (gst_tcpserversink_client_queue_data),
+       (gst_tcpserversink_client_queue_caps),
+       (gst_tcpserversink_client_queue_buffer),
+       (gst_tcpserversink_handle_client_write),
+       (gst_tcpserversink_queue_buffer),
+       (gst_tcpserversink_handle_clients), (gst_tcpserversink_thread),
+       (gst_tcpserversink_chain), (gst_tcpserversink_set_property),
+       (gst_tcpserversink_get_property), (gst_tcpserversink_init_send),
+       (gst_tcpserversink_close):
+       * gst/tcp/gsttcpserversink.h:
+       Serversink rewrite. Really do non blocking writes to clients and
+       maintain an internal queue to handle slower clients while not
+       disturbing fast clients.
+
 2004-06-25  Thomas Vander Stichele  <thomas at apestaart dot org>
 
        * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get):
index 42780a1..3c11d13 100644 (file)
 #define TCP_DEFAULT_PORT       4953
 #define TCP_BACKLOG            5
 
+#define CONTROL_RESTART                'R'     /* restart the select call */
+#define CONTROL_STOP           'S'     /* stop the select call */
+#define SEND_COMMAND(sink, command)            \
+G_STMT_START {                                 \
+  unsigned char c; c = command;                        \
+  write (sink->control_sock[1], &c, 1);                \
+} G_STMT_END
+
+#define READ_COMMAND(sink, command)            \
+G_STMT_START {                                 \
+  read(sink->control_sock[0], &command, 1);    \
+} G_STMT_END
+
 /* elementfactory information */
 static GstElementDetails gst_tcpserversink_details =
 GST_ELEMENT_DETAILS ("TCP Server sink",
@@ -46,6 +59,19 @@ GST_ELEMENT_DETAILS ("TCP Server sink",
 GST_DEBUG_CATEGORY (tcpserversink_debug);
 #define GST_CAT_DEFAULT (tcpserversink_debug)
 
+typedef struct
+{
+  int fd;
+  int bufpos;                   /* position of this client in the global queue */
+
+  GList *sending;               /* the buffers we need to send */
+  int bufoffset;                /* offset in the first buffer */
+
+  gboolean caps_sent;
+  gboolean streamheader_sent;
+}
+GstTCPClient;
+
 /* TCPServerSink signals and args */
 enum
 {
@@ -54,12 +80,17 @@ enum
   LAST_SIGNAL
 };
 
+#define DEFAULT_BUFFERS_MAX            25
+#define DEFAULT_BUFFERS_SOFT_MAX       20
+
 enum
 {
   ARG_0,
   ARG_HOST,
   ARG_PORT,
-  ARG_PROTOCOL
+  ARG_PROTOCOL,
+  ARG_BUFFERS_MAX,
+  ARG_BUFFERS_SOFT_MAX,
 };
 
 static void gst_tcpserversink_base_init (gpointer g_class);
@@ -139,6 +170,14 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass)
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
           GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
           G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
+      g_param_spec_int ("buffers-max", "Buffers max",
+          "max number of buffers to queue (0 = no limit)", 0, G_MAXINT,
+          DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX,
+      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
+          "Recover client when going over this limit (0 = no limit)", 0,
+          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
 
   gst_tcpserversink_signals[SIGNAL_CLIENT_ADDED] =
       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
@@ -187,6 +226,17 @@ gst_tcpserversink_init (GstTCPServerSink * this)
 
   this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
   this->clock = NULL;
+
+  this->clients = NULL;
+
+  this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
+  this->queuelock = g_mutex_new ();
+  this->queuecond = g_cond_new ();
+  this->buffers_max = DEFAULT_BUFFERS_MAX;
+  this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
+
+  this->clientslock = g_mutex_new ();
+
 }
 
 static void
@@ -210,6 +260,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
   int client_sock_fd;
   struct sockaddr_in client_address;
   int client_address_len;
+  GstTCPClient *client;
 
   client_sock_fd =
       accept (sink->server_sock_fd, (struct sockaddr *) &client_address,
@@ -219,7 +270,24 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
         ("Could not accept client on server socket: %s", g_strerror (errno)));
     return FALSE;
   }
-  FD_SET (client_sock_fd, &(sink->clientfds));
+
+  /* create client datastructure */
+  client = g_new0 (GstTCPClient, 1);
+  client->fd = client_sock_fd;
+  client->bufpos = -1;
+  client->bufoffset = 0;
+  client->sending = NULL;
+
+  g_mutex_lock (sink->clientslock);
+  sink->clients = g_list_prepend (sink->clients, client);
+  g_mutex_unlock (sink->clientslock);
+
+  /* we always read from a client */
+  FD_SET (client_sock_fd, &sink->readfds);
+
+  /* set the socket to non blocking */
+  fcntl (client_sock_fd, F_SETFL, O_NONBLOCK);
+
   GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d",
       inet_ntoa (client_address.sin_addr), client_sock_fd);
   g_signal_emit (G_OBJECT (sink),
@@ -230,16 +298,22 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
 }
 
 static void
-gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd)
+gst_tcpserversink_client_remove (GstTCPServerSink * sink, GstTCPClient * client)
 {
+  int fd = client->fd;
+
   /* FIXME: if we keep track of ip we can log it here and signal */
   GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
   if (close (fd) != 0) {
     GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
   }
-  FD_CLR (fd, &sink->clientfds);
-  FD_CLR (fd, &sink->caps_sent);
-  FD_CLR (fd, &sink->streamheader_sent);
+  FD_CLR (fd, &sink->readfds);
+  FD_CLR (fd, &sink->writefds);
+
+  sink->clients = g_list_remove (sink->clients, client);
+
+  g_free (client);
+
   g_signal_emit (G_OBJECT (sink),
       gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
 }
@@ -248,9 +322,12 @@ gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd)
  * which either indicates a close or should be ignored
  * returns FALSE if the client has been closed. */
 static gboolean
-gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
+gst_tcpserversink_handle_client_read (GstTCPServerSink * sink,
+    GstTCPClient * client)
 {
-  int nread;
+  int nread, fd;
+
+  fd = client->fd;
 
   GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd);
 
@@ -258,7 +335,6 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
   if (nread == 0) {
     /* client sent close, so remove it */
     GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
-    gst_tcpserversink_client_remove (sink, fd);
     return FALSE;
   } else {
     /* FIXME: we should probably just Read 'n' Drop */
@@ -267,128 +343,435 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
   return TRUE;
 }
 
-/* Write a buffer to the given fd for the given element using the given
- * protocol.
- * Return number of buffer bytes written.
- */
-static gint
-gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element,
-    GstTCPProtocolType protocol)
+static gboolean
+gst_tcpserversink_client_queue_data (GstTCPServerSink * sink,
+    GstTCPClient * client, gchar * data, gint len)
 {
-  gint wrote = 0;
+  GstBuffer *buf;
 
-  /* write the buffer header if we have one */
-  switch (protocol) {
-    case GST_TCP_PROTOCOL_TYPE_NONE:
-      break;
+  buf = gst_buffer_new ();
+  GST_BUFFER_DATA (buf) = data;
+  GST_BUFFER_SIZE (buf) = len;
 
-    case GST_TCP_PROTOCOL_TYPE_GDP:
-      GST_LOG_OBJECT (element, "Sending buffer header through GDP");
-      if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0))
-        return 0;
-      break;
-    default:
-      g_warning ("Unhandled protocol type");
-      break;
+  GST_DEBUG_OBJECT (sink, "Queueing data of length %d for fd %d",
+      len, client->fd);
+  client->sending = g_list_append (client->sending, buf);
+
+  return TRUE;
+}
+
+static gboolean
+gst_tcpserversink_client_queue_caps (GstTCPServerSink * sink,
+    GstTCPClient * client, const GstCaps * caps)
+{
+  guint8 *header;
+  guint8 *payload;
+  guint length;
+  gchar *string;
+
+  string = gst_caps_to_string (caps);
+  GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string,
+      client->fd);
+  g_free (string);
+
+  if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+    GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
+    return FALSE;
   }
+  gst_tcpserversink_client_queue_data (sink, client, header, length);
 
-  /* serve data to client */
-  GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d",
-      GST_BUFFER_SIZE (buf), fd);
-
-  wrote =
-      gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
-
-  if (wrote < GST_BUFFER_SIZE (buf)) {
-/* FIXME: keep track of client ip and port and so on */
-/*
-              GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
-                (_("Error while sending data to \"%s:%d\"."),
-                    sink->host, sink->port),
-                ("Only %d of %d bytes written: %s",
-                  bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
-*/
-    /* FIXME: there should be a better way to report problems, since we
-       want to continue for other clients and just drop this particular one */
-    GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote,
-        GST_BUFFER_SIZE (buf));
+  length = gst_dp_header_payload_length (header);
+  gst_tcpserversink_client_queue_data (sink, client, payload, length);
+
+  return TRUE;
+}
+
+static gboolean
+gst_tcpserversink_client_queue_buffer (GstTCPServerSink * sink,
+    GstTCPClient * client, GstBuffer * buffer)
+{
+  if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
+    guint8 *header;
+    guint len;
+
+    if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
+      GST_DEBUG_OBJECT (sink,
+          "could not create header, removing client on fd %d", client->fd);
+      return FALSE;
+    }
+    gst_tcpserversink_client_queue_data (sink, client, header, len);
   }
 
-  return wrote;
+  gst_buffer_ref (buffer);
+  client->sending = g_list_append (client->sending, buffer);
+
+  return TRUE;
 }
 
-/* handle a write on a client fd,
- * which indicates a read request from a client */
+
+/* handle a write on a client,
+ * which indicates a read request from a client.
+ *
+ * The strategy is as follows, for each client we maintain a queue of GstBuffers
+ * that contain the raw bytes we need to send to the client. In the case of the
+ * GDP protocol, we create buffers out of the header bytes so that we can only focus
+ * on sending buffers.
+ *
+ * We first check to see if we need to send caps (in GDP) and streamheaders. If so,
+ * we queue them. 
+ *
+ * Then we run into the main loop that tries to send as many buffers as possible. It
+ * will first exhaust the client->sending queue and if the queue is empty, it will
+ * pick a buffer from the global queue.
+ * 
+ * Sending the Buffers from the client->sending queue is basically writing the bytes
+ * to the socket and maintaining a count of the bytes that were sent. When the buffer
+ * is completely sent, it is removed from the client->sending queue and we try to pick
+ * a new buffer for sending.
+ *
+ * When the sending returns a partial buffer we stop sending more data as the next send
+ * operation could block.
+ */
 static gboolean
-gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
-    GstPad * pad, GstBuffer * buf)
+gst_tcpserversink_handle_client_write (GstTCPServerSink * sink,
+    GstTCPClient * client)
 {
-  gint wrote = 0;
+  int fd = client->fd;
+  gboolean more;
+  gboolean res;
 
-  /* when using GDP, first check if we have sent caps yet */
+  /* when using GDP, first check if we have queued caps yet */
   if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
-    if (!FD_ISSET (fd, &(sink->caps_sent))) {
-      const GstCaps *caps;
-      gchar *string;
-
-      caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
-      string = gst_caps_to_string (caps);
-      GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
-          fd);
-      if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
-              "unknown", 0)) {
-        GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client");
-        gst_tcpserversink_client_remove (sink, fd);
-        g_free (string);
+    if (!client->caps_sent) {
+      const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad));
+
+      /* queue caps for sending */
+      res = gst_tcpserversink_client_queue_caps (sink, client, caps);
+      if (!res) {
+        GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
         return FALSE;
       }
-      g_free (string);
-      FD_SET (fd, &(sink->caps_sent));
+      client->caps_sent = TRUE;
     }
   }
   /* if we have streamheader buffers, and haven't sent them to this client
    * yet, send them out one by one */
-  if (!FD_ISSET (fd, &(sink->streamheader_sent))) {
+  if (!client->streamheader_sent) {
     if (sink->streamheader) {
       GList *l;
 
       for (l = sink->streamheader; l; l = l->next) {
-        wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink),
-            sink->protocol);
-        if (wrote < GST_BUFFER_SIZE (l->data)) {
+        /* queue stream headers for sending */
+        res =
+            gst_tcpserversink_client_queue_buffer (sink, client,
+            GST_BUFFER (l->data));
+        if (!res) {
           GST_DEBUG_OBJECT (sink,
-              "Failed sending streamheader, removing client");
-          gst_tcpserversink_client_remove (sink, fd);
+              "Failed queueing streamheader, removing client");
+          return FALSE;
         }
       }
     }
-    FD_SET (fd, &(sink->streamheader_sent));
+    client->streamheader_sent = TRUE;
   }
 
-  /* now we write the data buffer */
-  wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol);
-  if (wrote < GST_BUFFER_SIZE (buf)) {
-    gst_tcpserversink_client_remove (sink, fd);
-    /* write failed, so drop the client */
-    GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
-    if (close (fd) != 0) {
-      GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s",
-          fd, g_strerror (errno));
+  more = TRUE;
+  do {
+    gint maxsize;
+
+    if (!client->sending) {
+      /* client is not working on a buffer */
+      if (client->bufpos == -1) {
+        /* client is too fast, remove from write queue until new buffer is
+         * available */
+        FD_CLR (fd, &sink->writefds);
+        return TRUE;
+      } else {
+        /* client can pick a buffer from the global queue */
+        GstBuffer *buf;
+
+        /* grab buffer and ref, we need to ref since it could be unreffed in
+         * another thread when we unlock the queuelock */
+        g_mutex_lock (sink->queuelock);
+        buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
+        client->bufpos--;
+        gst_buffer_ref (buf);
+        g_mutex_unlock (sink->queuelock);
+
+        gst_tcpserversink_client_queue_buffer (sink, client, buf);
+        /* it is safe to unref now as queueing a buffer will ref it */
+        gst_buffer_unref (buf);
+        /* need to start from the first byte for this new buffer */
+        client->bufoffset = 0;
+      }
     }
-    return FALSE;
-  }
+
+    /* see if we need to send something */
+    if (client->sending) {
+      ssize_t wrote;
+      GstBuffer *head;
+
+      /* pick first buffer from list */
+      head = GST_BUFFER (client->sending->data);
+      maxsize = GST_BUFFER_SIZE (head) - client->bufoffset;
+
+      /* try to write the complete buffer */
+      wrote =
+          send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
+          MSG_NOSIGNAL);
+      if (wrote < 0) {
+        /* hmm error.. */
+        if (errno == EAGAIN) {
+          /* nothing serious, resource was unavailable, try again later */
+          more = FALSE;
+        } else {
+          GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d",
+              fd);
+          return FALSE;
+        }
+      } else if (wrote < maxsize) {
+        /* partial write means that the client cannot read more and we should
+         * stop sending more */
+        GST_DEBUG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
+        client->bufoffset += wrote;
+        more = FALSE;
+      } else {
+        /* complete buffer was written, we can proceed to the next one */
+        client->sending = g_list_remove (client->sending, head);
+        gst_buffer_unref (head);
+        /* make sure we start from byte 0 for the next buffer */
+        client->bufoffset = 0;
+      }
+    }
+  } while (more);
+
   return TRUE;
 }
 
+/* Queue a buffer on the global queue. 
+ *
+ * This functions adds the buffer to the front of a GArray. It removes the
+ * tail buffer if the max queue size is exceeded. Unreffing the buffer that
+ * is queued. Note that unreffing the buffer is not a problem as clients who
+ * started writing out this buffer will still have a reference to it in the
+ * client->sending queue.
+ *
+ * After adding the buffer, we update all client positions in the queue. If
+ * a client moves of the soft max, we start the recovery procedure for this
+ * slow client. If it goes over the hard max, it is put into the slow list
+ * and removed.
+ *
+ * Special care is taken of clients that were waiting for a new buffer (they
+ * had a position of -1) because they can proceed after adding this new buffer.
+ * This is done by adding the client back into the write fd_set and signalling
+ * the select thread that the fd_set changed.
+ *
+ */
 static void
-gst_tcpserversink_chain (GstPad * pad, GstData * _data)
+gst_tcpserversink_queue_buffer (GstTCPServerSink * sink, GstBuffer * buf)
+{
+  GList *clients;
+  gint queuelen;
+  GList *slow = NULL;
+  gboolean need_signal = FALSE;
+
+  g_mutex_lock (sink->queuelock);
+  /* add buffer to queue */
+  g_array_prepend_val (sink->bufqueue, buf);
+  queuelen = sink->bufqueue->len;
+  if (queuelen > sink->buffers_max) {
+    GstBuffer *old;
+
+    /* queue exceeded max size */
+    queuelen--;
+    old = g_array_index (sink->bufqueue, GstBuffer *, queuelen);
+    sink->bufqueue = g_array_remove_index (sink->bufqueue, queuelen);
+
+    /* unref tail buffer */
+    gst_buffer_unref (old);
+  }
+  g_mutex_unlock (sink->queuelock);
+
+  /* then loop over the clients and update the positions */
+  g_mutex_lock (sink->clientslock);
+  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+    GstTCPClient *client;
+
+    client = (GstTCPClient *) clients->data;
+
+    client->bufpos++;
+    GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
+        client, client->fd, client->bufpos);
+    if (client->bufpos >= sink->buffers_soft_max) {
+      if (client->bufpos == sink->buffers_soft_max) {
+        g_warning ("client %p with fd %d is lagging...", client, client->fd);
+      }
+      GST_LOG_OBJECT (sink, "client %p with fd %d is lagging",
+          client, client->fd);
+    }
+    if (client->bufpos >= queuelen) {
+      /* remove client */
+      GST_LOG_OBJECT (sink, "client %p with fd %d is too slow, removing",
+          client, client->fd);
+      g_warning ("client %p with fd %d too slow, removing", client, client->fd);
+      FD_CLR (client->fd, &sink->readfds);
+      FD_CLR (client->fd, &sink->writefds);
+      slow = g_list_prepend (slow, client);
+      /* cannot send data to this client anymore. need to signal the select thread that
+       * the fd_set changed */
+      need_signal = TRUE;
+    } else if (client->bufpos == 0) {
+      /* can send data to this client now. need to signal the select thread that
+       * the fd_set changed */
+      FD_SET (client->fd, &sink->writefds);
+      need_signal = TRUE;
+    }
+  }
+  /* remove crap clients */
+  for (clients = slow; clients; clients = g_list_next (clients)) {
+    GstTCPClient *client;
+
+    client = (GstTCPClient *) slow->data;
+
+    gst_tcpserversink_client_remove (sink, client);
+  }
+  g_list_free (slow);
+  g_mutex_unlock (sink->clientslock);
+
+  /* and send a signal to thread if fd_set changed */
+  if (need_signal) {
+    SEND_COMMAND (sink, CONTROL_RESTART);
+  }
+}
+
+/* Handle the clients. Basically does a blocking select for one
+ * of the client fds to become read or writable. We also have a 
+ * filedescriptor to receive commands on that we need to check.
+ *
+ * After going out of the select call, we read and write to all
+ * clients that can do so. Badly behaving clients are put on a
+ * garbage list and removed.
+ */
+static void
+gst_tcpserversink_handle_clients (GstTCPServerSink * sink)
 {
   int result;
-  int fd;
   fd_set testreadfds, testwritefds;
-  struct timeval timeout;
-  struct timeval *timeoutp;
+  GList *clients, *error = NULL;
+  gboolean try_again;
+
+  do {
+    try_again = FALSE;
+
+    /* check for:
+     * - server socket input (ie, new client connections)
+     * - client socket input (ie, clients saying goodbye)
+     * - client socket output (ie, client reads)          */
+    testwritefds = sink->writefds;
+    testreadfds = sink->readfds;
+    FD_SET (sink->server_sock_fd, &testreadfds);
+    FD_SET (sink->control_sock[0], &testreadfds);
+
+    GST_LOG_OBJECT (sink, "doing select on server + client fds for reads");
+    gst_tcpserversink_debug_fdset (sink, &testreadfds);
+    GST_LOG_OBJECT (sink, "doing select on client fds for writes");
+    gst_tcpserversink_debug_fdset (sink, &testwritefds);
+
+    result =
+        select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL);
+
+    /* < 0 is an error, 0 just means a timeout happened */
+    if (result < 0) {
+      GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
+          ("select failed: %s", g_strerror (errno)));
+      return;
+    }
+
+    GST_LOG_OBJECT (sink, "%d sockets had action", result);
+    GST_LOG_OBJECT (sink, "done select on server/client fds for reads");
+    gst_tcpserversink_debug_fdset (sink, &testreadfds);
+    GST_LOG_OBJECT (sink, "done select on client fds for writes");
+    gst_tcpserversink_debug_fdset (sink, &testwritefds);
 
+    if (FD_ISSET (sink->control_sock[0], &testreadfds)) {
+      gchar command;
+
+      READ_COMMAND (sink, command);
+
+      switch (command) {
+        case CONTROL_RESTART:
+          /* need to restart the select call as the fd_set changed */
+          try_again = TRUE;
+          break;
+        case CONTROL_STOP:
+          /* stop this function */
+          return;
+        default:
+          g_warning ("tcpserversink: unknown control message received");
+          break;
+      }
+    }
+  } while (try_again);
+
+  if (FD_ISSET (sink->server_sock_fd, &testreadfds)) {
+    /* handle new client connection on server socket */
+    if (!gst_tcpserversink_handle_server_read (sink)) {
+      GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
+          ("client connection failed: %s", g_strerror (errno)));
+      return;
+    }
+  }
+
+  /* Check the reads */
+  g_mutex_lock (sink->clientslock);
+  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+    GstTCPClient *client;
+    int fd;
+
+    client = (GstTCPClient *) clients->data;
+    fd = client->fd;
+
+    if (FD_ISSET (fd, &testreadfds)) {
+      /* handle client read */
+      if (!gst_tcpserversink_handle_client_read (sink, client)) {
+        error = g_list_prepend (error, client);
+        continue;
+      }
+    }
+    if (FD_ISSET (fd, &testwritefds)) {
+      /* handle client write */
+      if (!gst_tcpserversink_handle_client_write (sink, client)) {
+        error = g_list_prepend (error, client);
+        continue;
+      }
+    }
+  }
+  /* remove crappy clients */
+  for (clients = error; clients; clients = g_list_next (clients)) {
+    GstTCPClient *client;
+
+    client = (GstTCPClient *) error->data;
+
+    GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client,
+        client->fd);
+    gst_tcpserversink_client_remove (sink, client);
+  }
+  g_list_free (error);
+  g_mutex_unlock (sink->clientslock);
+}
+
+static gpointer
+gst_tcpserversink_thread (GstTCPServerSink * sink)
+{
+  while (sink->running) {
+    gst_tcpserversink_handle_clients (sink);
+  }
+  return NULL;
+}
+
+static void
+gst_tcpserversink_chain (GstPad * pad, GstData * _data)
+{
   GstBuffer *buf = GST_BUFFER (_data);
   GstTCPServerSink *sink;
 
@@ -416,76 +799,11 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data)
     sink->streamheader = g_list_append (sink->streamheader, buf);
     return;
   }
-  /* if the incoming buffer has a duration, we can use that as the timeout
-   * value; otherwise, we block */
-  timeout.tv_sec = 0;
-  timeout.tv_usec = 0;
-  timeoutp = NULL;
-  GST_LOG_OBJECT (sink, "incoming buffer duration: %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
-  if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) {
-    GST_TIME_TO_TIMEVAL (GST_BUFFER_DURATION (buf), timeout);
-    timeoutp = &timeout;
-    GST_LOG_OBJECT (sink, "select will be with timeout %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
-    GST_LOG_OBJECT (sink, "select will be with timeout %d.%d",
-        timeout.tv_sec, timeout.tv_usec);
-  }
-  /* check for:
-   * - server socket input (ie, new client connections)
-   * - client socket input (ie, clients saying goodbye)
-   * - client socket output (ie, client reads)          */
-  testwritefds = sink->clientfds;
-  testreadfds = sink->clientfds;
-  FD_SET (sink->server_sock_fd, &testreadfds);
-
-  GST_LOG_OBJECT (sink, "doing select on server + client fds for reads");
-  gst_tcpserversink_debug_fdset (sink, &testreadfds);
-  GST_LOG_OBJECT (sink, "doing select on client fds for writes");
-  gst_tcpserversink_debug_fdset (sink, &testwritefds);
-
-  result = select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0,
-      timeoutp);
-  /* < 0 is an error, 0 just means a timeout happened */
-  if (result < 0) {
-    GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
-        ("select failed: %s", g_strerror (errno)));
-    return;
-  }
-  GST_LOG_OBJECT (sink, "%d sockets had action", result);
-  GST_LOG_OBJECT (sink, "done select on server/client fds for reads");
-  gst_tcpserversink_debug_fdset (sink, &testreadfds);
-  GST_LOG_OBJECT (sink, "done select on client fds for writes");
-  gst_tcpserversink_debug_fdset (sink, &testwritefds);
 
-  /* Check the reads */
-  for (fd = 0; fd < FD_SETSIZE; fd++) {
-    if (FD_ISSET (fd, &testreadfds)) {
-      if (fd == sink->server_sock_fd) {
-        /* handle new client connection on server socket */
-        if (!gst_tcpserversink_handle_server_read (sink))
-          return;
-      } else {
-        /* handle client read */
-        if (!gst_tcpserversink_handle_client_read (sink, fd))
-          return;
-      }
-    }
-  }
+  /* queue the buffer */
+  gst_tcpserversink_queue_buffer (sink, buf);
 
-  /* Check the writes */
-  for (fd = 0; fd < FD_SETSIZE; fd++) {
-    if (FD_ISSET (fd, &testwritefds)) {
-      if (!gst_tcpserversink_handle_client_write (sink, fd, pad, buf)) {
-        gst_buffer_unref (buf);
-        return;
-      }
-    }
-  }
   sink->data_written += GST_BUFFER_SIZE (buf);
-
-  gst_buffer_unref (buf);
-
 }
 
 static void
@@ -508,6 +826,12 @@ gst_tcpserversink_set_property (GObject * object, guint prop_id,
     case ARG_PROTOCOL:
       tcpserversink->protocol = g_value_get_enum (value);
       break;
+    case ARG_BUFFERS_MAX:
+      tcpserversink->buffers_max = g_value_get_int (value);
+      break;
+    case ARG_BUFFERS_SOFT_MAX:
+      tcpserversink->buffers_soft_max = g_value_get_int (value);
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -534,6 +858,12 @@ gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value,
     case ARG_PROTOCOL:
       g_value_set_enum (value, tcpserversink->protocol);
       break;
+    case ARG_BUFFERS_MAX:
+      g_value_set_int (value, tcpserversink->buffers_max);
+      break;
+    case ARG_BUFFERS_SOFT_MAX:
+      g_value_set_int (value, tcpserversink->buffers_soft_max);
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -606,10 +936,18 @@ gst_tcpserversink_init_send (GstTCPServerSink * this)
       "listened on server socket %d, returning from connection setup",
       this->server_sock_fd);
 
-  FD_ZERO (&this->clientfds);
-  FD_ZERO (&this->caps_sent);
-  FD_ZERO (&this->streamheader_sent);
-  FD_SET (this->server_sock_fd, &this->clientfds);
+  FD_ZERO (&this->readfds);
+  FD_ZERO (&this->writefds);
+  FD_SET (this->server_sock_fd, &this->readfds);
+
+  if (socketpair (PF_UNIX, SOCK_STREAM, 0, this->control_sock) < 0) {
+    perror ("creating socket pair");
+  }
+
+  this->running = TRUE;
+  this->thread = g_thread_create ((GThreadFunc) gst_tcpserversink_thread,
+      this, TRUE, NULL);
+
   GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
   this->streamheader = NULL;
 
@@ -621,6 +959,15 @@ gst_tcpserversink_init_send (GstTCPServerSink * this)
 static void
 gst_tcpserversink_close (GstTCPServerSink * this)
 {
+  this->running = FALSE;
+
+  SEND_COMMAND (this, CONTROL_STOP);
+
+  g_thread_join (this->thread);
+
+  close (this->control_sock[0]);
+  close (this->control_sock[1]);
+
   if (this->server_sock_fd != -1) {
     close (this->server_sock_fd);
     this->server_sock_fd = -1;
index 4499623..903b62c 100644 (file)
@@ -80,16 +80,28 @@ struct _GstTCPServerSink {
 
   size_t data_written; /* how much bytes have we written ? */
 
-  fd_set clientfds; /* all the client file descriptors that are open */
-  fd_set caps_sent; /* all the client file descriptors
-                     * that have had caps sent */
-  fd_set streamheader_sent; /* all the client file descriptors that have had
-                             * streamheader sent */
+  GMutex *clientslock;
+  GList *clients;      /* list of clients we are serving */
+  
+  fd_set readfds; /* all the client file descriptors that we can read from */
+  fd_set writefds; /* all the client file descriptors that we can write to */
+
+  int control_sock[2]; /* sockets for controlling the select call */
 
   GList *streamheader; /* GList of GstBuffers to use as streamheader */
   GstTCPProtocolType protocol;
   guint mtu;
   GstClock *clock;
+
+  GArray *bufqueue;
+  GMutex *queuelock;
+  GCond *queuecond;
+
+  gboolean running;
+  GThread *thread;
+
+  gint buffers_max;
+  gint buffers_soft_max;
 };
 
 struct _GstTCPServerSinkClass {