X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst-libs%2Fgst%2Frtsp%2Fgstrtspconnection.c;h=e7b57d91712b626e7282c2072c7f1cb2ecf1a08f;hb=0b0dde7ce11e15bedaf34aea2df843a5253d1e2f;hp=df4c4d3b475abf24ef0641a235fc60abd712930c;hpb=d45b27d92d7bd1885931959e75d3d87de50c678f;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index df4c4d3..fa7ec81 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -1,5 +1,5 @@ /* GStreamer - * Copyright (C) <2005,2006> Wim Taymans + * Copyright (C) <2005-2009> Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -45,12 +45,8 @@ * @short_description: manage RTSP connections * @see_also: gstrtspurl * - * - * * This object manages the RTSP connection to the server. It provides function * to receive and send bytes and messages. - * - * * * Last reviewed on 2007-07-24 (0.10.14) */ @@ -65,88 +61,39 @@ #include #include -#ifdef HAVE_UNISTD_H -#include -#endif - - /* we include this here to get the G_OS_* defines */ #include #include -#ifdef G_OS_WIN32 -#include -#include -#define EINPROGRESS WSAEINPROGRESS -#else -#include -#include -#include -#include -#include -#include -#endif +#include "gstrtspconnection.h" -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include -#endif +#include "gst/glib-compat-private.h" -#include "gstrtspconnection.h" -#include "gstrtspbase64.h" -#include "md5.h" +#ifdef IP_TOS +union gst_sockaddr +{ + struct sockaddr sa; + struct sockaddr_in sa_in; + struct sockaddr_in6 sa_in6; + struct sockaddr_storage sa_stor; +}; +#endif typedef struct { gint state; guint save; - gchar in[4]; - guint cin; - gchar out[3]; + guchar out[3]; /* the size must be evenly divisible by 3 */ guint cout; + guint coutl; } DecodeCtx; -static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx, - guint size, DecodeCtx * ctxp); -static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key, - guint keysize, gchar ** value); -static void parse_string (gchar * dest, gint size, gchar ** src); - -#ifdef G_OS_WIN32 -#define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0) -#define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0) -#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len) -#define CLOSE_SOCKET(sock) closesocket (sock) -#define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK) -#define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR) -/* According to Microsoft's connect() documentation this one returns - * WSAEWOULDBLOCK and not WSAEINPROGRESS. */ -#define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK) +#ifdef MSG_NOSIGNAL +#define SEND_FLAGS MSG_NOSIGNAL #else -#define READ_SOCKET(fd, buf, len) read (fd, buf, len) -#define WRITE_SOCKET(fd, buf, len) write (fd, buf, len) -#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len) -#define CLOSE_SOCKET(sock) close (sock) -#define ERRNO_IS_EAGAIN (errno == EAGAIN) -#define ERRNO_IS_EINTR (errno == EINTR) -#define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS) +#define SEND_FLAGS 0 #endif -#define ADD_POLLFD(fdset, pfd, fd) \ -G_STMT_START { \ - (pfd)->fd = fd; \ - gst_poll_add_fd (fdset, pfd); \ -} G_STMT_END - -#define REMOVE_POLLFD(fdset, pfd) \ -G_STMT_START { \ - if ((pfd)->fd != -1) { \ - GST_DEBUG ("remove fd %d", (pfd)->fd); \ - gst_poll_remove_fd (fdset, pfd); \ - CLOSE_SOCKET ((pfd)->fd); \ - (pfd)->fd = -1; \ - } \ -} G_STMT_END - typedef enum { TUNNEL_STATE_NONE, @@ -164,19 +111,23 @@ struct _GstRTSPConnection GstRTSPUrl *url; /* connection state */ - GstPollFD fd0; - GstPollFD fd1; - - GstPollFD *readfd; - GstPollFD *writefd; + GSocket *read_socket; + GSocket *write_socket; + gboolean manual_http; + GSocket *socket0, *socket1; + GCancellable *cancellable; gchar tunnelid[TUNNELID_LEN]; gboolean tunneled; GstRTSPTunnelState tstate; - GstPoll *fdset; gchar *ip; + gint read_ahead; + + gchar *initial_buffer; + gsize initial_buffer_offset; + /* Session state */ gint cseq; /* sequence number */ gchar session_id[512]; /* session id */ @@ -191,23 +142,10 @@ struct _GstRTSPConnection DecodeCtx ctx; DecodeCtx *ctxp; -}; -#ifdef G_OS_WIN32 -static int -inet_aton (const char *c, struct in_addr *paddr) -{ - /* note that inet_addr is deprecated on unix because - * inet_addr returns -1 (INADDR_NONE) for the valid 255.255.255.255 - * address. */ - paddr->s_addr = inet_addr (c); - - if (paddr->s_addr == INADDR_NONE) - return 0; - - return 1; -} -#endif + gchar *proxy_host; + guint proxy_port; +}; enum { @@ -219,10 +157,18 @@ enum STATE_LAST }; +enum +{ + READ_AHEAD_EOH = -1, /* end of headers */ + READ_AHEAD_CRLF = -2, + READ_AHEAD_CRLFCR = -3 +}; + /* a structure for constructing RTSPMessages */ typedef struct { gint state; + GstRTSPResult status; guint8 buffer[4096]; guint offset; @@ -235,7 +181,7 @@ static void build_reset (GstRTSPBuilder * builder) { g_free (builder->body_data); - memset (builder, 0, sizeof (builder)); + memset (builder, 0, sizeof (GstRTSPBuilder)); } /** @@ -255,33 +201,17 @@ GstRTSPResult gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) { GstRTSPConnection *newconn; -#ifdef G_OS_WIN32 - WSADATA w; - int error; -#endif g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); -#ifdef G_OS_WIN32 - error = WSAStartup (0x0202, &w); - - if (error) - goto startup_error; - - if (w.wVersion != 0x0202) - goto version_error; -#endif - newconn = g_new0 (GstRTSPConnection, 1); - if ((newconn->fdset = gst_poll_new (TRUE)) == NULL) - goto no_fdset; + newconn->cancellable = g_cancellable_new (); newconn->url = gst_rtsp_url_copy (url); - newconn->fd0.fd = -1; - newconn->fd1.fd = -1; newconn->timer = g_timer_new (); newconn->timeout = 60; + newconn->cseq = 1; newconn->auth_method = GST_RTSP_AUTH_NONE; newconn->username = NULL; @@ -291,236 +221,264 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) *conn = newconn; return GST_RTSP_OK; - - /* ERRORS */ -#ifdef G_OS_WIN32 -startup_error: - { - g_warning ("Error %d on WSAStartup", error); - return GST_RTSP_EWSASTART; - } -version_error: - { - g_warning ("Windows sockets are not version 0x202 (current 0x%x)", - w.wVersion); - WSACleanup (); - return GST_RTSP_EWSAVERSION; - } -#endif -no_fdset: - { - g_free (newconn); -#ifdef G_OS_WIN32 - WSACleanup (); -#endif - return GST_RTSP_ESYS; - } } /** - * gst_rtsp_connection_accept: - * @sock: a socket + * gst_rtsp_connection_create_from_socket: + * @socket: a #GSocket + * @ip: the IP address of the other end + * @port: the port used by the other end + * @initial_buffer: data already read from @fd * @conn: storage for a #GstRTSPConnection * - * Accept a new connection on @sock and create a new #GstRTSPConnection for - * handling communication on new socket. + * Create a new #GstRTSPConnection for handling communication on the existing + * socket @socket. The @initial_buffer contains any data already read from + * @socket which should be used before starting to read new data. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. * - * Since: 0.10.23 + * Since: 0.10.25 */ GstRTSPResult -gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn) +gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip, + guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn) { - int fd; - unsigned int address_len; GstRTSPConnection *newconn = NULL; - struct sockaddr_in address; GstRTSPUrl *url; -#ifdef G_OS_WIN32 - gulong flags = 1; -#endif - - address_len = sizeof (address); - memset (&address, 0, address_len); + GstRTSPResult res; -#ifndef G_OS_WIN32 - fd = accept (sock, (struct sockaddr *) &address, &address_len); -#else - fd = accept (sock, (struct sockaddr *) &address, (gint *) & address_len); -#endif /* G_OS_WIN32 */ - if (fd == -1) - goto accept_failed; + g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); + g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); /* set to non-blocking mode so that we can cancel the communication */ -#ifndef G_OS_WIN32 - fcntl (fd, F_SETFL, O_NONBLOCK); -#else - ioctlsocket (fd, FIONBIO, &flags); -#endif /* G_OS_WIN32 */ + g_socket_set_blocking (socket, FALSE); /* create a url for the client address */ url = g_new0 (GstRTSPUrl, 1); - url->host = g_strdup_printf ("%s", inet_ntoa (address.sin_addr)); - url->port = address.sin_port; + url->host = g_strdup (ip); + url->port = port; /* now create the connection object */ - gst_rtsp_connection_create (url, &newconn); + GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed); gst_rtsp_url_free (url); - ADD_POLLFD (newconn->fdset, &newconn->fd0, fd); - /* both read and write initially */ - newconn->readfd = &newconn->fd0; - newconn->writefd = &newconn->fd0; + newconn->socket0 = G_SOCKET (g_object_ref (socket)); + newconn->socket1 = G_SOCKET (g_object_ref (socket)); + newconn->write_socket = newconn->read_socket = newconn->socket0; + + newconn->ip = g_strdup (ip); + + newconn->initial_buffer = g_strdup (initial_buffer); *conn = newconn; return GST_RTSP_OK; /* ERRORS */ +newconn_failed: + { + gst_rtsp_url_free (url); + return res; + } +} + +/** + * gst_rtsp_connection_accept: + * @socket: a socket + * @conn: storage for a #GstRTSPConnection + * @cancellable: a #GCancellable to cancel the operation + * + * Accept a new connection on @socket and create a new #GstRTSPConnection for + * handling communication on new socket. + * + * Returns: #GST_RTSP_OK when @conn contains a valid connection. + * + * Since: 0.10.23 + */ +GstRTSPResult +gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn, + GCancellable * cancellable) +{ + GError *err = NULL; + gchar *ip; + guint16 port; + GSocket *client_sock; + GSocketAddress *addr; + GstRTSPResult ret; + + g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + + client_sock = g_socket_accept (socket, cancellable, &err); + if (!client_sock) + goto accept_failed; + + addr = g_socket_get_remote_address (client_sock, &err); + if (!addr) + goto getnameinfo_failed; + + ip = g_inet_address_to_string (g_inet_socket_address_get_address + (G_INET_SOCKET_ADDRESS (addr))); + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)); + g_object_unref (addr); + + ret = + gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL, + conn); + g_object_unref (client_sock); + g_free (ip); + + return ret; + + /* ERRORS */ accept_failed: { + GST_DEBUG ("Accepting client failed: %s", err->message); + g_clear_error (&err); return GST_RTSP_ESYS; } +getnameinfo_failed: + { + if (!g_socket_close (client_sock, &err)) { + GST_DEBUG ("Closing socket failed: %s", err->message); + g_clear_error (&err); + } + g_object_unref (client_sock); + return GST_RTSP_ERROR; + } } -static const gchar * -do_resolve (const gchar * host) +static gchar * +do_resolve (const gchar * host, GCancellable * cancellable) { - struct hostent *hostinfo; - struct in_addr addr; - const gchar *ip; -#ifdef G_OS_WIN32 - struct in_addr *addrp; -#else - char **addrs; - gchar ipbuf[INET_ADDRSTRLEN]; -#endif /* G_OS_WIN32 */ + GResolver *resolver; + GInetAddress *addr; + GError *err = NULL; + gchar *ip; - ip = NULL; + addr = g_inet_address_new_from_string (host); + if (!addr) { + GList *results, *l; - /* first check if it already is an IP address */ - if (inet_aton (host, &addr)) { - ip = host; - } else { - hostinfo = gethostbyname (host); - if (!hostinfo) - goto not_resolved; /* h_errno set */ - - if (hostinfo->h_addrtype != AF_INET) - goto not_ip; /* host not an IP host */ -#ifdef G_OS_WIN32 - addrp = (struct in_addr *) hostinfo->h_addr_list[0]; - /* this is not threadsafe */ - ip = inet_ntoa (*addrp); -#else - addrs = hostinfo->h_addr_list; - ip = inet_ntop (AF_INET, (struct in_addr *) addrs[0], ipbuf, - sizeof (ipbuf)); -#endif /* G_OS_WIN32 */ + resolver = g_resolver_get_default (); + + results = g_resolver_lookup_by_name (resolver, host, cancellable, &err); + if (!results) + goto name_resolve; + + for (l = results; l; l = l->next) { + GInetAddress *tmp = l->data; + + if (g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV4 || + g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV6) { + addr = G_INET_ADDRESS (g_object_ref (tmp)); + break; + } + } + + g_resolver_free_addresses (results); + g_object_unref (resolver); } + + if (!addr) + return NULL; + + ip = g_inet_address_to_string (addr); + g_object_unref (addr); + return ip; /* ERRORS */ -not_resolved: +name_resolve: { - GST_ERROR ("could not resolve %s", host); - return NULL; - } -not_ip: - { - GST_ERROR ("not an IP address"); + GST_ERROR ("failed to resolve %s: %s", host, err->message); + g_clear_error (&err); + g_object_unref (resolver); return NULL; } } static GstRTSPResult -do_connect (const gchar * ip, guint16 port, GstPollFD * fdout, - GstPoll * fdset, GTimeVal * timeout) +do_connect (const gchar * ip, guint16 port, GSocket ** socket_out, + GTimeVal * timeout, GCancellable * cancellable) { - gint fd; - struct sockaddr_in sa_in; - gint ret; -#ifdef G_OS_WIN32 - unsigned long flags = 1; -#endif /* G_OS_WIN32 */ + GSocket *socket; GstClockTime to; - gint retval; - - memset (&sa_in, 0, sizeof (sa_in)); - sa_in.sin_family = AF_INET; /* network socket */ - sa_in.sin_port = htons (port); /* on port */ - sa_in.sin_addr.s_addr = inet_addr (ip); /* on host ip */ - - fd = socket (AF_INET, SOCK_STREAM, 0); - if (fd == -1) + GInetAddress *addr; + GSocketAddress *saddr; + GError *err = NULL; + + addr = g_inet_address_new_from_string (ip); + g_assert (addr); + saddr = g_inet_socket_address_new (addr, port); + g_object_unref (addr); + + socket = + g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); + if (socket == NULL) goto no_socket; /* set to non-blocking mode so that we can cancel the connect */ -#ifndef G_OS_WIN32 - fcntl (fd, F_SETFL, O_NONBLOCK); -#else - ioctlsocket (fd, FIONBIO, &flags); -#endif /* G_OS_WIN32 */ - - /* add the socket to our fdset */ - ADD_POLLFD (fdset, fdout, fd); + g_socket_set_blocking (socket, FALSE); /* we are going to connect ASYNC now */ - ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in)); - if (ret == 0) + if (!g_socket_connect (socket, saddr, cancellable, &err)) { + if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_PENDING)) + goto sys_error; + g_clear_error (&err); + } else { goto done; - if (!ERRNO_IS_EINPROGRESS) - goto sys_error; + } /* wait for connect to complete up to the specified timeout or until we got * interrupted. */ - gst_poll_fd_ctl_write (fdset, fdout, TRUE); - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - do { - retval = gst_poll_wait (fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + g_socket_set_timeout (socket, (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (socket, G_IO_OUT, cancellable, &err)) { + g_socket_set_timeout (socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) + goto timeout; + else + goto sys_error; + } + g_socket_set_timeout (socket, 0); - if (retval == 0) - goto timeout; - else if (retval == -1) + if (!g_socket_check_connect_result (socket, &err)) goto sys_error; - /* we can still have an error connecting on windows */ - if (gst_poll_fd_has_error (fdset, fdout)) { - socklen_t len = sizeof (errno); -#ifndef G_OS_WIN32 - getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len); -#else - getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len); -#endif - goto sys_error; - } - - gst_poll_fd_ignored (fdset, fdout); done: + g_object_unref (saddr); + + *socket_out = socket; return GST_RTSP_OK; /* ERRORS */ no_socket: { - GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno)); + GST_ERROR ("no socket: %s", err->message); + g_clear_error (&err); + g_object_unref (saddr); return GST_RTSP_ESYS; } sys_error: { - GST_ERROR ("system error %d (%s)", errno, g_strerror (errno)); - REMOVE_POLLFD (fdset, fdout); + GST_ERROR ("system error: %s", err->message); + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (socket); return GST_RTSP_ESYS; } timeout: { GST_ERROR ("timeout"); - REMOVE_POLLFD (fdset, fdout); + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (socket); return GST_RTSP_ETIMEOUT; } } @@ -530,15 +488,18 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) { gint i; GstRTSPResult res; - gchar *str; - guint idx, line; - gint retval; - GstClockTime to; - const gchar *ip; - guint16 port; - gchar codestr[4], *resultstr; - gint code; + gchar *ip; + gchar *uri; + gchar *value; + guint16 port, url_port; GstRTSPUrl *url; + gchar *hostparam; + GstRTSPMessage *msg; + GstRTSPMessage response; + gboolean old_http; + + memset (&response, 0, sizeof (response)); + gst_rtsp_message_init (&response); /* create a random sessionid */ for (i = 0; i < TUNNELID_LEN; i++) @@ -546,176 +507,153 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) conn->tunnelid[TUNNELID_LEN - 1] = '\0'; url = conn->url; + /* get the port from the url */ + gst_rtsp_url_get_port (url, &url_port); + + if (conn->proxy_host) { + uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port, + url->abspath, url->query ? "?" : "", url->query ? url->query : ""); + hostparam = g_strdup_printf ("%s:%d", url->host, url_port); + ip = conn->proxy_host; + port = conn->proxy_port; + } else { + uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", + url->query ? url->query : ""); + hostparam = NULL; + ip = conn->ip; + port = url_port; + } - /* */ - str = g_strdup_printf ("GET %s%s%s HTTP/1.0\r\n" - "x-sessioncookie: %s\r\n" - "Accept: application/x-rtsp-tunnelled\r\n" - "Pragma: no-cache\r\n" - "Cache-Control: no-cache\r\n" "\r\n", - url->abspath, url->query ? "?" : "", url->query ? url->query : "", + /* create the GET request for the read connection */ + GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri), + no_message); + msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; + + if (hostparam != NULL) + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, conn->tunnelid); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, + "application/x-rtsp-tunnelled"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); /* we start by writing to this fd */ - conn->writefd = &conn->fd0; - - res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout); - g_free (str); - if (res != GST_RTSP_OK) - goto write_failed; - - gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE); - gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE); - - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - - line = 0; - while (TRUE) { - guint8 buffer[4096]; - - idx = 0; - while (TRUE) { - res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL); - if (res == GST_RTSP_EEOF) - goto eof; - if (res == GST_RTSP_OK) - break; - if (res != GST_RTSP_EINTR) - goto read_error; - - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (retval == 0) - goto timeout; - - if (retval == -1) { - if (errno == EBUSY) - goto stopped; - else - goto select_error; - } - } + conn->write_socket = conn->socket0; - /* check for last line */ - if (buffer[0] == '\r') - buffer[0] = '\0'; - if (buffer[0] == '\0') - break; - - if (line == 0) { - /* first line, parse response */ - gchar versionstr[20]; - gchar *bptr; - - bptr = (gchar *) buffer; - - parse_string (versionstr, sizeof (versionstr), &bptr); - parse_string (codestr, sizeof (codestr), &bptr); - code = atoi (codestr); - - while (g_ascii_isspace (*bptr)) - bptr++; - - resultstr = bptr; - - if (code != 200) - goto wrong_result; + /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP + * request from being base64 encoded */ + conn->tunneled = FALSE; + GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; + + /* receive the response to the GET request */ + /* we need to temporarily set manual_http to TRUE since + * gst_rtsp_connection_receive() will treat the HTTP response as a parsing + * failure otherwise */ + old_http = conn->manual_http; + conn->manual_http = TRUE; + GST_RTSP_CHECK (gst_rtsp_connection_receive (conn, &response, timeout), + read_failed); + conn->manual_http = old_http; + + if (response.type != GST_RTSP_MESSAGE_HTTP_RESPONSE || + response.type_data.response.code != GST_RTSP_STS_OK) + goto wrong_result; + + if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, + &value, 0) == GST_RTSP_OK) { + if (conn->proxy_host) { + /* if we use a proxy we need to change the destination url */ + g_free (url->host); + url->host = g_strdup (value); + g_free (hostparam); + hostparam = g_strdup_printf ("%s:%d", url->host, url_port); } else { - gchar key[32]; - gchar *value; - - /* other lines, parse key/value */ - res = parse_key_value (buffer, key, sizeof (key), &value); - if (res == GST_RTSP_OK) { - /* we got a new ip address */ - if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) { - g_free (conn->ip); - conn->ip = g_strdup (value); - } - } + /* and resolve the new ip address */ + if (!(ip = do_resolve (value, conn->cancellable))) + goto not_resolved; + g_free (conn->ip); + conn->ip = ip; } - line++; } - if (!(ip = do_resolve (conn->ip))) - goto not_resolved; - - /* get the port from the url */ - gst_rtsp_url_get_port (conn->url, &port); - /* connect to the host/port */ - res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout); + res = do_connect (ip, port, &conn->socket1, timeout, conn->cancellable); if (res != GST_RTSP_OK) goto connect_failed; /* this is now our writing socket */ - conn->writefd = &conn->fd1; - - /* */ - str = g_strdup_printf ("POST %s%s%s HTTP/1.0\r\n" - "x-sessioncookie: %s\r\n" - "Content-Type: application/x-rtsp-tunnelled\r\n" - "Pragma: no-cache\r\n" - "Cache-Control: no-cache\r\n" - "Content-Length: 32767\r\n" - "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n" - "\r\n", - url->abspath, url->query ? "?" : "", url->query ? url->query : "", - conn->tunnelid); + conn->write_socket = conn->socket1; - /* we start by writing to this fd */ - conn->writefd = &conn->fd1; + /* create the POST request for the write connection */ + GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri), + no_message); + msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; - res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout); - g_free (str); - if (res != GST_RTSP_OK) - goto write_failed; + if (hostparam != NULL) + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, + conn->tunnelid); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, + "application/x-rtsp-tunnelled"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES, + "Sun, 9 Jan 1972 00:00:00 GMT"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767"); + + /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP + * request from being base64 encoded */ + conn->tunneled = FALSE; + GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; + +exit: + gst_rtsp_message_unset (&response); + g_free (hostparam); + g_free (uri); return res; /* ERRORS */ -write_failed: - { - GST_ERROR ("write failed (%d)", res); - return res; - } -eof: - { - return GST_RTSP_EEOF; - } -read_error: - { - return res; - } -timeout: +no_message: { - return GST_RTSP_ETIMEOUT; + GST_ERROR ("failed to create request (%d)", res); + goto exit; } -select_error: +write_failed: { - return GST_RTSP_ESYS; + GST_ERROR ("write failed (%d)", res); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; + goto exit; } -stopped: +read_failed: { - return GST_RTSP_EINTR; + GST_ERROR ("read failed (%d)", res); + conn->manual_http = FALSE; + goto exit; } wrong_result: { - GST_ERROR ("got failure response %d %s", code, resultstr); - return GST_RTSP_ERROR; + GST_ERROR ("got failure response %d %s", response.type_data.response.code, + response.type_data.response.reason); + res = GST_RTSP_ERROR; + goto exit; } not_resolved: { GST_ERROR ("could not resolve %s", conn->ip); - return GST_RTSP_ENET; + res = GST_RTSP_ENET; + goto exit; } connect_failed: { GST_ERROR ("failed to connect"); - return res; + goto exit; } } @@ -737,49 +675,56 @@ GstRTSPResult gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) { GstRTSPResult res; - const gchar *ip; + gchar *ip; guint16 port; GstRTSPUrl *url; -#ifdef G_OS_WIN32 - unsigned long flags = 1; -#endif /* G_OS_WIN32 */ g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->socket0 == NULL, GST_RTSP_EINVAL); url = conn->url; - if (!(ip = do_resolve (url->host))) - goto not_resolved; + if (conn->proxy_host && conn->tunneled) { + if (!(ip = do_resolve (conn->proxy_host, conn->cancellable))) { + GST_ERROR ("could not resolve %s", conn->proxy_host); + goto not_resolved; + } + port = conn->proxy_port; + g_free (conn->proxy_host); + conn->proxy_host = ip; + } else { + if (!(ip = do_resolve (url->host, conn->cancellable))) { + GST_ERROR ("could not resolve %s", url->host); + goto not_resolved; + } + /* get the port from the url */ + gst_rtsp_url_get_port (url, &port); - /* get the port from the url */ - gst_rtsp_url_get_port (url, &port); + g_free (conn->ip); + conn->ip = ip; + } /* connect to the host/port */ - res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout); + res = do_connect (ip, port, &conn->socket0, timeout, conn->cancellable); if (res != GST_RTSP_OK) goto connect_failed; - g_free (conn->ip); - conn->ip = g_strdup (ip); - /* this is our read URL */ - conn->readfd = &conn->fd0; + conn->read_socket = conn->socket0; if (conn->tunneled) { res = setup_tunneling (conn, timeout); if (res != GST_RTSP_OK) goto tunneling_failed; } else { - conn->writefd = &conn->fd0; + conn->write_socket = conn->socket0; } return GST_RTSP_OK; not_resolved: { - GST_ERROR ("could not resolve %s", url->host); return GST_RTSP_ENET; } connect_failed: @@ -795,33 +740,23 @@ tunneling_failed: } static void -md5_digest_to_hex_string (unsigned char digest[16], char string[33]) -{ - static const char hexdigits[] = "0123456789abcdef"; - int i; - - for (i = 0; i < 16; i++) { - string[i * 2] = hexdigits[(digest[i] >> 4) & 0x0f]; - string[i * 2 + 1] = hexdigits[digest[i] & 0x0f]; - } - string[32] = 0; -} - -static void auth_digest_compute_hex_urp (const gchar * username, const gchar * realm, const gchar * password, gchar hex_urp[33]) { - struct MD5Context md5_context; - unsigned char digest[16]; + GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5); + const gchar *digest_string; + + g_checksum_update (md5_context, (const guchar *) username, strlen (username)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) realm, strlen (realm)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) password, strlen (password)); + digest_string = g_checksum_get_string (md5_context); + + memset (hex_urp, 0, 33); + memcpy (hex_urp, digest_string, strlen (digest_string)); - MD5Init (&md5_context); - MD5Update (&md5_context, username, strlen (username)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, realm, strlen (realm)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, password, strlen (password)); - MD5Final (digest, &md5_context); - md5_digest_to_hex_string (digest, hex_urp); + g_checksum_free (md5_context); } static void @@ -829,28 +764,30 @@ auth_digest_compute_response (const gchar * method, const gchar * uri, const gchar * hex_a1, const gchar * nonce, gchar response[33]) { - char hex_a2[33]; - struct MD5Context md5_context; - unsigned char digest[16]; + char hex_a2[33] = { 0, }; + GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5); + const gchar *digest_string; /* compute A2 */ - MD5Init (&md5_context); - MD5Update (&md5_context, method, strlen (method)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, uri, strlen (uri)); - MD5Final (digest, &md5_context); - md5_digest_to_hex_string (digest, hex_a2); + g_checksum_update (md5_context, (const guchar *) method, strlen (method)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) uri, strlen (uri)); + digest_string = g_checksum_get_string (md5_context); + memcpy (hex_a2, digest_string, strlen (digest_string)); /* compute KD */ - MD5Init (&md5_context); - MD5Update (&md5_context, hex_a1, strlen (hex_a1)); - MD5Update (&md5_context, ":", 1); - MD5Update (&md5_context, nonce, strlen (nonce)); - MD5Update (&md5_context, ":", 1); - - MD5Update (&md5_context, hex_a2, 32); - MD5Final (digest, &md5_context); - md5_digest_to_hex_string (digest, response); + g_checksum_reset (md5_context); + g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce)); + g_checksum_update (md5_context, (const guchar *) ":", 1); + + g_checksum_update (md5_context, (const guchar *) hex_a2, 32); + digest_string = g_checksum_get_string (md5_context); + memset (response, 0, 33); + memcpy (response, digest_string, strlen (digest_string)); + + g_checksum_free (md5_context); } static void @@ -858,11 +795,16 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) { switch (conn->auth_method) { case GST_RTSP_AUTH_BASIC:{ - gchar *user_pass = - g_strdup_printf ("%s:%s", conn->username, conn->passwd); - gchar *user_pass64 = - gst_rtsp_base64_encode (user_pass, strlen (user_pass)); - gchar *auth_string = g_strdup_printf ("Basic %s", user_pass64); + gchar *user_pass; + gchar *user_pass64; + gchar *auth_string; + + if (conn->username == NULL || conn->passwd == NULL) + break; + + user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd); + user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass)); + auth_string = g_strdup_printf ("Basic %s", user_pass64); gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION, auth_string); @@ -881,7 +823,8 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) const gchar *method; /* we need to have some params set */ - if (conn->auth_params == NULL) + if (conn->auth_params == NULL || conn->username == NULL || + conn->passwd == NULL) break; /* we need the realm and nonce */ @@ -923,43 +866,54 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) static void gen_date_string (gchar * date_string, guint len) { - GTimeVal tv; + static const char wkdays[7][4] = + { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" }; + static const char months[12][4] = + { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", + "Nov", "Dec" + }; + struct tm tm; time_t t; -#ifdef HAVE_GMTIME_R - struct tm tm_; -#endif - g_get_current_time (&tv); - t = (time_t) tv.tv_sec; + time (&t); #ifdef HAVE_GMTIME_R - strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_)); + gmtime_r (&t, &tm); #else - strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t)); + tm = *gmtime (&t); #endif + + g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT", + wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900, + tm.tm_hour, tm.tm_min, tm.tm_sec); } static GstRTSPResult -write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) +write_bytes (GSocket * socket, const guint8 * buffer, guint * idx, guint size, + GCancellable * cancellable) { guint left; - if (*idx > size) + if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; left = size - *idx; while (left) { - gint r; + GError *err = NULL; + gssize r; - r = WRITE_SOCKET (fd, &buffer[*idx], left); - if (r == 0) { + r = g_socket_send (socket, (gchar *) & buffer[*idx], left, cancellable, + &err); + if (G_UNLIKELY (r == 0)) { return GST_RTSP_EINTR; - } else if (r < 0) { - if (ERRNO_IS_EAGAIN) + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + } + g_clear_error (&err); + return GST_RTSP_ESYS; } else { left -= r; *idx += r; @@ -969,54 +923,90 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) } static gint -fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx) +fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, + GError ** err) { gint out = 0; - if (ctx) { - gint r; + if (G_UNLIKELY (conn->initial_buffer != NULL)) { + gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]); + + out = MIN (left, size); + memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out); + + if (left == (gsize) out) { + g_free (conn->initial_buffer); + conn->initial_buffer = NULL; + conn->initial_buffer_offset = 0; + } else + conn->initial_buffer_offset += out; + } + + if (G_LIKELY (size > (guint) out)) { + gssize r; + r = g_socket_receive (conn->read_socket, (gchar *) & buffer[out], + size - out, conn->cancellable, err); + if (r <= 0) { + if (out == 0) + out = r; + } else + out += r; + } + + return out; +} + +static gint +fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, + GError ** err) +{ + DecodeCtx *ctx = conn->ctxp; + gint out = 0; + + if (ctx) { while (size > 0) { - while (size > 0 && ctx->cout < 3) { + guint8 in[sizeof (ctx->out) * 4 / 3]; + gint r; + + while (size > 0 && ctx->cout < ctx->coutl) { /* we have some leftover bytes */ - *buffer++ = ctx->out[ctx->cout]; - ctx->cout++; + *buffer++ = ctx->out[ctx->cout++]; size--; out++; } - /* nothing in the buffer */ + + /* got what we needed? */ if (size == 0) break; /* try to read more bytes */ - r = READ_SOCKET (fd, &ctx->in[ctx->cin], 4 - ctx->cin); + r = fill_raw_bytes (conn, in, sizeof (in), err); if (r <= 0) { if (out == 0) out = r; break; } - ctx->cin += r; - if (ctx->cin == 4) { - r = g_base64_decode_step ((const gchar *) ctx->in, 4, - (guchar *) ctx->out, &ctx->state, &ctx->save); - ctx->cout = 0; - ctx->cin = 0; - } + ctx->cout = 0; + ctx->coutl = + g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state, + &ctx->save); } } else { - out = READ_SOCKET (fd, buffer, size); + out = fill_raw_bytes (conn, buffer, size, err); } return out; } static GstRTSPResult -read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) +read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) { guint left; + GError *err = NULL; - if (*idx > size) + if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; left = size - *idx; @@ -1024,14 +1014,16 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) while (left) { gint r; - r = fill_bytes (fd, &buffer[*idx], left, ctx); - if (r == 0) { + r = fill_bytes (conn, &buffer[*idx], left, &err); + if (G_UNLIKELY (r == 0)) { return GST_RTSP_EEOF; - } else if (r < 0) { - if (ERRNO_IS_EAGAIN) + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + } + g_clear_error (&err); + return GST_RTSP_ESYS; } else { left -= r; *idx += r; @@ -1040,30 +1032,130 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) return GST_RTSP_OK; } +/* The code below tries to handle clients using \r, \n or \r\n to indicate the + * end of a line. It even does its best to handle clients which mix them (even + * though this is a really stupid idea (tm).) It also handles Line White Space + * (LWS), where a line end followed by whitespace is considered LWS. This is + * the method used in RTSP (and HTTP) to break long lines. + */ static GstRTSPResult -read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) +read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) { + GError *err = NULL; + while (TRUE) { guint8 c; gint r; - r = fill_bytes (fd, &c, 1, ctx); - if (r == 0) { - return GST_RTSP_EEOF; - } else if (r < 0) { - if (ERRNO_IS_EAGAIN) - return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + if (conn->read_ahead == READ_AHEAD_EOH) { + /* the last call to read_line() already determined that we have reached + * the end of the headers, so convey that information now */ + conn->read_ahead = 0; + break; + } else if (conn->read_ahead == READ_AHEAD_CRLF) { + /* the last call to read_line() left off after having read \r\n */ + c = '\n'; + } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { + /* the last call to read_line() left off after having read \r\n\r */ + c = '\r'; + } else if (conn->read_ahead != 0) { + /* the last call to read_line() left us with a character to start with */ + c = (guint8) conn->read_ahead; + conn->read_ahead = 0; } else { - if (c == '\n') /* end on \n */ - break; - if (c == '\r') /* ignore \r */ - continue; + /* read the next character */ + r = fill_bytes (conn, &c, 1, &err); + if (G_UNLIKELY (r == 0)) { + return GST_RTSP_EEOF; + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } + + g_clear_error (&err); + return GST_RTSP_ESYS; + } + } + + /* special treatment of line endings */ + if (c == '\r' || c == '\n') { + guint8 read_ahead; + + retry: + /* need to read ahead one more character to know what to do... */ + r = fill_bytes (conn, &read_ahead, 1, &err); + if (G_UNLIKELY (r == 0)) { + return GST_RTSP_EEOF; + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + /* remember the original character we read and try again next time */ + if (conn->read_ahead == 0) + conn->read_ahead = c; + return GST_RTSP_EINTR; + g_clear_error (&err); + return GST_RTSP_EINTR; + } - if (*idx < size - 1) - buffer[(*idx)++] = c; + g_clear_error (&err); + return GST_RTSP_ESYS; + } + + if (read_ahead == ' ' || read_ahead == '\t') { + if (conn->read_ahead == READ_AHEAD_CRLFCR) { + /* got \r\n\r followed by whitespace, treat it as a normal line + * followed by one starting with LWS */ + conn->read_ahead = read_ahead; + break; + } else { + /* got LWS, change the line ending to a space and continue */ + c = ' '; + conn->read_ahead = read_ahead; + } + } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { + if (read_ahead == '\r' || read_ahead == '\n') { + /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else { + /* got \r\n\r followed by something else, this is not really + * supported since we have probably just eaten the first character + * of the body or the next message, so just ignore the second \r + * and live with it... */ + conn->read_ahead = read_ahead; + break; + } + } else if (conn->read_ahead == READ_AHEAD_CRLF) { + if (read_ahead == '\r') { + /* got \r\n\r so far, need one more character... */ + conn->read_ahead = READ_AHEAD_CRLFCR; + goto retry; + } else if (read_ahead == '\n') { + /* got \r\n\n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else { + /* found the end of a line, keep read_ahead for the next line */ + conn->read_ahead = read_ahead; + break; + } + } else if (c == read_ahead) { + /* got double \r or \n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else if (c == '\r' && read_ahead == '\n') { + /* got \r\n so far, still need more to know what to do... */ + conn->read_ahead = READ_AHEAD_CRLF; + goto retry; + } else { + /* found the end of a line, keep read_ahead for the next line */ + conn->read_ahead = read_ahead; + break; + } } + + if (G_LIKELY (*idx < size - 1)) + buffer[(*idx)++] = c; } buffer[*idx] = '\0'; @@ -1090,20 +1182,13 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) { guint offset; - gint retval; GstClockTime to; GstRTSPResult res; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); - - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE); - /* clear all previous poll results */ - gst_poll_fd_ignored (conn->fdset, conn->writefd); - gst_poll_fd_ignored (conn->fdset, conn->readfd); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; @@ -1111,26 +1196,31 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, while (TRUE) { /* try to write */ - res = write_bytes (conn->writefd->fd, data, &offset, size); - if (res == GST_RTSP_OK) + res = + write_bytes (conn->write_socket, data, &offset, size, + conn->cancellable); + if (G_LIKELY (res == GST_RTSP_OK)) break; - if (res != GST_RTSP_EINTR) + if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto write_error; /* not all is written, wait until we can write more */ - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - if (retval == 0) - goto timeout; - - if (retval == -1) { - if (errno == EBUSY) + g_socket_set_timeout (conn->write_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable, &err)) { + g_socket_set_timeout (conn->write_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto timeout; + } + g_clear_error (&err); + goto select_error; } + g_socket_set_timeout (conn->write_socket, 0); } return GST_RTSP_OK; @@ -1169,6 +1259,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) message->type_data.request.uri, conn->cseq++); /* add session id if we have one */ if (conn->session_id[0] != '\0') { + gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1); gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION, conn->session_id); } @@ -1180,6 +1271,21 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) g_string_append_printf (str, "RTSP/1.0 %d %s\r\n", message->type_data.response.code, message->type_data.response.reason); break; + case GST_RTSP_MESSAGE_HTTP_REQUEST: + /* create request string */ + g_string_append_printf (str, "%s %s HTTP/%s\r\n", + gst_rtsp_method_as_text (message->type_data.request.method), + message->type_data.request.uri, + gst_rtsp_version_as_text (message->type_data.request.version)); + /* add any authentication headers */ + add_auth_header (conn, message); + break; + case GST_RTSP_MESSAGE_HTTP_RESPONSE: + /* create response string */ + g_string_append_printf (str, "HTTP/%s %d %s\r\n", + gst_rtsp_version_as_text (message->type_data.request.version), + message->type_data.response.code, message->type_data.response.reason); + break; case GST_RTSP_MESSAGE_DATA: { guint8 data_header[4]; @@ -1210,6 +1316,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message) gen_date_string (date_string, sizeof (date_string)); /* add date header */ + gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1); gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string); /* append headers */ @@ -1263,7 +1370,7 @@ gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - if (!(string = message_to_string (conn, message))) + if (G_UNLIKELY (!(string = message_to_string (conn, message)))) goto no_message; if (conn->tunneled) { @@ -1290,9 +1397,10 @@ no_message: } } -static void +static GstRTSPResult parse_string (gchar * dest, gint size, gchar ** src) { + GstRTSPResult res = GST_RTSP_OK; gint idx; idx = 0; @@ -1303,31 +1411,64 @@ parse_string (gchar * dest, gint size, gchar ** src) while (!g_ascii_isspace (**src) && **src != '\0') { if (idx < size - 1) dest[idx++] = **src; + else + res = GST_RTSP_EPARSE; (*src)++; } if (size > 0) dest[idx] = '\0'; + + return res; } -static void -parse_key (gchar * dest, gint size, gchar ** src) +static GstRTSPResult +parse_protocol_version (gchar * protocol, GstRTSPMsgType * type, + GstRTSPVersion * version) { - gint idx; + GstRTSPResult res = GST_RTSP_OK; + gchar *ver; - idx = 0; - while (**src != ':' && **src != '\0') { - if (idx < size - 1) - dest[idx++] = **src; - (*src)++; - } - if (size > 0) - dest[idx] = '\0'; + if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) { + guint major; + guint minor; + gchar dummychar; + + *ver++ = '\0'; + + /* the version number must be formatted as X.Y with nothing following */ + if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2) + res = GST_RTSP_EPARSE; + + if (g_ascii_strcasecmp (protocol, "RTSP") == 0) { + if (major != 1 || minor != 0) { + *version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_ERROR; + } + } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) { + if (*type == GST_RTSP_MESSAGE_REQUEST) + *type = GST_RTSP_MESSAGE_HTTP_REQUEST; + else if (*type == GST_RTSP_MESSAGE_RESPONSE) + *type = GST_RTSP_MESSAGE_HTTP_RESPONSE; + + if (major == 1 && minor == 1) { + *version = GST_RTSP_VERSION_1_1; + } else if (major != 1 || minor != 0) { + *version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_ERROR; + } + } else + res = GST_RTSP_EPARSE; + } else + res = GST_RTSP_EPARSE; + + return res; } static GstRTSPResult parse_response_status (guint8 * buffer, GstRTSPMessage * msg) { - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_OK; + GstRTSPResult res2; gchar versionstr[20]; gchar codestr[4]; gint code; @@ -1335,170 +1476,222 @@ parse_response_status (guint8 * buffer, GstRTSPMessage * msg) bptr = (gchar *) buffer; - parse_string (versionstr, sizeof (versionstr), &bptr); - parse_string (codestr, sizeof (codestr), &bptr); + if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; + + if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; code = atoi (codestr); + if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600)) + res = GST_RTSP_EPARSE; while (g_ascii_isspace (*bptr)) bptr++; - if (strcmp (versionstr, "RTSP/1.0") == 0) - GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL), - parse_error); - else if (strncmp (versionstr, "RTSP/", 5) == 0) { - GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL), - parse_error); - msg->type_data.response.version = GST_RTSP_VERSION_INVALID; - } else - goto parse_error; + if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr, + NULL) != GST_RTSP_OK)) + res = GST_RTSP_EPARSE; - return GST_RTSP_OK; + res2 = parse_protocol_version (versionstr, &msg->type, + &msg->type_data.response.version); + if (G_LIKELY (res == GST_RTSP_OK)) + res = res2; -parse_error: - { - return GST_RTSP_EPARSE; - } + return res; } static GstRTSPResult -parse_request_line (GstRTSPConnection * conn, guint8 * buffer, - GstRTSPMessage * msg) +parse_request_line (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPResult res = GST_RTSP_OK; + GstRTSPResult res2; gchar versionstr[20]; gchar methodstr[20]; gchar urlstr[4096]; gchar *bptr; GstRTSPMethod method; - GstRTSPTunnelState tstate = TUNNEL_STATE_NONE; bptr = (gchar *) buffer; - parse_string (methodstr, sizeof (methodstr), &bptr); + if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; method = gst_rtsp_find_method (methodstr); - if (method == GST_RTSP_INVALID) { - /* a tunnel request is allowed when we don't have one yet */ - if (conn->tstate != TUNNEL_STATE_NONE) - goto invalid_method; - /* we need GET or POST for a valid tunnel request */ - if (!strcmp (methodstr, "GET")) - tstate = TUNNEL_STATE_GET; - else if (!strcmp (methodstr, "POST")) - tstate = TUNNEL_STATE_POST; - else - goto invalid_method; - } - parse_string (urlstr, sizeof (urlstr), &bptr); - if (*urlstr == '\0') - goto invalid_url; + if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; + if (G_UNLIKELY (*urlstr == '\0')) + res = GST_RTSP_EPARSE; - parse_string (versionstr, sizeof (versionstr), &bptr); + if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK) + res = GST_RTSP_EPARSE; - if (*bptr != '\0') - goto invalid_version; + if (G_UNLIKELY (*bptr != '\0')) + res = GST_RTSP_EPARSE; - if (strcmp (versionstr, "RTSP/1.0") == 0) { - res = gst_rtsp_message_init_request (msg, method, urlstr); - } else if (strncmp (versionstr, "RTSP/", 5) == 0) { - res = gst_rtsp_message_init_request (msg, method, urlstr); - msg->type_data.request.version = GST_RTSP_VERSION_INVALID; - } else if (strcmp (versionstr, "HTTP/1.0") == 0) { - /* tunnel request, we need a tunnel method */ - if (tstate == TUNNEL_STATE_NONE) { - res = GST_RTSP_EPARSE; - } else { - conn->tstate = tstate; - } - } else { + if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method, + urlstr) != GST_RTSP_OK)) res = GST_RTSP_EPARSE; - } - return res; + res2 = parse_protocol_version (versionstr, &msg->type, + &msg->type_data.request.version); + if (G_LIKELY (res == GST_RTSP_OK)) + res = res2; - /* ERRORS */ -invalid_method: - { - GST_ERROR ("invalid method %s", methodstr); - return GST_RTSP_EPARSE; - } -invalid_url: - { - GST_ERROR ("invalid url %s", urlstr); - return GST_RTSP_EPARSE; - } -invalid_version: - { - GST_ERROR ("invalid version"); - return GST_RTSP_EPARSE; + if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) { + /* GET and POST are not allowed as RTSP methods */ + if (msg->type_data.request.method == GST_RTSP_GET || + msg->type_data.request.method == GST_RTSP_POST) { + msg->type_data.request.method = GST_RTSP_INVALID; + if (res == GST_RTSP_OK) + res = GST_RTSP_ERROR; + } + } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + /* only GET and POST are allowed as HTTP methods */ + if (msg->type_data.request.method != GST_RTSP_GET && + msg->type_data.request.method != GST_RTSP_POST) { + msg->type_data.request.method = GST_RTSP_INVALID; + if (res == GST_RTSP_OK) + res = GST_RTSP_ERROR; + } } + + return res; } +/* parsing lines means reading a Key: Value pair */ static GstRTSPResult -parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value) +parse_line (guint8 * buffer, GstRTSPMessage * msg) { - gchar *bptr; + GstRTSPHeaderField field; + gchar *line = (gchar *) buffer; + gchar *value; - bptr = (gchar *) buffer; + if ((value = strchr (line, ':')) == NULL || value == line) + goto parse_error; - /* read key */ - parse_key (key, keysize, &bptr); - if (*bptr != ':') - goto no_column; + /* trim space before the colon */ + if (value[-1] == ' ') + value[-1] = '\0'; - bptr++; - while (g_ascii_isspace (*bptr)) - bptr++; + /* replace the colon with a NUL */ + *value++ = '\0'; - *value = bptr; + /* find the header */ + field = gst_rtsp_find_header_field (line); + if (field == GST_RTSP_HDR_INVALID) + goto done; - return GST_RTSP_OK; + /* split up the value in multiple key:value pairs if it contains comma(s) */ + while (*value != '\0') { + gchar *next_value; + gchar *comma = NULL; + gboolean quoted = FALSE; + guint comment = 0; + + /* trim leading space */ + if (*value == ' ') + value++; + + /* for headers which may not appear multiple times, and thus may not + * contain multiple values on the same line, we can short-circuit the loop + * below and the entire value results in just one key:value pair*/ + if (!gst_rtsp_header_allow_multiple (field)) + next_value = value + strlen (value); + else + next_value = value; + + /* find the next value, taking special care of quotes and comments */ + while (*next_value != '\0') { + if ((quoted || comment != 0) && *next_value == '\\' && + next_value[1] != '\0') + next_value++; + else if (comment == 0 && *next_value == '"') + quoted = !quoted; + else if (!quoted && *next_value == '(') + comment++; + else if (comment != 0 && *next_value == ')') + comment--; + else if (!quoted && comment == 0) { + /* To quote RFC 2068: "User agents MUST take special care in parsing + * the WWW-Authenticate field value if it contains more than one + * challenge, or if more than one WWW-Authenticate header field is + * provided, since the contents of a challenge may itself contain a + * comma-separated list of authentication parameters." + * + * What this means is that we cannot just look for an unquoted comma + * when looking for multiple values in Proxy-Authenticate and + * WWW-Authenticate headers. Instead we need to look for the sequence + * "comma [space] token space token" before we can split after the + * comma... + */ + if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE || + field == GST_RTSP_HDR_WWW_AUTHENTICATE) { + if (*next_value == ',') { + if (next_value[1] == ' ') { + /* skip any space following the comma so we do not mistake it for + * separating between two tokens */ + next_value++; + } + comma = next_value; + } else if (*next_value == ' ' && next_value[1] != ',' && + next_value[1] != '=' && comma != NULL) { + next_value = comma; + comma = NULL; + break; + } + } else if (*next_value == ',') + break; + } - /* ERRORS */ -no_column: - { - return GST_RTSP_EPARSE; - } -} + next_value++; + } -/* parsing lines means reading a Key: Value pair */ -static GstRTSPResult -parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg) -{ - GstRTSPResult res; - gchar key[32]; - gchar *value; - GstRTSPHeaderField field; + /* trim space */ + if (value != next_value && next_value[-1] == ' ') + next_value[-1] = '\0'; - res = parse_key_value (buffer, key, sizeof (key), &value); - if (res != GST_RTSP_OK) - goto parse_error; + if (*next_value != '\0') + *next_value++ = '\0'; - if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) { - /* save the tunnel session in the connection */ - if (!strcmp (key, "x-sessioncookie")) { - strncpy (conn->tunnelid, value, TUNNELID_LEN); - conn->tunnelid[TUNNELID_LEN - 1] = '\0'; - conn->tunneled = TRUE; - } - } else { - field = gst_rtsp_find_header_field (key); - if (field != GST_RTSP_HDR_INVALID) + /* add the key:value pair */ + if (*value != '\0') gst_rtsp_message_add_header (msg, field, value); + + value = next_value; } +done: return GST_RTSP_OK; /* ERRORS */ parse_error: { - return res; + return GST_RTSP_EPARSE; + } +} + +/* convert all consecutive whitespace to a single space */ +static void +normalize_line (guint8 * buffer) +{ + while (*buffer) { + if (g_ascii_isspace (*buffer)) { + guint8 *tmp; + + *buffer++ = ' '; + for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) { + } + if (buffer != tmp) + memmove (buffer, tmp, strlen ((gchar *) tmp) + 1); + } else { + buffer++; + } } } /* returns: * GST_RTSP_OK when a complete message was read. - * GST_RTSP_EEOF: when the socket is closed + * GST_RTSP_EEOF: when the read socket is closed * GST_RTSP_EINTR: when more data is needed. * GST_RTSP_..: some other error occured. */ @@ -1511,28 +1704,35 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, while (TRUE) { switch (builder->state) { case STATE_START: + { + guint8 c; + builder->offset = 0; res = - read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, - &builder->offset, 1, conn->ctxp); + read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1); if (res != GST_RTSP_OK) goto done; + c = builder->buffer[0]; + /* we have 1 bytes now and we can see if this is a data message or * not */ - if (builder->buffer[0] == '$') { + if (c == '$') { /* data message, prepare for the header */ builder->state = STATE_DATA_HEADER; + } else if (c == '\n' || c == '\r') { + /* skip \n and \r */ + builder->offset = 0; } else { builder->line = 0; builder->state = STATE_READ_LINES; } break; + } case STATE_DATA_HEADER: { res = - read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, - &builder->offset, 4, conn->ctxp); + read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4); if (res != GST_RTSP_OK) goto done; @@ -1548,13 +1748,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, case STATE_DATA_BODY: { res = - read_bytes (conn->readfd->fd, builder->body_data, &builder->offset, - builder->body_len, conn->ctxp); + read_bytes (conn, builder->body_data, &builder->offset, + builder->body_len); if (res != GST_RTSP_OK) goto done; /* we have the complete body now, store in the message adjusting the - * length to include the traling '\0' */ + * length to include the trailing '\0' */ gst_rtsp_message_take_body (message, (guint8 *) builder->body_data, builder->body_len + 1); builder->body_data = NULL; @@ -1565,26 +1765,33 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } case STATE_READ_LINES: { - res = read_line (conn->readfd->fd, builder->buffer, &builder->offset, - sizeof (builder->buffer), conn->ctxp); + res = read_line (conn, builder->buffer, &builder->offset, + sizeof (builder->buffer)); if (res != GST_RTSP_OK) goto done; /* we have a regular response */ - if (builder->buffer[0] == '\r') { - builder->buffer[0] = '\0'; - } - if (builder->buffer[0] == '\0') { gchar *hdrval; /* empty line, end of message header */ - /* see if there is a Content-Length header */ + /* see if there is a Content-Length header, but ignore it if this + * is a POST request with an x-sessioncookie header */ if (gst_rtsp_message_get_header (message, - GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) { + GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK && + (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST || + message->type_data.request.method != GST_RTSP_POST || + gst_rtsp_message_get_header (message, + GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) { /* there is, prepare to read the body */ builder->body_len = atol (hdrval); - builder->body_data = g_malloc (builder->body_len + 1); + builder->body_data = g_try_malloc (builder->body_len + 1); + /* we can't do much here, we need the length to know how many bytes + * we need to read next and when allocation fails, something is + * probably wrong with the length. */ + if (builder->body_data == NULL) + goto invalid_body_len; + builder->body_data[builder->body_len] = '\0'; builder->offset = 0; builder->state = STATE_DATA_BODY; @@ -1595,19 +1802,20 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } /* we have a line */ + normalize_line (builder->buffer); if (builder->line == 0) { /* first line, check for response status */ - if (memcmp (builder->buffer, "RTSP", 4) == 0) { - res = parse_response_status (builder->buffer, message); + if (memcmp (builder->buffer, "RTSP", 4) == 0 || + memcmp (builder->buffer, "HTTP", 4) == 0) { + builder->status = parse_response_status (builder->buffer, message); } else { - res = parse_request_line (conn, builder->buffer, message); + builder->status = parse_request_line (builder->buffer, message); } - /* the first line must parse without errors */ - if (res != GST_RTSP_OK) - goto done; } else { - /* else just parse the line, ignore errors */ - parse_line (conn, builder->buffer, message); + /* else just parse the line */ + res = parse_line (builder->buffer, message); + if (res != GST_RTSP_OK) + builder->status = res; } builder->line++; builder->offset = 0; @@ -1615,24 +1823,29 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } case STATE_END: { + gchar *session_cookie; gchar *session_id; - if (conn->tstate == TUNNEL_STATE_GET) { - res = GST_RTSP_ETGET; - goto done; - } else if (conn->tstate == TUNNEL_STATE_POST) { - res = GST_RTSP_ETPOST; - goto done; - } - if (message->type == GST_RTSP_MESSAGE_DATA) { /* data messages don't have headers */ res = GST_RTSP_OK; goto done; } + /* save the tunnel session in the connection */ + if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST && + !conn->manual_http && + conn->tstate == TUNNEL_STATE_NONE && + gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE, + &session_cookie, 0) == GST_RTSP_OK) { + strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN); + conn->tunnelid[TUNNELID_LEN - 1] = '\0'; + conn->tunneled = TRUE; + } + /* save session id in the connection for further use */ - if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, + if (message->type == GST_RTSP_MESSAGE_RESPONSE && + gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, &session_id, 0) == GST_RTSP_OK) { gint maxlen, i; @@ -1650,7 +1863,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, gint to; /* if we parsed something valid, configure */ - if ((to = atoi (&session_id[i + 9])) > 0) + if ((to = atoi (&session_id[i + 8])) > 0) conn->timeout = to; } break; @@ -1661,7 +1874,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, strncpy (conn->session_id, session_id, maxlen); conn->session_id[maxlen] = '\0'; } - res = GST_RTSP_OK; + res = builder->status; goto done; } default: @@ -1671,6 +1884,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } done: return res; + + /* ERRORS */ +invalid_body_len: + { + GST_DEBUG ("could not allocate body"); + return GST_RTSP_ERROR; + } } /** @@ -1693,15 +1913,15 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, GTimeVal * timeout) { guint offset; - gint retval; GstClockTime to; GstRTSPResult res; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); - if (size == 0) + if (G_UNLIKELY (size == 0)) return GST_RTSP_OK; offset = 0; @@ -1709,34 +1929,32 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); - while (TRUE) { - res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp); - if (res == GST_RTSP_EEOF) + res = read_bytes (conn, data, &offset, size); + if (G_UNLIKELY (res == GST_RTSP_EEOF)) goto eof; - if (res == GST_RTSP_OK) + if (G_LIKELY (res == GST_RTSP_OK)) break; - if (res != GST_RTSP_EINTR) + if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto read_error; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (retval == 0) - goto select_timeout; - - if (retval == -1) { - if (errno == EBUSY) + g_socket_set_timeout (conn->read_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, + &err)) { + g_socket_set_timeout (conn->read_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto select_timeout; + } + g_clear_error (&err); + goto select_error; } - gst_poll_set_controllable (conn->fdset, FALSE); + g_socket_set_timeout (conn->read_socket, 0); } return GST_RTSP_OK; @@ -1763,37 +1981,40 @@ read_error: } } -static GString * -gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code) +static GstRTSPMessage * +gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code, + const GstRTSPMessage * request) { - GString *str; - gchar date_string[100]; - const gchar *status; + GstRTSPMessage *msg; + GstRTSPResult res; - gen_date_string (date_string, sizeof (date_string)); + if (gst_rtsp_status_as_text (code) == NULL) + code = GST_RTSP_STS_INTERNAL_SERVER_ERROR; - status = gst_rtsp_status_as_text (code); - if (status == NULL) { - code = 500; - status = "Internal Server Error"; - } + GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request), + no_message); - str = g_string_new (""); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER, + "GStreamer RTSP Server"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); - /* */ - g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status); - g_string_append_printf (str, - "Server: GStreamer RTSP Server\r\n" - "Date: %s\r\n" - "Connection: close\r\n" - "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string); - if (code == 200) { + if (code == GST_RTSP_STS_OK) { if (conn->ip) - g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip); - g_string_append_printf (str, - "Content-Type: application/x-rtsp-tunnelled\r\n" "\r\n"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, + conn->ip); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE, + "application/x-rtsp-tunnelled"); + } + + return msg; + + /* ERRORS */ +no_message: + { + return NULL; } - return str; } /** @@ -1815,58 +2036,77 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) { GstRTSPResult res; - GstRTSPBuilder builder = { 0 }; - gint retval; + GstRTSPBuilder builder; GstClockTime to; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); - + memset (&builder, 0, sizeof (GstRTSPBuilder)); while (TRUE) { res = build_next (&builder, message, conn); - if (res == GST_RTSP_EEOF) + if (G_UNLIKELY (res == GST_RTSP_EEOF)) goto eof; - if (res == GST_RTSP_OK) - break; - if (res == GST_RTSP_ETGET) { - GString *str; - - /* tunnel GET request, we can reply now */ - str = gen_tunnel_reply (conn, GST_RTSP_STS_OK); - res = - gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len, - timeout); - g_string_free (str, TRUE); - } else if (res == GST_RTSP_ETPOST) { - /* tunnel POST request, return the value, the caller now has to link the - * two connections. */ + else if (G_LIKELY (res == GST_RTSP_OK)) { + if (!conn->manual_http) { + if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; + + conn->tstate = TUNNEL_STATE_GET; + + /* tunnel GET request, we can reply now */ + response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message); + res = gst_rtsp_connection_send (conn, response, timeout); + gst_rtsp_message_free (response); + if (res == GST_RTSP_OK) + res = GST_RTSP_ETGET; + goto cleanup; + } else if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_POST) { + conn->tstate = TUNNEL_STATE_POST; + + /* tunnel POST request, the caller now has to link the two + * connections. */ + res = GST_RTSP_ETPOST; + goto cleanup; + } else { + res = GST_RTSP_EPARSE; + goto cleanup; + } + } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + res = GST_RTSP_EPARSE; + goto cleanup; + } + } + break; - } else if (res != GST_RTSP_EINTR) + } else if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto read_error; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (retval == 0) - goto select_timeout; - - if (retval == -1) { - if (errno == EBUSY) + g_socket_set_timeout (conn->read_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, + &err)) { + g_socket_set_timeout (conn->read_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto select_timeout; + } + g_clear_error (&err); + goto select_error; } - gst_poll_set_controllable (conn->fdset, FALSE); + g_socket_set_timeout (conn->read_socket, 0); } /* we have a message here */ @@ -1918,13 +2158,28 @@ gst_rtsp_connection_close (GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + /* last unref closes the connection we don't want to explicitly close here + * because these sockets might have been provided at construction */ + if (conn->socket0) { + g_object_unref (conn->socket0); + conn->socket0 = NULL; + } + if (conn->socket1) { + g_object_unref (conn->socket1); + conn->socket1 = NULL; + } + g_free (conn->ip); conn->ip = NULL; - REMOVE_POLLFD (conn->fdset, &conn->fd0); - REMOVE_POLLFD (conn->fdset, &conn->fd1); - conn->writefd = NULL; - conn->readfd = NULL; + conn->read_ahead = 0; + + g_free (conn->initial_buffer); + conn->initial_buffer = NULL; + conn->initial_buffer_offset = 0; + + conn->write_socket = NULL; + conn->read_socket = NULL; conn->tunneled = FALSE; conn->tstate = TUNNEL_STATE_NONE; conn->ctxp = NULL; @@ -1956,13 +2211,14 @@ gst_rtsp_connection_free (GstRTSPConnection * conn) g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); res = gst_rtsp_connection_close (conn); - gst_poll_free (conn->fdset); + + if (conn->cancellable) + g_object_unref (conn->cancellable); + g_timer_destroy (conn->timer); gst_rtsp_url_free (conn->url); + g_free (conn->proxy_host); g_free (conn); -#ifdef G_OS_WIN32 - WSACleanup (); -#endif return res; } @@ -1992,64 +2248,66 @@ gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events, GstRTSPEvent * revents, GTimeVal * timeout) { GstClockTime to; - gint retval; + GMainContext *ctx; + GSource *rs, *ws, *ts; + GIOCondition condition; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (events != 0, GST_RTSP_EINVAL); g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); - gst_poll_set_controllable (conn->fdset, TRUE); - - /* add fd to writer set when asked to */ - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, - events & GST_RTSP_EV_WRITE); - - /* add fd to reader set when asked to */ - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ); + ctx = g_main_context_new (); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + if (timeout) { + ts = g_timeout_source_new (to / GST_MSECOND); + g_source_set_dummy_callback (ts); + g_source_attach (ts, ctx); + g_source_unref (ts); + } - if (retval == 0) - goto select_timeout; + rs = g_socket_create_source (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable); + g_source_set_dummy_callback (rs); + g_source_attach (rs, ctx); + g_source_unref (rs); - if (retval == -1) { - if (errno == EBUSY) - goto stopped; - else - goto select_error; - } + ws = g_socket_create_source (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable); + g_source_set_dummy_callback (ws); + g_source_attach (ws, ctx); + g_source_unref (ws); + + /* Returns after handling all pending events */ + g_main_context_iteration (ctx, TRUE); + + g_main_context_unref (ctx); + + condition = + g_socket_condition_check (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); + condition |= + g_socket_condition_check (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP); *revents = 0; if (events & GST_RTSP_EV_READ) { - if (gst_poll_fd_can_read (conn->fdset, conn->readfd)) + if ((condition & G_IO_IN) || (condition & G_IO_PRI)) *revents |= GST_RTSP_EV_READ; } if (events & GST_RTSP_EV_WRITE) { - if (gst_poll_fd_can_write (conn->fdset, conn->writefd)) + if ((condition & G_IO_OUT)) *revents |= GST_RTSP_EV_WRITE; } - return GST_RTSP_OK; - /* ERRORS */ -select_timeout: - { + if (*revents == 0) return GST_RTSP_ETIMEOUT; - } -select_error: - { - return GST_RTSP_ESYS; - } -stopped: - { - return GST_RTSP_EINTR; - } + + return GST_RTSP_OK; } /** @@ -2058,7 +2316,7 @@ stopped: * @timeout: a timeout * * Calculate the next timeout for @conn, storing the result in @timeout. - * + * * Returns: #GST_RTSP_OK. */ GstRTSPResult @@ -2067,16 +2325,34 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) gdouble elapsed; glong sec; gulong usec; + gint ctimeout; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL); + ctimeout = conn->timeout; + if (ctimeout >= 20) { + /* Because we should act before the timeout we timeout 5 + * seconds in advance. */ + ctimeout -= 5; + } else if (ctimeout >= 5) { + /* else timeout 20% earlier */ + ctimeout -= ctimeout / 5; + } else if (ctimeout >= 1) { + /* else timeout 1 second earlier */ + ctimeout -= 1; + } + elapsed = g_timer_elapsed (conn->timer, &usec); - if (elapsed >= conn->timeout) { + if (elapsed >= ctimeout) { sec = 0; usec = 0; } else { - sec = conn->timeout - elapsed; + sec = ctimeout - elapsed; + if (usec <= G_USEC_PER_SEC) + usec = G_USEC_PER_SEC - usec; + else + usec = 0; } timeout->tv_sec = sec; @@ -2090,7 +2366,7 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) * @conn: a #GstRTSPConnection * * Reset the timeout of @conn. - * + * * Returns: #GST_RTSP_OK. */ GstRTSPResult @@ -2119,7 +2395,35 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - gst_poll_set_flushing (conn->fdset, flush); + if (flush) + g_cancellable_cancel (conn->cancellable); + else + g_cancellable_reset (conn->cancellable); + + return GST_RTSP_OK; +} + +/** + * gst_rtsp_connection_set_proxy: + * @conn: a #GstRTSPConnection + * @host: the proxy host + * @port: the proxy port + * + * Set the proxy host and port. + * + * Returns: #GST_RTSP_OK. + * + * Since: 0.10.23 + */ +GstRTSPResult +gst_rtsp_connection_set_proxy (GstRTSPConnection * conn, + const gchar * host, guint port) +{ + g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + + g_free (conn->proxy_host); + conn->proxy_host = g_strdup (host); + conn->proxy_port = port; return GST_RTSP_OK; } @@ -2251,21 +2555,21 @@ gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn) } static GstRTSPResult -set_qos_dscp (gint fd, guint qos_dscp) +set_qos_dscp (GSocket * socket, guint qos_dscp) { - union gst_sockaddr - { - struct sockaddr sa; - struct sockaddr_in6 sa_in6; - struct sockaddr_storage sa_stor; - } sa; +#ifndef IP_TOS + return GST_RTSP_OK; +#else + gint fd; + union gst_sockaddr sa; socklen_t slen = sizeof (sa); gint af; gint tos; - if (fd == -1) + if (!socket) return GST_RTSP_OK; + fd = g_socket_get_fd (socket); if (getsockname (fd, &sa.sa, &slen) < 0) goto no_getsockname; @@ -2282,12 +2586,12 @@ set_qos_dscp (gint fd, guint qos_dscp) switch (af) { case AF_INET: - if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) + if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) goto no_setsockopt; break; case AF_INET6: #ifdef IPV6_TCLASS - if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) + if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) goto no_setsockopt; break; #endif @@ -2303,11 +2607,11 @@ no_setsockopt: { return GST_RTSP_ESYS; } - wrong_family: { return GST_RTSP_ERROR; } +#endif } /** @@ -2327,12 +2631,12 @@ gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp) GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); - res = set_qos_dscp (conn->fd0.fd, qos_dscp); + res = set_qos_dscp (conn->socket0, qos_dscp); if (res == GST_RTSP_OK) - res = set_qos_dscp (conn->fd1.fd, qos_dscp); + res = set_qos_dscp (conn->socket1, qos_dscp); return res; } @@ -2395,6 +2699,65 @@ gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip) } /** + * gst_rtsp_connection_get_readfd: + * @conn: a #GstRTSPConnection + * + * Get the file descriptor for reading. + * + * Returns: the file descriptor used for reading or %NULL on error. The file + * descriptor remains valid until the connection is closed. + * + * Since: 0.10.23 + */ +GSocket * +gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn) +{ + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (conn->read_socket != NULL, NULL); + + return conn->read_socket; +} + +/** + * gst_rtsp_connection_get_write_socket: + * @conn: a #GstRTSPConnection + * + * Get the file descriptor for writing. + * + * Returns: the file descriptor used for writing or NULL on error. The file + * descriptor remains valid until the connection is closed. + * + * Since: 0.10.23 + */ +GSocket * +gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn) +{ + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (conn->write_socket != NULL, NULL); + + return conn->write_socket; +} + +/** + * gst_rtsp_connection_set_http_mode: + * @conn: a #GstRTSPConnection + * @enable: %TRUE to enable manual HTTP mode + * + * By setting the HTTP mode to %TRUE the message parsing will support HTTP + * messages in addition to the RTSP messages. It will also disable the + * automatic handling of setting up an HTTP tunnel. + * + * Since: 0.10.25 + */ +void +gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable) +{ + g_return_if_fail (conn != NULL); + + conn->manual_http = enable; +} + +/** * gst_rtsp_connection_set_tunneled: * @conn: a #GstRTSPConnection * @tunneled: the new state @@ -2408,8 +2771,8 @@ void gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled) { g_return_if_fail (conn != NULL); - g_return_if_fail (conn->readfd == NULL); - g_return_if_fail (conn->writefd == NULL); + g_return_if_fail (conn->read_socket == NULL); + g_return_if_fail (conn->write_socket == NULL); conn->tunneled = tunneled; } @@ -2456,7 +2819,7 @@ gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) /** * gst_rtsp_connection_do_tunnel: * @conn: a #GstRTSPConnection - * @conn2: a #GstRTSPConnection + * @conn2: a #GstRTSPConnection or %NULL * * If @conn received the first tunnel connection and @conn2 received * the second tunnel connection, link the two connections together so that @@ -2465,6 +2828,9 @@ gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) * After this call, @conn2 cannot be used anymore and must be freed with * gst_rtsp_connection_free(). * + * If @conn2 is %NULL then only the base64 decoding context will be setup for + * @conn. + * * Returns: return GST_RTSP_OK on success. * * Since: 0.10.23 @@ -2474,49 +2840,54 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, GstRTSPConnection * conn2) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL); - g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL); - g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN), - GST_RTSP_EINVAL); - /* both connections have fd0 as the read/write socket. start by taking the - * socket from conn2 and set it as the socket in conn */ - conn->fd1 = conn2->fd0; + if (conn2 != NULL) { + g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL); + g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL); + g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, + TUNNELID_LEN), GST_RTSP_EINVAL); + + /* both connections have socket0 as the read/write socket. start by taking the + * socket from conn2 and set it as the socket in conn */ + conn->socket1 = conn2->socket0; - /* clean up some of the state of conn2 */ - gst_poll_remove_fd (conn2->fdset, &conn2->fd0); - conn2->fd0.fd = -1; - conn2->readfd = conn2->writefd = NULL; + /* clean up some of the state of conn2 */ + g_cancellable_cancel (conn2->cancellable); + conn2->socket0 = 0; + g_object_unref (conn2->socket1); + conn2->socket1 = NULL; + conn2->write_socket = conn2->read_socket = NULL; + g_cancellable_reset (conn2->cancellable); - /* We make fd0 the write socket and fd1 the read socket. */ - conn->writefd = &conn->fd0; - conn->readfd = &conn->fd1; + /* We make socket0 the write socket and socket1 the read socket. */ + conn->write_socket = conn->socket0; + conn->read_socket = conn->socket1; - conn->tstate = TUNNEL_STATE_COMPLETE; + conn->tstate = TUNNEL_STATE_COMPLETE; + } /* we need base64 decoding for the readfd */ conn->ctx.state = 0; - conn->ctx.cin = 0; - conn->ctx.cout = 3; conn->ctx.save = 0; + conn->ctx.cout = 0; + conn->ctx.coutl = 0; conn->ctxp = &conn->ctx; return GST_RTSP_OK; } -#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR) -#define WRITE_COND (G_IO_OUT | G_IO_ERR) +#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define READ_COND (G_IO_IN | READ_ERR) +#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define WRITE_COND (G_IO_OUT | WRITE_ERR) typedef struct { - GString *str; - guint cseq; + guint8 *data; + guint size; + guint id; } GstRTSPRec; -static GstRTSPRec *queue_response (GstRTSPWatch * watch, GString * str, - guint cseq); - /* async functions */ struct _GstRTSPWatch { @@ -2529,14 +2900,15 @@ struct _GstRTSPWatch GPollFD readfd; GPollFD writefd; - gboolean write_added; /* queued message for transmission */ - GList *messages; + guint id; + GMutex *mutex; + GQueue *messages; guint8 *write_data; guint write_off; - guint write_len; - guint write_cseq; + guint write_size; + guint write_id; GstRTSPWatchFuncs funcs; @@ -2549,6 +2921,9 @@ gst_rtsp_source_prepare (GSource * source, gint * timeout) { GstRTSPWatch *watch = (GstRTSPWatch *) source; + if (watch->conn->initial_buffer != NULL) + return TRUE; + *timeout = (watch->conn->timeout * 1000); return FALSE; @@ -2569,126 +2944,226 @@ gst_rtsp_source_check (GSource * source) } static gboolean -gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback, - gpointer user_data) +gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, + gpointer user_data G_GNUC_UNUSED) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_ERROR; + gboolean keep_running = TRUE; /* first read as much as we can */ - if (watch->readfd.revents & READ_COND) { + if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) { do { + if (watch->readfd.revents & READ_ERR) + goto read_error; + res = build_next (&watch->builder, &watch->message, watch->conn); if (res == GST_RTSP_EINTR) break; - if (res == GST_RTSP_EEOF) - goto eof; - if (res == GST_RTSP_ETGET) { - GString *str; - GstRTSPStatusCode code; - - if (watch->funcs.tunnel_start) - code = watch->funcs.tunnel_start (watch, watch->user_data); - else - code = GST_RTSP_STS_OK; - - /* queue the response string */ - str = gen_tunnel_reply (watch->conn, code); - queue_response (watch, str, -1); - } else if (res == GST_RTSP_ETPOST) { - /* in the callback the connection should be tunneled with the - * GET connection */ - if (watch->funcs.tunnel_complete) - watch->funcs.tunnel_complete (watch, watch->user_data); - } else if (res != GST_RTSP_OK) - goto error; + else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + /* When we are in tunnelled mode, the read socket can be closed and we + * should be prepared for a new POST method to reopen it */ + if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { + /* remove the read connection for the tunnel */ + /* we accept a new POST request */ + watch->conn->tstate = TUNNEL_STATE_GET; + /* and signal that we lost our tunnel */ + if (watch->funcs.tunnel_lost) + res = watch->funcs.tunnel_lost (watch, watch->user_data); + goto read_done; + } else + goto eof; + } else if (G_LIKELY (res == GST_RTSP_OK)) { + if (!watch->conn->manual_http && + watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (watch->conn->tstate == TUNNEL_STATE_NONE && + watch->message.type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; + GstRTSPStatusCode code; + + watch->conn->tstate = TUNNEL_STATE_GET; + + if (watch->funcs.tunnel_start) + code = watch->funcs.tunnel_start (watch, watch->user_data); + else + code = GST_RTSP_STS_OK; + + /* queue the response */ + response = gen_tunnel_reply (watch->conn, code, &watch->message); + gst_rtsp_watch_send_message (watch, response, NULL); + gst_rtsp_message_free (response); + goto read_done; + } else if (watch->conn->tstate == TUNNEL_STATE_NONE && + watch->message.type_data.request.method == GST_RTSP_POST) { + watch->conn->tstate = TUNNEL_STATE_POST; + + /* in the callback the connection should be tunneled with the + * GET connection */ + if (watch->funcs.tunnel_complete) + watch->funcs.tunnel_complete (watch, watch->user_data); + goto read_done; + } + } + } - if (res == GST_RTSP_OK) { + if (!watch->conn->manual_http) { + /* if manual HTTP support is not enabled, then restore the message to + * what it would have looked like without the support for parsing HTTP + * messages being present */ + if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + watch->message.type = GST_RTSP_MESSAGE_REQUEST; + watch->message.type_data.request.method = GST_RTSP_INVALID; + if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + watch->message.type = GST_RTSP_MESSAGE_RESPONSE; + if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.response.version = + GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } + } + + if (G_LIKELY (res == GST_RTSP_OK)) { if (watch->funcs.message_received) watch->funcs.message_received (watch, &watch->message, watch->user_data); - - gst_rtsp_message_unset (&watch->message); + } else { + goto read_error; } + + read_done: + gst_rtsp_message_unset (&watch->message); build_reset (&watch->builder); } while (FALSE); } if (watch->writefd.revents & WRITE_COND) { + if (watch->writefd.revents & WRITE_ERR) + goto write_error; + + g_mutex_lock (watch->mutex); do { if (watch->write_data == NULL) { - GstRTSPRec *data; + GstRTSPRec *rec; - if (!watch->messages) - goto done; - - /* no data, get a new message from the queue */ - data = watch->messages->data; - watch->messages = g_list_delete_link (watch->messages, watch->messages); + /* get a new message from the queue */ + rec = g_queue_pop_tail (watch->messages); + if (rec == NULL) + break; watch->write_off = 0; - watch->write_len = data->str->len; - watch->write_data = (guint8 *) g_string_free (data->str, FALSE); - watch->write_cseq = data->cseq; + watch->write_data = rec->data; + watch->write_size = rec->size; + watch->write_id = rec->id; - g_slice_free (GstRTSPRec, data); + g_slice_free (GstRTSPRec, rec); } - res = write_bytes (watch->writefd.fd, watch->write_data, - &watch->write_off, watch->write_len); - if (res == GST_RTSP_EINTR) - break; - if (res != GST_RTSP_OK) - goto error; - - if (watch->funcs.message_sent && watch->write_cseq != -1) - watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data); + res = write_bytes (watch->conn->write_socket, watch->write_data, + &watch->write_off, watch->write_size, watch->conn->cancellable); + g_mutex_unlock (watch->mutex); - done: - if (watch->messages == NULL && watch->write_added) { - g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->write_added = FALSE; - watch->writefd.revents = 0; + if (res == GST_RTSP_EINTR) + goto write_blocked; + else if (G_LIKELY (res == GST_RTSP_OK)) { + if (watch->funcs.message_sent) + watch->funcs.message_sent (watch, watch->write_id, watch->user_data); + } else { + goto write_error; } + g_mutex_lock (watch->mutex); + g_free (watch->write_data); watch->write_data = NULL; - } while (FALSE); + } while (TRUE); + + watch->writefd.events = WRITE_ERR; + + g_mutex_unlock (watch->mutex); } - return TRUE; +write_blocked: + return keep_running; /* ERRORS */ eof: { if (watch->funcs.closed) watch->funcs.closed (watch, watch->user_data); + + /* always stop when the readfd returns EOF in non-tunneled mode */ return FALSE; } +read_error: + { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + keep_running = (watch->writefd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, + 0, watch->user_data), error); + else + goto error; + } else + goto eof; + } +write_error: + { + watch->writefd.events = 0; + watch->writefd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->writefd); + keep_running = (watch->readfd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, + watch->write_id, watch->user_data), error); + else + goto error; + } else + goto eof; + } error: { if (watch->funcs.error) watch->funcs.error (watch, res, watch->user_data); - return FALSE; + + return keep_running; } } static void +gst_rtsp_rec_free (gpointer data) +{ + GstRTSPRec *rec = data; + + g_free (rec->data); + g_slice_free (GstRTSPRec, rec); +} + +static void gst_rtsp_source_finalize (GSource * source) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GList *walk; build_reset (&watch->builder); + gst_rtsp_message_unset (&watch->message); - for (walk = watch->messages; walk; walk = g_list_next (walk)) { - GstRTSPRec *data = walk->data; - - g_string_free (data->str, TRUE); - g_slice_free (GstRTSPRec, data); - } - g_list_free (watch->messages); + g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); + g_queue_free (watch->messages); + watch->messages = NULL; g_free (watch->write_data); + g_mutex_free (watch->mutex); + if (watch->notify) watch->notify (watch->user_data); } @@ -2697,7 +3172,9 @@ static GSourceFuncs gst_rtsp_source_funcs = { gst_rtsp_source_prepare, gst_rtsp_source_check, gst_rtsp_source_dispatch, - gst_rtsp_source_finalize + gst_rtsp_source_finalize, + NULL, + NULL }; /** @@ -2728,8 +3205,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, g_return_val_if_fail (conn != NULL, NULL); g_return_val_if_fail (funcs != NULL, NULL); - g_return_val_if_fail (conn->readfd != NULL, NULL); - g_return_val_if_fail (conn->writefd != NULL, NULL); + g_return_val_if_fail (conn->read_socket != NULL, NULL); + g_return_val_if_fail (conn->write_socket != NULL, NULL); result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs, sizeof (GstRTSPWatch)); @@ -2737,6 +3214,9 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->conn = conn; result->builder.state = STATE_START; + result->mutex = g_mutex_new (); + result->messages = g_queue_new (); + result->readfd.fd = -1; result->writefd.fd = -1; @@ -2746,10 +3226,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->user_data = user_data; result->notify = notify; - /* only add the read fd, the write fd is only added when we have data - * to send. */ - g_source_add_poll ((GSource *) result, &result->readfd); - return result; } @@ -2770,16 +3246,18 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch) if (watch->writefd.fd != -1) g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->readfd.fd = watch->conn->readfd->fd; + watch->readfd.fd = g_socket_get_fd (watch->conn->read_socket); watch->readfd.events = READ_COND; watch->readfd.revents = 0; - watch->writefd.fd = watch->conn->writefd->fd; - watch->writefd.events = WRITE_COND; + watch->writefd.fd = g_socket_get_fd (watch->conn->write_socket); + watch->writefd.events = WRITE_ERR; watch->writefd.revents = 0; - watch->write_added = FALSE; - g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->readfd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->writefd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->writefd); } /** @@ -2818,67 +3296,123 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) g_source_unref ((GSource *) watch); } -static GstRTSPRec * -queue_response (GstRTSPWatch * watch, GString * str, guint cseq) +/** + * gst_rtsp_watch_write_data: + * @watch: a #GstRTSPWatch + * @data: the data to queue + * @size: the size of @data + * @id: location for a message ID or %NULL + * + * Write @data using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. + * + * This function will take ownership of @data and g_free() it after use. + * + * Returns: #GST_RTSP_OK on success. + * + * Since: 0.10.25 + */ +GstRTSPResult +gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, + guint size, guint * id) { - GstRTSPRec *data; + GstRTSPResult res; + GstRTSPRec *rec; + guint off = 0; + GMainContext *context = NULL; + + g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (size != 0, GST_RTSP_EINVAL); + + g_mutex_lock (watch->mutex); + + /* try to send the message synchronously first */ + if (watch->messages->length == 0 && watch->write_data == NULL) { + res = + write_bytes (watch->conn->write_socket, data, &off, size, + watch->conn->cancellable); + if (res != GST_RTSP_EINTR) { + if (id != NULL) + *id = 0; + g_free ((gpointer) data); + goto done; + } + } - /* make a record with the message as a string ans cseq */ - data = g_slice_new (GstRTSPRec); - data->str = str; - data->cseq = cseq; + /* make a record with the data and id for sending async */ + rec = g_slice_new (GstRTSPRec); + if (off == 0) { + rec->data = (guint8 *) data; + rec->size = size; + } else { + rec->data = g_memdup (data + off, size - off); + rec->size = size - off; + g_free ((gpointer) data); + } + + do { + /* make sure rec->id is never 0 */ + rec->id = ++watch->id; + } while (G_UNLIKELY (rec->id == 0)); - /* add the record to a queue */ - watch->messages = g_list_append (watch->messages, data); + /* add the record to a queue. FIXME we would like to have an upper limit here */ + g_queue_push_head (watch->messages, rec); /* make sure the main context will now also check for writability on the * socket */ - if (!watch->write_added) { - g_source_add_poll ((GSource *) watch, &watch->writefd); - watch->write_added = TRUE; + if (watch->writefd.events != WRITE_COND) { + watch->writefd.events = WRITE_COND; + context = ((GSource *) watch)->context; } - return data; + if (id != NULL) + *id = rec->id; + res = GST_RTSP_OK; + +done: + g_mutex_unlock (watch->mutex); + + if (context) + g_main_context_wakeup (context); + + return res; } /** - * gst_rtsp_watch_queue_message: + * gst_rtsp_watch_send_message: * @watch: a #GstRTSPWatch * @message: a #GstRTSPMessage + * @id: location for a message ID or %NULL * - * Queue a @message for transmission in @watch. The contents of this - * message will be serialized and transmitted when the connection of the - * watch becomes writable. + * Send a @message using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. * - * The return value of this function will be returned as the cseq argument in - * the message_sent callback. - * - * Returns: the sequence number of the message or -1 if the cseq could not be - * determined. + * Returns: #GST_RTSP_OK on success. * - * Since: 0.10.23 + * Since: 0.10.25 */ -guint -gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message) +GstRTSPResult +gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message, + guint * id) { - GstRTSPRec *data; - gchar *header; - guint cseq; + GString *str; + guint size; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - /* get the cseq from the message, when we finish writing this message on the - * socket we will have to pass the cseq to the callback. */ - if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ, &header, - 0) == GST_RTSP_OK) { - cseq = atoi (header); - } else { - cseq = -1; - } - - /* make a record with the message as a string ans cseq */ - data = queue_response (watch, message_to_string (watch->conn, message), cseq); - - return cseq; + /* make a record with the message as a string and id */ + str = message_to_string (watch->conn, message); + size = str->len; + return gst_rtsp_watch_write_data (watch, + (guint8 *) g_string_free (str, FALSE), size, id); }