gst/: Make UDP and TCP elements use PushSrc.
authorWim Taymans <wim.taymans@gmail.com>
Thu, 12 May 2005 10:45:25 +0000 (10:45 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 12 May 2005 10:45:25 +0000 (10:45 +0000)
Original commit message from CVS:
* gst/rtsp/README:
* gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
(gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
(gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
(gst_tcpclientsrc_stop), (gst_tcpclientsrc_eos),
(gst_tcpclientsrc_create), (gst_tcpclientsrc_start):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
(gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
(gst_tcpserversrc_init), (gst_tcpserversrc_create),
(gst_tcpserversrc_start), (gst_tcpserversrc_stop):
* gst/tcp/gsttcpserversrc.h:
* gst/tcp/gsttcpsrc.c: (gst_tcpsrc_get_type),
(gst_tcpsrc_base_init), (gst_tcpsrc_class_init), (gst_tcpsrc_init),
(gst_tcpsrc_create), (gst_tcpsrc_start), (gst_tcpsrc_stop):
* gst/tcp/gsttcpsrc.h:
* gst/udp/gstudpsink.c: (gst_udpsink_base_init),
(gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render),
(gst_udpsink_set_property), (gst_udpsink_get_property),
(gst_udpsink_change_state):
* gst/udp/gstudpsink.h:
* gst/udp/gstudpsrc.c: (gst_udpsrc_get_type),
(gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_init),
(gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start),
(gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Make UDP and TCP elements use PushSrc.

ChangeLog
gst/rtsp/README
gst/udp/gstudpsink.c
gst/udp/gstudpsink.h
gst/udp/gstudpsrc.c
gst/udp/gstudpsrc.h

index 9589d81..0be548b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,34 @@
+2005-05-12  Wim Taymans  <wim@fluendo.com>
+
+       * gst/rtsp/README:
+       * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
+       (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
+       (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
+       (gst_tcpclientsrc_stop), (gst_tcpclientsrc_eos),
+       (gst_tcpclientsrc_create), (gst_tcpclientsrc_start):
+       * gst/tcp/gsttcpclientsrc.h:
+       * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
+       (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
+       (gst_tcpserversrc_init), (gst_tcpserversrc_create),
+       (gst_tcpserversrc_start), (gst_tcpserversrc_stop):
+       * gst/tcp/gsttcpserversrc.h:
+       * gst/tcp/gsttcpsrc.c: (gst_tcpsrc_get_type),
+       (gst_tcpsrc_base_init), (gst_tcpsrc_class_init), (gst_tcpsrc_init),
+       (gst_tcpsrc_create), (gst_tcpsrc_start), (gst_tcpsrc_stop):
+       * gst/tcp/gsttcpsrc.h:
+       * gst/udp/gstudpsink.c: (gst_udpsink_base_init),
+       (gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render),
+       (gst_udpsink_set_property), (gst_udpsink_get_property),
+       (gst_udpsink_change_state):
+       * gst/udp/gstudpsink.h:
+       * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type),
+       (gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_init),
+       (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start),
+       (gst_udpsrc_stop):
+       * gst/udp/gstudpsrc.h:
+       Make UDP and TCP elements use PushSrc.
+
+
 2005-05-11  Tim-Philipp Müller  <tim at centricular dot net>
 
        * ext/mad/gstmad.c: (gst_mad_init), (gst_mad_src_query),
index 62793a6..285f4f4 100644 (file)
@@ -137,7 +137,7 @@ An RTSP session is created as follows:
     +---------------------------------------------+
     | +------------+                              |
     | | _loop()    |   +--------+                 |
-    | |            ----- rtpdec --------------------
+    | |            ----- rtpses --------------------
     | |            |   |        |                 |
     | |            |   |        |  +------------+ |
     | |            ----- RTCP   ---- udpsink    | |
index 93fb654..0d57cd6 100644 (file)
@@ -50,8 +50,7 @@ enum
   ARG_0,
   ARG_HOST,
   ARG_PORT,
-  ARG_MTU
-      /* FILL ME */
+  /* FILL ME */
 };
 
 static void gst_udpsink_base_init (gpointer g_class);
@@ -146,7 +145,6 @@ gst_udpsink_init (GstUDPSink * udpsink)
 {
   udpsink->host = g_strdup (UDP_DEFAULT_HOST);
   udpsink->port = UDP_DEFAULT_PORT;
-  udpsink->mtu = 1024;
 }
 
 static void
@@ -161,30 +159,31 @@ static GstFlowReturn
 gst_udpsink_render (GstBaseSink * sink, GstBuffer * buffer)
 {
   GstUDPSink *udpsink;
-  gint tosend;
+  gint ret, size;
   guint8 *data;
 
   udpsink = GST_UDPSINK (sink);
 
-  tosend = GST_BUFFER_SIZE (buffer);
+  size = GST_BUFFER_SIZE (buffer);
   data = GST_BUFFER_DATA (buffer);
 
-  /* send in chunks of MTU */
-  while (tosend > 0) {
-    gint psize;
-
-    psize = MIN (udpsink->mtu, tosend);
-    if (sendto (udpsink->sock, data, psize, 0,
-            (struct sockaddr *) &udpsink->theiraddr,
-            sizeof (udpsink->theiraddr)) == -1) {
-      perror ("sending");
-    }
+  while (TRUE) {
+    ret = sendto (udpsink->sock, data, size, 0,
+        (struct sockaddr *) &udpsink->theiraddr, sizeof (udpsink->theiraddr));
 
-    data += psize;
-    tosend -= psize;
+    if (ret < 0) {
+      if (errno != EINTR && errno != EAGAIN)
+        goto send_error;
+    } else
+      break;
   }
-
   return GST_FLOW_OK;
+
+send_error:
+  {
+    GST_DEBUG ("got send error");
+    return GST_FLOW_ERROR;
+  }
 }
 
 static void
@@ -207,9 +206,6 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value,
     case ARG_PORT:
       udpsink->port = g_value_get_int (value);
       break;
-    case ARG_MTU:
-      udpsink->mtu = g_value_get_int (value);
-      break;
     default:
       break;
   }
@@ -230,9 +226,6 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value,
     case ARG_PORT:
       g_value_set_int (value, udpsink->port);
       break;
-    case ARG_MTU:
-      g_value_set_int (value, udpsink->mtu);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index 0f65a25..e5480c3 100644 (file)
@@ -59,7 +59,6 @@ struct _GstUDPSink {
 
   gint port;
   gchar *host;
-  guint mtu;
 };
 
 struct _GstUDPSinkClass {
index 120d0c5..76c4b73 100644 (file)
@@ -63,9 +63,9 @@ static void gst_udpsrc_init (GstUDPSrc * udpsrc);
 
 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
 
-static void gst_udpsrc_loop (GstPad * pad);
-static GstElementStateReturn gst_udpsrc_change_state (GstElement * element);
-static gboolean gst_udpsrc_activate (GstPad * pad, GstActivateMode mode);
+static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
+static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
+static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
 
 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -101,7 +101,7 @@ gst_udpsrc_get_type (void)
     };
 
     udpsrc_type =
-        g_type_register_static (GST_TYPE_ELEMENT, "GstUDPSrc", &udpsrc_info, 0);
+        g_type_register_static (GST_TYPE_PUSHSRC, "GstUDPSrc", &udpsrc_info, 0);
 
     g_type_add_interface_static (udpsrc_type, GST_TYPE_URI_HANDLER,
         &urihandler_info);
@@ -125,11 +125,15 @@ gst_udpsrc_class_init (GstUDPSrc * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstBaseSrcClass *gstbasesrc_class;
+  GstPushSrcClass *gstpushsrc_class;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
+  gstbasesrc_class = (GstBaseSrcClass *) klass;
+  gstpushsrc_class = (GstPushSrcClass *) klass;
 
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+  parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
 
   gobject_class->set_property = gst_udpsrc_set_property;
   gobject_class->get_property = gst_udpsrc_get_property;
@@ -147,27 +151,22 @@ gst_udpsrc_class_init (GstUDPSrc * klass)
           "URI in the form of udp://hostname:port", UDP_DEFAULT_URI,
           G_PARAM_READWRITE));
 
-  gstelement_class->change_state = gst_udpsrc_change_state;
+  gstbasesrc_class->start = gst_udpsrc_start;
+  gstbasesrc_class->stop = gst_udpsrc_stop;
+  gstpushsrc_class->create = gst_udpsrc_create;
 }
 
 static void
 gst_udpsrc_init (GstUDPSrc * udpsrc)
 {
-  /* create the src and src pads */
-  udpsrc->srcpad = gst_pad_new_from_template
-      (gst_static_pad_template_get (&src_template), "src");
-  gst_pad_set_activate_function (udpsrc->srcpad, gst_udpsrc_activate);
-  gst_pad_set_loop_function (udpsrc->srcpad, gst_udpsrc_loop);
-  gst_element_add_pad (GST_ELEMENT (udpsrc), udpsrc->srcpad);
-
   udpsrc->port = UDP_DEFAULT_PORT;
   udpsrc->sock = -1;
   udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
   udpsrc->uri = g_strdup (UDP_DEFAULT_URI);
 }
 
-static void
-gst_udpsrc_loop (GstPad * pad)
+static GstFlowReturn
+gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 {
   GstUDPSrc *udpsrc;
   GstBuffer *outbuf;
@@ -176,55 +175,51 @@ gst_udpsrc_loop (GstPad * pad)
   gint numbytes;
   fd_set read_fds;
   guint max_sock;
+  gchar *pktdata;
+  gint pktsize;
 
-  udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad));
+  udpsrc = GST_UDPSRC (psrc);
 
   FD_ZERO (&read_fds);
   FD_SET (udpsrc->sock, &read_fds);
   max_sock = udpsrc->sock;
 
-  GST_STREAM_LOCK (pad);
-
   /* FIXME, add another socket to unblock */
   if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0)
     goto select_error;
 
-  outbuf = gst_buffer_new ();
-  GST_BUFFER_DATA (outbuf) = g_malloc (24000);
-  GST_BUFFER_SIZE (outbuf) = 24000;
+  pktdata = g_malloc (24000);
+  pktsize = 24000;
 
   len = sizeof (struct sockaddr);
-  if ((numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf),
-              GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr,
-              &len)) == -1)
-    goto receive_error;
+  while (TRUE) {
+    numbytes = recvfrom (udpsrc->sock, pktdata, pktsize,
+        0, (struct sockaddr *) &tmpaddr, &len);
+    if (numbytes < 0) {
+      if (errno != EAGAIN && errno != EINTR)
+        goto receive_error;
+    } else
+      break;
+  }
 
+  outbuf = gst_buffer_new ();
+  GST_BUFFER_DATA (outbuf) = pktdata;
   GST_BUFFER_SIZE (outbuf) = numbytes;
-  if (gst_pad_push (udpsrc->srcpad, outbuf) != GST_FLOW_OK)
-    goto need_pause;
 
-  GST_STREAM_UNLOCK (pad);
+  *buf = outbuf;
 
-  return;
+  return GST_FLOW_OK;
 
 select_error:
   {
-    GST_STREAM_UNLOCK (pad);
     GST_DEBUG ("got select error");
-    return;
+    return GST_FLOW_ERROR;
   }
 receive_error:
   {
-    GST_STREAM_UNLOCK (pad);
     gst_buffer_unref (outbuf);
     GST_DEBUG ("got receive error");
-    return;
-  }
-need_pause:
-  {
-    gst_task_pause (GST_RPAD_TASK (pad));
-    GST_STREAM_UNLOCK (pad);
-    return;
+    return GST_FLOW_ERROR;
   }
 }
 
@@ -315,12 +310,15 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
 
 /* create a socket for sending to remote machine */
 static gboolean
-gst_udpsrc_init_receive (GstUDPSrc * src)
+gst_udpsrc_start (GstBaseSrc * bsrc)
 {
   guint bc_val;
   gint reuse = 1;
   struct sockaddr_in my_addr;
   int len, port;
+  GstUDPSrc *src;
+
+  src = GST_UDPSRC (bsrc);
 
   memset (&src->myaddr, 0, sizeof (src->myaddr));
   src->myaddr.sin_family = AF_INET;     /* host byte order */
@@ -328,15 +326,15 @@ gst_udpsrc_init_receive (GstUDPSrc * src)
   src->myaddr.sin_addr.s_addr = INADDR_ANY;
 
   if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
-    goto error;
+    goto no_socket;
 
   if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
           sizeof (reuse)) < 0)
-    goto error;
+    goto setsockopt_error;
 
   if (bind (src->sock, (struct sockaddr *) &src->myaddr,
           sizeof (src->myaddr)) < 0)
-    goto error;
+    goto bind_error;
 
   if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) {
     if (src->multi_addr.imr_multiaddr.s_addr) {
@@ -347,7 +345,9 @@ gst_udpsrc_init_receive (GstUDPSrc * src)
   }
 
   len = sizeof (my_addr);
-  getsockname (src->sock, (struct sockaddr *) &my_addr, &len);
+  if (getsockname (src->sock, (struct sockaddr *) &my_addr, &len) < 0)
+    goto getsockname_error;
+
   port = ntohs (my_addr.sin_port);
   if (port != src->port) {
     src->port = port;
@@ -360,102 +360,41 @@ gst_udpsrc_init_receive (GstUDPSrc * src)
 
   return TRUE;
 
-error:
+  /* ERRORS */
+no_socket:
   {
-    perror ("open");
+    GST_DEBUG ("no_socket");
     return FALSE;
   }
-}
-
-static void
-gst_udpsrc_close (GstUDPSrc * src)
-{
-  if (src->sock != -1) {
-    close (src->sock);
-    src->sock = -1;
+setsockopt_error:
+  {
+    GST_DEBUG ("setsockopt failed");
+    return FALSE;
   }
-}
-
-static gboolean
-gst_udpsrc_activate (GstPad * pad, GstActivateMode mode)
-{
-  gboolean result;
-  GstUDPSrc *udpsrc;
-
-  udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad));
-
-  switch (mode) {
-    case GST_ACTIVATE_PUSH:
-      /* if we have a scheduler we can start the task */
-      if (GST_ELEMENT_SCHEDULER (udpsrc)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (udpsrc),
-            (GstTaskFunction) gst_udpsrc_loop, pad);
-
-        gst_task_start (GST_RPAD_TASK (pad));
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
-      break;
-    case GST_ACTIVATE_PULL:
-      result = FALSE;
-      break;
-    case GST_ACTIVATE_NONE:
-      /* step 1, unblock clock sync (if any) */
-
-      /* step 2, make sure streaming finishes */
-      GST_STREAM_LOCK (pad);
-      gst_udpsrc_close (udpsrc);
-
-      /* step 3, stop the task */
-      if (GST_RPAD_TASK (pad)) {
-        gst_task_stop (GST_RPAD_TASK (pad));
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
-      break;
+bind_error:
+  {
+    GST_DEBUG ("bind failed");
+    return FALSE;
+  }
+getsockname_error:
+  {
+    GST_DEBUG ("getsockname failed");
+    return FALSE;
   }
-  return result;
 }
 
-static GstElementStateReturn
-gst_udpsrc_change_state (GstElement * element)
+static gboolean
+gst_udpsrc_stop (GstBaseSrc * bsrc)
 {
-  GstElementStateReturn ret;
   GstUDPSrc *src;
-  gint transition;
 
-  src = GST_UDPSRC (element);
+  src = GST_UDPSRC (bsrc);
 
-  transition = GST_STATE_TRANSITION (element);
-
-  switch (transition) {
-    case GST_STATE_READY_TO_PAUSED:
-      if (!gst_udpsrc_init_receive (src))
-        goto no_init;
-      break;
-    default:
-      break;
-  }
-
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
-  switch (transition) {
-    default:
-      break;
-  }
-
-  return ret;
-
-no_init:
-  {
-    GST_DEBUG ("could not init udp socket");
-    return GST_STATE_FAILURE;
+  if (src->sock != -1) {
+    close (src->sock);
+    src->sock = -1;
   }
+  return TRUE;
 }
 
 /*** GSTURIHANDLER INTERFACE *************************************************/
index a347eb2..c39cbe7 100644 (file)
 #define __GST_UDPSRC_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
 
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
 
 #include <errno.h>
 #include <string.h>
@@ -53,14 +52,9 @@ typedef struct _GstUDPSrc GstUDPSrc;
 typedef struct _GstUDPSrcClass GstUDPSrcClass;
 
 struct _GstUDPSrc {
-  GstElement element;
-
-  /* pads */
-  GstPad *sinkpad,
-        *srcpad;
+  GstPushSrc parent;
 
   gchar *uri;
-
   int port;
   int sock;
   gchar *multi_group;
@@ -74,15 +68,12 @@ struct _GstUDPSrc {
 };
 
 struct _GstUDPSrcClass {
-  GstElementClass parent_class;
+  GstPushSrcClass parent_class;
 };
 
 GType gst_udpsrc_get_type(void);
 
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_END_DECLS
 
 
 #endif /* __GST_UDPSRC_H__ */