X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst-libs%2Fgst%2Frtsp%2Fgstrtspconnection.c;h=fa7ec81642ef05744138ce8d0bec8e89ff4abe2c;hb=0b0dde7ce11e15bedaf34aea2df843a5253d1e2f;hp=7c8c7808758f400147d27c3ef2059426638a3d91;hpb=11c8b811f3dee0a0e4c816226946e7a11b80629f;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 7c8c780..fa7ec81 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -45,12 +45,8 @@ * @short_description: manage RTSP connections * @see_also: gstrtspurl * - * - * * This object manages the RTSP connection to the server. It provides function * to receive and send bytes and messages. - * - * * * Last reviewed on 2007-07-24 (0.10.14) */ @@ -65,38 +61,15 @@ #include #include -#ifdef HAVE_UNISTD_H -#include -#endif - /* we include this here to get the G_OS_* defines */ #include #include -#ifdef G_OS_WIN32 -/* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later. - * minwg32 headers check WINVER before allowing the use of these */ -#ifndef WINVER -#define WINVER 0x0501 -#endif -#include -#include -#define EINPROGRESS WSAEINPROGRESS -#else -#include -#include -#include -#include -#include -#endif - -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include -#endif - #include "gstrtspconnection.h" -#include "gstrtspbase64.h" +#include "gst/glib-compat-private.h" + +#ifdef IP_TOS union gst_sockaddr { struct sockaddr sa; @@ -104,6 +77,7 @@ union gst_sockaddr struct sockaddr_in6 sa_in6; struct sockaddr_storage sa_stor; }; +#endif typedef struct { @@ -114,48 +88,12 @@ typedef struct guint coutl; } DecodeCtx; -static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer, - guint * idx, guint size); -static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key, - guint keysize, gchar ** value); -static GstRTSPResult parse_string (gchar * dest, gint size, gchar ** src); - -#ifdef G_OS_WIN32 -#define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0) -#define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0) -#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len) -#define CLOSE_SOCKET(sock) closesocket (sock) -#define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK) -#define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR) -/* According to Microsoft's connect() documentation this one returns - * WSAEWOULDBLOCK and not WSAEINPROGRESS. */ -#define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK) +#ifdef MSG_NOSIGNAL +#define SEND_FLAGS MSG_NOSIGNAL #else -#define READ_SOCKET(fd, buf, len) read (fd, buf, len) -#define WRITE_SOCKET(fd, buf, len) write (fd, buf, len) -#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len) -#define CLOSE_SOCKET(sock) close (sock) -#define ERRNO_IS_EAGAIN (errno == EAGAIN) -#define ERRNO_IS_EINTR (errno == EINTR) -#define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS) +#define SEND_FLAGS 0 #endif -#define ADD_POLLFD(fdset, pfd, fd) \ -G_STMT_START { \ - (pfd)->fd = fd; \ - gst_poll_add_fd (fdset, pfd); \ -} G_STMT_END - -#define REMOVE_POLLFD(fdset, pfd) \ -G_STMT_START { \ - if ((pfd)->fd != -1) { \ - GST_DEBUG ("remove fd %d", (pfd)->fd); \ - gst_poll_remove_fd (fdset, pfd); \ - CLOSE_SOCKET ((pfd)->fd); \ - (pfd)->fd = -1; \ - } \ -} G_STMT_END - typedef enum { TUNNEL_STATE_NONE, @@ -173,19 +111,20 @@ struct _GstRTSPConnection GstRTSPUrl *url; /* connection state */ - GstPollFD fd0; - GstPollFD fd1; - - GstPollFD *readfd; - GstPollFD *writefd; + GSocket *read_socket; + GSocket *write_socket; + gboolean manual_http; + GSocket *socket0, *socket1; + GCancellable *cancellable; gchar tunnelid[TUNNELID_LEN]; gboolean tunneled; GstRTSPTunnelState tstate; - GstPoll *fdset; gchar *ip; + gint read_ahead; + gchar *initial_buffer; gsize initial_buffer_offset; @@ -218,6 +157,13 @@ enum STATE_LAST }; +enum +{ + READ_AHEAD_EOH = -1, /* end of headers */ + READ_AHEAD_CRLF = -2, + READ_AHEAD_CRLFCR = -3 +}; + /* a structure for constructing RTSPMessages */ typedef struct { @@ -255,31 +201,14 @@ GstRTSPResult gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) { GstRTSPConnection *newconn; -#ifdef G_OS_WIN32 - WSADATA w; - int error; -#endif g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); -#ifdef G_OS_WIN32 - error = WSAStartup (0x0202, &w); - - if (error) - goto startup_error; - - if (w.wVersion != 0x0202) - goto version_error; -#endif - newconn = g_new0 (GstRTSPConnection, 1); - if ((newconn->fdset = gst_poll_new (TRUE)) == NULL) - goto no_fdset; + newconn->cancellable = g_cancellable_new (); newconn->url = gst_rtsp_url_copy (url); - newconn->fd0.fd = -1; - newconn->fd1.fd = -1; newconn->timer = g_timer_new (); newconn->timeout = 60; newconn->cseq = 1; @@ -292,69 +221,38 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) *conn = newconn; return GST_RTSP_OK; - - /* ERRORS */ -#ifdef G_OS_WIN32 -startup_error: - { - g_warning ("Error %d on WSAStartup", error); - return GST_RTSP_EWSASTART; - } -version_error: - { - g_warning ("Windows sockets are not version 0x202 (current 0x%x)", - w.wVersion); - WSACleanup (); - return GST_RTSP_EWSAVERSION; - } -#endif -no_fdset: - { - g_free (newconn); -#ifdef G_OS_WIN32 - WSACleanup (); -#endif - return GST_RTSP_ESYS; - } } /** - * gst_rtsp_connection_create_from_fd: - * @fd: a file descriptor + * gst_rtsp_connection_create_from_socket: + * @socket: a #GSocket * @ip: the IP address of the other end * @port: the port used by the other end * @initial_buffer: data already read from @fd * @conn: storage for a #GstRTSPConnection * * Create a new #GstRTSPConnection for handling communication on the existing - * file descriptor @fd. The @initial_buffer contains any data already read from - * @fd which should be used before starting to read new data. + * socket @socket. The @initial_buffer contains any data already read from + * @socket which should be used before starting to read new data. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. * * Since: 0.10.25 */ GstRTSPResult -gst_rtsp_connection_create_from_fd (gint fd, const gchar * ip, guint16 port, - const gchar * initial_buffer, GstRTSPConnection ** conn) +gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip, + guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn) { GstRTSPConnection *newconn = NULL; GstRTSPUrl *url; -#ifdef G_OS_WIN32 - gulong flags = 1; -#endif GstRTSPResult res; - g_return_val_if_fail (fd >= 0, GST_RTSP_EINVAL); + g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); /* set to non-blocking mode so that we can cancel the communication */ -#ifndef G_OS_WIN32 - fcntl (fd, F_SETFL, O_NONBLOCK); -#else - ioctlsocket (fd, FIONBIO, &flags); -#endif /* G_OS_WIN32 */ + g_socket_set_blocking (socket, FALSE); /* create a url for the client address */ url = g_new0 (GstRTSPUrl, 1); @@ -365,11 +263,10 @@ gst_rtsp_connection_create_from_fd (gint fd, const gchar * ip, guint16 port, GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed); gst_rtsp_url_free (url); - ADD_POLLFD (newconn->fdset, &newconn->fd0, fd); - /* both read and write initially */ - newconn->readfd = &newconn->fd0; - newconn->writefd = &newconn->fd0; + newconn->socket0 = G_SOCKET (g_object_ref (socket)); + newconn->socket1 = G_SOCKET (g_object_ref (socket)); + newconn->write_socket = newconn->read_socket = newconn->socket0; newconn->ip = g_strdup (ip); @@ -389,10 +286,11 @@ newconn_failed: /** * gst_rtsp_connection_accept: - * @sock: a socket + * @socket: a socket * @conn: storage for a #GstRTSPConnection + * @cancellable: a #GCancellable to cancel the operation * - * Accept a new connection on @sock and create a new #GstRTSPConnection for + * Accept a new connection on @socket and create a new #GstRTSPConnection for * handling communication on new socket. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. @@ -400,221 +298,187 @@ newconn_failed: * Since: 0.10.23 */ GstRTSPResult -gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn) +gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn, + GCancellable * cancellable) { - int fd; - union gst_sockaddr sa; - socklen_t slen = sizeof (sa); - gchar ip[INET6_ADDRSTRLEN]; + GError *err = NULL; + gchar *ip; guint16 port; + GSocket *client_sock; + GSocketAddress *addr; + GstRTSPResult ret; - g_return_val_if_fail (sock >= 0, GST_RTSP_EINVAL); + g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - memset (&sa, 0, slen); - -#ifndef G_OS_WIN32 - fd = accept (sock, &sa.sa, &slen); -#else - fd = accept (sock, &sa.sa, (gint *) & slen); -#endif /* G_OS_WIN32 */ - if (fd == -1) + client_sock = g_socket_accept (socket, cancellable, &err); + if (!client_sock) goto accept_failed; - if (getnameinfo (&sa.sa, slen, ip, sizeof (ip), NULL, 0, NI_NUMERICHOST) != 0) + addr = g_socket_get_remote_address (client_sock, &err); + if (!addr) goto getnameinfo_failed; - if (sa.sa.sa_family == AF_INET) - port = sa.sa_in.sin_port; - else if (sa.sa.sa_family == AF_INET6) - port = sa.sa_in6.sin6_port; - else - goto wrong_family; + ip = g_inet_address_to_string (g_inet_socket_address_get_address + (G_INET_SOCKET_ADDRESS (addr))); + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)); + g_object_unref (addr); - return gst_rtsp_connection_create_from_fd (fd, ip, port, NULL, conn); + ret = + gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL, + conn); + g_object_unref (client_sock); + g_free (ip); + + return ret; /* ERRORS */ accept_failed: { + GST_DEBUG ("Accepting client failed: %s", err->message); + g_clear_error (&err); return GST_RTSP_ESYS; } getnameinfo_failed: -wrong_family: { - close (fd); + if (!g_socket_close (client_sock, &err)) { + GST_DEBUG ("Closing socket failed: %s", err->message); + g_clear_error (&err); + } + g_object_unref (client_sock); return GST_RTSP_ERROR; } } static gchar * -do_resolve (const gchar * host) +do_resolve (const gchar * host, GCancellable * cancellable) { - static gchar ip[INET6_ADDRSTRLEN]; - struct addrinfo *aires; - struct addrinfo *ai; - gint aierr; + GResolver *resolver; + GInetAddress *addr; + GError *err = NULL; + gchar *ip; - aierr = getaddrinfo (host, NULL, NULL, &aires); - if (aierr != 0) - goto no_addrinfo; + addr = g_inet_address_new_from_string (host); + if (!addr) { + GList *results, *l; - for (ai = aires; ai; ai = ai->ai_next) { - if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) { - break; + resolver = g_resolver_get_default (); + + results = g_resolver_lookup_by_name (resolver, host, cancellable, &err); + if (!results) + goto name_resolve; + + for (l = results; l; l = l->next) { + GInetAddress *tmp = l->data; + + if (g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV4 || + g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV6) { + addr = G_INET_ADDRESS (g_object_ref (tmp)); + break; + } } + + g_resolver_free_addresses (results); + g_object_unref (resolver); } - if (ai == NULL) - goto no_family; - aierr = getnameinfo (ai->ai_addr, ai->ai_addrlen, ip, sizeof (ip), NULL, 0, - NI_NUMERICHOST | NI_NUMERICSERV); - if (aierr != 0) - goto no_address; + if (!addr) + return NULL; - freeaddrinfo (aires); + ip = g_inet_address_to_string (addr); + g_object_unref (addr); - return g_strdup (ip); + return ip; /* ERRORS */ -no_addrinfo: - { - GST_ERROR ("no addrinfo found for %s: %s", host, gai_strerror (aierr)); - return NULL; - } -no_family: - { - GST_ERROR ("no family found for %s", host); - freeaddrinfo (aires); - return NULL; - } -no_address: +name_resolve: { - GST_ERROR ("no address found for %s: %s", host, gai_strerror (aierr)); - freeaddrinfo (aires); + GST_ERROR ("failed to resolve %s: %s", host, err->message); + g_clear_error (&err); + g_object_unref (resolver); return NULL; } } static GstRTSPResult -do_connect (const gchar * ip, guint16 port, GstPollFD * fdout, - GstPoll * fdset, GTimeVal * timeout) +do_connect (const gchar * ip, guint16 port, GSocket ** socket_out, + GTimeVal * timeout, GCancellable * cancellable) { - gint fd; - struct addrinfo hints; - struct addrinfo *aires; - struct addrinfo *ai; - gint aierr; - gchar service[NI_MAXSERV]; - gint ret; -#ifdef G_OS_WIN32 - unsigned long flags = 1; -#endif /* G_OS_WIN32 */ + GSocket *socket; GstClockTime to; - gint retval; - - memset (&hints, 0, sizeof hints); - hints.ai_flags = AI_NUMERICHOST; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - g_snprintf (service, sizeof (service) - 1, "%hu", port); - service[sizeof (service) - 1] = '\0'; - - aierr = getaddrinfo (ip, service, &hints, &aires); - if (aierr != 0) - goto no_addrinfo; - - for (ai = aires; ai; ai = ai->ai_next) { - if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) { - break; - } - } - if (ai == NULL) - goto no_family; - - fd = socket (ai->ai_family, SOCK_STREAM, 0); - if (fd == -1) + GInetAddress *addr; + GSocketAddress *saddr; + GError *err = NULL; + + addr = g_inet_address_new_from_string (ip); + g_assert (addr); + saddr = g_inet_socket_address_new (addr, port); + g_object_unref (addr); + + socket = + g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); + if (socket == NULL) goto no_socket; /* set to non-blocking mode so that we can cancel the connect */ -#ifndef G_OS_WIN32 - fcntl (fd, F_SETFL, O_NONBLOCK); -#else - ioctlsocket (fd, FIONBIO, &flags); -#endif /* G_OS_WIN32 */ - - /* add the socket to our fdset */ - ADD_POLLFD (fdset, fdout, fd); + g_socket_set_blocking (socket, FALSE); /* we are going to connect ASYNC now */ - ret = connect (fd, ai->ai_addr, ai->ai_addrlen); - if (ret == 0) + if (!g_socket_connect (socket, saddr, cancellable, &err)) { + if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_PENDING)) + goto sys_error; + g_clear_error (&err); + } else { goto done; - if (!ERRNO_IS_EINPROGRESS) - goto sys_error; + } /* wait for connect to complete up to the specified timeout or until we got * interrupted. */ - gst_poll_fd_ctl_write (fdset, fdout, TRUE); - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - do { - retval = gst_poll_wait (fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - if (retval == 0) - goto timeout; - else if (retval == -1) - goto sys_error; - - /* we can still have an error connecting on windows */ - if (gst_poll_fd_has_error (fdset, fdout)) { - socklen_t len = sizeof (errno); -#ifndef G_OS_WIN32 - getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len); -#else - getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len); -#endif - goto sys_error; + g_socket_set_timeout (socket, (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (socket, G_IO_OUT, cancellable, &err)) { + g_socket_set_timeout (socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) + goto timeout; + else + goto sys_error; } + g_socket_set_timeout (socket, 0); - gst_poll_fd_ignored (fdset, fdout); + if (!g_socket_check_connect_result (socket, &err)) + goto sys_error; done: - freeaddrinfo (aires); + g_object_unref (saddr); + + *socket_out = socket; return GST_RTSP_OK; /* ERRORS */ -no_addrinfo: - { - GST_ERROR ("no addrinfo found for %s: %s", ip, gai_strerror (aierr)); - return GST_RTSP_ERROR; - } -no_family: - { - GST_ERROR ("no family found for %s", ip); - freeaddrinfo (aires); - return GST_RTSP_ERROR; - } no_socket: { - GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno)); - freeaddrinfo (aires); + GST_ERROR ("no socket: %s", err->message); + g_clear_error (&err); + g_object_unref (saddr); return GST_RTSP_ESYS; } sys_error: { - GST_ERROR ("system error %d (%s)", errno, g_strerror (errno)); - REMOVE_POLLFD (fdset, fdout); - freeaddrinfo (aires); + GST_ERROR ("system error: %s", err->message); + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (socket); return GST_RTSP_ESYS; } timeout: { GST_ERROR ("timeout"); - REMOVE_POLLFD (fdset, fdout); - freeaddrinfo (aires); + g_clear_error (&err); + g_object_unref (saddr); + g_object_unref (socket); return GST_RTSP_ETIMEOUT; } } @@ -624,16 +488,18 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) { gint i; GstRTSPResult res; - gchar *str; - guint idx, line; - gint retval; - GstClockTime to; - gchar *ip, *url_port_str; + gchar *ip; + gchar *uri; + gchar *value; guint16 port, url_port; - gchar codestr[4], *resultstr; - gint code; GstRTSPUrl *url; gchar *hostparam; + GstRTSPMessage *msg; + GstRTSPMessage response; + gboolean old_http; + + memset (&response, 0, sizeof (response)); + gst_rtsp_message_init (&response); /* create a random sessionid */ for (i = 0; i < TUNNELID_LEN; i++) @@ -645,196 +511,136 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) gst_rtsp_url_get_port (url, &url_port); if (conn->proxy_host) { - hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port); - url_port_str = g_strdup_printf (":%d", url_port); + uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port, + url->abspath, url->query ? "?" : "", url->query ? url->query : ""); + hostparam = g_strdup_printf ("%s:%d", url->host, url_port); ip = conn->proxy_host; port = conn->proxy_port; } else { + uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", + url->query ? url->query : ""); hostparam = NULL; - url_port_str = NULL; ip = conn->ip; port = url_port; } - /* */ - str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n" - "%s" - "x-sessioncookie: %s\r\n" - "Accept: application/x-rtsp-tunnelled\r\n" - "Pragma: no-cache\r\n" - "Cache-Control: no-cache\r\n" "\r\n", - conn->proxy_host ? "http://" : "", - conn->proxy_host ? url->host : "", - conn->proxy_host ? url_port_str : "", - url->abspath, url->query ? "?" : "", url->query ? url->query : "", - hostparam ? hostparam : "", conn->tunnelid); - - /* we start by writing to this fd */ - conn->writefd = &conn->fd0; - - res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout); - g_free (str); - if (res != GST_RTSP_OK) - goto write_failed; - - gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE); - gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE); - - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - - line = 0; - while (TRUE) { - guint8 buffer[4096]; - - idx = 0; - while (TRUE) { - res = read_line (conn, buffer, &idx, sizeof (buffer)); - if (res == GST_RTSP_EEOF) - goto eof; - if (res == GST_RTSP_OK) - break; - if (res != GST_RTSP_EINTR) - goto read_error; - - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + /* create the GET request for the read connection */ + GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri), + no_message); + msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; - /* check for timeout */ - if (retval == 0) - goto timeout; - - if (retval == -1) { - if (errno == EBUSY) - goto stopped; - else - goto select_error; - } - } - - /* check for last line */ - if (buffer[0] == '\r') - buffer[0] = '\0'; - if (buffer[0] == '\0') - break; - - if (line == 0) { - /* first line, parse response */ - gchar versionstr[20]; - gchar *bptr; + if (hostparam != NULL) + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, + conn->tunnelid); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, + "application/x-rtsp-tunnelled"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); - bptr = (gchar *) buffer; - - parse_string (versionstr, sizeof (versionstr), &bptr); - parse_string (codestr, sizeof (codestr), &bptr); - code = atoi (codestr); - - while (g_ascii_isspace (*bptr)) - bptr++; - - resultstr = bptr; + /* we start by writing to this fd */ + conn->write_socket = conn->socket0; - if (code != GST_RTSP_STS_OK) - goto wrong_result; + /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP + * request from being base64 encoded */ + conn->tunneled = FALSE; + GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; + + /* receive the response to the GET request */ + /* we need to temporarily set manual_http to TRUE since + * gst_rtsp_connection_receive() will treat the HTTP response as a parsing + * failure otherwise */ + old_http = conn->manual_http; + conn->manual_http = TRUE; + GST_RTSP_CHECK (gst_rtsp_connection_receive (conn, &response, timeout), + read_failed); + conn->manual_http = old_http; + + if (response.type != GST_RTSP_MESSAGE_HTTP_RESPONSE || + response.type_data.response.code != GST_RTSP_STS_OK) + goto wrong_result; + + if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, + &value, 0) == GST_RTSP_OK) { + if (conn->proxy_host) { + /* if we use a proxy we need to change the destination url */ + g_free (url->host); + url->host = g_strdup (value); + g_free (hostparam); + hostparam = g_strdup_printf ("%s:%d", url->host, url_port); } else { - gchar key[32]; - gchar *value; - - /* other lines, parse key/value */ - res = parse_key_value (buffer, key, sizeof (key), &value); - if (res == GST_RTSP_OK) { - /* we got a new ip address */ - if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) { - if (conn->proxy_host) { - /* if we use a proxy we need to change the destination url */ - g_free (url->host); - url->host = g_strdup (value); - g_free (hostparam); - g_free (url_port_str); - hostparam = - g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port); - url_port_str = g_strdup_printf (":%d", url_port); - } else { - /* and resolve the new ip address */ - if (!(ip = do_resolve (conn->ip))) - goto not_resolved; - g_free (conn->ip); - conn->ip = ip; - } - } - } + /* and resolve the new ip address */ + if (!(ip = do_resolve (value, conn->cancellable))) + goto not_resolved; + g_free (conn->ip); + conn->ip = ip; } - line++; } /* connect to the host/port */ - res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout); + res = do_connect (ip, port, &conn->socket1, timeout, conn->cancellable); if (res != GST_RTSP_OK) goto connect_failed; /* this is now our writing socket */ - conn->writefd = &conn->fd1; - - /* */ - str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n" - "%s" - "x-sessioncookie: %s\r\n" - "Content-Type: application/x-rtsp-tunnelled\r\n" - "Pragma: no-cache\r\n" - "Cache-Control: no-cache\r\n" - "Content-Length: 32767\r\n" - "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n" - "\r\n", - conn->proxy_host ? "http://" : "", - conn->proxy_host ? url->host : "", - conn->proxy_host ? url_port_str : "", - url->abspath, url->query ? "?" : "", url->query ? url->query : "", - hostparam ? hostparam : "", conn->tunnelid); - - res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout); - g_free (str); - if (res != GST_RTSP_OK) - goto write_failed; + conn->write_socket = conn->socket1; + + /* create the POST request for the write connection */ + GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri), + no_message); + msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; + + if (hostparam != NULL) + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, + conn->tunnelid); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, + "application/x-rtsp-tunnelled"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES, + "Sun, 9 Jan 1972 00:00:00 GMT"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767"); + + /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP + * request from being base64 encoded */ + conn->tunneled = FALSE; + GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; exit: + gst_rtsp_message_unset (&response); g_free (hostparam); - g_free (url_port_str); + g_free (uri); return res; /* ERRORS */ -write_failed: - { - GST_ERROR ("write failed (%d)", res); - goto exit; - } -eof: - { - res = GST_RTSP_EEOF; - goto exit; - } -read_error: - { - goto exit; - } -timeout: +no_message: { - res = GST_RTSP_ETIMEOUT; + GST_ERROR ("failed to create request (%d)", res); goto exit; } -select_error: +write_failed: { - res = GST_RTSP_ESYS; + GST_ERROR ("write failed (%d)", res); + gst_rtsp_message_free (msg); + conn->tunneled = TRUE; goto exit; } -stopped: +read_failed: { - res = GST_RTSP_EINTR; + GST_ERROR ("read failed (%d)", res); + conn->manual_http = FALSE; goto exit; } wrong_result: { - GST_ERROR ("got failure response %d %s", code, resultstr); + GST_ERROR ("got failure response %d %s", response.type_data.response.code, + response.type_data.response.reason); res = GST_RTSP_ERROR; goto exit; } @@ -875,12 +681,12 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->socket0 == NULL, GST_RTSP_EINVAL); url = conn->url; if (conn->proxy_host && conn->tunneled) { - if (!(ip = do_resolve (conn->proxy_host))) { + if (!(ip = do_resolve (conn->proxy_host, conn->cancellable))) { GST_ERROR ("could not resolve %s", conn->proxy_host); goto not_resolved; } @@ -888,7 +694,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) g_free (conn->proxy_host); conn->proxy_host = ip; } else { - if (!(ip = do_resolve (url->host))) { + if (!(ip = do_resolve (url->host, conn->cancellable))) { GST_ERROR ("could not resolve %s", url->host); goto not_resolved; } @@ -900,19 +706,19 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) } /* connect to the host/port */ - res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout); + res = do_connect (ip, port, &conn->socket0, timeout, conn->cancellable); if (res != GST_RTSP_OK) goto connect_failed; /* this is our read URL */ - conn->readfd = &conn->fd0; + conn->read_socket = conn->socket0; if (conn->tunneled) { res = setup_tunneling (conn, timeout); if (res != GST_RTSP_OK) goto tunneling_failed; } else { - conn->writefd = &conn->fd0; + conn->write_socket = conn->socket0; } return GST_RTSP_OK; @@ -970,12 +776,7 @@ auth_digest_compute_response (const gchar * method, memcpy (hex_a2, digest_string, strlen (digest_string)); /* compute KD */ -#if GLIB_CHECK_VERSION (2, 18, 0) g_checksum_reset (md5_context); -#else - g_checksum_free (md5_context); - md5_context = g_checksum_new (G_CHECKSUM_MD5); -#endif g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1)); g_checksum_update (md5_context, (const guchar *) ":", 1); g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce)); @@ -998,6 +799,9 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) gchar *user_pass64; gchar *auth_string; + if (conn->username == NULL || conn->passwd == NULL) + break; + user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd); user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass)); auth_string = g_strdup_printf ("Basic %s", user_pass64); @@ -1019,7 +823,8 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) const gchar *method; /* we need to have some params set */ - if (conn->auth_params == NULL) + if (conn->auth_params == NULL || conn->username == NULL || + conn->passwd == NULL) break; /* we need the realm and nonce */ @@ -1061,24 +866,31 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) static void gen_date_string (gchar * date_string, guint len) { - GTimeVal tv; + static const char wkdays[7][4] = + { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" }; + static const char months[12][4] = + { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", + "Nov", "Dec" + }; + struct tm tm; time_t t; -#ifdef HAVE_GMTIME_R - struct tm tm_; -#endif - g_get_current_time (&tv); - t = (time_t) tv.tv_sec; + time (&t); #ifdef HAVE_GMTIME_R - strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_)); + gmtime_r (&t, &tm); #else - strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t)); + tm = *gmtime (&t); #endif + + g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT", + wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900, + tm.tm_hour, tm.tm_min, tm.tm_sec); } static GstRTSPResult -write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) +write_bytes (GSocket * socket, const guint8 * buffer, guint * idx, guint size, + GCancellable * cancellable) { guint left; @@ -1088,16 +900,20 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) left = size - *idx; while (left) { - gint r; + GError *err = NULL; + gssize r; - r = WRITE_SOCKET (fd, &buffer[*idx], left); + r = g_socket_send (socket, (gchar *) & buffer[*idx], left, cancellable, + &err); if (G_UNLIKELY (r == 0)) { return GST_RTSP_EINTR; } else if (G_UNLIKELY (r < 0)) { - if (ERRNO_IS_EAGAIN) + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + } + g_clear_error (&err); + return GST_RTSP_ESYS; } else { left -= r; *idx += r; @@ -1107,7 +923,8 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size) } static gint -fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size) +fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, + GError ** err) { gint out = 0; @@ -1126,9 +943,10 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size) } if (G_LIKELY (size > (guint) out)) { - gint r; + gssize r; - r = READ_SOCKET (conn->readfd->fd, &buffer[out], size - out); + r = g_socket_receive (conn->read_socket, (gchar *) & buffer[out], + size - out, conn->cancellable, err); if (r <= 0) { if (out == 0) out = r; @@ -1140,7 +958,8 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size) } static gint -fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size) +fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, + GError ** err) { DecodeCtx *ctx = conn->ctxp; gint out = 0; @@ -1162,7 +981,7 @@ fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size) break; /* try to read more bytes */ - r = fill_raw_bytes (conn, in, sizeof (in)); + r = fill_raw_bytes (conn, in, sizeof (in), err); if (r <= 0) { if (out == 0) out = r; @@ -1175,7 +994,7 @@ fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size) &ctx->save); } } else { - out = fill_raw_bytes (conn, buffer, size); + out = fill_raw_bytes (conn, buffer, size, err); } return out; @@ -1185,6 +1004,7 @@ static GstRTSPResult read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) { guint left; + GError *err = NULL; if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; @@ -1194,14 +1014,16 @@ read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) while (left) { gint r; - r = fill_bytes (conn, &buffer[*idx], left); + r = fill_bytes (conn, &buffer[*idx], left, &err); if (G_UNLIKELY (r == 0)) { return GST_RTSP_EEOF; } else if (G_UNLIKELY (r < 0)) { - if (ERRNO_IS_EAGAIN) + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + } + g_clear_error (&err); + return GST_RTSP_ESYS; } else { left -= r; *idx += r; @@ -1210,30 +1032,130 @@ read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) return GST_RTSP_OK; } +/* The code below tries to handle clients using \r, \n or \r\n to indicate the + * end of a line. It even does its best to handle clients which mix them (even + * though this is a really stupid idea (tm).) It also handles Line White Space + * (LWS), where a line end followed by whitespace is considered LWS. This is + * the method used in RTSP (and HTTP) to break long lines. + */ static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) { + GError *err = NULL; + while (TRUE) { guint8 c; gint r; - r = fill_bytes (conn, &c, 1); - if (G_UNLIKELY (r == 0)) { - return GST_RTSP_EEOF; - } else if (G_UNLIKELY (r < 0)) { - if (ERRNO_IS_EAGAIN) - return GST_RTSP_EINTR; - if (!ERRNO_IS_EINTR) - return GST_RTSP_ESYS; + if (conn->read_ahead == READ_AHEAD_EOH) { + /* the last call to read_line() already determined that we have reached + * the end of the headers, so convey that information now */ + conn->read_ahead = 0; + break; + } else if (conn->read_ahead == READ_AHEAD_CRLF) { + /* the last call to read_line() left off after having read \r\n */ + c = '\n'; + } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { + /* the last call to read_line() left off after having read \r\n\r */ + c = '\r'; + } else if (conn->read_ahead != 0) { + /* the last call to read_line() left us with a character to start with */ + c = (guint8) conn->read_ahead; + conn->read_ahead = 0; } else { - if (c == '\n') /* end on \n */ - break; - if (c == '\r') /* ignore \r */ - continue; + /* read the next character */ + r = fill_bytes (conn, &c, 1, &err); + if (G_UNLIKELY (r == 0)) { + return GST_RTSP_EEOF; + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } + + g_clear_error (&err); + return GST_RTSP_ESYS; + } + } - if (G_LIKELY (*idx < size - 1)) - buffer[(*idx)++] = c; + /* special treatment of line endings */ + if (c == '\r' || c == '\n') { + guint8 read_ahead; + + retry: + /* need to read ahead one more character to know what to do... */ + r = fill_bytes (conn, &read_ahead, 1, &err); + if (G_UNLIKELY (r == 0)) { + return GST_RTSP_EEOF; + } else if (G_UNLIKELY (r < 0)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + /* remember the original character we read and try again next time */ + if (conn->read_ahead == 0) + conn->read_ahead = c; + return GST_RTSP_EINTR; + g_clear_error (&err); + return GST_RTSP_EINTR; + } + + g_clear_error (&err); + return GST_RTSP_ESYS; + } + + if (read_ahead == ' ' || read_ahead == '\t') { + if (conn->read_ahead == READ_AHEAD_CRLFCR) { + /* got \r\n\r followed by whitespace, treat it as a normal line + * followed by one starting with LWS */ + conn->read_ahead = read_ahead; + break; + } else { + /* got LWS, change the line ending to a space and continue */ + c = ' '; + conn->read_ahead = read_ahead; + } + } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { + if (read_ahead == '\r' || read_ahead == '\n') { + /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else { + /* got \r\n\r followed by something else, this is not really + * supported since we have probably just eaten the first character + * of the body or the next message, so just ignore the second \r + * and live with it... */ + conn->read_ahead = read_ahead; + break; + } + } else if (conn->read_ahead == READ_AHEAD_CRLF) { + if (read_ahead == '\r') { + /* got \r\n\r so far, need one more character... */ + conn->read_ahead = READ_AHEAD_CRLFCR; + goto retry; + } else if (read_ahead == '\n') { + /* got \r\n\n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else { + /* found the end of a line, keep read_ahead for the next line */ + conn->read_ahead = read_ahead; + break; + } + } else if (c == read_ahead) { + /* got double \r or \n, treat it as the end of the headers */ + conn->read_ahead = READ_AHEAD_EOH; + break; + } else if (c == '\r' && read_ahead == '\n') { + /* got \r\n so far, still need more to know what to do... */ + conn->read_ahead = READ_AHEAD_CRLF; + goto retry; + } else { + /* found the end of a line, keep read_ahead for the next line */ + conn->read_ahead = read_ahead; + break; + } } + + if (G_LIKELY (*idx < size - 1)) + buffer[(*idx)++] = c; } buffer[*idx] = '\0'; @@ -1260,20 +1182,13 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) { guint offset; - gint retval; GstClockTime to; GstRTSPResult res; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); - - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE); - /* clear all previous poll results */ - gst_poll_fd_ignored (conn->fdset, conn->writefd); - gst_poll_fd_ignored (conn->fdset, conn->readfd); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; @@ -1281,26 +1196,31 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, while (TRUE) { /* try to write */ - res = write_bytes (conn->writefd->fd, data, &offset, size); + res = + write_bytes (conn->write_socket, data, &offset, size, + conn->cancellable); if (G_LIKELY (res == GST_RTSP_OK)) break; if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto write_error; /* not all is written, wait until we can write more */ - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - if (G_UNLIKELY (retval == 0)) - goto timeout; - - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) + g_socket_set_timeout (conn->write_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable, &err)) { + g_socket_set_timeout (conn->write_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto timeout; + } + g_clear_error (&err); + goto select_error; } + g_socket_set_timeout (conn->write_socket, 0); } return GST_RTSP_OK; @@ -1501,21 +1421,6 @@ parse_string (gchar * dest, gint size, gchar ** src) return res; } -static void -parse_key (gchar * dest, gint size, gchar ** src) -{ - gint idx; - - idx = 0; - while (**src != ':' && **src != '\0') { - if (idx < size - 1) - dest[idx++] = **src; - (*src)++; - } - if (size > 0) - dest[idx] = '\0'; -} - static GstRTSPResult parse_protocol_version (gchar * protocol, GstRTSPMsgType * type, GstRTSPVersion * version) @@ -1653,56 +1558,115 @@ parse_request_line (guint8 * buffer, GstRTSPMessage * msg) return res; } +/* parsing lines means reading a Key: Value pair */ static GstRTSPResult -parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value) +parse_line (guint8 * buffer, GstRTSPMessage * msg) { - gchar *bptr; + GstRTSPHeaderField field; + gchar *line = (gchar *) buffer; + gchar *value; - bptr = (gchar *) buffer; + if ((value = strchr (line, ':')) == NULL || value == line) + goto parse_error; - /* read key */ - parse_key (key, keysize, &bptr); - if (G_UNLIKELY (*bptr != ':')) - goto no_column; + /* trim space before the colon */ + if (value[-1] == ' ') + value[-1] = '\0'; - bptr++; - while (g_ascii_isspace (*bptr)) - bptr++; + /* replace the colon with a NUL */ + *value++ = '\0'; - *value = bptr; + /* find the header */ + field = gst_rtsp_find_header_field (line); + if (field == GST_RTSP_HDR_INVALID) + goto done; - return GST_RTSP_OK; + /* split up the value in multiple key:value pairs if it contains comma(s) */ + while (*value != '\0') { + gchar *next_value; + gchar *comma = NULL; + gboolean quoted = FALSE; + guint comment = 0; + + /* trim leading space */ + if (*value == ' ') + value++; + + /* for headers which may not appear multiple times, and thus may not + * contain multiple values on the same line, we can short-circuit the loop + * below and the entire value results in just one key:value pair*/ + if (!gst_rtsp_header_allow_multiple (field)) + next_value = value + strlen (value); + else + next_value = value; + + /* find the next value, taking special care of quotes and comments */ + while (*next_value != '\0') { + if ((quoted || comment != 0) && *next_value == '\\' && + next_value[1] != '\0') + next_value++; + else if (comment == 0 && *next_value == '"') + quoted = !quoted; + else if (!quoted && *next_value == '(') + comment++; + else if (comment != 0 && *next_value == ')') + comment--; + else if (!quoted && comment == 0) { + /* To quote RFC 2068: "User agents MUST take special care in parsing + * the WWW-Authenticate field value if it contains more than one + * challenge, or if more than one WWW-Authenticate header field is + * provided, since the contents of a challenge may itself contain a + * comma-separated list of authentication parameters." + * + * What this means is that we cannot just look for an unquoted comma + * when looking for multiple values in Proxy-Authenticate and + * WWW-Authenticate headers. Instead we need to look for the sequence + * "comma [space] token space token" before we can split after the + * comma... + */ + if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE || + field == GST_RTSP_HDR_WWW_AUTHENTICATE) { + if (*next_value == ',') { + if (next_value[1] == ' ') { + /* skip any space following the comma so we do not mistake it for + * separating between two tokens */ + next_value++; + } + comma = next_value; + } else if (*next_value == ' ' && next_value[1] != ',' && + next_value[1] != '=' && comma != NULL) { + next_value = comma; + comma = NULL; + break; + } + } else if (*next_value == ',') + break; + } - /* ERRORS */ -no_column: - { - return GST_RTSP_EPARSE; - } -} + next_value++; + } -/* parsing lines means reading a Key: Value pair */ -static GstRTSPResult -parse_line (guint8 * buffer, GstRTSPMessage * msg) -{ - GstRTSPResult res; - gchar key[32]; - gchar *value; - GstRTSPHeaderField field; + /* trim space */ + if (value != next_value && next_value[-1] == ' ') + next_value[-1] = '\0'; - res = parse_key_value (buffer, key, sizeof (key), &value); - if (G_UNLIKELY (res != GST_RTSP_OK)) - goto parse_error; + if (*next_value != '\0') + *next_value++ = '\0'; - field = gst_rtsp_find_header_field (key); - if (field != GST_RTSP_HDR_INVALID) - gst_rtsp_message_add_header (msg, field, value); + /* add the key:value pair */ + if (*value != '\0') + gst_rtsp_message_add_header (msg, field, value); + + value = next_value; + } +done: return GST_RTSP_OK; /* ERRORS */ parse_error: { - return res; + return GST_RTSP_EPARSE; } } @@ -1727,7 +1691,7 @@ normalize_line (guint8 * buffer) /* returns: * GST_RTSP_OK when a complete message was read. - * GST_RTSP_EEOF: when the socket is closed + * GST_RTSP_EEOF: when the read socket is closed * GST_RTSP_EINTR: when more data is needed. * GST_RTSP_..: some other error occured. */ @@ -1740,22 +1704,31 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, while (TRUE) { switch (builder->state) { case STATE_START: + { + guint8 c; + builder->offset = 0; res = read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1); if (res != GST_RTSP_OK) goto done; + c = builder->buffer[0]; + /* we have 1 bytes now and we can see if this is a data message or * not */ - if (builder->buffer[0] == '$') { + if (c == '$') { /* data message, prepare for the header */ builder->state = STATE_DATA_HEADER; + } else if (c == '\n' || c == '\r') { + /* skip \n and \r */ + builder->offset = 0; } else { builder->line = 0; builder->state = STATE_READ_LINES; } break; + } case STATE_DATA_HEADER: { res = @@ -1781,7 +1754,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, goto done; /* we have the complete body now, store in the message adjusting the - * length to include the traling '\0' */ + * length to include the trailing '\0' */ gst_rtsp_message_take_body (message, (guint8 *) builder->body_data, builder->body_len + 1); builder->body_data = NULL; @@ -1798,20 +1771,27 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, goto done; /* we have a regular response */ - if (builder->buffer[0] == '\r') { - builder->buffer[0] = '\0'; - } - if (builder->buffer[0] == '\0') { gchar *hdrval; /* empty line, end of message header */ - /* see if there is a Content-Length header */ + /* see if there is a Content-Length header, but ignore it if this + * is a POST request with an x-sessioncookie header */ if (gst_rtsp_message_get_header (message, - GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) { + GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK && + (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST || + message->type_data.request.method != GST_RTSP_POST || + gst_rtsp_message_get_header (message, + GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) { /* there is, prepare to read the body */ builder->body_len = atol (hdrval); - builder->body_data = g_malloc (builder->body_len + 1); + builder->body_data = g_try_malloc (builder->body_len + 1); + /* we can't do much here, we need the length to know how many bytes + * we need to read next and when allocation fails, something is + * probably wrong with the length. */ + if (builder->body_data == NULL) + goto invalid_body_len; + builder->body_data[builder->body_len] = '\0'; builder->offset = 0; builder->state = STATE_DATA_BODY; @@ -1904,6 +1884,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, } done: return res; + + /* ERRORS */ +invalid_body_len: + { + GST_DEBUG ("could not allocate body"); + return GST_RTSP_ERROR; + } } /** @@ -1926,13 +1913,13 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, GTimeVal * timeout) { guint offset; - gint retval; GstClockTime to; GstRTSPResult res; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); if (G_UNLIKELY (size == 0)) return GST_RTSP_OK; @@ -1942,10 +1929,6 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); - while (TRUE) { res = read_bytes (conn, data, &offset, size); if (G_UNLIKELY (res == GST_RTSP_EEOF)) @@ -1955,21 +1938,23 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto read_error; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (G_UNLIKELY (retval == 0)) - goto select_timeout; - - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) + g_socket_set_timeout (conn->read_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, + &err)) { + g_socket_set_timeout (conn->read_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto select_timeout; + } + g_clear_error (&err); + goto select_error; } - gst_poll_set_controllable (conn->fdset, FALSE); + g_socket_set_timeout (conn->read_socket, 0); } return GST_RTSP_OK; @@ -1996,38 +1981,40 @@ read_error: } } -static GString * -gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code) +static GstRTSPMessage * +gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code, + const GstRTSPMessage * request) { - GString *str; - gchar date_string[100]; - const gchar *status; - - gen_date_string (date_string, sizeof (date_string)); + GstRTSPMessage *msg; + GstRTSPResult res; - status = gst_rtsp_status_as_text (code); - if (status == NULL) { + if (gst_rtsp_status_as_text (code) == NULL) code = GST_RTSP_STS_INTERNAL_SERVER_ERROR; - status = "Internal Server Error"; - } - str = g_string_new (""); + GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request), + no_message); + + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER, + "GStreamer RTSP Server"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); - /* */ - g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status); - g_string_append_printf (str, - "Server: GStreamer RTSP Server\r\n" - "Date: %s\r\n" - "Connection: close\r\n" - "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string); if (code == GST_RTSP_STS_OK) { if (conn->ip) - g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip); - g_string_append_printf (str, - "Content-Type: application/x-rtsp-tunnelled\r\n"); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, + conn->ip); + gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE, + "application/x-rtsp-tunnelled"); + } + + return msg; + + /* ERRORS */ +no_message: + { + return NULL; } - g_string_append_printf (str, "\r\n"); - return str; } /** @@ -2050,50 +2037,50 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, { GstRTSPResult res; GstRTSPBuilder builder; - gint retval; GstClockTime to; + GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - gst_poll_set_controllable (conn->fdset, TRUE); - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE); - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); - memset (&builder, 0, sizeof (GstRTSPBuilder)); while (TRUE) { res = build_next (&builder, message, conn); if (G_UNLIKELY (res == GST_RTSP_EEOF)) goto eof; else if (G_LIKELY (res == GST_RTSP_OK)) { - if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { - if (conn->tstate == TUNNEL_STATE_NONE && - message->type_data.request.method == GST_RTSP_GET) { - GString *str; - - conn->tstate = TUNNEL_STATE_GET; - - /* tunnel GET request, we can reply now */ - str = gen_tunnel_reply (conn, GST_RTSP_STS_OK); - res = - gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len, - timeout); - g_string_free (str, TRUE); - res = GST_RTSP_ETGET; - goto cleanup; - } else if (conn->tstate == TUNNEL_STATE_NONE && - message->type_data.request.method == GST_RTSP_POST) { - conn->tstate = TUNNEL_STATE_POST; - - /* tunnel POST request, the caller now has to link the two - * connections. */ - res = GST_RTSP_ETPOST; - goto cleanup; - } else { + if (!conn->manual_http) { + if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; + + conn->tstate = TUNNEL_STATE_GET; + + /* tunnel GET request, we can reply now */ + response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message); + res = gst_rtsp_connection_send (conn, response, timeout); + gst_rtsp_message_free (response); + if (res == GST_RTSP_OK) + res = GST_RTSP_ETGET; + goto cleanup; + } else if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_POST) { + conn->tstate = TUNNEL_STATE_POST; + + /* tunnel POST request, the caller now has to link the two + * connections. */ + res = GST_RTSP_ETPOST; + goto cleanup; + } else { + res = GST_RTSP_EPARSE; + goto cleanup; + } + } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { res = GST_RTSP_EPARSE; goto cleanup; } @@ -2103,21 +2090,23 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, } else if (G_UNLIKELY (res != GST_RTSP_EINTR)) goto read_error; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - - /* check for timeout */ - if (G_UNLIKELY (retval == 0)) - goto select_timeout; - - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) + g_socket_set_timeout (conn->read_socket, + (to + GST_SECOND - 1) / GST_SECOND); + if (!g_socket_condition_wait (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, + &err)) { + g_socket_set_timeout (conn->read_socket, 0); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { + g_clear_error (&err); goto stopped; - else - goto select_error; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + goto select_timeout; + } + g_clear_error (&err); + goto select_error; } - gst_poll_set_controllable (conn->fdset, FALSE); + g_socket_set_timeout (conn->read_socket, 0); } /* we have a message here */ @@ -2169,17 +2158,28 @@ gst_rtsp_connection_close (GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); + /* last unref closes the connection we don't want to explicitly close here + * because these sockets might have been provided at construction */ + if (conn->socket0) { + g_object_unref (conn->socket0); + conn->socket0 = NULL; + } + if (conn->socket1) { + g_object_unref (conn->socket1); + conn->socket1 = NULL; + } + g_free (conn->ip); conn->ip = NULL; + conn->read_ahead = 0; + g_free (conn->initial_buffer); conn->initial_buffer = NULL; conn->initial_buffer_offset = 0; - REMOVE_POLLFD (conn->fdset, &conn->fd0); - REMOVE_POLLFD (conn->fdset, &conn->fd1); - conn->writefd = NULL; - conn->readfd = NULL; + conn->write_socket = NULL; + conn->read_socket = NULL; conn->tunneled = FALSE; conn->tstate = TUNNEL_STATE_NONE; conn->ctxp = NULL; @@ -2211,14 +2211,14 @@ gst_rtsp_connection_free (GstRTSPConnection * conn) g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); res = gst_rtsp_connection_close (conn); - gst_poll_free (conn->fdset); + + if (conn->cancellable) + g_object_unref (conn->cancellable); + g_timer_destroy (conn->timer); gst_rtsp_url_free (conn->url); g_free (conn->proxy_host); g_free (conn); -#ifdef G_OS_WIN32 - WSACleanup (); -#endif return res; } @@ -2248,64 +2248,66 @@ gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events, GstRTSPEvent * revents, GTimeVal * timeout) { GstClockTime to; - gint retval; + GMainContext *ctx; + GSource *rs, *ws, *ts; + GIOCondition condition; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (events != 0, GST_RTSP_EINVAL); g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); - - gst_poll_set_controllable (conn->fdset, TRUE); - - /* add fd to writer set when asked to */ - gst_poll_fd_ctl_write (conn->fdset, conn->writefd, - events & GST_RTSP_EV_WRITE); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); - /* add fd to reader set when asked to */ - gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ); + ctx = g_main_context_new (); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - do { - retval = gst_poll_wait (conn->fdset, to); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + if (timeout) { + ts = g_timeout_source_new (to / GST_MSECOND); + g_source_set_dummy_callback (ts); + g_source_attach (ts, ctx); + g_source_unref (ts); + } - if (G_UNLIKELY (retval == 0)) - goto select_timeout; + rs = g_socket_create_source (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable); + g_source_set_dummy_callback (rs); + g_source_attach (rs, ctx); + g_source_unref (rs); - if (G_UNLIKELY (retval == -1)) { - if (errno == EBUSY) - goto stopped; - else - goto select_error; - } + ws = g_socket_create_source (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable); + g_source_set_dummy_callback (ws); + g_source_attach (ws, ctx); + g_source_unref (ws); + + /* Returns after handling all pending events */ + g_main_context_iteration (ctx, TRUE); + + g_main_context_unref (ctx); + + condition = + g_socket_condition_check (conn->read_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); + condition |= + g_socket_condition_check (conn->write_socket, + G_IO_OUT | G_IO_ERR | G_IO_HUP); *revents = 0; if (events & GST_RTSP_EV_READ) { - if (gst_poll_fd_can_read (conn->fdset, conn->readfd)) + if ((condition & G_IO_IN) || (condition & G_IO_PRI)) *revents |= GST_RTSP_EV_READ; } if (events & GST_RTSP_EV_WRITE) { - if (gst_poll_fd_can_write (conn->fdset, conn->writefd)) + if ((condition & G_IO_OUT)) *revents |= GST_RTSP_EV_WRITE; } - return GST_RTSP_OK; - /* ERRORS */ -select_timeout: - { + if (*revents == 0) return GST_RTSP_ETIMEOUT; - } -select_error: - { - return GST_RTSP_ESYS; - } -stopped: - { - return GST_RTSP_EINTR; - } + + return GST_RTSP_OK; } /** @@ -2314,7 +2316,7 @@ stopped: * @timeout: a timeout * * Calculate the next timeout for @conn, storing the result in @timeout. - * + * * Returns: #GST_RTSP_OK. */ GstRTSPResult @@ -2323,16 +2325,34 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) gdouble elapsed; glong sec; gulong usec; + gint ctimeout; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL); + ctimeout = conn->timeout; + if (ctimeout >= 20) { + /* Because we should act before the timeout we timeout 5 + * seconds in advance. */ + ctimeout -= 5; + } else if (ctimeout >= 5) { + /* else timeout 20% earlier */ + ctimeout -= ctimeout / 5; + } else if (ctimeout >= 1) { + /* else timeout 1 second earlier */ + ctimeout -= 1; + } + elapsed = g_timer_elapsed (conn->timer, &usec); - if (elapsed >= conn->timeout) { + if (elapsed >= ctimeout) { sec = 0; usec = 0; } else { - sec = conn->timeout - elapsed; + sec = ctimeout - elapsed; + if (usec <= G_USEC_PER_SEC) + usec = G_USEC_PER_SEC - usec; + else + usec = 0; } timeout->tv_sec = sec; @@ -2346,7 +2366,7 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) * @conn: a #GstRTSPConnection * * Reset the timeout of @conn. - * + * * Returns: #GST_RTSP_OK. */ GstRTSPResult @@ -2375,7 +2395,10 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - gst_poll_set_flushing (conn->fdset, flush); + if (flush) + g_cancellable_cancel (conn->cancellable); + else + g_cancellable_reset (conn->cancellable); return GST_RTSP_OK; } @@ -2532,16 +2555,21 @@ gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn) } static GstRTSPResult -set_qos_dscp (gint fd, guint qos_dscp) +set_qos_dscp (GSocket * socket, guint qos_dscp) { +#ifndef IP_TOS + return GST_RTSP_OK; +#else + gint fd; union gst_sockaddr sa; socklen_t slen = sizeof (sa); gint af; gint tos; - if (fd == -1) + if (!socket) return GST_RTSP_OK; + fd = g_socket_get_fd (socket); if (getsockname (fd, &sa.sa, &slen) < 0) goto no_getsockname; @@ -2558,12 +2586,12 @@ set_qos_dscp (gint fd, guint qos_dscp) switch (af) { case AF_INET: - if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) + if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) goto no_setsockopt; break; case AF_INET6: #ifdef IPV6_TCLASS - if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) + if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) goto no_setsockopt; break; #endif @@ -2579,11 +2607,11 @@ no_setsockopt: { return GST_RTSP_ESYS; } - wrong_family: { return GST_RTSP_ERROR; } +#endif } /** @@ -2603,12 +2631,12 @@ gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp) GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); + g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); - res = set_qos_dscp (conn->fd0.fd, qos_dscp); + res = set_qos_dscp (conn->socket0, qos_dscp); if (res == GST_RTSP_OK) - res = set_qos_dscp (conn->fd1.fd, qos_dscp); + res = set_qos_dscp (conn->socket1, qos_dscp); return res; } @@ -2676,40 +2704,58 @@ gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip) * * Get the file descriptor for reading. * - * Returns: the file descriptor used for reading or -1 on error. The file + * Returns: the file descriptor used for reading or %NULL on error. The file * descriptor remains valid until the connection is closed. * * Since: 0.10.23 */ -gint -gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn) +GSocket * +gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn) { - g_return_val_if_fail (conn != NULL, -1); - g_return_val_if_fail (conn->readfd != NULL, -1); + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (conn->read_socket != NULL, NULL); - return conn->readfd->fd; + return conn->read_socket; } /** - * gst_rtsp_connection_get_writefd: + * gst_rtsp_connection_get_write_socket: * @conn: a #GstRTSPConnection * * Get the file descriptor for writing. * - * Returns: the file descriptor used for writing or -1 on error. The file + * Returns: the file descriptor used for writing or NULL on error. The file * descriptor remains valid until the connection is closed. * * Since: 0.10.23 */ -gint -gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn) +GSocket * +gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn) { - g_return_val_if_fail (conn != NULL, -1); - g_return_val_if_fail (conn->writefd != NULL, -1); + g_return_val_if_fail (conn != NULL, NULL); + g_return_val_if_fail (conn->write_socket != NULL, NULL); - return conn->writefd->fd; + return conn->write_socket; } +/** + * gst_rtsp_connection_set_http_mode: + * @conn: a #GstRTSPConnection + * @enable: %TRUE to enable manual HTTP mode + * + * By setting the HTTP mode to %TRUE the message parsing will support HTTP + * messages in addition to the RTSP messages. It will also disable the + * automatic handling of setting up an HTTP tunnel. + * + * Since: 0.10.25 + */ +void +gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable) +{ + g_return_if_fail (conn != NULL); + + conn->manual_http = enable; +} /** * gst_rtsp_connection_set_tunneled: @@ -2725,8 +2771,8 @@ void gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled) { g_return_if_fail (conn != NULL); - g_return_if_fail (conn->readfd == NULL); - g_return_if_fail (conn->writefd == NULL); + g_return_if_fail (conn->read_socket == NULL); + g_return_if_fail (conn->write_socket == NULL); conn->tunneled = tunneled; } @@ -2773,7 +2819,7 @@ gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) /** * gst_rtsp_connection_do_tunnel: * @conn: a #GstRTSPConnection - * @conn2: a #GstRTSPConnection + * @conn2: a #GstRTSPConnection or %NULL * * If @conn received the first tunnel connection and @conn2 received * the second tunnel connection, link the two connections together so that @@ -2782,6 +2828,9 @@ gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) * After this call, @conn2 cannot be used anymore and must be freed with * gst_rtsp_connection_free(). * + * If @conn2 is %NULL then only the base64 decoding context will be setup for + * @conn. + * * Returns: return GST_RTSP_OK on success. * * Since: 0.10.23 @@ -2791,26 +2840,31 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, GstRTSPConnection * conn2) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL); - g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL); - g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL); - g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN), - GST_RTSP_EINVAL); - /* both connections have fd0 as the read/write socket. start by taking the - * socket from conn2 and set it as the socket in conn */ - conn->fd1 = conn2->fd0; + if (conn2 != NULL) { + g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL); + g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL); + g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, + TUNNELID_LEN), GST_RTSP_EINVAL); - /* clean up some of the state of conn2 */ - gst_poll_remove_fd (conn2->fdset, &conn2->fd0); - conn2->fd0.fd = -1; - conn2->readfd = conn2->writefd = NULL; + /* both connections have socket0 as the read/write socket. start by taking the + * socket from conn2 and set it as the socket in conn */ + conn->socket1 = conn2->socket0; - /* We make fd0 the write socket and fd1 the read socket. */ - conn->writefd = &conn->fd0; - conn->readfd = &conn->fd1; + /* clean up some of the state of conn2 */ + g_cancellable_cancel (conn2->cancellable); + conn2->socket0 = 0; + g_object_unref (conn2->socket1); + conn2->socket1 = NULL; + conn2->write_socket = conn2->read_socket = NULL; + g_cancellable_reset (conn2->cancellable); - conn->tstate = TUNNEL_STATE_COMPLETE; + /* We make socket0 the write socket and socket1 the read socket. */ + conn->write_socket = conn->socket0; + conn->read_socket = conn->socket1; + + conn->tstate = TUNNEL_STATE_COMPLETE; + } /* we need base64 decoding for the readfd */ conn->ctx.state = 0; @@ -2822,8 +2876,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, return GST_RTSP_OK; } -#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR) -#define WRITE_COND (G_IO_OUT | G_IO_ERR) +#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define READ_COND (G_IO_IN | READ_ERR) +#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define WRITE_COND (G_IO_OUT | WRITE_ERR) typedef struct { @@ -2844,11 +2900,11 @@ struct _GstRTSPWatch GPollFD readfd; GPollFD writefd; - gboolean write_added; /* queued message for transmission */ guint id; - GAsyncQueue *messages; + GMutex *mutex; + GQueue *messages; guint8 *write_data; guint write_off; guint write_size; @@ -2892,23 +2948,41 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_ERROR; + gboolean keep_running = TRUE; /* first read as much as we can */ if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) { do { + if (watch->readfd.revents & READ_ERR) + goto read_error; + res = build_next (&watch->builder, &watch->message, watch->conn); if (res == GST_RTSP_EINTR) break; - else if (G_UNLIKELY (res == GST_RTSP_EEOF)) - goto eof; - else if (G_LIKELY (res == GST_RTSP_OK)) { - if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + /* When we are in tunnelled mode, the read socket can be closed and we + * should be prepared for a new POST method to reopen it */ + if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { + /* remove the read connection for the tunnel */ + /* we accept a new POST request */ + watch->conn->tstate = TUNNEL_STATE_GET; + /* and signal that we lost our tunnel */ + if (watch->funcs.tunnel_lost) + res = watch->funcs.tunnel_lost (watch, watch->user_data); + goto read_done; + } else + goto eof; + } else if (G_LIKELY (res == GST_RTSP_OK)) { + if (!watch->conn->manual_http && + watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { if (watch->conn->tstate == TUNNEL_STATE_NONE && watch->message.type_data.request.method == GST_RTSP_GET) { - GString *str; + GstRTSPMessage *response; GstRTSPStatusCode code; - guint size; watch->conn->tstate = TUNNEL_STATE_GET; @@ -2917,11 +2991,10 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, else code = GST_RTSP_STS_OK; - /* queue the response string */ - str = gen_tunnel_reply (watch->conn, code); - size = str->len; - gst_rtsp_watch_queue_data (watch, (guint8 *) g_string_free (str, - FALSE), size); + /* queue the response */ + response = gen_tunnel_reply (watch->conn, code, &watch->message); + gst_rtsp_watch_send_message (watch, response, NULL); + gst_rtsp_message_free (response); goto read_done; } else if (watch->conn->tstate == TUNNEL_STATE_NONE && watch->message.type_data.request.method == GST_RTSP_POST) { @@ -2932,18 +3005,36 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, if (watch->funcs.tunnel_complete) watch->funcs.tunnel_complete (watch, watch->user_data); goto read_done; - } else { - res = GST_RTSP_ERROR; } } } + if (!watch->conn->manual_http) { + /* if manual HTTP support is not enabled, then restore the message to + * what it would have looked like without the support for parsing HTTP + * messages being present */ + if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + watch->message.type = GST_RTSP_MESSAGE_REQUEST; + watch->message.type_data.request.method = GST_RTSP_INVALID; + if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + watch->message.type = GST_RTSP_MESSAGE_RESPONSE; + if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.response.version = + GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } + } + if (G_LIKELY (res == GST_RTSP_OK)) { if (watch->funcs.message_received) watch->funcs.message_received (watch, &watch->message, watch->user_data); - } else - goto error; + } else { + goto read_error; + } read_done: gst_rtsp_message_unset (&watch->message); @@ -2952,14 +3043,18 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, } if (watch->writefd.revents & WRITE_COND) { + if (watch->writefd.revents & WRITE_ERR) + goto write_error; + + g_mutex_lock (watch->mutex); do { if (watch->write_data == NULL) { GstRTSPRec *rec; /* get a new message from the queue */ - rec = g_async_queue_try_pop (watch->messages); + rec = g_queue_pop_tail (watch->messages); if (rec == NULL) - goto done; + break; watch->write_off = 0; watch->write_data = rec->data; @@ -2969,41 +3064,79 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, g_slice_free (GstRTSPRec, rec); } - res = write_bytes (watch->writefd.fd, watch->write_data, - &watch->write_off, watch->write_size); - if (res == GST_RTSP_EINTR) - break; - if (G_UNLIKELY (res != GST_RTSP_OK)) - goto error; - - if (watch->funcs.message_sent) - watch->funcs.message_sent (watch, watch->write_id, watch->user_data); + res = write_bytes (watch->conn->write_socket, watch->write_data, + &watch->write_off, watch->write_size, watch->conn->cancellable); + g_mutex_unlock (watch->mutex); - done: - if (g_async_queue_length (watch->messages) == 0 && watch->write_added) { - g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->write_added = FALSE; - watch->writefd.revents = 0; + if (res == GST_RTSP_EINTR) + goto write_blocked; + else if (G_LIKELY (res == GST_RTSP_OK)) { + if (watch->funcs.message_sent) + watch->funcs.message_sent (watch, watch->write_id, watch->user_data); + } else { + goto write_error; } + g_mutex_lock (watch->mutex); + g_free (watch->write_data); watch->write_data = NULL; - } while (FALSE); + } while (TRUE); + + watch->writefd.events = WRITE_ERR; + + g_mutex_unlock (watch->mutex); } - return TRUE; +write_blocked: + return keep_running; /* ERRORS */ eof: { if (watch->funcs.closed) watch->funcs.closed (watch, watch->user_data); + + /* always stop when the readfd returns EOF in non-tunneled mode */ return FALSE; } +read_error: + { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + keep_running = (watch->writefd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, + 0, watch->user_data), error); + else + goto error; + } else + goto eof; + } +write_error: + { + watch->writefd.events = 0; + watch->writefd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->writefd); + keep_running = (watch->readfd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, + watch->write_id, watch->user_data), error); + else + goto error; + } else + goto eof; + } error: { if (watch->funcs.error) watch->funcs.error (watch, res, watch->user_data); - return FALSE; + + return keep_running; } } @@ -3024,11 +3157,13 @@ gst_rtsp_source_finalize (GSource * source) build_reset (&watch->builder); gst_rtsp_message_unset (&watch->message); - g_async_queue_unref (watch->messages); + g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); + g_queue_free (watch->messages); watch->messages = NULL; - g_free (watch->write_data); + g_mutex_free (watch->mutex); + if (watch->notify) watch->notify (watch->user_data); } @@ -3070,8 +3205,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, g_return_val_if_fail (conn != NULL, NULL); g_return_val_if_fail (funcs != NULL, NULL); - g_return_val_if_fail (conn->readfd != NULL, NULL); - g_return_val_if_fail (conn->writefd != NULL, NULL); + g_return_val_if_fail (conn->read_socket != NULL, NULL); + g_return_val_if_fail (conn->write_socket != NULL, NULL); result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs, sizeof (GstRTSPWatch)); @@ -3079,7 +3214,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->conn = conn; result->builder.state = STATE_START; - result->messages = g_async_queue_new_full (gst_rtsp_rec_free); + result->mutex = g_mutex_new (); + result->messages = g_queue_new (); result->readfd.fd = -1; result->writefd.fd = -1; @@ -3090,10 +3226,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->user_data = user_data; result->notify = notify; - /* only add the read fd, the write fd is only added when we have data - * to send. */ - g_source_add_poll ((GSource *) result, &result->readfd); - return result; } @@ -3114,16 +3246,18 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch) if (watch->writefd.fd != -1) g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->readfd.fd = watch->conn->readfd->fd; + watch->readfd.fd = g_socket_get_fd (watch->conn->read_socket); watch->readfd.events = READ_COND; watch->readfd.revents = 0; - watch->writefd.fd = watch->conn->writefd->fd; - watch->writefd.events = WRITE_COND; + watch->writefd.fd = g_socket_get_fd (watch->conn->write_socket); + watch->writefd.events = WRITE_ERR; watch->writefd.revents = 0; - watch->write_added = FALSE; - g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->readfd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->writefd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->writefd); } /** @@ -3163,75 +3297,112 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) } /** - * gst_rtsp_watch_queue_data: + * gst_rtsp_watch_write_data: * @watch: a #GstRTSPWatch * @data: the data to queue * @size: the size of @data + * @id: location for a message ID or %NULL * - * Queue @data for transmission in @watch. It will be transmitted when the - * connection of the @watch becomes writable. + * Write @data using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. * * This function will take ownership of @data and g_free() it after use. * - * The return value of this function will be used as the id argument in the - * message_sent callback. - * - * Returns: an id. + * Returns: #GST_RTSP_OK on success. * - * Since: 0.10.24 + * Since: 0.10.25 */ -guint -gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, - guint size) +GstRTSPResult +gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, + guint size, guint * id) { + GstRTSPResult res; GstRTSPRec *rec; + guint off = 0; + GMainContext *context = NULL; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (size != 0, GST_RTSP_EINVAL); - /* make a record with the data and id */ + g_mutex_lock (watch->mutex); + + /* try to send the message synchronously first */ + if (watch->messages->length == 0 && watch->write_data == NULL) { + res = + write_bytes (watch->conn->write_socket, data, &off, size, + watch->conn->cancellable); + if (res != GST_RTSP_EINTR) { + if (id != NULL) + *id = 0; + g_free ((gpointer) data); + goto done; + } + } + + /* make a record with the data and id for sending async */ rec = g_slice_new (GstRTSPRec); - rec->data = (guint8 *) data; - rec->size = size; + if (off == 0) { + rec->data = (guint8 *) data; + rec->size = size; + } else { + rec->data = g_memdup (data + off, size - off); + rec->size = size - off; + g_free ((gpointer) data); + } + do { /* make sure rec->id is never 0 */ rec->id = ++watch->id; } while (G_UNLIKELY (rec->id == 0)); /* add the record to a queue. FIXME we would like to have an upper limit here */ - g_async_queue_push (watch->messages, rec); + g_queue_push_head (watch->messages, rec); - /* FIXME: does the following need to be made thread-safe? (this might be - * called from a streaming thread, like appsink's render function) */ /* make sure the main context will now also check for writability on the * socket */ - if (!watch->write_added) { - g_source_add_poll ((GSource *) watch, &watch->writefd); - watch->write_added = TRUE; + if (watch->writefd.events != WRITE_COND) { + watch->writefd.events = WRITE_COND; + context = ((GSource *) watch)->context; } - return rec->id; + if (id != NULL) + *id = rec->id; + res = GST_RTSP_OK; + +done: + g_mutex_unlock (watch->mutex); + + if (context) + g_main_context_wakeup (context); + + return res; } /** - * gst_rtsp_watch_queue_message: + * gst_rtsp_watch_send_message: * @watch: a #GstRTSPWatch * @message: a #GstRTSPMessage + * @id: location for a message ID or %NULL * - * Queue a @message for transmission in @watch. The contents of this - * message will be serialized and transmitted when the connection of the - * @watch becomes writable. - * - * The return value of this function will be used as the id argument in the - * message_sent callback. + * Send a @message using the connection of the @watch. If it cannot be sent + * immediately, it will be queued for transmission in @watch. The contents of + * @message will then be serialized and transmitted when the connection of the + * @watch becomes writable. In case the @message is queued, the ID returned in + * @id will be non-zero and used as the ID argument in the message_sent + * callback. * - * Returns: an id. + * Returns: #GST_RTSP_OK on success. * - * Since: 0.10.23 + * Since: 0.10.25 */ -guint -gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message) +GstRTSPResult +gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message, + guint * id) { GString *str; guint size; @@ -3242,6 +3413,6 @@ gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message) /* make a record with the message as a string and id */ str = message_to_string (watch->conn, message); size = str->len; - return gst_rtsp_watch_queue_data (watch, - (guint8 *) g_string_free (str, FALSE), size); + return gst_rtsp_watch_write_data (watch, + (guint8 *) g_string_free (str, FALSE), size, id); }