Use a more stateful response parser.
authorColin Guthrie <pulse@colin.guthr.ie>
Wed, 7 May 2008 00:35:10 +0000 (00:35 +0000)
committerColin Guthrie <pulse@colin.guthr.ie>
Wed, 8 Oct 2008 19:32:07 +0000 (20:32 +0100)
This makes things fully asyncronous.
Some of the continuation headerlist stuff could be moved to headerlist for neatness, but this is OK for now.

git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/coling@2373 fefdeb5f-60dc-0310-8127-8f9354f1896f

src/modules/rtp/rtsp.c

index 4f2411a..44cd80b 100644 (file)
 #include <pulsecore/macro.h>
 #include <pulsecore/strbuf.h>
 #include <pulsecore/poll.h>
+#include <pulsecore/ioline.h>
 
 #include "rtsp.h"
 
 struct pa_rtsp_context {
     pa_socket_client *sc;
     pa_iochannel *io;
-    pa_rtsp_cb_t callback;
-    void* userdata;
-    const char* useragent;
-    pa_headerlist* headers;
-    char* localip;
-    char* url;
-    uint32_t port;
-    uint32_t cseq;
-    char* session;
-    char* transport;
-    pa_rtsp_state state;
-};
+    pa_ioline *ioline;
 
-/*
- * read one line from the file descriptor
- * timeout: msec unit, -1 for infinite
- * if CR comes then following LF is expected
- * returned string in line is always null terminated, maxlen-1 is maximum string length
- */
-static int pa_read_line(pa_iochannel* io, char *line, int maxlen, int timeout)
-{
-    int i, rval;
-    int count;
-    int fd;
-    char ch;
-    struct pollfd pfds;
-
-    pa_assert(io);
-    fd = pa_iochannel_get_recv_fd(io);
-
-    count = 0;
-    *line = 0;
-    pfds.events = POLLIN;
-    pfds.fd = fd;
-
-    for (i=0; i<maxlen; ++i) {
-        if (!poll(&pfds, 1, timeout))
-            return 0;
-
-        rval = read(fd, &ch, 1);
-
-        if (-1 == rval) {
-            if (EAGAIN == errno)
-                return 0;
-            /*ERRMSG("%s:read error: %s\n", __func__, strerror(errno));*/
-            return -1;
-        }
-
-        if (0 == rval) {
-            /*INFMSG("%s:disconnected on the other end\n", __func__);*/
-            return -1;
-        }
-
-        if ('\n' == ch) {
-            *line = 0;
-            return count;
-        }
-
-        if ('\r' == ch)
-            continue;
+    pa_rtsp_cb_t callback;
 
-        *line++ = ch;
-        count++;
+    void *userdata;
+    const char *useragent;
 
-        if (count >= maxlen-1)
-            break;
-    }
+    pa_rtsp_state state;
+    uint8_t waiting;
 
-    *line = 0;
-    return count;
-}
+    pa_headerlist* headers;
+    char *last_header;
+    pa_strbuf *header_buffer;
+    pa_headerlist* response_headers;
 
+    char *localip;
+    char *url;
+    uint32_t port;
+    uint32_t cseq;
+    char *session;
+    char *transport;
+};
 
 static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd,
                         const char* content_type, const char* content,
@@ -172,8 +123,8 @@ static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd,
 
     /* Our packet is created... now we can send it :) */
     hdrs = pa_strbuf_tostring_free(buf);
-    pa_log_debug("Submitting request:");
-    pa_log_debug(hdrs);
+    /*pa_log_debug("Submitting request:");
+    pa_log_debug(hdrs);*/
     l = pa_iochannel_write(c->io, hdrs, strlen(hdrs));
     pa_xfree(hdrs);
 
@@ -205,125 +156,39 @@ void pa_rtsp_context_free(pa_rtsp_context* c) {
         pa_xfree(c->localip);
         pa_xfree(c->session);
         pa_xfree(c->transport);
+        pa_xfree(c->last_header);
+        if (c->header_buffer)
+            pa_strbuf_free(c->header_buffer);
+        if (c->response_headers)
+            pa_headerlist_free(c->response_headers);
         pa_headerlist_free(c->headers);
     }
     pa_xfree(c);
 }
 
 
-static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) {
-    pa_strbuf* buf;
-    pa_headerlist* response_headers = NULL;
-    char response[1024];
-    int timeout;
+static void headers_read(pa_rtsp_context *c) {
     char* token;
-    char* header;
-    char* delimpos;
-    char delimiters[] = " ";
-    pa_rtsp_context *c = userdata;
-    pa_assert(c);
-    pa_assert(c->io == io);
-
-    if (!pa_iochannel_is_readable(c->io)) {
-        if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) return;
-        goto do_callback;
-    }
-
-    /* TODO: convert this to a pa_ioline based reader */
-    if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) {
-        response_headers = pa_headerlist_new();
-    }
-    timeout = 5000;
-    /* read in any response headers */
-    if (pa_read_line(c->io, response, sizeof(response), timeout) > 0) {
-        const char* token_state = NULL;
-        pa_log_debug("Response Line: %s", response);
-
-        timeout = 1000;
-        pa_xfree(pa_split(response, delimiters, &token_state));
-        token = pa_split(response, delimiters, &token_state);
-        if (!token || strcmp(token, "200")) {
-            pa_xfree(token);
-            pa_log("Invalid Response");
-            /* TODO: Bail out completely */
-            return;
-        }
-        pa_xfree(token);
-
-        /* We want to return the headers? */
-        if (!response_headers) {
-            /* We have no storage, so just clear out the response. */
-            while (pa_read_line(c->io, response, sizeof(response), timeout) > 0){
-                pa_log_debug("Response Line: %s", response);
-            }
-        } else {
-            /* TODO: Move header reading into the headerlist. */
-            header = NULL;
-            buf = pa_strbuf_new();
-            while (pa_read_line(c->io, response, sizeof(response), timeout) > 0) {
-                pa_log_debug("Response Line: %s", response);
-                /* If the first character is a space, it's a continuation header */
-                if (header && ' ' == response[0]) {
-                    /* Add this line to the buffer (sans the space. */
-                    pa_strbuf_puts(buf, &(response[1]));
-                    continue;
-                }
-
-                if (header) {
-                    /* This is not a continuation header so let's dump the full
-                      header/value into our proplist */
-                    pa_headerlist_puts(response_headers, header, pa_strbuf_tostring_free(buf));
-                    pa_xfree(header);
-                    buf = pa_strbuf_new();
-                }
-
-                delimpos = strstr(response, ":");
-                if (!delimpos) {
-                    pa_log("Invalid response header");
-                    return;
-                }
-
-                if (strlen(delimpos) > 1) {
-                    /* Cut our line off so we can copy the header name out */
-                    *delimpos++ = '\0';
-
-                    /* Trim the front of any spaces */
-                    while (' ' == *delimpos)
-                        ++delimpos;
+    char delimiters[] = ";";
 
-                    pa_strbuf_puts(buf, delimpos);
-                } else {
-                    /* Cut our line off so we can copy the header name out */
-                    *delimpos = '\0';
-                }
-
-                /* Save the header name */
-                header = pa_xstrdup(response);
-            }
-            /* We will have a header left from our looping itteration, so add it in :) */
-            if (header) {
-                /* This is not a continuation header so let's dump it into our proplist */
-                pa_headerlist_puts(response_headers, header, pa_strbuf_tostring(buf));
-            }
-            pa_strbuf_free(buf);
-        }
-    }
+    pa_assert(c);
+    pa_assert(c->response_headers);
 
     /* Deal with a SETUP response */
     if (STATE_SETUP == c->state) {
         const char* token_state = NULL;
         const char* pc = NULL;
-        c->session = pa_xstrdup(pa_headerlist_gets(response_headers, "Session"));
-        c->transport = pa_xstrdup(pa_headerlist_gets(response_headers, "Transport"));
+        c->session = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Session"));
+        c->transport = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Transport"));
 
         if (!c->session || !c->transport) {
-            pa_headerlist_free(response_headers);
+            pa_headerlist_free(c->response_headers);
+            c->response_headers = NULL;
+            pa_log("Invalid SETUP response.");
             return;
         }
 
         /* Now parse out the server port component of the response. */
-        c->port = 0;
-        delimiters[0] = ';';
         while ((token = pa_split(c->transport, delimiters, &token_state))) {
             if ((pc = strstr(token, "="))) {
                 if (0 == strncmp(token, "server_port", 11)) {
@@ -336,33 +201,117 @@ static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) {
         }
         if (0 == c->port) {
             /* Error no server_port in response */
-            pa_headerlist_free(response_headers);
+            pa_headerlist_free(c->response_headers);
+            c->response_headers = NULL;
+            pa_log("Invalid SETUP response (no port number).");
             return;
         }
     }
 
     /* Call our callback */
-do_callback:
     if (c->callback)
-        c->callback(c, c->state, response_headers, c->userdata);
+        c->callback(c, c->state, c->response_headers, c->userdata);
 
+    pa_headerlist_free(c->response_headers);
+    c->response_headers = NULL;
+}
 
-    if (response_headers)
-        pa_headerlist_free(response_headers);
 
-    /*
-    if (do_read(u) < 0 || do_write(u) < 0) {
+static void line_callback(pa_ioline *line, const char *s, void *userdata) {
+    char *delimpos;
+    char *s2, *s2p;
 
-        if (u->io) {
-            pa_iochannel_free(u->io);
-            u->io = NULL;
+    pa_rtsp_context *c = userdata;
+    pa_assert(line);
+    pa_assert(c);
+    pa_assert(s);
+
+    s2 = pa_xstrdup(s);
+    /* Trim trailing carriage returns */
+    s2p = s2 + strlen(s2) - 1;
+    while (s2p >= s2 && '\r' == *s2p) {
+        *s2p = '\0';
+        s2p -= 1;
+    }
+    if (c->waiting && 0 == strcmp("RTSP/1.0 200 OK", s2)) {
+        c->waiting = 0;
+        pa_assert(!c->response_headers);
+        c->response_headers = pa_headerlist_new();
+        goto exit;
+    }
+    if (c->waiting) {
+        pa_log_warn("Unexpected response: %s", s2);
+        goto exit;;
+    }
+    if (!strlen(s2)) {
+        /* End of headers */
+        /* We will have a header left from our looping itteration, so add it in :) */
+        if (c->last_header) {
+            /* This is not a continuation header so let's dump it into our proplist */
+            pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer));
+            pa_xfree(c->last_header);
+            c->last_header = NULL;
+            c->header_buffer= NULL;
         }
 
-       pa_module_unload_request(u->module);
+        pa_log_debug("Full response received. Dispatching");
+        headers_read(c);
+        c->waiting = 1;
+        goto exit;
     }
-    */
+
+    /* Read and parse a header (we know it's not empty) */
+    /* TODO: Move header reading into the headerlist. */
+
+    /* If the first character is a space, it's a continuation header */
+    if (c->last_header && ' ' == s2[0]) {
+        pa_assert(c->header_buffer);
+
+        /* Add this line to the buffer (sans the space. */
+        pa_strbuf_puts(c->header_buffer, &(s2[1]));
+        goto exit;
+    }
+
+    if (c->last_header) {
+        /* This is not a continuation header so let's dump the full
+          header/value into our proplist */
+        pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer));
+        pa_xfree(c->last_header);
+        c->last_header = NULL;
+        c->header_buffer = NULL;
+    }
+
+    delimpos = strstr(s2, ":");
+    if (!delimpos) {
+        pa_log_warn("Unexpected response when expecting header: %s", s);
+        goto exit;
+    }
+
+    pa_assert(!c->header_buffer);
+    pa_assert(!c->last_header);
+
+    c->header_buffer = pa_strbuf_new();
+    if (strlen(delimpos) > 1) {
+        /* Cut our line off so we can copy the header name out */
+        *delimpos++ = '\0';
+
+        /* Trim the front of any spaces */
+        while (' ' == *delimpos)
+            ++delimpos;
+
+        pa_strbuf_puts(c->header_buffer, delimpos);
+    } else {
+        /* Cut our line off so we can copy the header name out */
+        *delimpos = '\0';
+    }
+
+    /* Save the header name */
+    c->last_header = pa_xstrdup(s2);
+  exit:
+    pa_xfree(s2);
 }
 
+
 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
     pa_rtsp_context *c = userdata;
     union {
@@ -385,7 +334,9 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
     }
     pa_assert(!c->io);
     c->io = io;
-    pa_iochannel_set_callback(c->io, io_callback, c);
+
+    c->ioline = pa_ioline_new(io);
+    pa_ioline_set_callback(c->ioline, line_callback, c);
 
     /* Get the local IP address for use externally */
     if (0 == getsockname(pa_iochannel_get_recv_fd(io), &sa.sa, &sa_len)) {
@@ -401,6 +352,11 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
             c->localip = pa_xstrdup(res);
     }
     pa_log_debug("Established RTSP connection from local ip %s", c->localip);
+
+    c->waiting = 1;
+    c->state = STATE_CONNECT;
+    if (c->callback)
+        c->callback(c, c->state, NULL, c->userdata);
 }
 
 int pa_rtsp_connect(pa_rtsp_context *c, pa_mainloop_api *mainloop, const char* hostname, uint16_t port) {