X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst-libs%2Fgst%2Frtsp%2Fgstrtspconnection.c;h=fa7ec81642ef05744138ce8d0bec8e89ff4abe2c;hb=0b0dde7ce11e15bedaf34aea2df843a5253d1e2f;hp=6f716b63887bd0d4bd317f55d1d35ce15e1f0113;hpb=e1a4c8871abb22275ceca03a0530e6b19ce8be6d;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 6f716b6..fa7ec81 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -45,12 +45,8 @@ * @short_description: manage RTSP connections * @see_also: gstrtspurl * - * - * * This object manages the RTSP connection to the server. It provides function * to receive and send bytes and messages. - * - * * * Last reviewed on 2007-07-24 (0.10.14) */ @@ -65,35 +61,23 @@ #include #include -#ifdef HAVE_UNISTD_H -#include -#endif - - /* we include this here to get the G_OS_* defines */ #include #include -#ifdef G_OS_WIN32 -#include -#include -#define EINPROGRESS WSAEINPROGRESS -#else -#include -#include -#include -#include -#include -#include -#endif +#include "gstrtspconnection.h" -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include -#endif +#include "gst/glib-compat-private.h" -#include "gstrtspconnection.h" -#include "gstrtspbase64.h" -#include "md5.h" +#ifdef IP_TOS +union gst_sockaddr +{ + struct sockaddr sa; + struct sockaddr_in sa_in; + struct sockaddr_in6 sa_in6; + struct sockaddr_storage sa_stor; +}; +#endif typedef struct { @@ -104,48 +88,12 @@ typedef struct guint coutl; } DecodeCtx; -static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx, - guint size, DecodeCtx * ctxp); -static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key, - guint keysize, gchar ** value); -static void parse_string (gchar * dest, gint size, gchar ** src); - -#ifdef G_OS_WIN32 -#define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0) -#define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0) -#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len) -#define CLOSE_SOCKET(sock) closesocket (sock) -#define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK) -#define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR) -/* According to Microsoft's connect() documentation this one returns - * WSAEWOULDBLOCK and not WSAEINPROGRESS. */ -#define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK) +#ifdef MSG_NOSIGNAL +#define SEND_FLAGS MSG_NOSIGNAL #else -#define READ_SOCKET(fd, buf, len) read (fd, buf, len) -#define WRITE_SOCKET(fd, buf, len) write (fd, buf, len) -#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len) -#define CLOSE_SOCKET(sock) close (sock) -#define ERRNO_IS_EAGAIN (errno == EAGAIN) -#define ERRNO_IS_EINTR (errno == EINTR) -#define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS) +#define SEND_FLAGS 0 #endif -#define ADD_POLLFD(fdset, pfd, fd) \ -G_STMT_START { \ - (pfd)->fd = fd; \ - gst_poll_add_fd (fdset, pfd); \ -} G_STMT_END - -#define REMOVE_POLLFD(fdset, pfd) \ -G_STMT_START { \ - if ((pfd)->fd != -1) { \ - GST_DEBUG ("remove fd %d", (pfd)->fd); \ - gst_poll_remove_fd (fdset, pfd); \ - CLOSE_SOCKET ((pfd)->fd); \ - (pfd)->fd = -1; \ - } \ -} G_STMT_END - typedef enum { TUNNEL_STATE_NONE, @@ -163,19 +111,23 @@ struct _GstRTSPConnection GstRTSPUrl *url; /* connection state */ - GstPollFD fd0; - GstPollFD fd1; - - GstPollFD *readfd; - GstPollFD *writefd; + GSocket *read_socket; + GSocket *write_socket; + gboolean manual_http; + GSocket *socket0, *socket1; + GCancellable *cancellable; gchar tunnelid[TUNNELID_LEN]; gboolean tunneled; GstRTSPTunnelState tstate; - GstPoll *fdset; gchar *ip; + gint read_ahead; + + gchar *initial_buffer; + gsize initial_buffer_offset; + /* Session state */ gint cseq; /* sequence number */ gchar session_id[512]; /* session id */ @@ -195,22 +147,6 @@ struct _GstRTSPConnection guint proxy_port; }; -#ifdef G_OS_WIN32 -static int -inet_aton (const char *c, struct in_addr *paddr) -{ - /* note that inet_addr is deprecated on unix because - * inet_addr returns -1 (INADDR_NONE) for the valid 255.255.255.255 - * address. */ - paddr->s_addr = inet_addr (c); - - if (paddr->s_addr == INADDR_NONE) - return 0; - - return 1; -} -#endif - enum { STATE_START = 0, @@ -221,10 +157,18 @@ enum STATE_LAST }; +enum +{ + READ_AHEAD_EOH = -1, /* end of headers */ + READ_AHEAD_CRLF = -2, + READ_AHEAD_CRLFCR = -3 +}; + /* a structure for constructing RTSPMessages */ typedef struct { gint state; + GstRTSPResult status; guint8 buffer[4096]; guint offset; @@ -257,31 +201,14 @@ GstRTSPResult gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) { GstRTSPConnection *newconn; -#ifdef G_OS_WIN32 - WSADATA w; - int error; -#endif g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); -#ifdef G_OS_WIN32 - error = WSAStartup (0x0202, &w); - - if (error) - goto startup_error; - - if (w.wVersion != 0x0202) - goto version_error; -#endif - newconn = g_new0 (GstRTSPConnection, 1); - if ((newconn->fdset = gst_poll_new (TRUE)) == NULL) - goto no_fdset; + newconn->cancellable = g_cancellable_new (); newconn->url = gst_rtsp_url_copy (url); - newconn->fd0.fd = -1; - newconn->fd1.fd = -1; newconn->timer = g_timer_new (); newconn->timeout = 60; newconn->cseq = 1; @@ -294,236 +221,264 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) *conn = newconn; return GST_RTSP_OK; - - /* ERRORS */ -#ifdef G_OS_WIN32 -startup_error: - { - g_warning ("Error %d on WSAStartup", error); - return GST_RTSP_EWSASTART; - } -version_error: - { - g_warning ("Windows sockets are not version 0x202 (current 0x%x)", - w.wVersion); - WSACleanup (); - return GST_RTSP_EWSAVERSION; - } -#endif -no_fdset: - { - g_free (newconn); -#ifdef G_OS_WIN32 - WSACleanup (); -#endif - return GST_RTSP_ESYS; - } } /** - * gst_rtsp_connection_accept: - * @sock: a socket + * gst_rtsp_connection_create_from_socket: + * @socket: a #GSocket + * @ip: the IP address of the other end + * @port: the port used by the other end + * @initial_buffer: data already read from @fd * @conn: storage for a #GstRTSPConnection * - * Accept a new connection on @sock and create a new #GstRTSPConnection for - * handling communication on new socket. + * Create a new #GstRTSPConnection for handling communication on the existing + * socket @socket. The @initial_buffer contains any data already read from + * @socket which should be used before starting to read new data. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. * - * Since: 0.10.23 + * Since: 0.10.25 */ GstRTSPResult -gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn) +gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip, + guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn) { - int fd; - unsigned int address_len; GstRTSPConnection *newconn = NULL; - struct sockaddr_in address; GstRTSPUrl *url; -#ifdef G_OS_WIN32 - gulong flags = 1; -#endif - - address_len = sizeof (address); - memset (&address, 0, address_len); + GstRTSPResult res; -#ifndef G_OS_WIN32 - fd = accept (sock, (struct sockaddr *) &address, &address_len); -#else - fd = accept (sock, (struct sockaddr *) &address, (gint *) & address_len); -#endif /* G_OS_WIN32 */ - if (fd == -1) - goto accept_failed; + g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); + g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); /* set to non-blocking mode so that we can cancel the communication */ -#ifndef G_OS_WIN32 - fcntl (fd, F_SETFL, O_NONBLOCK); -#else - ioctlsocket (fd, FIONBIO, &flags); -#endif /* G_OS_WIN32 */ + g_socket_set_blocking (socket, FALSE); /* create a url for the client address */ url = g_new0 (GstRTSPUrl, 1); - url->host = g_strdup_printf ("%s", inet_ntoa (address.sin_addr)); - url->port = address.sin_port; + url->host = g_strdup (ip); + url->port = port; /* now create the connection object */ - gst_rtsp_connection_create (url, &newconn); + GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed); gst_rtsp_url_free (url); - ADD_POLLFD (newconn->fdset, &newconn->fd0, fd); - /* both read and write initially */ - newconn->readfd = &newconn->fd0; - newconn->writefd = &newconn->fd0; + newconn->socket0 = G_SOCKET (g_object_ref (socket)); + newconn->socket1 = G_SOCKET (g_object_ref (socket)); + newconn->write_socket = newconn->read_socket = newconn->socket0; + + newconn->ip = g_strdup (ip); + + newconn->initial_buffer = g_strdup (initial_buffer); *conn = newconn; return GST_RTSP_OK; /* ERRORS */ +newconn_failed: + { + gst_rtsp_url_free (url); + return res; + } +} + +/** + * gst_rtsp_connection_accept: + * @socket: a socket + * @conn: storage for a #GstRTSPConnection + * @cancellable: a #GCancellable to cancel the operation + * + * Accept a new connection on @socket and create a new #GstRTSPConnection for + * handling communication on new socket. + * + * Returns: #GST_RTSP_OK when @conn contains a valid connection. + * + * Since: 0.10.23 + */ +GstRTSPResult +gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn, + GCancellable * cancellable) +{ + GError *err = NULL; + gchar *ip; + guint16 port; + GSocket *client_sock; + GSocketAddress *addr; + GstRTSPResult ret; + + g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + + client_sock = g_socket_accept (socket, cancellable, &err); + if (!client_sock) + goto accept_failed; + + addr = g_socket_get_remote_address (client_sock, &err); + if (!addr) + goto getnameinfo_failed; + + ip = g_inet_address_to_string (g_inet_socket_address_get_address + (G_INET_SOCKET_ADDRESS (addr))); + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)); + g_object_unref (addr); + + ret = + gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL, + conn); + g_object_unref (client_sock); + g_free (ip); + + return ret; + + /* ERRORS */ accept_failed: { + GST_DEBUG ("Accepting client failed: %s", err->message); + g_clear_error (&err); return GST_RTSP_ESYS; } +getnameinfo_failed: + { + if (!g_socket_close (client_sock, &err)) { + GST_DEBUG ("Closing socket failed: %s", err->message); + g_clear_error (&err); + } + g_object_unref (client_sock); + return GST_RTSP_ERROR; + } } static gchar * -do_resolve (const gchar * host) +do_resolve (const gchar * host, GCancellable * cancellable) { - struct hostent *hostinfo; - struct in_addr addr; - const gchar *ip; -#ifdef G_OS_WIN32 - struct in_addr *addrp; -#else - char **addrs; - gchar ipbuf[INET_ADDRSTRLEN]; -#endif /* G_OS_WIN32 */ + GResolver *resolver; + GInetAddress *addr; + GError *err = NULL; + gchar *ip; - ip = NULL; + addr = g_inet_address_new_from_string (host); + if (!addr) { + GList *results, *l; - /* first check if it already is an IP address */ - if (inet_aton (host, &addr)) { - ip = host; - } else { - hostinfo = gethostbyname (host); - if (!hostinfo) - goto not_resolved; /* h_errno set */ - - if (hostinfo->h_addrtype != AF_INET) - goto not_ip; /* host not an IP host */ -#ifdef G_OS_WIN32 - addrp = (struct in_addr *) hostinfo->h_addr_list[0]; - /* this is not threadsafe */ - ip = inet_ntoa (*addrp); -#else - addrs = hostinfo->h_addr_list; - ip = inet_ntop (AF_INET, (struct in_addr *) addrs[0], ipbuf, - sizeof (ipbuf)); -#endif /* G_OS_WIN32 */ + resolver = g_resolver_get_default (); + + results = g_resolver_lookup_by_name (resolver, host, cancellable, &err); + if (!results) + goto name_resolve; + + for (l = results; l; l = l->next) { + GInetAddress *tmp = l->data; + + if (g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV4 || + g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV6) { + addr = G_INET_ADDRESS (g_object_ref (tmp)); + break; + } + } + + g_resolver_free_addresses (results); + g_object_unref (resolver); } - return g_strdup (ip); - /* ERRORS */ -not_resolved: - { - GST_ERROR ("could not resolve %s", host); + if (!addr) return NULL; - } -not_ip: + + ip = g_inet_address_to_string (addr); + g_object_unref (addr); + + return ip; + + /* ERRORS */ +name_resolve: { - GST_ERROR ("not an IP address"); + GST_ERROR ("failed to resolve %s: %s", host, err->message); + g_clear_error (&err); + g_object_unref (resolver); return NULL; } } static GstRTSPResult -do_connect (const gchar * ip, guint16 port, GstPollFD * fdout, - GstPoll * fdset, GTimeVal * timeout) +do_connect (const gchar * ip, guint16 port, GSocket ** socket_out, + GTimeVal * timeout, GCancellable * cancellable) { - gint fd; - struct sockaddr_in sa_in; - gint ret; -#ifdef G_OS_WIN32 - unsigned long flags = 1; -#endif /* G_OS_WIN32 */ + GSocket *socket; GstClockTime to; - gint retval; - - memset (&sa_in, 0, sizeof (sa_in)); - sa_in.sin_family = AF_INET; /* network socket */ - sa_in.sin_port = htons (port); /* on port */ - sa_in.sin_addr.s_addr = inet_addr (ip); /* on host ip */ - - fd = socket (AF_INET, SOCK_STREAM, 0); - if (fd == -1) + GInetAddress *addr; + GSocketAddress *saddr; + GError *err = NULL; + + addr = g_inet_address_new_from_string (ip); + g_assert (addr); + saddr = g_inet_socket_address_new (addr, port); + g_object_unref (addr); + + socket = + g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); + if (socket == NULL) goto no_socket; /* set to non-blocking mode so that we can cancel the connect */ -#ifndef G_OS_WIN32 - fcntl (fd, F_SETFL, O_NONBLOCK); -#else - ioctlsocket (fd, FIONBIO, &flags); -#endif /* G_OS_WIN32 */ - - /* add the socket to our fdset */ - ADD_POLLFD (fdset, fdout, fd); + g_socket_set_blocking (socket, FALSE); /* we are going to connect ASYNC now */ - ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in)); - if (ret == 0) + if (!g_socket_connect (socket, saddr, cancellable, &err)) { + if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_PENDING)) + goto sys_error; + g_clear_error (&err); + } else { goto done; - if (!ERRNO_IS_EINPROGRESS) - goto sys_error; + } /* wait for connect to complete up to the specified timeout or until we got * interrupted. */ - gst_poll_fd_ctl_write (fdset, fdout, TRUE); - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - do { - retval = gst_poll_wait (fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - if (retval == 0) - goto timeout; - else if (retval == -1) - goto sys_error; + g_socket_set_timeout (socket, (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (socket, G_IO_OUT, cancellable, &err)) { + g_socket_set_timeout (socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) + goto timeout; + else + goto sys_error; + } + g_socket_set_timeout (socket, 0); - /* we can still have an error connecting on windows */ - if (gst_poll_fd_has_error (fdset, fdout)) { - socklen_t len = sizeof (errno); -#ifndef G_OS_WIN32 - getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len); -#else - getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len); -#endif + if (!g_socket_check_connect_result (socket, &err)) goto sys_error; - } - gst_poll_fd_ignored (fdset, fdout); done: + g_object_unref (saddr); + + *socket_out = socket; return GST_RTSP_OK; /* ERRORS */ no_socket: { - GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno)); + GST_ERROR ("no socket: %s", err->message); + g_clear_error (&err); + g_object_unref (saddr); return GST_RTSP_ESYS; } sys_error: { - GST_ERROR ("system error %d (%s)", errno, g_strerror (errno)); - REMOVE_POLLFD (fdset, fdout); + GST_ERROR ("system error: %s", err->message); + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (socket); return GST_RTSP_ESYS; } timeout: { GST_ERROR ("timeout"); - REMOVE_POLLFD (fdset, fdout); + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (socket); return GST_RTSP_ETIMEOUT; } } @@ -533,16 +488,18 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) { gint i; GstRTSPResult res; - gchar *str; - guint idx, line; - gint retval; - GstClockTime to; - gchar *ip, *url_port_str; + gchar *ip; + gchar *uri; + gchar *value; guint16 port, url_port; - gchar codestr[4], *resultstr; - gint code; GstRTSPUrl *url; gchar *hostparam; + GstRTSPMessage *msg; + GstRTSPMessage response; + gboolean old_http; + + memset (&response, 0, sizeof (response)); + gst_rtsp_message_init (&response); /* create a random sessionid */ for (i = 0; i < TUNNELID_LEN; i++) @@ -554,196 +511,136 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) gst_rtsp_url_get_port (url, &url_port); if (conn->proxy_host) { - hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port); - url_port_str = g_strdup_printf (":%d", url_port); + uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port, + url->abspath, url->query ? "?" : "", url->query ? url->query : ""); + hostparam = g_strdup_printf ("%s:%d", url->host, url_port); ip = conn->proxy_host; port = conn->proxy_port; } else { + uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", + url->query ? url->query : ""); hostparam = NULL; - url_port_str = NULL; ip = conn->ip; port = url_port; } - /* */ - str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n" - "%s" - "x-sessioncookie: %s\r\n" - "Accept: application/x-rtsp-tunnelled\r\n" - "Pragma: no-cache\r\n" - "Cache-Control: no-cache\r\n" "\r\n", - conn->proxy_host ? "http://" : "", - conn->proxy_host ? url->host : "", - conn->proxy_host ? url_port_str : "", - url->abspath, url->query ? "?" : "", url->query ? url->query : "", - hostparam ? hostparam : "", conn->tunnelid); - - /* we start by writing to this fd */ - conn->writefd = &conn->fd0; - - res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout); - g_free (str); - if (res != GST_RTSP_OK) - goto write_failed; - - gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE); - gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE); - - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - - line = 0; - while (TRUE) { - guint8 buffer[4096]; - - idx = 0; - while (TRUE) { - res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL); - if (res == GST_RTSP_EEOF) - goto eof; - if (res == GST_RTSP_OK) - break; - if (res != GST_RTSP_EINTR) - goto read_error; - - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (retval == 0) - goto timeout; - - if (retval == -1) { - if (errno == EBUSY) - goto stopped; - else - goto select_error; - } - } - - /* check for last line */ - if (buffer[0] == '\r') - buffer[0] = '\0'; - if (buffer[0] == '\0') - break; - - if (line == 0) { - /* first line, parse response */ - gchar versionstr[20]; - gchar *bptr; - - bptr = (gchar *) buffer; + /* create the GET request for the read connection */ + GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri), + no_message); + msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; - parse_string (versionstr, sizeof (versionstr), &bptr); - parse_string (codestr, sizeof (codestr), &bptr); - code = atoi (codestr); + if (hostparam != NULL) + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, + conn->tunnelid); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, + "application/x-rtsp-tunnelled"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); - while (g_ascii_isspace (*bptr)) - bptr++; - - resultstr = bptr; + /* we start by writing to this fd */ + conn->write_socket = conn->socket0; - if (code != GST_RTSP_STS_OK) - goto wrong_result; + /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP + * request from being base64 encoded */ + conn->tunneled = FALSE; + GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; + + /* receive the response to the GET request */ + /* we need to temporarily set manual_http to TRUE since + * gst_rtsp_connection_receive() will treat the HTTP response as a parsing + * failure otherwise */ + old_http = conn->manual_http; + conn->manual_http = TRUE; + GST_RTSP_CHECK (gst_rtsp_connection_receive (conn, &response, timeout), + read_failed); + conn->manual_http = old_http; + + if (response.type != GST_RTSP_MESSAGE_HTTP_RESPONSE || + response.type_data.response.code != GST_RTSP_STS_OK) + goto wrong_result; + + if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, + &value, 0) == GST_RTSP_OK) { + if (conn->proxy_host) { + /* if we use a proxy we need to change the destination url */ + g_free (url->host); + url->host = g_strdup (value); + g_free (hostparam); + hostparam = g_strdup_printf ("%s:%d", url->host, url_port); } else { - gchar key[32]; - gchar *value; - - /* other lines, parse key/value */ - res = parse_key_value (buffer, key, sizeof (key), &value); - if (res == GST_RTSP_OK) { - /* we got a new ip address */ - if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) { - if (conn->proxy_host) { - /* if we use a proxy we need to change the destination url */ - g_free (url->host); - url->host = g_strdup (value); - g_free (hostparam); - g_free (url_port_str); - hostparam = - g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port); - url_port_str = g_strdup_printf (":%d", url_port); - } else { - /* and resolve the new ip address */ - if (!(ip = do_resolve (conn->ip))) - goto not_resolved; - g_free (conn->ip); - conn->ip = ip; - } - } - } + /* and resolve the new ip address */ + if (!(ip = do_resolve (value, conn->cancellable))) + goto not_resolved; + g_free (conn->ip); + conn->ip = ip; } - line++; } /* connect to the host/port */ - res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout); + res = do_connect (ip, port, &conn->socket1, timeout, conn->cancellable); if (res != GST_RTSP_OK) goto connect_failed; /* this is now our writing socket */ - conn->writefd = &conn->fd1; - - /* */ - str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n" - "%s" - "x-sessioncookie: %s\r\n" - "Content-Type: application/x-rtsp-tunnelled\r\n" - "Pragma: no-cache\r\n" - "Cache-Control: no-cache\r\n" - "Content-Length: 32767\r\n" - "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n" - "\r\n", - conn->proxy_host ? "http://" : "", - conn->proxy_host ? url->host : "", - conn->proxy_host ? url_port_str : "", - url->abspath, url->query ? "?" : "", url->query ? url->query : "", - hostparam ? hostparam : "", conn->tunnelid); - - res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout); - g_free (str); - if (res != GST_RTSP_OK) - goto write_failed; + conn->write_socket = conn->socket1; + + /* create the POST request for the write connection */ + GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri), + no_message); + msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; + + if (hostparam != NULL) + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, + conn->tunnelid); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, + "application/x-rtsp-tunnelled"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES, + "Sun, 9 Jan 1972 00:00:00 GMT"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767"); + + /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP + * request from being base64 encoded */ + conn->tunneled = FALSE; + GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; exit: + gst_rtsp_message_unset (&response); g_free (hostparam); - g_free (url_port_str); + g_free (uri); return res; /* ERRORS */ -write_failed: - { - GST_ERROR ("write failed (%d)", res); - goto exit; - } -eof: - { - res = GST_RTSP_EEOF; - goto exit; - } -read_error: - { - goto exit; - } -timeout: +no_message: { - res = GST_RTSP_ETIMEOUT; + GST_ERROR ("failed to create request (%d)", res); goto exit; } -select_error: +write_failed: { - res = GST_RTSP_ESYS; + GST_ERROR ("write failed (%d)", res); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; goto exit; } -stopped: +read_failed: { - res = GST_RTSP_EINTR; + GST_ERROR ("read failed (%d)", res); + conn->manual_http = FALSE; goto exit; } wrong_result: { - GST_ERROR ("got failure response %d %s", code, resultstr); + GST_ERROR ("got failure response %d %s", response.type_data.response.code, + response.type_data.response.reason); res = GST_RTSP_ERROR; goto exit; } @@ -784,12 +681,12 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->socket0 == NULL, GST_RTSP_EINVAL); url = conn->url; if (conn->proxy_host && conn->tunneled) { - if (!(ip = do_resolve (conn->proxy_host))) { + if (!(ip = do_resolve (conn->proxy_host, conn->cancellable))) { GST_ERROR ("could not resolve %s", conn->proxy_host); goto not_resolved; } @@ -797,7 +694,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) g_free (conn->proxy_host); conn->proxy_host = ip; } else { - if (!(ip = do_resolve (url->host))) { + if (!(ip = do_resolve (url->host, conn->cancellable))) { GST_ERROR ("could not resolve %s", url->host); goto not_resolved; } @@ -809,19 +706,19 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) } /* connect to the host/port */ - res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout); + res = do_connect (ip, port, &conn->socket0, timeout, conn->cancellable); if (res != GST_RTSP_OK) goto connect_failed; /* this is our read URL */ - conn->readfd = &conn->fd0; + conn->read_socket = conn->socket0; if (conn->tunneled) { res = setup_tunneling (conn, timeout); if (res != GST_RTSP_OK) goto tunneling_failed; } else { - conn->writefd = &conn->fd0; + conn->write_socket = conn->socket0; } return GST_RTSP_OK; @@ -843,33 +740,23 @@ tunneling_failed: } static void -md5_digest_to_hex_string (unsigned char digest[16], char string[33]) -{ - static const char hexdigits[] = "0123456789abcdef"; - int i; - - for (i = 0; i < 16; i++) { - string[i * 2] = hexdigits[(digest[i] >> 4) & 0x0f]; - string[i * 2 + 1] = hexdigits[digest[i] & 0x0f]; - } - string[32] = 0; -} - -static void auth_digest_compute_hex_urp (const gchar * username, const gchar * realm, const gchar * password, gchar hex_urp[33]) { - struct MD5Context md5_context; - unsigned char digest[16]; + GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5); + const gchar *digest_string; + + g_checksum_update (md5_context, (const guchar *) username, strlen (username)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) realm, strlen (realm)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) password, strlen (password)); + digest_string = g_checksum_get_string (md5_context); + + memset (hex_urp, 0, 33); + memcpy (hex_urp, digest_string, strlen (digest_string)); - MD5Init (&md5_context); - MD5Update (&md5_context, username, strlen (username)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, realm, strlen (realm)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, password, strlen (password)); - MD5Final (digest, &md5_context); - md5_digest_to_hex_string (digest, hex_urp); + g_checksum_free (md5_context); } static void @@ -877,28 +764,30 @@ auth_digest_compute_response (const gchar * method, const gchar * uri, const gchar * hex_a1, const gchar * nonce, gchar response[33]) { - char hex_a2[33]; - struct MD5Context md5_context; - unsigned char digest[16]; + char hex_a2[33] = { 0, }; + GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5); + const gchar *digest_string; /* compute A2 */ - MD5Init (&md5_context); - MD5Update (&md5_context, method, strlen (method)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, uri, strlen (uri)); - MD5Final (digest, &md5_context); - md5_digest_to_hex_string (digest, hex_a2); + g_checksum_update (md5_context, (const guchar *) method, strlen (method)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) uri, strlen (uri)); + digest_string = g_checksum_get_string (md5_context); + memcpy (hex_a2, digest_string, strlen (digest_string)); /* compute KD */ - MD5Init (&md5_context); - MD5Update (&md5_context, hex_a1, strlen (hex_a1)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, nonce, strlen (nonce)); - MD5Update (&md5_context, ":", 1); + g_checksum_reset (md5_context); + g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce)); + g_checksum_update (md5_context, (const guchar *) ":", 1); - MD5Update (&md5_context, hex_a2, 32); - MD5Final (digest, &md5_context); - md5_digest_to_hex_string (digest, response); + g_checksum_update (md5_context, (const guchar *) hex_a2, 32); + digest_string = g_checksum_get_string (md5_context); + memset (response, 0, 33); + memcpy (response, digest_string, strlen (digest_string)); + + g_checksum_free (md5_context); } static void @@ -910,6 +799,9 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) gchar *user_pass64; gchar *auth_string; + if (conn->username == NULL || conn->passwd == NULL) + break; + user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd); user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass)); auth_string = g_strdup_printf ("Basic %s", user_pass64); @@ -931,7 +823,8 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) const gchar *method; /* we need to have some params set */ - if (conn->auth_params == NULL) + if (conn->auth_params == NULL || conn->username == NULL || + conn->passwd == NULL) break; /* we need the realm and nonce */ @@ -973,24 +866,31 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) static void gen_date_string (gchar * date_string, guint len) { - GTimeVal tv; + static const char wkdays[7][4] = + { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" }; + static const char months[12][4] = + { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", + "Nov", "Dec" + }; + struct tm tm; time_t t; -#ifdef HAVE_GMTIME_R - struct tm tm_; -#endif - g_get_current_time (&tv); - t = (time_t) tv.tv_sec; + time (&t); #ifdef HAVE_GMTIME_R - strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_)); + gmtime_r (&t, &tm); #else - strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t)); + tm = *gmtime (&t); #endif + + g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT", + wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900, + tm.tm_hour, tm.tm_min, tm.tm_sec); } static GstRTSPResult -write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) +write_bytes (GSocket * socket, const guint8 * buffer, guint * idx, guint size, + GCancellable * cancellable) { guint left; @@ -1000,16 +900,20 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) left = size - *idx; while (left) { - gint r; + GError *err = NULL; + gssize r; - r = WRITE_SOCKET (fd, &buffer[*idx], left); + r = g_socket_send (socket, (gchar *) & buffer[*idx], left, cancellable, + &err); if (G_UNLIKELY (r == 0)) { return GST_RTSP_EINTR; } else if (G_UNLIKELY (r < 0)) { - if (ERRNO_IS_EAGAIN) + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + } + g_clear_error (&err); + return GST_RTSP_ESYS; } else { left -= r; *idx += r; @@ -1019,8 +923,45 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) } static gint -fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx) +fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, + GError ** err) +{ + gint out = 0; + + if (G_UNLIKELY (conn->initial_buffer != NULL)) { + gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]); + + out = MIN (left, size); + memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out); + + if (left == (gsize) out) { + g_free (conn->initial_buffer); + conn->initial_buffer = NULL; + conn->initial_buffer_offset = 0; + } else + conn->initial_buffer_offset += out; + } + + if (G_LIKELY (size > (guint) out)) { + gssize r; + + r = g_socket_receive (conn->read_socket, (gchar *) & buffer[out], + size - out, conn->cancellable, err); + if (r <= 0) { + if (out == 0) + out = r; + } else + out += r; + } + + return out; +} + +static gint +fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, + GError ** err) { + DecodeCtx *ctx = conn->ctxp; gint out = 0; if (ctx) { @@ -1040,7 +981,7 @@ fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx) break; /* try to read more bytes */ - r = READ_SOCKET (fd, in, sizeof (in)); + r = fill_raw_bytes (conn, in, sizeof (in), err); if (r <= 0) { if (out == 0) out = r; @@ -1053,16 +994,17 @@ fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx) &ctx->save); } } else { - out = READ_SOCKET (fd, buffer, size); + out = fill_raw_bytes (conn, buffer, size, err); } return out; } static GstRTSPResult -read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) +read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) { guint left; + GError *err = NULL; if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; @@ -1072,14 +1014,16 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) while (left) { gint r; - r = fill_bytes (fd, &buffer[*idx], left, ctx); + r = fill_bytes (conn, &buffer[*idx], left, &err); if (G_UNLIKELY (r == 0)) { return GST_RTSP_EEOF; } else if (G_UNLIKELY (r < 0)) { - if (ERRNO_IS_EAGAIN) + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + } + g_clear_error (&err); + return GST_RTSP_ESYS; } else { left -= r; *idx += r; @@ -1088,30 +1032,130 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) return GST_RTSP_OK; } +/* The code below tries to handle clients using \r, \n or \r\n to indicate the + * end of a line. It even does its best to handle clients which mix them (even + * though this is a really stupid idea (tm).) It also handles Line White Space + * (LWS), where a line end followed by whitespace is considered LWS. This is + * the method used in RTSP (and HTTP) to break long lines. + */ static GstRTSPResult -read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) +read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) { + GError *err = NULL; + while (TRUE) { guint8 c; gint r; - r = fill_bytes (fd, &c, 1, ctx); - if (G_UNLIKELY (r == 0)) { - return GST_RTSP_EEOF; - } else if (G_UNLIKELY (r < 0)) { - if (ERRNO_IS_EAGAIN) - return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + if (conn->read_ahead == READ_AHEAD_EOH) { + /* the last call to read_line() already determined that we have reached + * the end of the headers, so convey that information now */ + conn->read_ahead = 0; + break; + } else if (conn->read_ahead == READ_AHEAD_CRLF) { + /* the last call to read_line() left off after having read \r\n */ + c = '\n'; + } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { + /* the last call to read_line() left off after having read \r\n\r */ + c = '\r'; + } else if (conn->read_ahead != 0) { + /* the last call to read_line() left us with a character to start with */ + c = (guint8) conn->read_ahead; + conn->read_ahead = 0; } else { - if (c == '\n') /* end on \n */ - break; - if (c == '\r') /* ignore \r */ - continue; + /* read the next character */ + r = fill_bytes (conn, &c, 1, &err); + if (G_UNLIKELY (r == 0)) { + return GST_RTSP_EEOF; + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } + + g_clear_error (&err); + return GST_RTSP_ESYS; + } + } + + /* special treatment of line endings */ + if (c == '\r' || c == '\n') { + guint8 read_ahead; + + retry: + /* need to read ahead one more character to know what to do... */ + r = fill_bytes (conn, &read_ahead, 1, &err); + if (G_UNLIKELY (r == 0)) { + return GST_RTSP_EEOF; + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + /* remember the original character we read and try again next time */ + if (conn->read_ahead == 0) + conn->read_ahead = c; + return GST_RTSP_EINTR; + g_clear_error (&err); + return GST_RTSP_EINTR; + } + + g_clear_error (&err); + return GST_RTSP_ESYS; + } - if (G_LIKELY (*idx < size - 1)) - buffer[(*idx)++] = c; + if (read_ahead == ' ' || read_ahead == '\t') { + if (conn->read_ahead == READ_AHEAD_CRLFCR) { + /* got \r\n\r followed by whitespace, treat it as a normal line + * followed by one starting with LWS */ + conn->read_ahead = read_ahead; + break; + } else { + /* got LWS, change the line ending to a space and continue */ + c = ' '; + conn->read_ahead = read_ahead; + } + } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { + if (read_ahead == '\r' || read_ahead == '\n') { + /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else { + /* got \r\n\r followed by something else, this is not really + * supported since we have probably just eaten the first character + * of the body or the next message, so just ignore the second \r + * and live with it... */ + conn->read_ahead = read_ahead; + break; + } + } else if (conn->read_ahead == READ_AHEAD_CRLF) { + if (read_ahead == '\r') { + /* got \r\n\r so far, need one more character... */ + conn->read_ahead = READ_AHEAD_CRLFCR; + goto retry; + } else if (read_ahead == '\n') { + /* got \r\n\n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else { + /* found the end of a line, keep read_ahead for the next line */ + conn->read_ahead = read_ahead; + break; + } + } else if (c == read_ahead) { + /* got double \r or \n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else if (c == '\r' && read_ahead == '\n') { + /* got \r\n so far, still need more to know what to do... */ + conn->read_ahead = READ_AHEAD_CRLF; + goto retry; + } else { + /* found the end of a line, keep read_ahead for the next line */ + conn->read_ahead = read_ahead; + break; + } } + + if (G_LIKELY (*idx < size - 1)) + buffer[(*idx)++] = c; } buffer[*idx] = '\0'; @@ -1138,20 +1182,13 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) { guint offset; - gint retval; GstClockTime to; GstRTSPResult res; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); - - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE); - /* clear all previous poll results */ - gst_poll_fd_ignored (conn->fdset, conn->writefd); - gst_poll_fd_ignored (conn->fdset, conn->readfd); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; @@ -1159,26 +1196,31 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, while (TRUE) { /* try to write */ - res = write_bytes (conn->writefd->fd, data, &offset, size); + res = + write_bytes (conn->write_socket, data, &offset, size, + conn->cancellable); if (G_LIKELY (res == GST_RTSP_OK)) break; if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto write_error; /* not all is written, wait until we can write more */ - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - if (G_UNLIKELY (retval == 0)) - goto timeout; - - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) + g_socket_set_timeout (conn->write_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable, &err)) { + g_socket_set_timeout (conn->write_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto timeout; + } + g_clear_error (&err); + goto select_error; } + g_socket_set_timeout (conn->write_socket, 0); } return GST_RTSP_OK; @@ -1217,6 +1259,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) message->type_data.request.uri, conn->cseq++); /* add session id if we have one */ if (conn->session_id[0] != '\0') { + gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1); gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION, conn->session_id); } @@ -1228,6 +1271,21 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) g_string_append_printf (str, "RTSP/1.0 %d %s\r\n", message->type_data.response.code, message->type_data.response.reason); break; + case GST_RTSP_MESSAGE_HTTP_REQUEST: + /* create request string */ + g_string_append_printf (str, "%s %s HTTP/%s\r\n", + gst_rtsp_method_as_text (message->type_data.request.method), + message->type_data.request.uri, + gst_rtsp_version_as_text (message->type_data.request.version)); + /* add any authentication headers */ + add_auth_header (conn, message); + break; + case GST_RTSP_MESSAGE_HTTP_RESPONSE: + /* create response string */ + g_string_append_printf (str, "HTTP/%s %d %s\r\n", + gst_rtsp_version_as_text (message->type_data.request.version), + message->type_data.response.code, message->type_data.response.reason); + break; case GST_RTSP_MESSAGE_DATA: { guint8 data_header[4]; @@ -1258,6 +1316,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) gen_date_string (date_string, sizeof (date_string)); /* add date header */ + gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1); gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string); /* append headers */ @@ -1338,9 +1397,10 @@ no_message: } } -static void +static GstRTSPResult parse_string (gchar * dest, gint size, gchar ** src) { + GstRTSPResult res = GST_RTSP_OK; gint idx; idx = 0; @@ -1351,31 +1411,64 @@ parse_string (gchar * dest, gint size, gchar ** src) while (!g_ascii_isspace (**src) && **src != '\0') { if (idx < size - 1) dest[idx++] = **src; + else + res = GST_RTSP_EPARSE; (*src)++; } if (size > 0) dest[idx] = '\0'; + + return res; } -static void -parse_key (gchar * dest, gint size, gchar ** src) +static GstRTSPResult +parse_protocol_version (gchar * protocol, GstRTSPMsgType * type, + GstRTSPVersion * version) { - gint idx; + GstRTSPResult res = GST_RTSP_OK; + gchar *ver; - idx = 0; - while (**src != ':' && **src != '\0') { - if (idx < size - 1) - dest[idx++] = **src; - (*src)++; - } - if (size > 0) - dest[idx] = '\0'; + if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) { + guint major; + guint minor; + gchar dummychar; + + *ver++ = '\0'; + + /* the version number must be formatted as X.Y with nothing following */ + if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2) + res = GST_RTSP_EPARSE; + + if (g_ascii_strcasecmp (protocol, "RTSP") == 0) { + if (major != 1 || minor != 0) { + *version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_ERROR; + } + } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) { + if (*type == GST_RTSP_MESSAGE_REQUEST) + *type = GST_RTSP_MESSAGE_HTTP_REQUEST; + else if (*type == GST_RTSP_MESSAGE_RESPONSE) + *type = GST_RTSP_MESSAGE_HTTP_RESPONSE; + + if (major == 1 && minor == 1) { + *version = GST_RTSP_VERSION_1_1; + } else if (major != 1 || minor != 0) { + *version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_ERROR; + } + } else + res = GST_RTSP_EPARSE; + } else + res = GST_RTSP_EPARSE; + + return res; } static GstRTSPResult parse_response_status (guint8 * buffer, GstRTSPMessage * msg) { - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_OK; + GstRTSPResult res2; gchar versionstr[20]; gchar codestr[4]; gint code; @@ -1383,170 +1476,222 @@ parse_response_status (guint8 * buffer, GstRTSPMessage * msg) bptr = (gchar *) buffer; - parse_string (versionstr, sizeof (versionstr), &bptr); - parse_string (codestr, sizeof (codestr), &bptr); + if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; + + if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; code = atoi (codestr); + if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600)) + res = GST_RTSP_EPARSE; while (g_ascii_isspace (*bptr)) bptr++; - if (strcmp (versionstr, "RTSP/1.0") == 0) - GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL), - parse_error); - else if (strncmp (versionstr, "RTSP/", 5) == 0) { - GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL), - parse_error); - msg->type_data.response.version = GST_RTSP_VERSION_INVALID; - } else - goto parse_error; + if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr, + NULL) != GST_RTSP_OK)) + res = GST_RTSP_EPARSE; - return GST_RTSP_OK; + res2 = parse_protocol_version (versionstr, &msg->type, + &msg->type_data.response.version); + if (G_LIKELY (res == GST_RTSP_OK)) + res = res2; -parse_error: - { - return GST_RTSP_EPARSE; - } + return res; } static GstRTSPResult -parse_request_line (GstRTSPConnection * conn, guint8 * buffer, - GstRTSPMessage * msg) +parse_request_line (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPResult res = GST_RTSP_OK; + GstRTSPResult res2; gchar versionstr[20]; gchar methodstr[20]; gchar urlstr[4096]; gchar *bptr; GstRTSPMethod method; - GstRTSPTunnelState tstate = TUNNEL_STATE_NONE; bptr = (gchar *) buffer; - parse_string (methodstr, sizeof (methodstr), &bptr); + if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; method = gst_rtsp_find_method (methodstr); - if (method == GST_RTSP_INVALID) { - /* a tunnel request is allowed when we don't have one yet */ - if (conn->tstate != TUNNEL_STATE_NONE) - goto invalid_method; - /* we need GET or POST for a valid tunnel request */ - if (!strcmp (methodstr, "GET")) - tstate = TUNNEL_STATE_GET; - else if (!strcmp (methodstr, "POST")) - tstate = TUNNEL_STATE_POST; - else - goto invalid_method; - } - parse_string (urlstr, sizeof (urlstr), &bptr); + if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; if (G_UNLIKELY (*urlstr == '\0')) - goto invalid_url; - - parse_string (versionstr, sizeof (versionstr), &bptr); - - if (G_UNLIKELY (*bptr != '\0')) - goto invalid_version; - - if (strcmp (versionstr, "RTSP/1.0") == 0) { - res = gst_rtsp_message_init_request (msg, method, urlstr); - } else if (strncmp (versionstr, "RTSP/", 5) == 0) { - res = gst_rtsp_message_init_request (msg, method, urlstr); - msg->type_data.request.version = GST_RTSP_VERSION_INVALID; - } else if (strcmp (versionstr, "HTTP/1.0") == 0) { - /* tunnel request, we need a tunnel method */ - if (tstate == TUNNEL_STATE_NONE) { - res = GST_RTSP_EPARSE; - } else { - conn->tstate = tstate; - } - } else { res = GST_RTSP_EPARSE; - } - - return res; - - /* ERRORS */ -invalid_method: - { - GST_ERROR ("invalid method %s", methodstr); - return GST_RTSP_EPARSE; - } -invalid_url: - { - GST_ERROR ("invalid url %s", urlstr); - return GST_RTSP_EPARSE; - } -invalid_version: - { - GST_ERROR ("invalid version"); - return GST_RTSP_EPARSE; - } -} -static GstRTSPResult -parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value) -{ - gchar *bptr; - - bptr = (gchar *) buffer; - - /* read key */ - parse_key (key, keysize, &bptr); - if (G_UNLIKELY (*bptr != ':')) - goto no_column; + if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; - bptr++; - while (g_ascii_isspace (*bptr)) - bptr++; + if (G_UNLIKELY (*bptr != '\0')) + res = GST_RTSP_EPARSE; - *value = bptr; + if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method, + urlstr) != GST_RTSP_OK)) + res = GST_RTSP_EPARSE; - return GST_RTSP_OK; + res2 = parse_protocol_version (versionstr, &msg->type, + &msg->type_data.request.version); + if (G_LIKELY (res == GST_RTSP_OK)) + res = res2; - /* ERRORS */ -no_column: - { - return GST_RTSP_EPARSE; + if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) { + /* GET and POST are not allowed as RTSP methods */ + if (msg->type_data.request.method == GST_RTSP_GET || + msg->type_data.request.method == GST_RTSP_POST) { + msg->type_data.request.method = GST_RTSP_INVALID; + if (res == GST_RTSP_OK) + res = GST_RTSP_ERROR; + } + } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + /* only GET and POST are allowed as HTTP methods */ + if (msg->type_data.request.method != GST_RTSP_GET && + msg->type_data.request.method != GST_RTSP_POST) { + msg->type_data.request.method = GST_RTSP_INVALID; + if (res == GST_RTSP_OK) + res = GST_RTSP_ERROR; + } } + + return res; } /* parsing lines means reading a Key: Value pair */ static GstRTSPResult -parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg) +parse_line (guint8 * buffer, GstRTSPMessage * msg) { - GstRTSPResult res; - gchar key[32]; - gchar *value; GstRTSPHeaderField field; + gchar *line = (gchar *) buffer; + gchar *value; - res = parse_key_value (buffer, key, sizeof (key), &value); - if (G_UNLIKELY (res != GST_RTSP_OK)) + if ((value = strchr (line, ':')) == NULL || value == line) goto parse_error; - if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) { - /* save the tunnel session in the connection */ - if (!strcmp (key, "x-sessioncookie")) { - strncpy (conn->tunnelid, value, TUNNELID_LEN); - conn->tunnelid[TUNNELID_LEN - 1] = '\0'; - conn->tunneled = TRUE; + /* trim space before the colon */ + if (value[-1] == ' ') + value[-1] = '\0'; + + /* replace the colon with a NUL */ + *value++ = '\0'; + + /* find the header */ + field = gst_rtsp_find_header_field (line); + if (field == GST_RTSP_HDR_INVALID) + goto done; + + /* split up the value in multiple key:value pairs if it contains comma(s) */ + while (*value != '\0') { + gchar *next_value; + gchar *comma = NULL; + gboolean quoted = FALSE; + guint comment = 0; + + /* trim leading space */ + if (*value == ' ') + value++; + + /* for headers which may not appear multiple times, and thus may not + * contain multiple values on the same line, we can short-circuit the loop + * below and the entire value results in just one key:value pair*/ + if (!gst_rtsp_header_allow_multiple (field)) + next_value = value + strlen (value); + else + next_value = value; + + /* find the next value, taking special care of quotes and comments */ + while (*next_value != '\0') { + if ((quoted || comment != 0) && *next_value == '\\' && + next_value[1] != '\0') + next_value++; + else if (comment == 0 && *next_value == '"') + quoted = !quoted; + else if (!quoted && *next_value == '(') + comment++; + else if (comment != 0 && *next_value == ')') + comment--; + else if (!quoted && comment == 0) { + /* To quote RFC 2068: "User agents MUST take special care in parsing + * the WWW-Authenticate field value if it contains more than one + * challenge, or if more than one WWW-Authenticate header field is + * provided, since the contents of a challenge may itself contain a + * comma-separated list of authentication parameters." + * + * What this means is that we cannot just look for an unquoted comma + * when looking for multiple values in Proxy-Authenticate and + * WWW-Authenticate headers. Instead we need to look for the sequence + * "comma [space] token space token" before we can split after the + * comma... + */ + if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE || + field == GST_RTSP_HDR_WWW_AUTHENTICATE) { + if (*next_value == ',') { + if (next_value[1] == ' ') { + /* skip any space following the comma so we do not mistake it for + * separating between two tokens */ + next_value++; + } + comma = next_value; + } else if (*next_value == ' ' && next_value[1] != ',' && + next_value[1] != '=' && comma != NULL) { + next_value = comma; + comma = NULL; + break; + } + } else if (*next_value == ',') + break; + } + + next_value++; } - } else { - field = gst_rtsp_find_header_field (key); - if (field != GST_RTSP_HDR_INVALID) + + /* trim space */ + if (value != next_value && next_value[-1] == ' ') + next_value[-1] = '\0'; + + if (*next_value != '\0') + *next_value++ = '\0'; + + /* add the key:value pair */ + if (*value != '\0') gst_rtsp_message_add_header (msg, field, value); + + value = next_value; } +done: return GST_RTSP_OK; /* ERRORS */ parse_error: { - return res; + return GST_RTSP_EPARSE; + } +} + +/* convert all consecutive whitespace to a single space */ +static void +normalize_line (guint8 * buffer) +{ + while (*buffer) { + if (g_ascii_isspace (*buffer)) { + guint8 *tmp; + + *buffer++ = ' '; + for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) { + } + if (buffer != tmp) + memmove (buffer, tmp, strlen ((gchar *) tmp) + 1); + } else { + buffer++; + } } } /* returns: * GST_RTSP_OK when a complete message was read. - * GST_RTSP_EEOF: when the socket is closed + * GST_RTSP_EEOF: when the read socket is closed * GST_RTSP_EINTR: when more data is needed. * GST_RTSP_..: some other error occured. */ @@ -1559,28 +1704,35 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, while (TRUE) { switch (builder->state) { case STATE_START: + { + guint8 c; + builder->offset = 0; res = - read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, - &builder->offset, 1, conn->ctxp); + read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1); if (res != GST_RTSP_OK) goto done; + c = builder->buffer[0]; + /* we have 1 bytes now and we can see if this is a data message or * not */ - if (builder->buffer[0] == '$') { + if (c == '$') { /* data message, prepare for the header */ builder->state = STATE_DATA_HEADER; + } else if (c == '\n' || c == '\r') { + /* skip \n and \r */ + builder->offset = 0; } else { builder->line = 0; builder->state = STATE_READ_LINES; } break; + } case STATE_DATA_HEADER: { res = - read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, - &builder->offset, 4, conn->ctxp); + read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4); if (res != GST_RTSP_OK) goto done; @@ -1596,13 +1748,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, case STATE_DATA_BODY: { res = - read_bytes (conn->readfd->fd, builder->body_data, &builder->offset, - builder->body_len, conn->ctxp); + read_bytes (conn, builder->body_data, &builder->offset, + builder->body_len); if (res != GST_RTSP_OK) goto done; /* we have the complete body now, store in the message adjusting the - * length to include the traling '\0' */ + * length to include the trailing '\0' */ gst_rtsp_message_take_body (message, (guint8 *) builder->body_data, builder->body_len + 1); builder->body_data = NULL; @@ -1613,26 +1765,33 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } case STATE_READ_LINES: { - res = read_line (conn->readfd->fd, builder->buffer, &builder->offset, - sizeof (builder->buffer), conn->ctxp); + res = read_line (conn, builder->buffer, &builder->offset, + sizeof (builder->buffer)); if (res != GST_RTSP_OK) goto done; /* we have a regular response */ - if (builder->buffer[0] == '\r') { - builder->buffer[0] = '\0'; - } - if (builder->buffer[0] == '\0') { gchar *hdrval; /* empty line, end of message header */ - /* see if there is a Content-Length header */ + /* see if there is a Content-Length header, but ignore it if this + * is a POST request with an x-sessioncookie header */ if (gst_rtsp_message_get_header (message, - GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) { + GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK && + (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST || + message->type_data.request.method != GST_RTSP_POST || + gst_rtsp_message_get_header (message, + GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) { /* there is, prepare to read the body */ builder->body_len = atol (hdrval); - builder->body_data = g_malloc (builder->body_len + 1); + builder->body_data = g_try_malloc (builder->body_len + 1); + /* we can't do much here, we need the length to know how many bytes + * we need to read next and when allocation fails, something is + * probably wrong with the length. */ + if (builder->body_data == NULL) + goto invalid_body_len; + builder->body_data[builder->body_len] = '\0'; builder->offset = 0; builder->state = STATE_DATA_BODY; @@ -1643,19 +1802,20 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } /* we have a line */ + normalize_line (builder->buffer); if (builder->line == 0) { /* first line, check for response status */ - if (memcmp (builder->buffer, "RTSP", 4) == 0) { - res = parse_response_status (builder->buffer, message); + if (memcmp (builder->buffer, "RTSP", 4) == 0 || + memcmp (builder->buffer, "HTTP", 4) == 0) { + builder->status = parse_response_status (builder->buffer, message); } else { - res = parse_request_line (conn, builder->buffer, message); + builder->status = parse_request_line (builder->buffer, message); } - /* the first line must parse without errors */ - if (res != GST_RTSP_OK) - goto done; } else { - /* else just parse the line, ignore errors */ - parse_line (conn, builder->buffer, message); + /* else just parse the line */ + res = parse_line (builder->buffer, message); + if (res != GST_RTSP_OK) + builder->status = res; } builder->line++; builder->offset = 0; @@ -1663,24 +1823,29 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } case STATE_END: { + gchar *session_cookie; gchar *session_id; - if (conn->tstate == TUNNEL_STATE_GET) { - res = GST_RTSP_ETGET; - goto done; - } else if (conn->tstate == TUNNEL_STATE_POST) { - res = GST_RTSP_ETPOST; - goto done; - } - if (message->type == GST_RTSP_MESSAGE_DATA) { /* data messages don't have headers */ res = GST_RTSP_OK; goto done; } + /* save the tunnel session in the connection */ + if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST && + !conn->manual_http && + conn->tstate == TUNNEL_STATE_NONE && + gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE, + &session_cookie, 0) == GST_RTSP_OK) { + strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN); + conn->tunnelid[TUNNELID_LEN - 1] = '\0'; + conn->tunneled = TRUE; + } + /* save session id in the connection for further use */ - if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, + if (message->type == GST_RTSP_MESSAGE_RESPONSE && + gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, &session_id, 0) == GST_RTSP_OK) { gint maxlen, i; @@ -1709,7 +1874,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, strncpy (conn->session_id, session_id, maxlen); conn->session_id[maxlen] = '\0'; } - res = GST_RTSP_OK; + res = builder->status; goto done; } default: @@ -1719,6 +1884,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } done: return res; + + /* ERRORS */ +invalid_body_len: + { + GST_DEBUG ("could not allocate body"); + return GST_RTSP_ERROR; + } } /** @@ -1741,13 +1913,13 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, GTimeVal * timeout) { guint offset; - gint retval; GstClockTime to; GstRTSPResult res; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); if (G_UNLIKELY (size == 0)) return GST_RTSP_OK; @@ -1757,12 +1929,8 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); - while (TRUE) { - res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp); + res = read_bytes (conn, data, &offset, size); if (G_UNLIKELY (res == GST_RTSP_EEOF)) goto eof; if (G_LIKELY (res == GST_RTSP_OK)) @@ -1770,21 +1938,23 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto read_error; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (G_UNLIKELY (retval == 0)) - goto select_timeout; - - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) + g_socket_set_timeout (conn->read_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, + &err)) { + g_socket_set_timeout (conn->read_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto select_timeout; + } + g_clear_error (&err); + goto select_error; } - gst_poll_set_controllable (conn->fdset, FALSE); + g_socket_set_timeout (conn->read_socket, 0); } return GST_RTSP_OK; @@ -1811,38 +1981,40 @@ read_error: } } -static GString * -gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code) +static GstRTSPMessage * +gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code, + const GstRTSPMessage * request) { - GString *str; - gchar date_string[100]; - const gchar *status; - - gen_date_string (date_string, sizeof (date_string)); + GstRTSPMessage *msg; + GstRTSPResult res; - status = gst_rtsp_status_as_text (code); - if (status == NULL) { + if (gst_rtsp_status_as_text (code) == NULL) code = GST_RTSP_STS_INTERNAL_SERVER_ERROR; - status = "Internal Server Error"; - } - str = g_string_new (""); + GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request), + no_message); + + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER, + "GStreamer RTSP Server"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); - /* */ - g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status); - g_string_append_printf (str, - "Server: GStreamer RTSP Server\r\n" - "Date: %s\r\n" - "Connection: close\r\n" - "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string); if (code == GST_RTSP_STS_OK) { if (conn->ip) - g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip); - g_string_append_printf (str, - "Content-Type: application/x-rtsp-tunnelled\r\n"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, + conn->ip); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE, + "application/x-rtsp-tunnelled"); + } + + return msg; + + /* ERRORS */ +no_message: + { + return NULL; } - g_string_append_printf (str, "\r\n"); - return str; } /** @@ -1865,58 +2037,76 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, { GstRTSPResult res; GstRTSPBuilder builder; - gint retval; GstClockTime to; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); - memset (&builder, 0, sizeof (GstRTSPBuilder)); while (TRUE) { res = build_next (&builder, message, conn); if (G_UNLIKELY (res == GST_RTSP_EEOF)) goto eof; - if (G_LIKELY (res == GST_RTSP_OK)) - break; - if (res == GST_RTSP_ETGET) { - GString *str; - - /* tunnel GET request, we can reply now */ - str = gen_tunnel_reply (conn, GST_RTSP_STS_OK); - res = - gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len, - timeout); - g_string_free (str, TRUE); - } else if (res == GST_RTSP_ETPOST) { - /* tunnel POST request, return the value, the caller now has to link the - * two connections. */ + else if (G_LIKELY (res == GST_RTSP_OK)) { + if (!conn->manual_http) { + if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; + + conn->tstate = TUNNEL_STATE_GET; + + /* tunnel GET request, we can reply now */ + response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message); + res = gst_rtsp_connection_send (conn, response, timeout); + gst_rtsp_message_free (response); + if (res == GST_RTSP_OK) + res = GST_RTSP_ETGET; + goto cleanup; + } else if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_POST) { + conn->tstate = TUNNEL_STATE_POST; + + /* tunnel POST request, the caller now has to link the two + * connections. */ + res = GST_RTSP_ETPOST; + goto cleanup; + } else { + res = GST_RTSP_EPARSE; + goto cleanup; + } + } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + res = GST_RTSP_EPARSE; + goto cleanup; + } + } + break; } else if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto read_error; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (G_UNLIKELY (retval == 0)) - goto select_timeout; - - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) + g_socket_set_timeout (conn->read_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, + &err)) { + g_socket_set_timeout (conn->read_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto select_timeout; + } + g_clear_error (&err); + goto select_error; } - gst_poll_set_controllable (conn->fdset, FALSE); + g_socket_set_timeout (conn->read_socket, 0); } /* we have a message here */ @@ -1968,13 +2158,28 @@ gst_rtsp_connection_close (GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + /* last unref closes the connection we don't want to explicitly close here + * because these sockets might have been provided at construction */ + if (conn->socket0) { + g_object_unref (conn->socket0); + conn->socket0 = NULL; + } + if (conn->socket1) { + g_object_unref (conn->socket1); + conn->socket1 = NULL; + } + g_free (conn->ip); conn->ip = NULL; - REMOVE_POLLFD (conn->fdset, &conn->fd0); - REMOVE_POLLFD (conn->fdset, &conn->fd1); - conn->writefd = NULL; - conn->readfd = NULL; + conn->read_ahead = 0; + + g_free (conn->initial_buffer); + conn->initial_buffer = NULL; + conn->initial_buffer_offset = 0; + + conn->write_socket = NULL; + conn->read_socket = NULL; conn->tunneled = FALSE; conn->tstate = TUNNEL_STATE_NONE; conn->ctxp = NULL; @@ -2006,14 +2211,14 @@ gst_rtsp_connection_free (GstRTSPConnection * conn) g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); res = gst_rtsp_connection_close (conn); - gst_poll_free (conn->fdset); + + if (conn->cancellable) + g_object_unref (conn->cancellable); + g_timer_destroy (conn->timer); gst_rtsp_url_free (conn->url); g_free (conn->proxy_host); g_free (conn); -#ifdef G_OS_WIN32 - WSACleanup (); -#endif return res; } @@ -2043,64 +2248,66 @@ gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events, GstRTSPEvent * revents, GTimeVal * timeout) { GstClockTime to; - gint retval; + GMainContext *ctx; + GSource *rs, *ws, *ts; + GIOCondition condition; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (events != 0, GST_RTSP_EINVAL); g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); - gst_poll_set_controllable (conn->fdset, TRUE); - - /* add fd to writer set when asked to */ - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, - events & GST_RTSP_EV_WRITE); - - /* add fd to reader set when asked to */ - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ); + ctx = g_main_context_new (); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + if (timeout) { + ts = g_timeout_source_new (to / GST_MSECOND); + g_source_set_dummy_callback (ts); + g_source_attach (ts, ctx); + g_source_unref (ts); + } - if (G_UNLIKELY (retval == 0)) - goto select_timeout; + rs = g_socket_create_source (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable); + g_source_set_dummy_callback (rs); + g_source_attach (rs, ctx); + g_source_unref (rs); - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) - goto stopped; - else - goto select_error; - } + ws = g_socket_create_source (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable); + g_source_set_dummy_callback (ws); + g_source_attach (ws, ctx); + g_source_unref (ws); + + /* Returns after handling all pending events */ + g_main_context_iteration (ctx, TRUE); + + g_main_context_unref (ctx); + + condition = + g_socket_condition_check (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); + condition |= + g_socket_condition_check (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP); *revents = 0; if (events & GST_RTSP_EV_READ) { - if (gst_poll_fd_can_read (conn->fdset, conn->readfd)) + if ((condition & G_IO_IN) || (condition & G_IO_PRI)) *revents |= GST_RTSP_EV_READ; } if (events & GST_RTSP_EV_WRITE) { - if (gst_poll_fd_can_write (conn->fdset, conn->writefd)) + if ((condition & G_IO_OUT)) *revents |= GST_RTSP_EV_WRITE; } - return GST_RTSP_OK; - /* ERRORS */ -select_timeout: - { + if (*revents == 0) return GST_RTSP_ETIMEOUT; - } -select_error: - { - return GST_RTSP_ESYS; - } -stopped: - { - return GST_RTSP_EINTR; - } + + return GST_RTSP_OK; } /** @@ -2109,7 +2316,7 @@ stopped: * @timeout: a timeout * * Calculate the next timeout for @conn, storing the result in @timeout. - * + * * Returns: #GST_RTSP_OK. */ GstRTSPResult @@ -2118,16 +2325,34 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) gdouble elapsed; glong sec; gulong usec; + gint ctimeout; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL); + ctimeout = conn->timeout; + if (ctimeout >= 20) { + /* Because we should act before the timeout we timeout 5 + * seconds in advance. */ + ctimeout -= 5; + } else if (ctimeout >= 5) { + /* else timeout 20% earlier */ + ctimeout -= ctimeout / 5; + } else if (ctimeout >= 1) { + /* else timeout 1 second earlier */ + ctimeout -= 1; + } + elapsed = g_timer_elapsed (conn->timer, &usec); - if (elapsed >= conn->timeout) { + if (elapsed >= ctimeout) { sec = 0; usec = 0; } else { - sec = conn->timeout - elapsed; + sec = ctimeout - elapsed; + if (usec <= G_USEC_PER_SEC) + usec = G_USEC_PER_SEC - usec; + else + usec = 0; } timeout->tv_sec = sec; @@ -2141,7 +2366,7 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) * @conn: a #GstRTSPConnection * * Reset the timeout of @conn. - * + * * Returns: #GST_RTSP_OK. */ GstRTSPResult @@ -2170,7 +2395,10 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - gst_poll_set_flushing (conn->fdset, flush); + if (flush) + g_cancellable_cancel (conn->cancellable); + else + g_cancellable_reset (conn->cancellable); return GST_RTSP_OK; } @@ -2327,21 +2555,21 @@ gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn) } static GstRTSPResult -set_qos_dscp (gint fd, guint qos_dscp) +set_qos_dscp (GSocket * socket, guint qos_dscp) { - union gst_sockaddr - { - struct sockaddr sa; - struct sockaddr_in6 sa_in6; - struct sockaddr_storage sa_stor; - } sa; +#ifndef IP_TOS + return GST_RTSP_OK; +#else + gint fd; + union gst_sockaddr sa; socklen_t slen = sizeof (sa); gint af; gint tos; - if (fd == -1) + if (!socket) return GST_RTSP_OK; + fd = g_socket_get_fd (socket); if (getsockname (fd, &sa.sa, &slen) < 0) goto no_getsockname; @@ -2358,12 +2586,12 @@ set_qos_dscp (gint fd, guint qos_dscp) switch (af) { case AF_INET: - if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) + if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) goto no_setsockopt; break; case AF_INET6: #ifdef IPV6_TCLASS - if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) + if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) goto no_setsockopt; break; #endif @@ -2379,11 +2607,11 @@ no_setsockopt: { return GST_RTSP_ESYS; } - wrong_family: { return GST_RTSP_ERROR; } +#endif } /** @@ -2403,12 +2631,12 @@ gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp) GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); - res = set_qos_dscp (conn->fd0.fd, qos_dscp); + res = set_qos_dscp (conn->socket0, qos_dscp); if (res == GST_RTSP_OK) - res = set_qos_dscp (conn->fd1.fd, qos_dscp); + res = set_qos_dscp (conn->socket1, qos_dscp); return res; } @@ -2476,40 +2704,58 @@ gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip) * * Get the file descriptor for reading. * - * Returns: the file descriptor used for reading or -1 on error. The file + * Returns: the file descriptor used for reading or %NULL on error. The file * descriptor remains valid until the connection is closed. * * Since: 0.10.23 */ -gint -gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn) +GSocket * +gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn) { - g_return_val_if_fail (conn != NULL, -1); - g_return_val_if_fail (conn->readfd != NULL, -1); + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (conn->read_socket != NULL, NULL); - return conn->readfd->fd; + return conn->read_socket; } /** - * gst_rtsp_connection_get_writefd: + * gst_rtsp_connection_get_write_socket: * @conn: a #GstRTSPConnection * * Get the file descriptor for writing. * - * Returns: the file descriptor used for writing or -1 on error. The file + * Returns: the file descriptor used for writing or NULL on error. The file * descriptor remains valid until the connection is closed. * * Since: 0.10.23 */ -gint -gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn) +GSocket * +gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn) { - g_return_val_if_fail (conn != NULL, -1); - g_return_val_if_fail (conn->writefd != NULL, -1); + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (conn->write_socket != NULL, NULL); - return conn->writefd->fd; + return conn->write_socket; } +/** + * gst_rtsp_connection_set_http_mode: + * @conn: a #GstRTSPConnection + * @enable: %TRUE to enable manual HTTP mode + * + * By setting the HTTP mode to %TRUE the message parsing will support HTTP + * messages in addition to the RTSP messages. It will also disable the + * automatic handling of setting up an HTTP tunnel. + * + * Since: 0.10.25 + */ +void +gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable) +{ + g_return_if_fail (conn != NULL); + + conn->manual_http = enable; +} /** * gst_rtsp_connection_set_tunneled: @@ -2525,8 +2771,8 @@ void gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled) { g_return_if_fail (conn != NULL); - g_return_if_fail (conn->readfd == NULL); - g_return_if_fail (conn->writefd == NULL); + g_return_if_fail (conn->read_socket == NULL); + g_return_if_fail (conn->write_socket == NULL); conn->tunneled = tunneled; } @@ -2573,7 +2819,7 @@ gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) /** * gst_rtsp_connection_do_tunnel: * @conn: a #GstRTSPConnection - * @conn2: a #GstRTSPConnection + * @conn2: a #GstRTSPConnection or %NULL * * If @conn received the first tunnel connection and @conn2 received * the second tunnel connection, link the two connections together so that @@ -2582,6 +2828,9 @@ gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) * After this call, @conn2 cannot be used anymore and must be freed with * gst_rtsp_connection_free(). * + * If @conn2 is %NULL then only the base64 decoding context will be setup for + * @conn. + * * Returns: return GST_RTSP_OK on success. * * Since: 0.10.23 @@ -2591,26 +2840,31 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, GstRTSPConnection * conn2) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL); - g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL); - g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN), - GST_RTSP_EINVAL); - /* both connections have fd0 as the read/write socket. start by taking the - * socket from conn2 and set it as the socket in conn */ - conn->fd1 = conn2->fd0; + if (conn2 != NULL) { + g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL); + g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL); + g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, + TUNNELID_LEN), GST_RTSP_EINVAL); - /* clean up some of the state of conn2 */ - gst_poll_remove_fd (conn2->fdset, &conn2->fd0); - conn2->fd0.fd = -1; - conn2->readfd = conn2->writefd = NULL; + /* both connections have socket0 as the read/write socket. start by taking the + * socket from conn2 and set it as the socket in conn */ + conn->socket1 = conn2->socket0; - /* We make fd0 the write socket and fd1 the read socket. */ - conn->writefd = &conn->fd0; - conn->readfd = &conn->fd1; + /* clean up some of the state of conn2 */ + g_cancellable_cancel (conn2->cancellable); + conn2->socket0 = 0; + g_object_unref (conn2->socket1); + conn2->socket1 = NULL; + conn2->write_socket = conn2->read_socket = NULL; + g_cancellable_reset (conn2->cancellable); - conn->tstate = TUNNEL_STATE_COMPLETE; + /* We make socket0 the write socket and socket1 the read socket. */ + conn->write_socket = conn->socket0; + conn->read_socket = conn->socket1; + + conn->tstate = TUNNEL_STATE_COMPLETE; + } /* we need base64 decoding for the readfd */ conn->ctx.state = 0; @@ -2622,17 +2876,18 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, return GST_RTSP_OK; } -#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR) -#define WRITE_COND (G_IO_OUT | G_IO_ERR) +#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define READ_COND (G_IO_IN | READ_ERR) +#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define WRITE_COND (G_IO_OUT | WRITE_ERR) typedef struct { - GString *str; + guint8 *data; + guint size; guint id; } GstRTSPRec; -static guint queue_response (GstRTSPWatch * watch, GString * str); - /* async functions */ struct _GstRTSPWatch { @@ -2645,14 +2900,14 @@ struct _GstRTSPWatch GPollFD readfd; GPollFD writefd; - gboolean write_added; /* queued message for transmission */ guint id; - GAsyncQueue *messages; + GMutex *mutex; + GQueue *messages; guint8 *write_data; guint write_off; - guint write_len; + guint write_size; guint write_id; GstRTSPWatchFuncs funcs; @@ -2666,6 +2921,9 @@ gst_rtsp_source_prepare (GSource * source, gint * timeout) { GstRTSPWatch *watch = (GstRTSPWatch *) source; + if (watch->conn->initial_buffer != NULL) + return TRUE; + *timeout = (watch->conn->timeout * 1000); return FALSE; @@ -2690,101 +2948,195 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_ERROR; + gboolean keep_running = TRUE; /* first read as much as we can */ - if (watch->readfd.revents & READ_COND) { + if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) { do { + if (watch->readfd.revents & READ_ERR) + goto read_error; + res = build_next (&watch->builder, &watch->message, watch->conn); if (res == GST_RTSP_EINTR) break; - if (G_UNLIKELY (res == GST_RTSP_EEOF)) - goto eof; - if (res == GST_RTSP_ETGET) { - GString *str; - GstRTSPStatusCode code; - - if (watch->funcs.tunnel_start) - code = watch->funcs.tunnel_start (watch, watch->user_data); - else - code = GST_RTSP_STS_OK; - - /* queue the response string */ - str = gen_tunnel_reply (watch->conn, code); - queue_response (watch, str); - } else if (res == GST_RTSP_ETPOST) { - /* in the callback the connection should be tunneled with the - * GET connection */ - if (watch->funcs.tunnel_complete) - watch->funcs.tunnel_complete (watch, watch->user_data); - } else if (G_UNLIKELY (res != GST_RTSP_OK)) - goto error; + else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + /* When we are in tunnelled mode, the read socket can be closed and we + * should be prepared for a new POST method to reopen it */ + if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { + /* remove the read connection for the tunnel */ + /* we accept a new POST request */ + watch->conn->tstate = TUNNEL_STATE_GET; + /* and signal that we lost our tunnel */ + if (watch->funcs.tunnel_lost) + res = watch->funcs.tunnel_lost (watch, watch->user_data); + goto read_done; + } else + goto eof; + } else if (G_LIKELY (res == GST_RTSP_OK)) { + if (!watch->conn->manual_http && + watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (watch->conn->tstate == TUNNEL_STATE_NONE && + watch->message.type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; + GstRTSPStatusCode code; + + watch->conn->tstate = TUNNEL_STATE_GET; + + if (watch->funcs.tunnel_start) + code = watch->funcs.tunnel_start (watch, watch->user_data); + else + code = GST_RTSP_STS_OK; + + /* queue the response */ + response = gen_tunnel_reply (watch->conn, code, &watch->message); + gst_rtsp_watch_send_message (watch, response, NULL); + gst_rtsp_message_free (response); + goto read_done; + } else if (watch->conn->tstate == TUNNEL_STATE_NONE && + watch->message.type_data.request.method == GST_RTSP_POST) { + watch->conn->tstate = TUNNEL_STATE_POST; + + /* in the callback the connection should be tunneled with the + * GET connection */ + if (watch->funcs.tunnel_complete) + watch->funcs.tunnel_complete (watch, watch->user_data); + goto read_done; + } + } + } + + if (!watch->conn->manual_http) { + /* if manual HTTP support is not enabled, then restore the message to + * what it would have looked like without the support for parsing HTTP + * messages being present */ + if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + watch->message.type = GST_RTSP_MESSAGE_REQUEST; + watch->message.type_data.request.method = GST_RTSP_INVALID; + if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + watch->message.type = GST_RTSP_MESSAGE_RESPONSE; + if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.response.version = + GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } + } if (G_LIKELY (res == GST_RTSP_OK)) { if (watch->funcs.message_received) watch->funcs.message_received (watch, &watch->message, watch->user_data); - - gst_rtsp_message_unset (&watch->message); + } else { + goto read_error; } + + read_done: + gst_rtsp_message_unset (&watch->message); build_reset (&watch->builder); } while (FALSE); } if (watch->writefd.revents & WRITE_COND) { + if (watch->writefd.revents & WRITE_ERR) + goto write_error; + + g_mutex_lock (watch->mutex); do { if (watch->write_data == NULL) { GstRTSPRec *rec; /* get a new message from the queue */ - rec = g_async_queue_try_pop (watch->messages); + rec = g_queue_pop_tail (watch->messages); if (rec == NULL) - goto done; + break; watch->write_off = 0; - watch->write_len = rec->str->len; - watch->write_data = (guint8 *) g_string_free (rec->str, FALSE); + watch->write_data = rec->data; + watch->write_size = rec->size; watch->write_id = rec->id; - rec->str = NULL; g_slice_free (GstRTSPRec, rec); } - res = write_bytes (watch->writefd.fd, watch->write_data, - &watch->write_off, watch->write_len); - if (res == GST_RTSP_EINTR) - break; - if (G_UNLIKELY (res != GST_RTSP_OK)) - goto error; - - if (watch->funcs.message_sent) - watch->funcs.message_sent (watch, watch->write_id, watch->user_data); + res = write_bytes (watch->conn->write_socket, watch->write_data, + &watch->write_off, watch->write_size, watch->conn->cancellable); + g_mutex_unlock (watch->mutex); - done: - if (g_async_queue_length (watch->messages) == 0 && watch->write_added) { - g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->write_added = FALSE; - watch->writefd.revents = 0; + if (res == GST_RTSP_EINTR) + goto write_blocked; + else if (G_LIKELY (res == GST_RTSP_OK)) { + if (watch->funcs.message_sent) + watch->funcs.message_sent (watch, watch->write_id, watch->user_data); + } else { + goto write_error; } + g_mutex_lock (watch->mutex); + g_free (watch->write_data); watch->write_data = NULL; - } while (FALSE); + } while (TRUE); + + watch->writefd.events = WRITE_ERR; + + g_mutex_unlock (watch->mutex); } - return TRUE; +write_blocked: + return keep_running; /* ERRORS */ eof: { if (watch->funcs.closed) watch->funcs.closed (watch, watch->user_data); + + /* always stop when the readfd returns EOF in non-tunneled mode */ return FALSE; } +read_error: + { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + keep_running = (watch->writefd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, + 0, watch->user_data), error); + else + goto error; + } else + goto eof; + } +write_error: + { + watch->writefd.events = 0; + watch->writefd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->writefd); + keep_running = (watch->readfd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, + watch->write_id, watch->user_data), error); + else + goto error; + } else + goto eof; + } error: { if (watch->funcs.error) watch->funcs.error (watch, res, watch->user_data); - return FALSE; + + return keep_running; } } @@ -2793,8 +3145,7 @@ gst_rtsp_rec_free (gpointer data) { GstRTSPRec *rec = data; - g_string_free (rec->str, TRUE); - rec->str = NULL; + g_free (rec->data); g_slice_free (GstRTSPRec, rec); } @@ -2806,11 +3157,13 @@ gst_rtsp_source_finalize (GSource * source) build_reset (&watch->builder); gst_rtsp_message_unset (&watch->message); - g_async_queue_unref (watch->messages); + g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); + g_queue_free (watch->messages); watch->messages = NULL; - g_free (watch->write_data); + g_mutex_free (watch->mutex); + if (watch->notify) watch->notify (watch->user_data); } @@ -2852,8 +3205,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, g_return_val_if_fail (conn != NULL, NULL); g_return_val_if_fail (funcs != NULL, NULL); - g_return_val_if_fail (conn->readfd != NULL, NULL); - g_return_val_if_fail (conn->writefd != NULL, NULL); + g_return_val_if_fail (conn->read_socket != NULL, NULL); + g_return_val_if_fail (conn->write_socket != NULL, NULL); result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs, sizeof (GstRTSPWatch)); @@ -2861,7 +3214,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->conn = conn; result->builder.state = STATE_START; - result->messages = g_async_queue_new_full (gst_rtsp_rec_free); + result->mutex = g_mutex_new (); + result->messages = g_queue_new (); result->readfd.fd = -1; result->writefd.fd = -1; @@ -2872,10 +3226,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->user_data = user_data; result->notify = notify; - /* only add the read fd, the write fd is only added when we have data - * to send. */ - g_source_add_poll ((GSource *) result, &result->readfd); - return result; } @@ -2896,16 +3246,18 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch) if (watch->writefd.fd != -1) g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->readfd.fd = watch->conn->readfd->fd; + watch->readfd.fd = g_socket_get_fd (watch->conn->read_socket); watch->readfd.events = READ_COND; watch->readfd.revents = 0; - watch->writefd.fd = watch->conn->writefd->fd; - watch->writefd.events = WRITE_COND; + watch->writefd.fd = g_socket_get_fd (watch->conn->write_socket); + watch->writefd.events = WRITE_ERR; watch->writefd.revents = 0; - watch->write_added = FALSE; - g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->readfd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->writefd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->writefd); } /** @@ -2944,53 +3296,123 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) g_source_unref ((GSource *) watch); } -static guint -queue_response (GstRTSPWatch * watch, GString * str) +/** + * gst_rtsp_watch_write_data: + * @watch: a #GstRTSPWatch + * @data: the data to queue + * @size: the size of @data + * @id: location for a message ID or %NULL + * + * Write @data using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. + * + * This function will take ownership of @data and g_free() it after use. + * + * Returns: #GST_RTSP_OK on success. + * + * Since: 0.10.25 + */ +GstRTSPResult +gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, + guint size, guint * id) { + GstRTSPResult res; GstRTSPRec *rec; + guint off = 0; + GMainContext *context = NULL; - /* make a record with the message as a string and id */ + g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (size != 0, GST_RTSP_EINVAL); + + g_mutex_lock (watch->mutex); + + /* try to send the message synchronously first */ + if (watch->messages->length == 0 && watch->write_data == NULL) { + res = + write_bytes (watch->conn->write_socket, data, &off, size, + watch->conn->cancellable); + if (res != GST_RTSP_EINTR) { + if (id != NULL) + *id = 0; + g_free ((gpointer) data); + goto done; + } + } + + /* make a record with the data and id for sending async */ rec = g_slice_new (GstRTSPRec); - rec->str = str; - rec->id = ++watch->id; + if (off == 0) { + rec->data = (guint8 *) data; + rec->size = size; + } else { + rec->data = g_memdup (data + off, size - off); + rec->size = size - off; + g_free ((gpointer) data); + } + + do { + /* make sure rec->id is never 0 */ + rec->id = ++watch->id; + } while (G_UNLIKELY (rec->id == 0)); /* add the record to a queue. FIXME we would like to have an upper limit here */ - g_async_queue_push (watch->messages, rec); + g_queue_push_head (watch->messages, rec); - /* FIXME: does the following need to be made thread-safe? (this might be - * called from a streaming thread, like appsink's render function) */ /* make sure the main context will now also check for writability on the * socket */ - if (!watch->write_added) { - g_source_add_poll ((GSource *) watch, &watch->writefd); - watch->write_added = TRUE; + if (watch->writefd.events != WRITE_COND) { + watch->writefd.events = WRITE_COND; + context = ((GSource *) watch)->context; } - return rec->id; + if (id != NULL) + *id = rec->id; + res = GST_RTSP_OK; + +done: + g_mutex_unlock (watch->mutex); + + if (context) + g_main_context_wakeup (context); + + return res; } /** - * gst_rtsp_watch_queue_message: + * gst_rtsp_watch_send_message: * @watch: a #GstRTSPWatch * @message: a #GstRTSPMessage + * @id: location for a message ID or %NULL * - * Queue a @message for transmission in @watch. The contents of this - * message will be serialized and transmitted when the connection of the - * @watch becomes writable. - * - * The return value of this function will be used as the id argument in the - * message_sent callback. + * Send a @message using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. * - * Returns: an id. + * Returns: #GST_RTSP_OK on success. * - * Since: 0.10.23 + * Since: 0.10.25 */ -guint -gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message) +GstRTSPResult +gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message, + guint * id) { + GString *str; + guint size; + g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); /* make a record with the message as a string and id */ - return queue_response (watch, message_to_string (watch->conn, message)); + str = message_to_string (watch->conn, message); + size = str->len; + return gst_rtsp_watch_write_data (watch, + (guint8 *) g_string_free (str, FALSE), size, id); }