From a6d75bd33cd94f4cfef1bf888a5950853a5ad95c Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 18 Feb 2009 17:42:59 +0100 Subject: [PATCH] Add RTSP channel object for async io Add a GstRTSPChannel object that wraps a GSource around the RTSP connection so that the connection can be monitored from a maincontext. This allows us to operate in ASYNC mode, which is handy when building a server. Rework the old code to use the async code under the hood. API: gst_rtsp_channel_new() API: gst_rtsp_channel_unref() API: gst_rtsp_channel_attach() API: gst_rtsp_channel_queue_message() --- docs/libs/gst-plugins-base-libs-sections.txt | 7 + gst-libs/gst/rtsp/gstrtspconnection.c | 1089 ++++++++++++++++++-------- gst-libs/gst/rtsp/gstrtspconnection.h | 42 + 3 files changed, 812 insertions(+), 326 deletions(-) diff --git a/docs/libs/gst-plugins-base-libs-sections.txt b/docs/libs/gst-plugins-base-libs-sections.txt index 3671337..4680380 100644 --- a/docs/libs/gst-plugins-base-libs-sections.txt +++ b/docs/libs/gst-plugins-base-libs-sections.txt @@ -1205,6 +1205,13 @@ gst_rtsp_connection_next_timeout gst_rtsp_connection_reset_timeout gst_rtsp_connection_flush gst_rtsp_connection_set_auth + +GstRTSPChannel +GstRTSPChannelFuncs +gst_rtsp_channel_new +gst_rtsp_channel_unref +gst_rtsp_channel_attach +gst_rtsp_channel_queue_message
diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index fbf2db0..33cec5b 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -96,27 +96,23 @@ #include "md5.h" #ifdef G_OS_WIN32 -#define FIONREAD_TYPE gulong -#define IOCTL_SOCKET ioctlsocket #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0) #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0) #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len) #define CLOSE_SOCKET(sock) closesocket (sock) -#define ERRNO_IS_NOT_EAGAIN (WSAGetLastError () != WSAEWOULDBLOCK) -#define ERRNO_IS_NOT_EINTR (WSAGetLastError () != WSAEINTR) +#define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK) +#define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR) /* According to Microsoft's connect() documentation this one returns * WSAEWOULDBLOCK and not WSAEINPROGRESS. */ -#define ERRNO_IS_NOT_EINPROGRESS (WSAGetLastError () != WSAEWOULDBLOCK) +#define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK) #else -#define FIONREAD_TYPE gint -#define IOCTL_SOCKET ioctl #define READ_SOCKET(fd, buf, len) read (fd, buf, len) #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len) #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len) #define CLOSE_SOCKET(sock) close (sock) -#define ERRNO_IS_NOT_EAGAIN (errno != EAGAIN) -#define ERRNO_IS_NOT_EINTR (errno != EINTR) -#define ERRNO_IS_NOT_EINPROGRESS (errno != EINPROGRESS) +#define ERRNO_IS_EAGAIN (errno == EAGAIN) +#define ERRNO_IS_EINTR (errno == EINTR) +#define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS) #endif #ifdef G_OS_WIN32 @@ -135,6 +131,35 @@ inet_aton (const char *c, struct in_addr *paddr) } #endif +enum +{ + STATE_START = 0, + STATE_DATA_HEADER, + STATE_DATA_BODY, + STATE_READ_LINES, + STATE_END, + STATE_LAST +}; + +/* a structure for constructing RTSPMessages */ +typedef struct +{ + gint state; + guint8 buffer[4096]; + guint offset; + + guint line; + guint8 *body_data; + glong body_len; +} GstRTSPBuilder; + +static void +build_reset (GstRTSPBuilder * builder) +{ + g_free (builder->body_data); + memset (builder, 0, sizeof (builder)); +} + /** * gst_rtsp_connection_create: * @url: a #GstRTSPUrl @@ -161,6 +186,7 @@ gst_rtsp_connection_create (GstRTSPUrl * url, GstRTSPConnection ** conn) newconn->url = url; newconn->fd.fd = -1; newconn->timer = g_timer_new (); + newconn->timeout = 60; newconn->auth_method = GST_RTSP_AUTH_NONE; newconn->username = NULL; @@ -269,7 +295,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in)); if (ret == 0) goto done; - if (ERRNO_IS_NOT_EINPROGRESS) + if (!ERRNO_IS_EINPROGRESS) goto sys_error; /* wait for connect to complete up to the specified timeout or until we got @@ -288,8 +314,11 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) goto sys_error; /* we can still have an error connecting on windows */ - if (gst_poll_fd_has_error (conn->fdset, &conn->fd)) + if (gst_poll_fd_has_error (conn->fdset, &conn->fd)) { + socklen_t len = sizeof (errno); + getsockopt (conn->fd.fd, SOL_SOCKET, SO_ERROR, &errno, &len); goto sys_error; + } gst_poll_fd_ignored (conn->fdset, &conn->fd); @@ -485,6 +514,94 @@ add_date_header (GstRTSPMessage * message) gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string); } +static GstRTSPResult +write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) +{ + guint left; + + if (*idx > size) + return GST_RTSP_ERROR; + + left = size - *idx; + + while (left) { + gint r; + + r = WRITE_SOCKET (fd, &buffer[*idx], left); + if (r == 0) { + return GST_RTSP_EINTR; + } else if (r < 0) { + if (ERRNO_IS_EAGAIN) + return GST_RTSP_EINTR; + if (!ERRNO_IS_EINTR) + return GST_RTSP_ESYS; + } else { + left -= r; + *idx += r; + } + } + return GST_RTSP_OK; +} + +static GstRTSPResult +read_bytes (gint fd, guint8 * buffer, guint * idx, guint size) +{ + guint left; + + if (*idx > size) + return GST_RTSP_ERROR; + + left = size - *idx; + + while (left) { + gint r; + + r = READ_SOCKET (fd, &buffer[*idx], left); + if (r == 0) { + return GST_RTSP_EEOF; + } else if (r < 0) { + if (ERRNO_IS_EAGAIN) + return GST_RTSP_EINTR; + if (!ERRNO_IS_EINTR) + return GST_RTSP_ESYS; + } else { + left -= r; + *idx += r; + } + } + return GST_RTSP_OK; +} + +static GstRTSPResult +read_line (gint fd, guint8 * buffer, guint * idx, guint size) +{ + while (TRUE) { + guint8 c; + gint r; + + r = READ_SOCKET (fd, &c, 1); + if (r == 0) { + return GST_RTSP_EEOF; + } else if (r < 0) { + if (ERRNO_IS_EAGAIN) + return GST_RTSP_EINTR; + if (!ERRNO_IS_EINTR) + return GST_RTSP_ESYS; + } else { + if (c == '\n') /* end on \n */ + break; + if (c == '\r') /* ignore \r */ + continue; + + if (*idx < size - 1) + buffer[(*idx)++] = c; + } + } + buffer[*idx] = '\0'; + + return GST_RTSP_OK; +} + /** * gst_rtsp_connection_write: * @conn: a #GstRTSPConnection @@ -504,9 +621,10 @@ GstRTSPResult gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) { - guint towrite; + guint offset; gint retval; GstClockTime to; + GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); @@ -520,11 +638,17 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - towrite = size; + offset = 0; - while (towrite > 0) { - gint written; + while (TRUE) { + /* try to write */ + res = write_bytes (conn->fd.fd, data, &offset, size); + if (res == GST_RTSP_OK) + break; + if (res != GST_RTSP_EINTR) + goto write_error; + /* not all is written, wait until we can write more */ do { retval = gst_poll_wait (conn->fdset, to); } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); @@ -538,16 +662,6 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, else goto select_error; } - - /* now we can write */ - written = WRITE_SOCKET (conn->fd.fd, data, towrite); - if (written < 0) { - if (ERRNO_IS_NOT_EAGAIN && ERRNO_IS_NOT_EINTR) - goto write_error; - } else { - towrite -= written; - data += written; - } } return GST_RTSP_OK; @@ -566,48 +680,14 @@ stopped: } write_error: { - return GST_RTSP_ESYS; + return res; } } -/** - * gst_rtsp_connection_send: - * @conn: a #GstRTSPConnection - * @message: the message to send - * @timeout: a timeout value or #NULL - * - * Attempt to send @message to the connected @conn, blocking up to - * the specified @timeout. @timeout can be #NULL, in which case this function - * might block forever. - * - * This function can be cancelled with gst_rtsp_connection_flush(). - * - * Returns: #GST_RTSP_OK on success. - */ -GstRTSPResult -gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, - GTimeVal * timeout) +static GString * +message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) { GString *str = NULL; - GstRTSPResult res; - -#ifdef G_OS_WIN32 - WSADATA w; - int error; -#endif - - g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - -#ifdef G_OS_WIN32 - error = WSAStartup (0x0202, &w); - - if (error) - goto startup_error; - - if (w.wVersion != 0x0202) - goto version_error; -#endif str = g_string_new (""); @@ -649,7 +729,8 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, break; } default: - g_return_val_if_reached (GST_RTSP_EINVAL); + g_string_free (str, TRUE); + g_return_val_if_reached (NULL); break; } @@ -680,6 +761,51 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, } } + return str; +} + +/** + * gst_rtsp_connection_send: + * @conn: a #GstRTSPConnection + * @message: the message to send + * @timeout: a timeout value or #NULL + * + * Attempt to send @message to the connected @conn, blocking up to + * the specified @timeout. @timeout can be #NULL, in which case this function + * might block forever. + * + * This function can be cancelled with gst_rtsp_connection_flush(). + * + * Returns: #GST_RTSP_OK on success. + */ +GstRTSPResult +gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, + GTimeVal * timeout) +{ + GString *str = NULL; + GstRTSPResult res; + +#ifdef G_OS_WIN32 + WSADATA w; + int error; +#endif + + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); + +#ifdef G_OS_WIN32 + error = WSAStartup (0x0202, &w); + + if (error) + goto startup_error; + + if (w.wVersion != 0x0202) + goto version_error; +#endif + + if (!(str = message_to_string (conn, message))) + goto no_message; + /* write request */ res = gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len, timeout); @@ -688,6 +814,11 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, return res; +no_message: + { + g_warning ("Wrong message"); + return GST_RTSP_EINVAL; + } #ifdef G_OS_WIN32 startup_error: { @@ -704,47 +835,8 @@ version_error: #endif } -static GstRTSPResult -read_line (gint fd, gchar * buffer, guint size) -{ - guint idx; - gchar c; - gint r; - - idx = 0; - while (TRUE) { - r = READ_SOCKET (fd, &c, 1); - if (r == 0) { - goto eof; - } else if (r < 0) { - if (ERRNO_IS_NOT_EAGAIN && ERRNO_IS_NOT_EINTR) - goto read_error; - } else { - if (c == '\n') /* end on \n */ - break; - if (c == '\r') /* ignore \r */ - continue; - - if (idx < size - 1) - buffer[idx++] = c; - } - } - buffer[idx] = '\0'; - - return GST_RTSP_OK; - -eof: - { - return GST_RTSP_EEOF; - } -read_error: - { - return GST_RTSP_ESYS; - } -} - static void -read_string (gchar * dest, gint size, gchar ** src) +parse_string (gchar * dest, gint size, gchar ** src) { gint idx; @@ -763,7 +855,7 @@ read_string (gchar * dest, gint size, gchar ** src) } static void -read_key (gchar * dest, gint size, gchar ** src) +parse_key (gchar * dest, gint size, gchar ** src) { gint idx; @@ -778,7 +870,7 @@ read_key (gchar * dest, gint size, gchar ** src) } static GstRTSPResult -parse_response_status (gchar * buffer, GstRTSPMessage * msg) +parse_response_status (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPResult res; gchar versionstr[20]; @@ -786,10 +878,10 @@ parse_response_status (gchar * buffer, GstRTSPMessage * msg) gint code; gchar *bptr; - bptr = buffer; + bptr = (gchar *) buffer; - read_string (versionstr, sizeof (versionstr), &bptr); - read_string (codestr, sizeof (codestr), &bptr); + parse_string (versionstr, sizeof (versionstr), &bptr); + parse_string (codestr, sizeof (codestr), &bptr); code = atoi (codestr); while (g_ascii_isspace (*bptr)) @@ -814,7 +906,7 @@ parse_error: } static GstRTSPResult -parse_request_line (gchar * buffer, GstRTSPMessage * msg) +parse_request_line (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPResult res = GST_RTSP_OK; gchar versionstr[20]; @@ -823,16 +915,16 @@ parse_request_line (gchar * buffer, GstRTSPMessage * msg) gchar *bptr; GstRTSPMethod method; - bptr = buffer; + bptr = (gchar *) buffer; - read_string (methodstr, sizeof (methodstr), &bptr); + parse_string (methodstr, sizeof (methodstr), &bptr); method = gst_rtsp_find_method (methodstr); - read_string (urlstr, sizeof (urlstr), &bptr); + parse_string (urlstr, sizeof (urlstr), &bptr); if (*urlstr == '\0') res = GST_RTSP_EPARSE; - read_string (versionstr, sizeof (versionstr), &bptr); + parse_string (versionstr, sizeof (versionstr), &bptr); if (*bptr != '\0') res = GST_RTSP_EPARSE; @@ -855,16 +947,16 @@ parse_request_line (gchar * buffer, GstRTSPMessage * msg) /* parsing lines means reading a Key: Value pair */ static GstRTSPResult -parse_line (gchar * buffer, GstRTSPMessage * msg) +parse_line (guint8 * buffer, GstRTSPMessage * msg) { gchar key[32]; gchar *bptr; GstRTSPHeaderField field; - bptr = buffer; + bptr = (gchar *) buffer; /* read key */ - read_key (key, sizeof (key), &bptr); + parse_key (key, sizeof (key), &bptr); if (*bptr != ':') goto no_column; @@ -885,31 +977,187 @@ no_column: } } +/* returns: + * GST_RTSP_OK when a complete message was read. + * GST_RTSP_EEOF: when the socket is closed + * GST_RTSP_EINTR: when more data is needed. + * GST_RTSP_..: some other error occured. + */ +static GstRTSPResult +build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, + GstRTSPConnection * conn) +{ + GstRTSPResult res; + + while (TRUE) { + switch (builder->state) { + case STATE_START: + builder->offset = 0; + res = + read_bytes (conn->fd.fd, (guint8 *) builder->buffer, + &builder->offset, 1); + if (res != GST_RTSP_OK) + goto done; + + /* we have 1 bytes now and we can see if this is a data message or + * not */ + if (builder->buffer[0] == '$') { + /* data message, prepare for the header */ + builder->state = STATE_DATA_HEADER; + } else { + builder->line = 0; + builder->state = STATE_READ_LINES; + } + break; + case STATE_DATA_HEADER: + { + res = + read_bytes (conn->fd.fd, (guint8 *) builder->buffer, + &builder->offset, 4); + if (res != GST_RTSP_OK) + goto done; + + gst_rtsp_message_init_data (message, builder->buffer[1]); + + builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3]; + builder->body_data = g_malloc (builder->body_len + 1); + builder->body_data[builder->body_len] = '\0'; + builder->offset = 0; + builder->state = STATE_DATA_BODY; + break; + } + case STATE_DATA_BODY: + { + res = read_bytes (conn->fd.fd, builder->body_data, &builder->offset, + builder->body_len); + if (res != GST_RTSP_OK) + goto done; + + /* we have the complete body now */ + gst_rtsp_message_take_body (message, + (guint8 *) builder->body_data, builder->body_len); + builder->body_data = NULL; + builder->body_len = 0; + + builder->state = STATE_END; + break; + } + case STATE_READ_LINES: + { + res = read_line (conn->fd.fd, builder->buffer, &builder->offset, + sizeof (builder->buffer)); + if (res != GST_RTSP_OK) + goto done; + + /* we have a regular response */ + if (builder->buffer[0] == '\r') { + builder->buffer[0] = '\0'; + } + + if (builder->buffer[0] == '\0') { + gchar *hdrval; + + /* empty line, end of message header */ + /* see if there is a Content-Length header */ + if (gst_rtsp_message_get_header (message, + GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) { + /* there is, prepare to read the body */ + builder->body_len = atol (hdrval); + builder->body_data = g_malloc (builder->body_len + 1); + builder->body_data[builder->body_len] = '\0'; + builder->offset = 0; + builder->state = STATE_DATA_BODY; + } else { + builder->state = STATE_END; + } + break; + } + + /* we have a line */ + if (builder->line == 0) { + /* first line, check for response status */ + if (memcmp (builder->buffer, "RTSP", 4) == 0) { + res = parse_response_status (builder->buffer, message); + } else { + res = parse_request_line (builder->buffer, message); + } + } else { + /* else just parse the line */ + parse_line (builder->buffer, message); + } + builder->line++; + builder->offset = 0; + break; + } + case STATE_END: + { + gchar *session_id; + + /* save session id in the connection for further use */ + if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, + &session_id, 0) == GST_RTSP_OK) { + gint maxlen, i; + + maxlen = sizeof (conn->session_id) - 1; + /* the sessionid can have attributes marked with ; + * Make sure we strip them */ + for (i = 0; session_id[i] != '\0'; i++) { + if (session_id[i] == ';') { + maxlen = i; + /* parse timeout */ + do { + i++; + } while (g_ascii_isspace (session_id[i])); + if (g_str_has_prefix (&session_id[i], "timeout=")) { + gint to; + + /* if we parsed something valid, configure */ + if ((to = atoi (&session_id[i + 9])) > 0) + conn->timeout = to; + } + break; + } + } + + /* make sure to not overflow */ + strncpy (conn->session_id, session_id, maxlen); + conn->session_id[maxlen] = '\0'; + } + res = GST_RTSP_OK; + goto done; + } + default: + res = GST_RTSP_ERROR; + break; + } + } +done: + return res; +} + /** - * gst_rtsp_connection_read_internal: + * gst_rtsp_connection_read: * @conn: a #GstRTSPConnection * @data: the data to read * @size: the size of @data * @timeout: a timeout value or #NULL - * @allow_interrupt: can the pending read be interrupted * * Attempt to read @size bytes into @data from the connected @conn, blocking up to * the specified @timeout. @timeout can be #NULL, in which case this function * might block forever. - * - * This function can be cancelled with gst_rtsp_connection_flush() only if - * @allow_interrupt is set. + * + * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. */ -static GstRTSPResult -gst_rtsp_connection_read_internal (GstRTSPConnection * conn, guint8 * data, - guint size, GTimeVal * timeout, gboolean allow_interrupt) +GstRTSPResult +gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, + GTimeVal * timeout) { - guint toread; + guint offset; gint retval; GstClockTime to; - FIONREAD_TYPE avail; + GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); @@ -918,53 +1166,39 @@ gst_rtsp_connection_read_internal (GstRTSPConnection * conn, guint8 * data, if (size == 0) return GST_RTSP_OK; - toread = size; + offset = 0; /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - /* if the call fails, just go in the select.. it should not fail. Else if - * there is enough data to read, skip the select call al together.*/ - if (IOCTL_SOCKET (conn->fd.fd, FIONREAD, &avail) < 0) - avail = 0; - else if (avail >= toread) - goto do_read; - - gst_poll_set_controllable (conn->fdset, allow_interrupt); + gst_poll_set_controllable (conn->fdset, TRUE); gst_poll_fd_ctl_write (conn->fdset, &conn->fd, FALSE); gst_poll_fd_ctl_read (conn->fdset, &conn->fd, TRUE); - while (toread > 0) { - gint bytes; + while (TRUE) { + res = read_bytes (conn->fd.fd, data, &offset, size); + if (res == GST_RTSP_EEOF) + goto eof; + if (res == GST_RTSP_OK) + break; + if (res != GST_RTSP_EINTR) + goto read_error; do { retval = gst_poll_wait (conn->fdset, to); } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + /* check for timeout */ + if (retval == 0) + goto select_timeout; + if (retval == -1) { if (errno == EBUSY) goto stopped; else goto select_error; } - - /* check for timeout */ - if (retval == 0) - goto select_timeout; - - do_read: - /* if we get here there is activity on the real fd since the select - * completed and the control socket was not readable. */ - bytes = READ_SOCKET (conn->fd.fd, data, toread); - if (bytes == 0) { - goto eof; - } else if (bytes < 0) { - if (ERRNO_IS_NOT_EAGAIN && ERRNO_IS_NOT_EINTR) - goto read_error; - } else { - toread -= bytes; - data += bytes; - } + gst_poll_set_controllable (conn->fdset, FALSE); } return GST_RTSP_OK; @@ -987,67 +1221,11 @@ eof: } read_error: { - return GST_RTSP_ESYS; - } -} - -/** - * gst_rtsp_connection_read: - * @conn: a #GstRTSPConnection - * @data: the data to read - * @size: the size of @data - * @timeout: a timeout value or #NULL - * - * Attempt to read @size bytes into @data from the connected @conn, blocking up to - * the specified @timeout. @timeout can be #NULL, in which case this function - * might block forever. - * - * This function can be cancelled with gst_rtsp_connection_flush(). - * - * Returns: #GST_RTSP_OK on success. - */ -GstRTSPResult -gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, - GTimeVal * timeout) -{ - return gst_rtsp_connection_read_internal (conn, data, size, timeout, TRUE); -} - - -static GstRTSPResult -read_body (GstRTSPConnection * conn, glong content_length, GstRTSPMessage * msg, - GTimeVal * timeout) -{ - guint8 *body; - GstRTSPResult res; - - if (content_length <= 0) { - body = NULL; - content_length = 0; - goto done; - } - - body = g_malloc (content_length + 1); - body[content_length] = '\0'; - - GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, body, content_length, - timeout, FALSE), read_error); - - content_length += 1; - -done: - gst_rtsp_message_take_body (msg, (guint8 *) body, content_length); - - return GST_RTSP_OK; - - /* ERRORS */ -read_error: - { - g_free (body); return res; } } + /** * gst_rtsp_connection_receive: * @conn: a #GstRTSPConnection @@ -1066,138 +1244,78 @@ GstRTSPResult gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) { - gchar buffer[4096]; - gint line; - glong content_length; GstRTSPResult res; - gboolean need_body; + GstRTSPBuilder builder = { 0 }; + gint retval; + GstClockTime to; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - line = 0; - - need_body = TRUE; - - res = GST_RTSP_OK; - /* parse first line and headers */ - while (res == GST_RTSP_OK) { - guint8 c; - - /* read first character, this identifies data messages */ - /* This is the only read() that we allow to be interrupted */ - GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, &c, 1, timeout, - TRUE), read_error); - - /* check for data packet, first character is $ */ - if (c == '$') { - guint16 size; - - /* data packets are $<1 byte channel><2 bytes length,BE> */ - - /* read channel, which is the next char */ - GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, &c, 1, timeout, - FALSE), read_error); - - /* now we create a data message */ - gst_rtsp_message_init_data (message, c); - - /* next two bytes are the length of the data */ - GST_RTSP_CHECK (gst_rtsp_connection_read_internal (conn, - (guint8 *) & size, 2, timeout, FALSE), read_error); + /* configure timeout if any */ + to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - size = GUINT16_FROM_BE (size); + gst_poll_set_controllable (conn->fdset, TRUE); + gst_poll_fd_ctl_write (conn->fdset, &conn->fd, FALSE); + gst_poll_fd_ctl_read (conn->fdset, &conn->fd, TRUE); - /* and read the body */ - res = read_body (conn, size, message, timeout); - need_body = FALSE; + while (TRUE) { + res = build_next (&builder, message, conn); + if (res == GST_RTSP_EEOF) + goto eof; + if (res == GST_RTSP_OK) break; - } else { - gint offset = 0; + if (res != GST_RTSP_EINTR) + goto read_error; - /* we have a regular response */ - if (c != '\r') { - buffer[0] = c; - offset = 1; - } - /* should not happen */ - if (c == '\n') - break; - - /* read lines */ - GST_RTSP_CHECK (read_line (conn->fd.fd, buffer + offset, - sizeof (buffer) - offset), read_error); + do { + retval = gst_poll_wait (conn->fdset, to); + } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - if (buffer[0] == '\0') - break; + /* check for timeout */ + if (retval == 0) + goto select_timeout; - if (line == 0) { - /* first line, check for response status */ - if (g_str_has_prefix (buffer, "RTSP")) { - res = parse_response_status (buffer, message); - } else { - res = parse_request_line (buffer, message); - } - } else { - /* else just parse the line */ - parse_line (buffer, message); - } + if (retval == -1) { + if (errno == EBUSY) + goto stopped; + else + goto select_error; } - line++; + gst_poll_set_controllable (conn->fdset, FALSE); } - /* read the rest of the body if needed */ - if (need_body) { - gchar *session_id; - gchar *hdrval; - - /* see if there is a Content-Length header */ - if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CONTENT_LENGTH, - &hdrval, 0) == GST_RTSP_OK) { - /* there is, read the body */ - content_length = atol (hdrval); - GST_RTSP_CHECK (read_body (conn, content_length, message, timeout), - read_error); - } + /* we have a message here */ + build_reset (&builder); - /* save session id in the connection for further use */ - if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, - &session_id, 0) == GST_RTSP_OK) { - gint maxlen, i; - - /* default session timeout */ - conn->timeout = 60; - - maxlen = sizeof (conn->session_id) - 1; - /* the sessionid can have attributes marked with ; - * Make sure we strip them */ - for (i = 0; session_id[i] != '\0'; i++) { - if (session_id[i] == ';') { - maxlen = i; - /* parse timeout */ - do { - i++; - } while (g_ascii_isspace (session_id[i])); - if (g_str_has_prefix (&session_id[i], "timeout=")) { - gint to; - - /* if we parsed something valid, configure */ - if ((to = atoi (&session_id[i + 9])) > 0) - conn->timeout = to; - } - break; - } - } + return GST_RTSP_OK; - /* make sure to not overflow */ - strncpy (conn->session_id, session_id, maxlen); - conn->session_id[maxlen] = '\0'; - } + /* ERRORS */ +select_error: + { + res = GST_RTSP_ESYS; + goto cleanup; + } +select_timeout: + { + res = GST_RTSP_ETIMEOUT; + goto cleanup; + } +stopped: + { + res = GST_RTSP_EINTR; + goto cleanup; + } +eof: + { + res = GST_RTSP_EEOF; + goto cleanup; } - return res; - read_error: +cleanup: { + build_reset (&builder); + gst_rtsp_message_unset (message); return res; } } @@ -1636,3 +1754,322 @@ gst_rtsp_connection_get_ip (const GstRTSPConnection * conn) return conn->ip; } + +#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR) +#define WRITE_COND (G_IO_OUT | G_IO_ERR) + +typedef struct +{ + GString *str; + guint cseq; +} GstRTSPRec; + +/* async functions */ +struct _GstRTSPChannel +{ + GSource source; + + GstRTSPConnection *conn; + + GstRTSPBuilder builder; + GstRTSPMessage message; + + GPollFD readfd; + GPollFD writefd; + gboolean write_added; + + /* queued message for transmission */ + GList *messages; + guint8 *write_data; + guint write_off; + guint write_len; + guint write_cseq; + + GstRTSPChannelFuncs funcs; + + gpointer user_data; + GDestroyNotify notify; +}; + +static gboolean +gst_rtsp_source_prepare (GSource * source, gint * timeout) +{ + GstRTSPChannel *channel = (GstRTSPChannel *) source; + + *timeout = (channel->conn->timeout * 1000); + + return FALSE; +} + +static gboolean +gst_rtsp_source_check (GSource * source) +{ + GstRTSPChannel *channel = (GstRTSPChannel *) source; + + if (channel->readfd.revents & READ_COND) + return TRUE; + + if (channel->writefd.revents & WRITE_COND) + return TRUE; + + return FALSE; +} + +static gboolean +gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback, + gpointer user_data) +{ + GstRTSPChannel *channel = (GstRTSPChannel *) source; + GstRTSPResult res; + + /* first read as much as we can */ + if (channel->readfd.revents & READ_COND) { + do { + res = build_next (&channel->builder, &channel->message, channel->conn); + if (res == GST_RTSP_EINTR) + break; + if (res == GST_RTSP_EEOF) + goto eof; + if (res != GST_RTSP_OK) + goto error; + + if (channel->funcs.message_received) + channel->funcs.message_received (channel, &channel->message, + channel->user_data); + + gst_rtsp_message_unset (&channel->message); + build_reset (&channel->builder); + } while (FALSE); + } + + if (channel->writefd.revents & WRITE_COND) { + do { + if (channel->write_data == NULL) { + GstRTSPRec *data; + + if (!channel->messages) + goto done; + + /* no data, get a new message from the queue */ + data = channel->messages->data; + channel->messages = + g_list_delete_link (channel->messages, channel->messages); + + channel->write_off = 0; + channel->write_len = data->str->len; + channel->write_data = (guint8 *) g_string_free (data->str, FALSE); + channel->write_cseq = data->cseq; + + g_slice_free (GstRTSPRec, data); + } + + res = write_bytes (channel->writefd.fd, channel->write_data, + &channel->write_off, channel->write_len); + if (res == GST_RTSP_EINTR) + break; + if (res != GST_RTSP_OK) + goto error; + + if (channel->funcs.message_sent) + channel->funcs.message_sent (channel, channel->write_cseq, + channel->user_data); + + done: + if (channel->messages == NULL && channel->write_added) { + g_source_remove_poll ((GSource *) channel, &channel->writefd); + channel->write_added = FALSE; + channel->writefd.revents = 0; + } + g_free (channel->write_data); + channel->write_data = NULL; + } while (FALSE); + } + + return TRUE; + + /* ERRORS */ +eof: + { + if (channel->funcs.closed) + channel->funcs.closed (channel, channel->user_data); + return FALSE; + } +error: + { + if (channel->funcs.error) + channel->funcs.error (channel, res, channel->user_data); + return FALSE; + } +} + +static void +gst_rtsp_source_finalize (GSource * source) +{ + GstRTSPChannel *channel = (GstRTSPChannel *) source; + GList *walk; + + build_reset (&channel->builder); + + for (walk = channel->messages; walk; walk = g_list_next (walk)) { + GstRTSPRec *data = walk->data; + + g_string_free (data->str, TRUE); + g_slice_free (GstRTSPRec, data); + } + g_list_free (channel->messages); + g_free (channel->write_data); + + if (channel->notify) + channel->notify (channel->user_data); +} + +static GSourceFuncs gst_rtsp_source_funcs = { + gst_rtsp_source_prepare, + gst_rtsp_source_check, + gst_rtsp_source_dispatch, + gst_rtsp_source_finalize +}; + +/** + * gst_rtsp_channel_new: + * @conn: a #GstRTSPConnection + * @funcs: channel functions + * @user_data: user data to pass to @funcs + * + * Create a channel object for @conn. The functions provided in @funcs will be + * called with @user_data when activity happened on the channel. + * + * The new channel is usually created so that it can be attached to a + * maincontext with gst_rtsp_channel_attach(). + * + * @conn must exist for the entire lifetime of the channel. + * + * Returns: a #GstRTSPChannel that can be used for asynchronous RTSP + * communication. Free with gst_rtsp_channel_unref () after usage. + * + * Since: 0.10.23 + */ +GstRTSPChannel * +gst_rtsp_channel_new (GstRTSPConnection * conn, + GstRTSPChannelFuncs * funcs, gpointer user_data, GDestroyNotify notify) +{ + GstRTSPChannel *result; + + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (funcs != NULL, NULL); + + result = (GstRTSPChannel *) g_source_new (&gst_rtsp_source_funcs, + sizeof (GstRTSPChannel)); + + result->conn = conn; + result->builder.state = STATE_START; + + result->readfd.fd = conn->fd.fd; + result->readfd.events = READ_COND; + result->readfd.revents = 0; + + result->writefd.fd = conn->fd.fd; + result->writefd.events = WRITE_COND; + result->writefd.revents = 0; + result->write_added = FALSE; + + result->funcs = *funcs; + result->user_data = user_data; + result->notify = notify; + + /* only add the read fd, the write fd is only added when we have data + * to send. */ + g_source_add_poll ((GSource *) result, &result->readfd); + + return result; +} + +/** + * gst_rtsp_channel_attach: + * @channel: a #GstRTSPChannel + * @context: a GMainContext (if NULL, the default context will be used) + * + * Adds a #GstRTSPChannel to a context so that it will be executed within that context. + * + * Returns: the ID (greater than 0) for the channel within the GMainContext. + * + * Since: 0.10.23 + */ +guint +gst_rtsp_channel_attach (GstRTSPChannel * channel, GMainContext * context) +{ + g_return_val_if_fail (channel != NULL, 0); + + return g_source_attach ((GSource *) channel, context); +} + +/** + * gst_rtsp_channel_free: + * @channel: a #GstRTSPChannel + * + * Decreases the reference count of @channel by one. If the resulting reference + * count is zero the channel and associated memory will be destroyed. + * + * Since: 0.10.23 + */ +void +gst_rtsp_channel_unref (GstRTSPChannel * channel) +{ + g_return_if_fail (channel != NULL); + + g_source_unref ((GSource *) channel); +} + +/** + * gst_rtsp_channel_queue_message: + * @channel: a #GstRTSPChannel + * @message: a #GstRTSPMessage + * + * Queue a @message for transmission in @channel. The contents of this + * message will be serialized and transmitted when the connection of the + * channel becomes writable. + * + * The return value of this function will be returned as the cseq argument in + * the message_sent callback. + * + * Returns: the sequence number of the message or -1 if the cseq could not be + * determined. + * + * Since: 0.10.23 + */ +guint +gst_rtsp_channel_queue_message (GstRTSPChannel * channel, + GstRTSPMessage * message) +{ + GstRTSPRec *data; + gchar *header; + guint cseq; + + g_return_val_if_fail (channel != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); + + /* get the cseq from the message, when we finish writing this message on the + * socket we will have to pass the cseq to the callback. */ + if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ, &header, + 0) == GST_RTSP_OK) { + cseq = atoi (header); + } else { + cseq = -1; + } + + /* make a record with the message as a string ans cseq */ + data = g_slice_new (GstRTSPRec); + data->str = message_to_string (channel->conn, message); + data->cseq = cseq; + + /* add the record to a queue */ + channel->messages = g_list_append (channel->messages, data); + + /* make sure the main context will now also check for writability on the + * socket */ + if (!channel->write_added) { + g_source_add_poll ((GSource *) channel, &channel->writefd); + channel->write_added = TRUE; + } + return cseq; +} diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h index 80312f3..33bee8f 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.h +++ b/gst-libs/gst/rtsp/gstrtspconnection.h @@ -127,6 +127,48 @@ GstRTSPResult gst_rtsp_connection_set_qos_dscp (GstRTSPConnection *conn, /* accessors */ const gchar * gst_rtsp_connection_get_ip (const GstRTSPConnection *conn); +/* async IO */ + +/** + * GstRTSPChannel: + * + * Opaque RTSP channel object that can be used for asynchronous RTSP + * operations. + */ +typedef struct _GstRTSPChannel GstRTSPChannel; + +/** + * GstRTSPChannelFuncs: + * @message_received: callback when a message was received + * @message_sent: callback when a message was sent + * @closed: callback when the connection is closed + * @error: callback when an error occured + * + * Callback functions from a #GstRTSPChannel. + */ +typedef struct { + GstRTSPResult (*message_received) (GstRTSPChannel *channel, GstRTSPMessage *message, + gpointer user_data); + GstRTSPResult (*message_sent) (GstRTSPChannel *channel, guint cseq, + gpointer user_data); + GstRTSPResult (*closed) (GstRTSPChannel *channel, gpointer user_data); + GstRTSPResult (*error) (GstRTSPChannel *channel, GstRTSPResult result, + gpointer user_data); +} GstRTSPChannelFuncs; + +GstRTSPChannel * gst_rtsp_channel_new (GstRTSPConnection *conn, + GstRTSPChannelFuncs *funcs, + gpointer user_data, + GDestroyNotify notify); +void gst_rtsp_channel_unref (GstRTSPChannel *channel); + +guint gst_rtsp_channel_attach (GstRTSPChannel *channel, + GMainContext *context); + +guint gst_rtsp_channel_queue_message (GstRTSPChannel *channel, + GstRTSPMessage *message); + + G_END_DECLS #endif /* __GST_RTSP_CONNECTION_H__ */ -- 2.7.4