Add RTSP channel object for async io
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 18 Feb 2009 16:42:59 +0000 (17:42 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 18 Feb 2009 16:42:59 +0000 (17:42 +0100)
Add a GstRTSPChannel object that wraps a GSource around the RTSP connection so
that the connection can be monitored from a maincontext. This allows us to
operate in ASYNC mode, which is handy when building a server.

Rework the old code to use the async code under the hood.

API: gst_rtsp_channel_new()
API: gst_rtsp_channel_unref()
API: gst_rtsp_channel_attach()
API: gst_rtsp_channel_queue_message()

docs/libs/gst-plugins-base-libs-sections.txt
gst-libs/gst/rtsp/gstrtspconnection.c
gst-libs/gst/rtsp/gstrtspconnection.h

index 36713376bc4eeb0d452badb158769976831b9638..468038016531fd59225b2c8a03e02f3ad8bd50cc 100644 (file)
@@ -1205,6 +1205,13 @@ gst_rtsp_connection_next_timeout
 gst_rtsp_connection_reset_timeout
 gst_rtsp_connection_flush
 gst_rtsp_connection_set_auth
+
+GstRTSPChannel
+GstRTSPChannelFuncs
+gst_rtsp_channel_new
+gst_rtsp_channel_unref
+gst_rtsp_channel_attach
+gst_rtsp_channel_queue_message
 </SECTION>
 
 <SECTION>
index fbf2db09efa0f2fed1e82462175ad0a37fed4616..33cec5ba79b26264eaba9b5e856b1214ff7bb7a7 100644 (file)
 #include "md5.h"
 
 #ifdef G_OS_WIN32
-#define FIONREAD_TYPE gulong
-#define IOCTL_SOCKET ioctlsocket
 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
 #define CLOSE_SOCKET(sock) closesocket (sock)
-#define ERRNO_IS_NOT_EAGAIN (WSAGetLastError () != WSAEWOULDBLOCK)
-#define ERRNO_IS_NOT_EINTR (WSAGetLastError () != WSAEINTR)
+#define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
+#define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
 /* According to Microsoft's connect() documentation this one returns
  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
-#define ERRNO_IS_NOT_EINPROGRESS (WSAGetLastError () != WSAEWOULDBLOCK)
+#define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
 #else
-#define FIONREAD_TYPE gint
-#define IOCTL_SOCKET ioctl
 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
 #define CLOSE_SOCKET(sock) close (sock)
-#define ERRNO_IS_NOT_EAGAIN (errno != EAGAIN)
-#define ERRNO_IS_NOT_EINTR (errno != EINTR)
-#define ERRNO_IS_NOT_EINPROGRESS (errno != EINPROGRESS)
+#define ERRNO_IS_EAGAIN (errno == EAGAIN)
+#define ERRNO_IS_EINTR (errno == EINTR)
+#define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
 #endif
 
 #ifdef G_OS_WIN32
@@ -135,6 +131,35 @@ inet_aton (const char *c, struct in_addr *paddr)
 }
 #endif
 
+enum
+{
+  STATE_START = 0,
+  STATE_DATA_HEADER,
+  STATE_DATA_BODY,
+  STATE_READ_LINES,
+  STATE_END,
+  STATE_LAST
+};
+
+/* a structure for constructing RTSPMessages */
+typedef struct
+{
+  gint state;
+  guint8 buffer[4096];
+  guint offset;
+
+  guint line;
+  guint8 *body_data;
+  glong body_len;
+} GstRTSPBuilder;
+
+static void
+build_reset (GstRTSPBuilder * builder)
+{
+  g_free (builder->body_data);
+  memset (builder, 0, sizeof (builder));
+}
+
 /**
  * gst_rtsp_connection_create:
  * @url: a #GstRTSPUrl 
@@ -161,6 +186,7 @@ gst_rtsp_connection_create (GstRTSPUrl * url, GstRTSPConnection ** conn)
   newconn->url = url;
   newconn->fd.fd = -1;
   newconn->timer = g_timer_new ();
+  newconn->timeout = 60;
 
   newconn->auth_method = GST_RTSP_AUTH_NONE;
   newconn->username = NULL;
@@ -269,7 +295,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
   ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in));
   if (ret == 0)
     goto done;
-  if (ERRNO_IS_NOT_EINPROGRESS)
+  if (!ERRNO_IS_EINPROGRESS)
     goto sys_error;
 
   /* wait for connect to complete up to the specified timeout or until we got
@@ -288,8 +314,11 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
     goto sys_error;
 
   /* we can still have an error connecting on windows */
-  if (gst_poll_fd_has_error (conn->fdset, &conn->fd))
+  if (gst_poll_fd_has_error (conn->fdset, &conn->fd)) {
+    socklen_t len = sizeof (errno);
+    getsockopt (conn->fd.fd, SOL_SOCKET, SO_ERROR, &errno, &len);
     goto sys_error;
+  }
 
   gst_poll_fd_ignored (conn->fdset, &conn->fd);
 
@@ -485,6 +514,94 @@ add_date_header (GstRTSPMessage * message)
   gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
 }
 
+static GstRTSPResult
+write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
+{
+  guint left;
+
+  if (*idx > size)
+    return GST_RTSP_ERROR;
+
+  left = size - *idx;
+
+  while (left) {
+    gint r;
+
+    r = WRITE_SOCKET (fd, &buffer[*idx], left);
+    if (r == 0) {
+      return GST_RTSP_EINTR;
+    } else if (r < 0) {
+      if (ERRNO_IS_EAGAIN)
+        return GST_RTSP_EINTR;
+      if (!ERRNO_IS_EINTR)
+        return GST_RTSP_ESYS;
+    } else {
+      left -= r;
+      *idx += r;
+    }
+  }
+  return GST_RTSP_OK;
+}
+
+static GstRTSPResult
+read_bytes (gint fd, guint8 * buffer, guint * idx, guint size)
+{
+  guint left;
+
+  if (*idx > size)
+    return GST_RTSP_ERROR;
+
+  left = size - *idx;
+
+  while (left) {
+    gint r;
+
+    r = READ_SOCKET (fd, &buffer[*idx], left);
+    if (r == 0) {
+      return GST_RTSP_EEOF;
+    } else if (r < 0) {
+      if (ERRNO_IS_EAGAIN)
+        return GST_RTSP_EINTR;
+      if (!ERRNO_IS_EINTR)
+        return GST_RTSP_ESYS;
+    } else {
+      left -= r;
+      *idx += r;
+    }
+  }
+  return GST_RTSP_OK;
+}
+
+static GstRTSPResult
+read_line (gint fd, guint8 * buffer, guint * idx, guint size)
+{
+  while (TRUE) {
+    guint8 c;
+    gint r;
+
+    r = READ_SOCKET (fd, &c, 1);
+    if (r == 0) {
+      return GST_RTSP_EEOF;
+    } else if (r < 0) {
+      if (ERRNO_IS_EAGAIN)
+        return GST_RTSP_EINTR;
+      if (!ERRNO_IS_EINTR)
+        return GST_RTSP_ESYS;
+    } else {
+      if (c == '\n')            /* end on \n */
+        break;
+      if (c == '\r')            /* ignore \r */
+        continue;
+
+      if (*idx < size - 1)
+        buffer[(*idx)++] = c;
+    }
+  }
+  buffer[*idx] = '\0';
+
+  return GST_RTSP_OK;
+}
+
 /**
  * gst_rtsp_connection_write:
  * @conn: a #GstRTSPConnection
@@ -504,9 +621,10 @@ GstRTSPResult
 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
     guint size, GTimeVal * timeout)
 {
-  guint towrite;
+  guint offset;
   gint retval;
   GstClockTime to;
+  GstRTSPResult res;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
@@ -520,11 +638,17 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
 
   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
 
-  towrite = size;
+  offset = 0;
 
-  while (towrite > 0) {
-    gint written;
+  while (TRUE) {
+    /* try to write */
+    res = write_bytes (conn->fd.fd, data, &offset, size);
+    if (res == GST_RTSP_OK)
+      break;
+    if (res != GST_RTSP_EINTR)
+      goto write_error;
 
+    /* not all is written, wait until we can write more */
     do {
       retval = gst_poll_wait (conn->fdset, to);
     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
@@ -538,16 +662,6 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
       else
         goto select_error;
     }
-
-    /* now we can write */
-    written = WRITE_SOCKET (conn->fd.fd, data, towrite);
-    if (written < 0) {
-      if (ERRNO_IS_NOT_EAGAIN && ERRNO_IS_NOT_EINTR)
-        goto write_error;
-    } else {
-      towrite -= written;
-      data += written;
-    }
   }
   return GST_RTSP_OK;
 
@@ -566,48 +680,14 @@ stopped:
   }
 write_error:
   {
-    return GST_RTSP_ESYS;
+    return res;
   }
 }
 
-/**
- * gst_rtsp_connection_send:
- * @conn: a #GstRTSPConnection
- * @message: the message to send
- * @timeout: a timeout value or #NULL
- *
- * Attempt to send @message to the connected @conn, blocking up to
- * the specified @timeout. @timeout can be #NULL, in which case this function
- * might block forever.
- * 
- * This function can be cancelled with gst_rtsp_connection_flush().
- *
- * Returns: #GST_RTSP_OK on success.
- */
-GstRTSPResult
-gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
-    GTimeVal * timeout)
+static GString *
+message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
 {
   GString *str = NULL;
-  GstRTSPResult res;
-
-#ifdef G_OS_WIN32
-  WSADATA w;
-  int error;
-#endif
-
-  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
-  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
-
-#ifdef G_OS_WIN32
-  error = WSAStartup (0x0202, &w);
-
-  if (error)
-    goto startup_error;
-
-  if (w.wVersion != 0x0202)
-    goto version_error;
-#endif
 
   str = g_string_new ("");
 
@@ -649,7 +729,8 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
       break;
     }
     default:
-      g_return_val_if_reached (GST_RTSP_EINVAL);
+      g_string_free (str, TRUE);
+      g_return_val_if_reached (NULL);
       break;
   }
 
@@ -680,6 +761,51 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
     }
   }
 
+  return str;
+}
+
+/**
+ * gst_rtsp_connection_send:
+ * @conn: a #GstRTSPConnection
+ * @message: the message to send
+ * @timeout: a timeout value or #NULL
+ *
+ * Attempt to send @message to the connected @conn, blocking up to
+ * the specified @timeout. @timeout can be #NULL, in which case this function
+ * might block forever.
+ * 
+ * This function can be cancelled with gst_rtsp_connection_flush().
+ *
+ * Returns: #GST_RTSP_OK on success.
+ */
+GstRTSPResult
+gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
+    GTimeVal * timeout)
+{
+  GString *str = NULL;
+  GstRTSPResult res;
+
+#ifdef G_OS_WIN32
+  WSADATA w;
+  int error;
+#endif
+
+  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+
+#ifdef G_OS_WIN32
+  error = WSAStartup (0x0202, &w);
+
+  if (error)
+    goto startup_error;
+
+  if (w.wVersion != 0x0202)
+    goto version_error;
+#endif
+
+  if (!(str = message_to_string (conn, message)))
+    goto no_message;
+
   /* write request */
   res =
       gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len, timeout);
@@ -688,6 +814,11 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
 
   return res;
 
+no_message:
+  {
+    g_warning ("Wrong message");
+    return GST_RTSP_EINVAL;
+  }
 #ifdef G_OS_WIN32
 startup_error:
   {
@@ -704,47 +835,8 @@ version_error:
 #endif
 }
 
-static GstRTSPResult
-read_line (gint fd, gchar * buffer, guint size)
-{
-  guint idx;
-  gchar c;
-  gint r;
-
-  idx = 0;
-  while (TRUE) {
-    r = READ_SOCKET (fd, &c, 1);
-    if (r == 0) {
-      goto eof;
-    } else if (r < 0) {
-      if (ERRNO_IS_NOT_EAGAIN && ERRNO_IS_NOT_EINTR)
-        goto read_error;
-    } else {
-      if (c == '\n')            /* end on \n */
-        break;
-      if (c == '\r')            /* ignore \r */
-        continue;
-
-      if (idx < size - 1)
-        buffer[idx++] = c;
-    }
-  }
-  buffer[idx] = '\0';
-
-  return GST_RTSP_OK;
-
-eof:
-  {
-    return GST_RTSP_EEOF;
-  }
-read_error:
-  {
-    return GST_RTSP_ESYS;
-  }
-}
-
 static void
-read_string (gchar * dest, gint size, gchar ** src)
+parse_string (gchar * dest, gint size, gchar ** src)
 {
   gint idx;
 
@@ -763,7 +855,7 @@ read_string (gchar * dest, gint size, gchar ** src)
 }
 
 static void
-read_key (gchar * dest, gint size, gchar ** src)
+parse_key (gchar * dest, gint size, gchar ** src)
 {
   gint idx;
 
@@ -778,7 +870,7 @@ read_key (gchar * dest, gint size, gchar ** src)
 }
 
 static GstRTSPResult
-parse_response_status (gchar * buffer, GstRTSPMessage * msg)
+parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
 {
   GstRTSPResult res;
   gchar versionstr[20];
@@ -786,10 +878,10 @@ parse_response_status (gchar * buffer, GstRTSPMessage * msg)
   gint code;
   gchar *bptr;
 
-  bptr = buffer;
+  bptr = (gchar *) buffer;
 
-  read_string (versionstr, sizeof (versionstr), &bptr);
-  read_string (codestr, sizeof (codestr), &bptr);
+  parse_string (versionstr, sizeof (versionstr), &bptr);
+  parse_string (codestr, sizeof (codestr), &bptr);
   code = atoi (codestr);
 
   while (g_ascii_isspace (*bptr))
@@ -814,7 +906,7 @@ parse_error:
 }
 
 static GstRTSPResult
-parse_request_line (gchar * buffer, GstRTSPMessage * msg)
+parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
 {
   GstRTSPResult res = GST_RTSP_OK;
   gchar versionstr[20];
@@ -823,16 +915,16 @@ parse_request_line (gchar * buffer, GstRTSPMessage * msg)
   gchar *bptr;
   GstRTSPMethod method;
 
-  bptr = buffer;
+  bptr = (gchar *) buffer;
 
-  read_string (methodstr, sizeof (methodstr), &bptr);
+  parse_string (methodstr, sizeof (methodstr), &bptr);
   method = gst_rtsp_find_method (methodstr);
 
-  read_string (urlstr, sizeof (urlstr), &bptr);
+  parse_string (urlstr, sizeof (urlstr), &bptr);
   if (*urlstr == '\0')
     res = GST_RTSP_EPARSE;
 
-  read_string (versionstr, sizeof (versionstr), &bptr);
+  parse_string (versionstr, sizeof (versionstr), &bptr);
 
   if (*bptr != '\0')
     res = GST_RTSP_EPARSE;
@@ -855,16 +947,16 @@ parse_request_line (gchar * buffer, GstRTSPMessage * msg)
 
 /* parsing lines means reading a Key: Value pair */
 static GstRTSPResult
-parse_line (gchar * buffer, GstRTSPMessage * msg)
+parse_line (guint8 * buffer, GstRTSPMessage * msg)
 {
   gchar key[32];
   gchar *bptr;
   GstRTSPHeaderField field;
 
-  bptr = buffer;
+  bptr = (gchar *) buffer;
 
   /* read key */
-  read_key (key, sizeof (key), &bptr);
+  parse_key (key, sizeof (key), &bptr);
   if (*bptr != ':')
     goto no_column;
 
@@ -885,31 +977,187 @@ no_column:
   }
 }
 
+/* returns:
+ *  GST_RTSP_OK when a complete message was read.
+ *  GST_RTSP_EEOF: when the socket is closed
+ *  GST_RTSP_EINTR: when more data is needed.
+ *  GST_RTSP_..: some other error occured.
+ */
+static GstRTSPResult
+build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
+    GstRTSPConnection * conn)
+{
+  GstRTSPResult res;
+
+  while (TRUE) {
+    switch (builder->state) {
+      case STATE_START:
+        builder->offset = 0;
+        res =
+            read_bytes (conn->fd.fd, (guint8 *) builder->buffer,
+            &builder->offset, 1);
+        if (res != GST_RTSP_OK)
+          goto done;
+
+        /* we have 1 bytes now and we can see if this is a data message or
+         * not */
+        if (builder->buffer[0] == '$') {
+          /* data message, prepare for the header */
+          builder->state = STATE_DATA_HEADER;
+        } else {
+          builder->line = 0;
+          builder->state = STATE_READ_LINES;
+        }
+        break;
+      case STATE_DATA_HEADER:
+      {
+        res =
+            read_bytes (conn->fd.fd, (guint8 *) builder->buffer,
+            &builder->offset, 4);
+        if (res != GST_RTSP_OK)
+          goto done;
+
+        gst_rtsp_message_init_data (message, builder->buffer[1]);
+
+        builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
+        builder->body_data = g_malloc (builder->body_len + 1);
+        builder->body_data[builder->body_len] = '\0';
+        builder->offset = 0;
+        builder->state = STATE_DATA_BODY;
+        break;
+      }
+      case STATE_DATA_BODY:
+      {
+        res = read_bytes (conn->fd.fd, builder->body_data, &builder->offset,
+            builder->body_len);
+        if (res != GST_RTSP_OK)
+          goto done;
+
+        /* we have the complete body now */
+        gst_rtsp_message_take_body (message,
+            (guint8 *) builder->body_data, builder->body_len);
+        builder->body_data = NULL;
+        builder->body_len = 0;
+
+        builder->state = STATE_END;
+        break;
+      }
+      case STATE_READ_LINES:
+      {
+        res = read_line (conn->fd.fd, builder->buffer, &builder->offset,
+            sizeof (builder->buffer));
+        if (res != GST_RTSP_OK)
+          goto done;
+
+        /* we have a regular response */
+        if (builder->buffer[0] == '\r') {
+          builder->buffer[0] = '\0';
+        }
+
+        if (builder->buffer[0] == '\0') {
+          gchar *hdrval;
+
+          /* empty line, end of message header */
+          /* see if there is a Content-Length header */
+          if (gst_rtsp_message_get_header (message,
+                  GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
+            /* there is, prepare to read the body */
+            builder->body_len = atol (hdrval);
+            builder->body_data = g_malloc (builder->body_len + 1);
+            builder->body_data[builder->body_len] = '\0';
+            builder->offset = 0;
+            builder->state = STATE_DATA_BODY;
+          } else {
+            builder->state = STATE_END;
+          }
+          break;
+        }
+
+        /* we have a line */
+        if (builder->line == 0) {
+          /* first line, check for response status */
+          if (memcmp (builder->buffer, "RTSP", 4) == 0) {
+            res = parse_response_status (builder->buffer, message);
+          } else {
+            res = parse_request_line (builder->buffer, message);
+          }
+        } else {
+          /* else just parse the line */
+          parse_line (builder->buffer, message);
+        }
+        builder->line++;
+        builder->offset = 0;
+        break;
+      }
+      case STATE_END:
+      {
+        gchar *session_id;
+
+        /* save session id in the connection for further use */
+        if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
+                &session_id, 0) == GST_RTSP_OK) {
+          gint maxlen, i;
+
+          maxlen = sizeof (conn->session_id) - 1;
+          /* the sessionid can have attributes marked with ;
+           * Make sure we strip them */
+          for (i = 0; session_id[i] != '\0'; i++) {
+            if (session_id[i] == ';') {
+              maxlen = i;
+              /* parse timeout */
+              do {
+                i++;
+              } while (g_ascii_isspace (session_id[i]));
+              if (g_str_has_prefix (&session_id[i], "timeout=")) {
+                gint to;
+
+                /* if we parsed something valid, configure */
+                if ((to = atoi (&session_id[i + 9])) > 0)
+                  conn->timeout = to;
+              }
+              break;
+            }
+          }
+
+          /* make sure to not overflow */
+          strncpy (conn->session_id, session_id, maxlen);
+          conn->session_id[maxlen] = '\0';
+        }
+        res = GST_RTSP_OK;
+        goto done;
+      }
+      default:
+        res = GST_RTSP_ERROR;
+        break;
+    }
+  }
+done:
+  return res;
+}
+
 /**
- * gst_rtsp_connection_read_internal:
+ * gst_rtsp_connection_read:
  * @conn: a #GstRTSPConnection
  * @data: the data to read
  * @size: the size of @data
  * @timeout: a timeout value or #NULL
- * @allow_interrupt: can the pending read be interrupted
  *
  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
  * the specified @timeout. @timeout can be #NULL, in which case this function
  * might block forever.
- * 
- * This function can be cancelled with gst_rtsp_connection_flush() only if
- * @allow_interrupt is set.
+ *
+ * This function can be cancelled with gst_rtsp_connection_flush().
  *
  * Returns: #GST_RTSP_OK on success.
  */
-static GstRTSPResult
-gst_rtsp_connection_read_internal (GstRTSPConnection * conn, guint8 * data,
-    guint size, GTimeVal * timeout, gboolean allow_interrupt)
+GstRTSPResult
+gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
+    GTimeVal * timeout)
 {
-  guint toread;
+  guint offset;
   gint retval;
   GstClockTime to;
-  FIONREAD_TYPE avail;
+  GstRTSPResult res;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -918,53 +1166,39 @@ gst_rtsp_connection_read_internal (GstRTSPConnection * conn, guint8 * data,
   if (size == 0)
     return GST_RTSP_OK;
 
-  toread = size;
+  offset = 0;
 
   /* configure timeout if any */
   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
 
-  /* if the call fails, just go in the select.. it should not fail. Else if
-   * there is enough data to read, skip the select call al together.*/
-  if (IOCTL_SOCKET (conn->fd.fd, FIONREAD, &avail) < 0)
-    avail = 0;
-  else if (avail >= toread)
-    goto do_read;
-
-  gst_poll_set_controllable (conn->fdset, allow_interrupt);
+  gst_poll_set_controllable (conn->fdset, TRUE);
   gst_poll_fd_ctl_write (conn->fdset, &conn->fd, FALSE);
   gst_poll_fd_ctl_read (conn->fdset, &conn->fd, TRUE);
 
-  while (toread > 0) {
-    gint bytes;
+  while (TRUE) {
+    res = read_bytes (conn->fd.fd, data, &offset, size);
+    if (res == GST_RTSP_EEOF)
+      goto eof;
+    if (res == GST_RTSP_OK)
+      break;
+    if (res != GST_RTSP_EINTR)
+      goto read_error;
 
     do {
       retval = gst_poll_wait (conn->fdset, to);
     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
 
+    /* check for timeout */
+    if (retval == 0)
+      goto select_timeout;
+
     if (retval == -1) {
       if (errno == EBUSY)
         goto stopped;
       else
         goto select_error;
     }
-
-    /* check for timeout */
-    if (retval == 0)
-      goto select_timeout;
-
-  do_read:
-    /* if we get here there is activity on the real fd since the select
-     * completed and the control socket was not readable. */
-    bytes = READ_SOCKET (conn->fd.fd, data, toread);
-    if (bytes == 0) {
-      goto eof;
-    } else if (bytes < 0) {
-      if (ERRNO_IS_NOT_EAGAIN && ERRNO_IS_NOT_EINTR)
-        goto read_error;
-    } else {
-      toread -= bytes;
-      data += bytes;
-    }
+    gst_poll_set_controllable (conn->fdset, FALSE);
   }
   return GST_RTSP_OK;
 
@@ -987,67 +1221,11 @@ eof:
   }
 read_error:
   {
-    return GST_RTSP_ESYS;
-  }
-}
-
-/**
- * gst_rtsp_connection_read:
- * @conn: a #GstRTSPConnection
- * @data: the data to read
- * @size: the size of @data
- * @timeout: a timeout value or #NULL
- *
- * Attempt to read @size bytes into @data from the connected @conn, blocking up to
- * the specified @timeout. @timeout can be #NULL, in which case this function
- * might block forever.
- *
- * This function can be cancelled with gst_rtsp_connection_flush().
- *
- * Returns: #GST_RTSP_OK on success.
- */
-GstRTSPResult
-gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
-    GTimeVal * timeout)
-{
-  return gst_rtsp_connection_read_internal (conn, data, size, timeout, TRUE);
-}
-
-
-static GstRTSPResult
-read_body (GstRTSPConnection * conn, glong content_length, GstRTSPMessage * msg,
-    GTimeVal * timeout)
-{
-  guint8 *body;
-  GstRTSPResult res;
-
-  if (content_length <= 0) {
-    body = NULL;
-    content_length = 0;
-    goto done;
-  }
-
-  body = g_malloc (content_length + 1);
-  body[content_length] = '\0';
-
-  GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, body, content_length,
-          timeout, FALSE), read_error);
-
-  content_length += 1;
-
-done:
-  gst_rtsp_message_take_body (msg, (guint8 *) body, content_length);
-
-  return GST_RTSP_OK;
-
-  /* ERRORS */
-read_error:
-  {
-    g_free (body);
     return res;
   }
 }
 
+
 /**
  * gst_rtsp_connection_receive:
  * @conn: a #GstRTSPConnection
@@ -1066,138 +1244,78 @@ GstRTSPResult
 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
     GTimeVal * timeout)
 {
-  gchar buffer[4096];
-  gint line;
-  glong content_length;
   GstRTSPResult res;
-  gboolean need_body;
+  GstRTSPBuilder builder = { 0 };
+  gint retval;
+  GstClockTime to;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
 
-  line = 0;
-
-  need_body = TRUE;
-
-  res = GST_RTSP_OK;
-  /* parse first line and headers */
-  while (res == GST_RTSP_OK) {
-    guint8 c;
-
-    /* read first character, this identifies data messages */
-    /* This is the only read() that we allow to be interrupted */
-    GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, &c, 1, timeout,
-            TRUE), read_error);
-
-    /* check for data packet, first character is $ */
-    if (c == '$') {
-      guint16 size;
-
-      /* data packets are $<1 byte channel><2 bytes length,BE><data bytes> */
-
-      /* read channel, which is the next char */
-      GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, &c, 1, timeout,
-              FALSE), read_error);
-
-      /* now we create a data message */
-      gst_rtsp_message_init_data (message, c);
-
-      /* next two bytes are the length of the data */
-      GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn,
-              (guint8 *) & size, 2, timeout, FALSE), read_error);
+  /* configure timeout if any */
+  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
 
-      size = GUINT16_FROM_BE (size);
+  gst_poll_set_controllable (conn->fdset, TRUE);
+  gst_poll_fd_ctl_write (conn->fdset, &conn->fd, FALSE);
+  gst_poll_fd_ctl_read (conn->fdset, &conn->fd, TRUE);
 
-      /* and read the body */
-      res = read_body (conn, size, message, timeout);
-      need_body = FALSE;
+  while (TRUE) {
+    res = build_next (&builder, message, conn);
+    if (res == GST_RTSP_EEOF)
+      goto eof;
+    if (res == GST_RTSP_OK)
       break;
-    } else {
-      gint offset = 0;
+    if (res != GST_RTSP_EINTR)
+      goto read_error;
 
-      /* we have a regular response */
-      if (c != '\r') {
-        buffer[0] = c;
-        offset = 1;
-      }
-      /* should not happen */
-      if (c == '\n')
-        break;
-
-      /* read lines */
-      GST_RTSP_CHECK (read_line (conn->fd.fd, buffer + offset,
-              sizeof (buffer) - offset), read_error);
+    do {
+      retval = gst_poll_wait (conn->fdset, to);
+    } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
 
-      if (buffer[0] == '\0')
-        break;
+    /* check for timeout */
+    if (retval == 0)
+      goto select_timeout;
 
-      if (line == 0) {
-        /* first line, check for response status */
-        if (g_str_has_prefix (buffer, "RTSP")) {
-          res = parse_response_status (buffer, message);
-        } else {
-          res = parse_request_line (buffer, message);
-        }
-      } else {
-        /* else just parse the line */
-        parse_line (buffer, message);
-      }
+    if (retval == -1) {
+      if (errno == EBUSY)
+        goto stopped;
+      else
+        goto select_error;
     }
-    line++;
+    gst_poll_set_controllable (conn->fdset, FALSE);
   }
 
-  /* read the rest of the body if needed */
-  if (need_body) {
-    gchar *session_id;
-    gchar *hdrval;
-
-    /* see if there is a Content-Length header */
-    if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CONTENT_LENGTH,
-            &hdrval, 0) == GST_RTSP_OK) {
-      /* there is, read the body */
-      content_length = atol (hdrval);
-      GST_RTSP_CHECK (read_body (conn, content_length, message, timeout),
-          read_error);
-    }
+  /* we have a message here */
+  build_reset (&builder);
 
-    /* save session id in the connection for further use */
-    if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
-            &session_id, 0) == GST_RTSP_OK) {
-      gint maxlen, i;
-
-      /* default session timeout */
-      conn->timeout = 60;
-
-      maxlen = sizeof (conn->session_id) - 1;
-      /* the sessionid can have attributes marked with ;
-       * Make sure we strip them */
-      for (i = 0; session_id[i] != '\0'; i++) {
-        if (session_id[i] == ';') {
-          maxlen = i;
-          /* parse timeout */
-          do {
-            i++;
-          } while (g_ascii_isspace (session_id[i]));
-          if (g_str_has_prefix (&session_id[i], "timeout=")) {
-            gint to;
-
-            /* if we parsed something valid, configure */
-            if ((to = atoi (&session_id[i + 9])) > 0)
-              conn->timeout = to;
-          }
-          break;
-        }
-      }
+  return GST_RTSP_OK;
 
-      /* make sure to not overflow */
-      strncpy (conn->session_id, session_id, maxlen);
-      conn->session_id[maxlen] = '\0';
-    }
+  /* ERRORS */
+select_error:
+  {
+    res = GST_RTSP_ESYS;
+    goto cleanup;
+  }
+select_timeout:
+  {
+    res = GST_RTSP_ETIMEOUT;
+    goto cleanup;
+  }
+stopped:
+  {
+    res = GST_RTSP_EINTR;
+    goto cleanup;
+  }
+eof:
+  {
+    res = GST_RTSP_EEOF;
+    goto cleanup;
   }
-  return res;
-
 read_error:
+cleanup:
   {
+    build_reset (&builder);
+    gst_rtsp_message_unset (message);
     return res;
   }
 }
@@ -1636,3 +1754,322 @@ gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
 
   return conn->ip;
 }
+
+#define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
+#define WRITE_COND  (G_IO_OUT | G_IO_ERR)
+
+typedef struct
+{
+  GString *str;
+  guint cseq;
+} GstRTSPRec;
+
+/* async functions */
+struct _GstRTSPChannel
+{
+  GSource source;
+
+  GstRTSPConnection *conn;
+
+  GstRTSPBuilder builder;
+  GstRTSPMessage message;
+
+  GPollFD readfd;
+  GPollFD writefd;
+  gboolean write_added;
+
+  /* queued message for transmission */
+  GList *messages;
+  guint8 *write_data;
+  guint write_off;
+  guint write_len;
+  guint write_cseq;
+
+  GstRTSPChannelFuncs funcs;
+
+  gpointer user_data;
+  GDestroyNotify notify;
+};
+
+static gboolean
+gst_rtsp_source_prepare (GSource * source, gint * timeout)
+{
+  GstRTSPChannel *channel = (GstRTSPChannel *) source;
+
+  *timeout = (channel->conn->timeout * 1000);
+
+  return FALSE;
+}
+
+static gboolean
+gst_rtsp_source_check (GSource * source)
+{
+  GstRTSPChannel *channel = (GstRTSPChannel *) source;
+
+  if (channel->readfd.revents & READ_COND)
+    return TRUE;
+
+  if (channel->writefd.revents & WRITE_COND)
+    return TRUE;
+
+  return FALSE;
+}
+
+static gboolean
+gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
+    gpointer user_data)
+{
+  GstRTSPChannel *channel = (GstRTSPChannel *) source;
+  GstRTSPResult res;
+
+  /* first read as much as we can */
+  if (channel->readfd.revents & READ_COND) {
+    do {
+      res = build_next (&channel->builder, &channel->message, channel->conn);
+      if (res == GST_RTSP_EINTR)
+        break;
+      if (res == GST_RTSP_EEOF)
+        goto eof;
+      if (res != GST_RTSP_OK)
+        goto error;
+
+      if (channel->funcs.message_received)
+        channel->funcs.message_received (channel, &channel->message,
+            channel->user_data);
+
+      gst_rtsp_message_unset (&channel->message);
+      build_reset (&channel->builder);
+    } while (FALSE);
+  }
+
+  if (channel->writefd.revents & WRITE_COND) {
+    do {
+      if (channel->write_data == NULL) {
+        GstRTSPRec *data;
+
+        if (!channel->messages)
+          goto done;
+
+        /* no data, get a new message from the queue */
+        data = channel->messages->data;
+        channel->messages =
+            g_list_delete_link (channel->messages, channel->messages);
+
+        channel->write_off = 0;
+        channel->write_len = data->str->len;
+        channel->write_data = (guint8 *) g_string_free (data->str, FALSE);
+        channel->write_cseq = data->cseq;
+
+        g_slice_free (GstRTSPRec, data);
+      }
+
+      res = write_bytes (channel->writefd.fd, channel->write_data,
+          &channel->write_off, channel->write_len);
+      if (res == GST_RTSP_EINTR)
+        break;
+      if (res != GST_RTSP_OK)
+        goto error;
+
+      if (channel->funcs.message_sent)
+        channel->funcs.message_sent (channel, channel->write_cseq,
+            channel->user_data);
+
+    done:
+      if (channel->messages == NULL && channel->write_added) {
+        g_source_remove_poll ((GSource *) channel, &channel->writefd);
+        channel->write_added = FALSE;
+        channel->writefd.revents = 0;
+      }
+      g_free (channel->write_data);
+      channel->write_data = NULL;
+    } while (FALSE);
+  }
+
+  return TRUE;
+
+  /* ERRORS */
+eof:
+  {
+    if (channel->funcs.closed)
+      channel->funcs.closed (channel, channel->user_data);
+    return FALSE;
+  }
+error:
+  {
+    if (channel->funcs.error)
+      channel->funcs.error (channel, res, channel->user_data);
+    return FALSE;
+  }
+}
+
+static void
+gst_rtsp_source_finalize (GSource * source)
+{
+  GstRTSPChannel *channel = (GstRTSPChannel *) source;
+  GList *walk;
+
+  build_reset (&channel->builder);
+
+  for (walk = channel->messages; walk; walk = g_list_next (walk)) {
+    GstRTSPRec *data = walk->data;
+
+    g_string_free (data->str, TRUE);
+    g_slice_free (GstRTSPRec, data);
+  }
+  g_list_free (channel->messages);
+  g_free (channel->write_data);
+
+  if (channel->notify)
+    channel->notify (channel->user_data);
+}
+
+static GSourceFuncs gst_rtsp_source_funcs = {
+  gst_rtsp_source_prepare,
+  gst_rtsp_source_check,
+  gst_rtsp_source_dispatch,
+  gst_rtsp_source_finalize
+};
+
+/**
+ * gst_rtsp_channel_new:
+ * @conn: a #GstRTSPConnection
+ * @funcs: channel functions
+ * @user_data: user data to pass to @funcs
+ *
+ * Create a channel object for @conn. The functions provided in @funcs will be
+ * called with @user_data when activity happened on the channel.
+ *
+ * The new channel is usually created so that it can be attached to a
+ * maincontext with gst_rtsp_channel_attach(). 
+ *
+ * @conn must exist for the entire lifetime of the channel.
+ *
+ * Returns: a #GstRTSPChannel that can be used for asynchronous RTSP
+ * communication. Free with gst_rtsp_channel_unref () after usage.
+ *
+ * Since: 0.10.23
+ */
+GstRTSPChannel *
+gst_rtsp_channel_new (GstRTSPConnection * conn,
+    GstRTSPChannelFuncs * funcs, gpointer user_data, GDestroyNotify notify)
+{
+  GstRTSPChannel *result;
+
+  g_return_val_if_fail (conn != NULL, NULL);
+  g_return_val_if_fail (funcs != NULL, NULL);
+
+  result = (GstRTSPChannel *) g_source_new (&gst_rtsp_source_funcs,
+      sizeof (GstRTSPChannel));
+
+  result->conn = conn;
+  result->builder.state = STATE_START;
+
+  result->readfd.fd = conn->fd.fd;
+  result->readfd.events = READ_COND;
+  result->readfd.revents = 0;
+
+  result->writefd.fd = conn->fd.fd;
+  result->writefd.events = WRITE_COND;
+  result->writefd.revents = 0;
+  result->write_added = FALSE;
+
+  result->funcs = *funcs;
+  result->user_data = user_data;
+  result->notify = notify;
+
+  /* only add the read fd, the write fd is only added when we have data
+   * to send. */
+  g_source_add_poll ((GSource *) result, &result->readfd);
+
+  return result;
+}
+
+/**
+ * gst_rtsp_channel_attach:
+ * @channel: a #GstRTSPChannel
+ * @context: a GMainContext (if NULL, the default context will be used)
+ *
+ * Adds a #GstRTSPChannel to a context so that it will be executed within that context.
+ *
+ * Returns: the ID (greater than 0) for the channel within the GMainContext. 
+ *
+ * Since: 0.10.23
+ */
+guint
+gst_rtsp_channel_attach (GstRTSPChannel * channel, GMainContext * context)
+{
+  g_return_val_if_fail (channel != NULL, 0);
+
+  return g_source_attach ((GSource *) channel, context);
+}
+
+/**
+ * gst_rtsp_channel_free:
+ * @channel: a #GstRTSPChannel
+ *
+ * Decreases the reference count of @channel by one. If the resulting reference
+ * count is zero the channel and associated memory will be destroyed.
+ *
+ * Since: 0.10.23
+ */
+void
+gst_rtsp_channel_unref (GstRTSPChannel * channel)
+{
+  g_return_if_fail (channel != NULL);
+
+  g_source_unref ((GSource *) channel);
+}
+
+/**
+ * gst_rtsp_channel_queue_message:
+ * @channel: a #GstRTSPChannel
+ * @message: a #GstRTSPMessage
+ *
+ * Queue a @message for transmission in @channel. The contents of this 
+ * message will be serialized and transmitted when the connection of the
+ * channel becomes writable.
+ *
+ * The return value of this function will be returned as the cseq argument in
+ * the message_sent callback.
+ *
+ * Returns: the sequence number of the message or -1 if the cseq could not be
+ * determined.
+ *
+ * Since: 0.10.23
+ */
+guint
+gst_rtsp_channel_queue_message (GstRTSPChannel * channel,
+    GstRTSPMessage * message)
+{
+  GstRTSPRec *data;
+  gchar *header;
+  guint cseq;
+
+  g_return_val_if_fail (channel != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+
+  /* get the cseq from the message, when we finish writing this message on the
+   * socket we will have to pass the cseq to the callback. */
+  if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ, &header,
+          0) == GST_RTSP_OK) {
+    cseq = atoi (header);
+  } else {
+    cseq = -1;
+  }
+
+  /* make a record with the message as a string ans cseq */
+  data = g_slice_new (GstRTSPRec);
+  data->str = message_to_string (channel->conn, message);
+  data->cseq = cseq;
+
+  /* add the record to a queue */
+  channel->messages = g_list_append (channel->messages, data);
+
+  /* make sure the main context will now also check for writability on the
+   * socket */
+  if (!channel->write_added) {
+    g_source_add_poll ((GSource *) channel, &channel->writefd);
+    channel->write_added = TRUE;
+  }
+  return cseq;
+}
index 80312f35965cb9ee76476008f7ff1476f32d85c3..33bee8f8f9019bd4290545b179b0b773c8e0723f 100644 (file)
@@ -127,6 +127,48 @@ GstRTSPResult      gst_rtsp_connection_set_qos_dscp  (GstRTSPConnection *conn,
 /* accessors */
 const gchar *      gst_rtsp_connection_get_ip        (const GstRTSPConnection *conn);
 
+/* async IO */
+
+/**
+ * GstRTSPChannel:
+ *
+ * Opaque RTSP channel object that can be used for asynchronous RTSP
+ * operations.
+ */
+typedef struct _GstRTSPChannel GstRTSPChannel;
+
+/**
+ * GstRTSPChannelFuncs:
+ * @message_received: callback when a message was received
+ * @message_sent: callback when a message was sent
+ * @closed: callback when the connection is closed
+ * @error: callback when an error occured
+ *
+ * Callback functions from a #GstRTSPChannel.
+ */
+typedef struct {
+  GstRTSPResult (*message_received) (GstRTSPChannel *channel, GstRTSPMessage *message,
+                                     gpointer user_data);
+  GstRTSPResult (*message_sent)     (GstRTSPChannel *channel, guint cseq, 
+                                     gpointer user_data);
+  GstRTSPResult (*closed)           (GstRTSPChannel *channel, gpointer user_data);
+  GstRTSPResult (*error)            (GstRTSPChannel *channel, GstRTSPResult result,
+                                     gpointer user_data);
+} GstRTSPChannelFuncs;
+
+GstRTSPChannel *   gst_rtsp_channel_new              (GstRTSPConnection *conn,
+                                                      GstRTSPChannelFuncs *funcs,
+                                                     gpointer user_data,
+                                                     GDestroyNotify notify);
+void               gst_rtsp_channel_unref            (GstRTSPChannel *channel);
+
+guint              gst_rtsp_channel_attach           (GstRTSPChannel *channel,
+                                                      GMainContext *context);
+
+guint              gst_rtsp_channel_queue_message    (GstRTSPChannel *channel,
+                                                      GstRTSPMessage *message);
+
+
 G_END_DECLS
 
 #endif /* __GST_RTSP_CONNECTION_H__ */