use streamheader
authorThomas Vander Stichele <thomas@apestaart.org>
Tue, 8 Jun 2004 10:44:59 +0000 (10:44 +0000)
committerThomas Vander Stichele <thomas@apestaart.org>
Tue, 8 Jun 2004 10:44:59 +0000 (10:44 +0000)
Original commit message from CVS:
use streamheader

ChangeLog
gst/tcp/gsttcpclientsrc.c
gst/tcp/gsttcpserversink.c
gst/tcp/gsttcpserversink.h

index 0b0fc38..4fc6440 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,15 @@
 2004-06-08  Thomas Vander Stichele  <thomas at apestaart dot org>
 
+       * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get):
+       * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_client_remove),
+       (gst_tcpserversink_handle_client_read), (gst_tcp_buffer_write),
+       (gst_tcpserversink_handle_client_write), (gst_tcpserversink_chain),
+       (gst_tcpserversink_init_send), (gst_tcpserversink_close):
+       * gst/tcp/gsttcpserversink.h:
+          take streamheader into account
+
+2004-06-08  Thomas Vander Stichele  <thomas at apestaart dot org>
+
        * gst/level/Makefile.am:
        * gst/level/gstlevel.c: (gst_level_class_init):
          clean up marshal generation
index 45bd4f5..1431c49 100644 (file)
@@ -231,6 +231,7 @@ gst_tcpclientsrc_get (GstPad * pad)
             ("ioctl failed: %s", g_strerror (errno)));
         return NULL;
       }
+      GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
       buf = gst_buffer_new_and_alloc (readsize);
       break;
     case GST_TCP_PROTOCOL_TYPE_GDP:
index cc6d268..d920186 100644 (file)
@@ -229,8 +229,24 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
   return TRUE;
 }
 
+static void
+gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd)
+{
+  /* FIXME: if we keep track of ip we can log it here and signal */
+  GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
+  if (close (fd) != 0) {
+    GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno));
+  }
+  FD_CLR (fd, &sink->clientfds);
+  FD_CLR (fd, &sink->caps_sent);
+  FD_CLR (fd, &sink->streamheader_sent);
+  g_signal_emit (G_OBJECT (sink),
+      gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
+}
+
 /* handle a read on a client fd,
- * which either indicates a close or should be ignored */
+ * which either indicates a close or should be ignored
+ * returns FALSE if the client has been closed. */
 static gboolean
 gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
 {
@@ -241,17 +257,9 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
   ioctl (fd, FIONREAD, &nread);
   if (nread == 0) {
     /* client sent close, so remove it */
-    GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
-    if (close (fd) != 0) {
-      GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
-          ("error closing fd %d: %s", fd, g_strerror (errno)));
-      return FALSE;
-    }
-    FD_CLR (fd, &sink->clientfds);
-    FD_CLR (fd, &sink->caps_sent);
-    /* FIXME: we need to keep track of IP info so we can signal it here */
-    g_signal_emit (G_OBJECT (sink),
-        gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
+    GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd);
+    gst_tcpserversink_client_remove (sink, fd);
+    return FALSE;
   } else {
     /* FIXME: we should probably just Read 'n' Drop */
     g_warning ("Don't know what to do with %d bytes to read", nread);
@@ -259,40 +267,25 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
   return TRUE;
 }
 
-/* handle a write on a client fd,
- * which indicates a read request from a client */
-static gboolean
-gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
-    GstPad * pad, GstBuffer * buf)
+/* Write a buffer to the given fd for the given element using the given
+ * protocol.
+ * Return number of buffer bytes written.
+ */
+static gint
+gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element,
+    GstTCPProtocolType protocol)
 {
+  gint wrote = 0;
+
   /* write the buffer header if we have one */
-  switch (sink->protocol) {
+  switch (protocol) {
     case GST_TCP_PROTOCOL_TYPE_NONE:
       break;
 
     case GST_TCP_PROTOCOL_TYPE_GDP:
-      /* if we haven't sent caps yet, send them first */
-      if (!FD_ISSET (fd, &(sink->caps_sent))) {
-        const GstCaps *caps;
-        gchar *string;
-
-        caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
-        string = gst_caps_to_string (caps);
-        GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
-            fd);
-        /* FIXME: fix this again so that write_caps is non-fatal for multiple clients; also use a fd, host, port struct */
-        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
-                "unknown", 0)) {
-          g_free (string);
-          return FALSE;
-        }
-        g_free (string);
-        FD_SET (fd, &(sink->caps_sent));
-      }
-      GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
-      if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), fd, buf, FALSE,
-              "unknown", 0))
-        return FALSE;
+      GST_LOG_OBJECT (element, "Sending buffer header through GDP");
+      if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0))
+        return 0;
       break;
     default:
       g_warning ("Unhandled protocol type");
@@ -300,11 +293,9 @@ gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
   }
 
   /* serve data to client */
-  GST_LOG_OBJECT (sink, "serving data buffer of size %d to client on fd %d",
+  GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d",
       GST_BUFFER_SIZE (buf), fd);
 
-  int wrote = 0;
-
   wrote =
       gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
 
@@ -312,24 +303,78 @@ gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
 /* FIXME: keep track of client ip and port and so on */
 /*
               GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
-                (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
+                (_("Error while sending data to \"%s:%d\"."),
+                    sink->host, sink->port),
                 ("Only %d of %d bytes written: %s",
                   bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
 */
     /* FIXME: there should be a better way to report problems, since we
        want to continue for other clients and just drop this particular one */
-    GST_DEBUG_OBJECT (sink, "Write failed: %d of %d bytes written", wrote,
+    GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote,
         GST_BUFFER_SIZE (buf));
+  }
+
+  return wrote;
+}
+
+/* handle a write on a client fd,
+ * which indicates a read request from a client */
+static gboolean
+gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
+    GstPad * pad, GstBuffer * buf)
+{
+  gint wrote = 0;
+
+  /* when using GDP, first check if we have sent caps yet */
+  if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
+    if (!FD_ISSET (fd, &(sink->caps_sent))) {
+      const GstCaps *caps;
+      gchar *string;
+
+      caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
+      string = gst_caps_to_string (caps);
+      GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
+          fd);
+      if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
+              "unknown", 0)) {
+        GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client");
+        gst_tcpserversink_client_remove (sink, fd);
+        g_free (string);
+        return FALSE;
+      }
+      g_free (string);
+      FD_SET (fd, &(sink->caps_sent));
+    }
+  }
+  /* if we have streamheader buffers, and haven't sent them to this client
+   * yet, send them out one by one */
+  if (!FD_ISSET (fd, &(sink->streamheader_sent))) {
+    if (sink->streamheader) {
+      GList *l;
+
+      for (l = sink->streamheader; l; l = l->next) {
+        wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink),
+            sink->protocol);
+        if (wrote < GST_BUFFER_SIZE (l->data)) {
+          GST_DEBUG_OBJECT (sink,
+              "Failed sending streamheader, removing client");
+          gst_tcpserversink_client_remove (sink, fd);
+        }
+      }
+    }
+    FD_SET (fd, &(sink->streamheader_sent));
+  }
+
+  /* now we write the data buffer */
+  wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol);
+  if (wrote < GST_BUFFER_SIZE (buf)) {
+    gst_tcpserversink_client_remove (sink, fd);
     /* write failed, so drop the client */
     GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
     if (close (fd) != 0) {
       GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s",
           fd, g_strerror (errno));
     }
-    FD_CLR (fd, &sink->clientfds);
-    FD_CLR (fd, &sink->caps_sent);
-    g_signal_emit (G_OBJECT (sink),
-        gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
     return FALSE;
   }
   return TRUE;
@@ -354,10 +399,23 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data)
   g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN));
 
   if (GST_IS_EVENT (buf)) {
-    g_warning ("FIXME: handl events");
+    g_warning ("FIXME: handle events");
     return;
   }
 
+  /* if the incoming buffer is marked as IN CAPS, then we assume for now
+   * it's a streamheader that needs to be sent to each new client, so we
+   * put it on our internal list of streamheader buffers.
+   * After that we return, since we only send these out when we get
+   * non IN_CAPS buffers so we properly keep track of clients that got
+   * streamheaders. */
+  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
+    GST_DEBUG_OBJECT (sink,
+        "appending IN_CAPS buffer with length %d to streamheader",
+        GST_BUFFER_SIZE (buf));
+    sink->streamheader = g_list_append (sink->streamheader, buf);
+    return;
+  }
   /* if the incoming buffer has a duration, we can use that as the timeout
    * value; otherwise, we block */
   timeout.tv_sec = 0;
@@ -428,7 +486,6 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data)
 
   gst_buffer_unref (buf);
 
-  /* FIXME: emit signal ? */
 }
 
 static void
@@ -551,8 +608,10 @@ gst_tcpserversink_init_send (GstTCPServerSink * this)
 
   FD_ZERO (&this->clientfds);
   FD_ZERO (&this->caps_sent);
+  FD_ZERO (&this->streamheader_sent);
   FD_SET (this->server_sock_fd, &this->clientfds);
   GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
+  this->streamheader = NULL;
 
   this->data_written = 0;
 
@@ -567,6 +626,14 @@ gst_tcpserversink_close (GstTCPServerSink * this)
     this->server_sock_fd = -1;
   }
 
+  if (this->streamheader) {
+    GList *l;
+
+    for (l = sink->streamheader; l; l = l->next) {
+      gst_buffer_unref (l->data);
+    }
+    g_list_free (this->streamheader);
+  }
   GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
 }
 
index fb496a6..4499623 100644 (file)
@@ -86,6 +86,7 @@ struct _GstTCPServerSink {
   fd_set streamheader_sent; /* all the client file descriptors that have had
                              * streamheader sent */
 
+  GList *streamheader; /* GList of GstBuffers to use as streamheader */
   GstTCPProtocolType protocol;
   guint mtu;
   GstClock *clock;