rtsp: Add initial buffer support.
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @short_description: manage RTSP connections
46  * @see_also: gstrtspurl
47  *  
48  * <refsect2>
49  * <para>
50  * This object manages the RTSP connection to the server. It provides function
51  * to receive and send bytes and messages.
52  * </para>
53  * </refsect2>
54  *  
55  * Last reviewed on 2007-07-24 (0.10.14)
56  */
57
58 #ifdef HAVE_CONFIG_H
59 #  include <config.h>
60 #endif
61
62 #include <stdio.h>
63 #include <errno.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <time.h>
67
68 #ifdef HAVE_UNISTD_H
69 #include <unistd.h>
70 #endif
71
72 /* we include this here to get the G_OS_* defines */
73 #include <glib.h>
74 #include <gst/gst.h>
75
76 #ifdef G_OS_WIN32
77 /* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
78  * minwg32 headers check WINVER before allowing the use of these */
79 #ifndef WINVER
80 #define WINVER 0x0501
81 #endif
82 #include <winsock2.h>
83 #include <ws2tcpip.h>
84 #define EINPROGRESS WSAEINPROGRESS
85 #else
86 #include <sys/ioctl.h>
87 #include <netdb.h>
88 #include <sys/socket.h>
89 #include <fcntl.h>
90 #include <netinet/in.h>
91 #endif
92
93 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
94 #include <sys/filio.h>
95 #endif
96
97 #include "gstrtspconnection.h"
98 #include "gstrtspbase64.h"
99
100 union gst_sockaddr
101 {
102   struct sockaddr sa;
103   struct sockaddr_in sa_in;
104   struct sockaddr_in6 sa_in6;
105   struct sockaddr_storage sa_stor;
106 };
107
108 typedef struct
109 {
110   gint state;
111   guint save;
112   guchar out[3];                /* the size must be evenly divisible by 3 */
113   guint cout;
114   guint coutl;
115 } DecodeCtx;
116
117 static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer,
118     guint * idx, guint size);
119 static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
120     guint keysize, gchar ** value);
121 static void parse_string (gchar * dest, gint size, gchar ** src);
122
123 #ifdef G_OS_WIN32
124 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
125 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
126 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
127 #define CLOSE_SOCKET(sock) closesocket (sock)
128 #define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
129 #define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
130 /* According to Microsoft's connect() documentation this one returns
131  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
132 #define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
133 #else
134 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
135 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
136 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
137 #define CLOSE_SOCKET(sock) close (sock)
138 #define ERRNO_IS_EAGAIN (errno == EAGAIN)
139 #define ERRNO_IS_EINTR (errno == EINTR)
140 #define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
141 #endif
142
143 #define ADD_POLLFD(fdset, pfd, fd)        \
144 G_STMT_START {                            \
145   (pfd)->fd = fd;                         \
146   gst_poll_add_fd (fdset, pfd);           \
147 } G_STMT_END
148
149 #define REMOVE_POLLFD(fdset, pfd)          \
150 G_STMT_START {                             \
151   if ((pfd)->fd != -1) {                   \
152     GST_DEBUG ("remove fd %d", (pfd)->fd); \
153     gst_poll_remove_fd (fdset, pfd);       \
154     CLOSE_SOCKET ((pfd)->fd);              \
155     (pfd)->fd = -1;                        \
156   }                                        \
157 } G_STMT_END
158
159 typedef enum
160 {
161   TUNNEL_STATE_NONE,
162   TUNNEL_STATE_GET,
163   TUNNEL_STATE_POST,
164   TUNNEL_STATE_COMPLETE
165 } GstRTSPTunnelState;
166
167 #define TUNNELID_LEN   24
168
169 struct _GstRTSPConnection
170 {
171   /*< private > */
172   /* URL for the connection */
173   GstRTSPUrl *url;
174
175   /* connection state */
176   GstPollFD fd0;
177   GstPollFD fd1;
178
179   GstPollFD *readfd;
180   GstPollFD *writefd;
181
182   gchar tunnelid[TUNNELID_LEN];
183   gboolean tunneled;
184   GstRTSPTunnelState tstate;
185
186   GstPoll *fdset;
187   gchar *ip;
188
189   gchar *initial_buffer;
190   gsize initial_buffer_offset;
191
192   /* Session state */
193   gint cseq;                    /* sequence number */
194   gchar session_id[512];        /* session id */
195   gint timeout;                 /* session timeout in seconds */
196   GTimer *timer;                /* timeout timer */
197
198   /* Authentication */
199   GstRTSPAuthMethod auth_method;
200   gchar *username;
201   gchar *passwd;
202   GHashTable *auth_params;
203
204   DecodeCtx ctx;
205   DecodeCtx *ctxp;
206
207   gchar *proxy_host;
208   guint proxy_port;
209 };
210
211 enum
212 {
213   STATE_START = 0,
214   STATE_DATA_HEADER,
215   STATE_DATA_BODY,
216   STATE_READ_LINES,
217   STATE_END,
218   STATE_LAST
219 };
220
221 /* a structure for constructing RTSPMessages */
222 typedef struct
223 {
224   gint state;
225   guint8 buffer[4096];
226   guint offset;
227
228   guint line;
229   guint8 *body_data;
230   glong body_len;
231 } GstRTSPBuilder;
232
233 static void
234 build_reset (GstRTSPBuilder * builder)
235 {
236   g_free (builder->body_data);
237   memset (builder, 0, sizeof (GstRTSPBuilder));
238 }
239
240 /**
241  * gst_rtsp_connection_create:
242  * @url: a #GstRTSPUrl 
243  * @conn: storage for a #GstRTSPConnection
244  *
245  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
246  * The connection will not yet attempt to connect to @url, use
247  * gst_rtsp_connection_connect().
248  *
249  * A copy of @url will be made.
250  *
251  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
252  */
253 GstRTSPResult
254 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
255 {
256   GstRTSPConnection *newconn;
257 #ifdef G_OS_WIN32
258   WSADATA w;
259   int error;
260 #endif
261
262   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
263
264 #ifdef G_OS_WIN32
265   error = WSAStartup (0x0202, &w);
266
267   if (error)
268     goto startup_error;
269
270   if (w.wVersion != 0x0202)
271     goto version_error;
272 #endif
273
274   newconn = g_new0 (GstRTSPConnection, 1);
275
276   if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
277     goto no_fdset;
278
279   newconn->url = gst_rtsp_url_copy (url);
280   newconn->fd0.fd = -1;
281   newconn->fd1.fd = -1;
282   newconn->timer = g_timer_new ();
283   newconn->timeout = 60;
284   newconn->cseq = 1;
285
286   newconn->auth_method = GST_RTSP_AUTH_NONE;
287   newconn->username = NULL;
288   newconn->passwd = NULL;
289   newconn->auth_params = NULL;
290
291   *conn = newconn;
292
293   return GST_RTSP_OK;
294
295   /* ERRORS */
296 #ifdef G_OS_WIN32
297 startup_error:
298   {
299     g_warning ("Error %d on WSAStartup", error);
300     return GST_RTSP_EWSASTART;
301   }
302 version_error:
303   {
304     g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
305         w.wVersion);
306     WSACleanup ();
307     return GST_RTSP_EWSAVERSION;
308   }
309 #endif
310 no_fdset:
311   {
312     g_free (newconn);
313 #ifdef G_OS_WIN32
314     WSACleanup ();
315 #endif
316     return GST_RTSP_ESYS;
317   }
318 }
319
320 /**
321  * gst_rtsp_connection_accept:
322  * @sock: a socket
323  * @conn: storage for a #GstRTSPConnection
324  *
325  * Accept a new connection on @sock and create a new #GstRTSPConnection for
326  * handling communication on new socket.
327  *
328  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
329  *
330  * Since: 0.10.23
331  */
332 GstRTSPResult
333 gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
334 {
335   int fd;
336   GstRTSPConnection *newconn = NULL;
337   union gst_sockaddr sa;
338   socklen_t slen = sizeof (sa);
339   gchar ip[INET6_ADDRSTRLEN];
340   GstRTSPUrl *url;
341 #ifdef G_OS_WIN32
342   gulong flags = 1;
343 #endif
344
345   memset (&sa, 0, slen);
346
347 #ifndef G_OS_WIN32
348   fd = accept (sock, &sa.sa, &slen);
349 #else
350   fd = accept (sock, &sa.sa, (gint *) & slen);
351 #endif /* G_OS_WIN32 */
352   if (fd == -1)
353     goto accept_failed;
354
355   if (getnameinfo (&sa.sa, slen, ip, sizeof (ip), NULL, 0, NI_NUMERICHOST) != 0)
356     goto getnameinfo_failed;
357   if (sa.sa.sa_family != AF_INET && sa.sa.sa_family != AF_INET6)
358     goto wrong_family;
359
360   /* set to non-blocking mode so that we can cancel the communication */
361 #ifndef G_OS_WIN32
362   fcntl (fd, F_SETFL, O_NONBLOCK);
363 #else
364   ioctlsocket (fd, FIONBIO, &flags);
365 #endif /* G_OS_WIN32 */
366
367   /* create a url for the client address */
368   url = g_new0 (GstRTSPUrl, 1);
369   url->host = g_strdup (ip);
370   if (sa.sa.sa_family == AF_INET)
371     url->port = sa.sa_in.sin_port;
372   else
373     url->port = sa.sa_in6.sin6_port;
374
375   /* now create the connection object */
376   gst_rtsp_connection_create (url, &newconn);
377   gst_rtsp_url_free (url);
378
379   ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);
380
381   /* both read and write initially */
382   newconn->readfd = &newconn->fd0;
383   newconn->writefd = &newconn->fd0;
384
385   *conn = newconn;
386
387   return GST_RTSP_OK;
388
389   /* ERRORS */
390 accept_failed:
391   {
392     return GST_RTSP_ESYS;
393   }
394 getnameinfo_failed:
395 wrong_family:
396   {
397     close (fd);
398     return GST_RTSP_ERROR;
399   }
400 }
401
402 static gchar *
403 do_resolve (const gchar * host)
404 {
405   static gchar ip[INET6_ADDRSTRLEN];
406   struct addrinfo *aires;
407   struct addrinfo *ai;
408   gint aierr;
409
410   aierr = getaddrinfo (host, NULL, NULL, &aires);
411   if (aierr != 0)
412     goto no_addrinfo;
413
414   for (ai = aires; ai; ai = ai->ai_next) {
415     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
416       break;
417     }
418   }
419   if (ai == NULL)
420     goto no_family;
421
422   aierr = getnameinfo (ai->ai_addr, ai->ai_addrlen, ip, sizeof (ip), NULL, 0,
423       NI_NUMERICHOST | NI_NUMERICSERV);
424   if (aierr != 0)
425     goto no_address;
426
427   freeaddrinfo (aires);
428
429   return g_strdup (ip);
430
431   /* ERRORS */
432 no_addrinfo:
433   {
434     GST_ERROR ("no addrinfo found for %s: %s", host, gai_strerror (aierr));
435     return NULL;
436   }
437 no_family:
438   {
439     GST_ERROR ("no family found for %s", host);
440     freeaddrinfo (aires);
441     return NULL;
442   }
443 no_address:
444   {
445     GST_ERROR ("no address found for %s: %s", host, gai_strerror (aierr));
446     freeaddrinfo (aires);
447     return NULL;
448   }
449 }
450
451 static GstRTSPResult
452 do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
453     GstPoll * fdset, GTimeVal * timeout)
454 {
455   gint fd;
456   struct addrinfo hints;
457   struct addrinfo *aires;
458   struct addrinfo *ai;
459   gint aierr;
460   gchar service[NI_MAXSERV];
461   gint ret;
462 #ifdef G_OS_WIN32
463   unsigned long flags = 1;
464 #endif /* G_OS_WIN32 */
465   GstClockTime to;
466   gint retval;
467
468   memset (&hints, 0, sizeof hints);
469   hints.ai_flags = AI_NUMERICHOST;
470   hints.ai_family = AF_UNSPEC;
471   hints.ai_socktype = SOCK_STREAM;
472   g_snprintf (service, sizeof (service) - 1, "%hu", port);
473   service[sizeof (service) - 1] = '\0';
474
475   aierr = getaddrinfo (ip, service, &hints, &aires);
476   if (aierr != 0)
477     goto no_addrinfo;
478
479   for (ai = aires; ai; ai = ai->ai_next) {
480     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
481       break;
482     }
483   }
484   if (ai == NULL)
485     goto no_family;
486
487   fd = socket (ai->ai_family, SOCK_STREAM, 0);
488   if (fd == -1)
489     goto no_socket;
490
491   /* set to non-blocking mode so that we can cancel the connect */
492 #ifndef G_OS_WIN32
493   fcntl (fd, F_SETFL, O_NONBLOCK);
494 #else
495   ioctlsocket (fd, FIONBIO, &flags);
496 #endif /* G_OS_WIN32 */
497
498   /* add the socket to our fdset */
499   ADD_POLLFD (fdset, fdout, fd);
500
501   /* we are going to connect ASYNC now */
502   ret = connect (fd, ai->ai_addr, ai->ai_addrlen);
503   if (ret == 0)
504     goto done;
505   if (!ERRNO_IS_EINPROGRESS)
506     goto sys_error;
507
508   /* wait for connect to complete up to the specified timeout or until we got
509    * interrupted. */
510   gst_poll_fd_ctl_write (fdset, fdout, TRUE);
511
512   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
513
514   do {
515     retval = gst_poll_wait (fdset, to);
516   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
517
518   if (retval == 0)
519     goto timeout;
520   else if (retval == -1)
521     goto sys_error;
522
523   /* we can still have an error connecting on windows */
524   if (gst_poll_fd_has_error (fdset, fdout)) {
525     socklen_t len = sizeof (errno);
526 #ifndef G_OS_WIN32
527     getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
528 #else
529     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
530 #endif
531     goto sys_error;
532   }
533
534   gst_poll_fd_ignored (fdset, fdout);
535
536 done:
537   freeaddrinfo (aires);
538
539   return GST_RTSP_OK;
540
541   /* ERRORS */
542 no_addrinfo:
543   {
544     GST_ERROR ("no addrinfo found for %s: %s", ip, gai_strerror (aierr));
545     return GST_RTSP_ERROR;
546   }
547 no_family:
548   {
549     GST_ERROR ("no family found for %s", ip);
550     freeaddrinfo (aires);
551     return GST_RTSP_ERROR;
552   }
553 no_socket:
554   {
555     GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno));
556     freeaddrinfo (aires);
557     return GST_RTSP_ESYS;
558   }
559 sys_error:
560   {
561     GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
562     REMOVE_POLLFD (fdset, fdout);
563     freeaddrinfo (aires);
564     return GST_RTSP_ESYS;
565   }
566 timeout:
567   {
568     GST_ERROR ("timeout");
569     REMOVE_POLLFD (fdset, fdout);
570     freeaddrinfo (aires);
571     return GST_RTSP_ETIMEOUT;
572   }
573 }
574
575 static GstRTSPResult
576 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
577 {
578   gint i;
579   GstRTSPResult res;
580   gchar *str;
581   guint idx, line;
582   gint retval;
583   GstClockTime to;
584   gchar *ip, *url_port_str;
585   guint16 port, url_port;
586   gchar codestr[4], *resultstr;
587   gint code;
588   GstRTSPUrl *url;
589   gchar *hostparam;
590
591   /* create a random sessionid */
592   for (i = 0; i < TUNNELID_LEN; i++)
593     conn->tunnelid[i] = g_random_int_range ('a', 'z');
594   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
595
596   url = conn->url;
597   /* get the port from the url */
598   gst_rtsp_url_get_port (url, &url_port);
599
600   if (conn->proxy_host) {
601     hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
602     url_port_str = g_strdup_printf (":%d", url_port);
603     ip = conn->proxy_host;
604     port = conn->proxy_port;
605   } else {
606     hostparam = NULL;
607     url_port_str = NULL;
608     ip = conn->ip;
609     port = url_port;
610   }
611
612   /* */
613   str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n"
614       "%s"
615       "x-sessioncookie: %s\r\n"
616       "Accept: application/x-rtsp-tunnelled\r\n"
617       "Pragma: no-cache\r\n"
618       "Cache-Control: no-cache\r\n" "\r\n",
619       conn->proxy_host ? "http://" : "",
620       conn->proxy_host ? url->host : "",
621       conn->proxy_host ? url_port_str : "",
622       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
623       hostparam ? hostparam : "", conn->tunnelid);
624
625   /* we start by writing to this fd */
626   conn->writefd = &conn->fd0;
627
628   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
629   g_free (str);
630   if (res != GST_RTSP_OK)
631     goto write_failed;
632
633   gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE);
634   gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE);
635
636   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
637
638   line = 0;
639   while (TRUE) {
640     guint8 buffer[4096];
641
642     idx = 0;
643     while (TRUE) {
644       res = read_line (conn, buffer, &idx, sizeof (buffer));
645       if (res == GST_RTSP_EEOF)
646         goto eof;
647       if (res == GST_RTSP_OK)
648         break;
649       if (res != GST_RTSP_EINTR)
650         goto read_error;
651
652       do {
653         retval = gst_poll_wait (conn->fdset, to);
654       } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
655
656       /* check for timeout */
657       if (retval == 0)
658         goto timeout;
659
660       if (retval == -1) {
661         if (errno == EBUSY)
662           goto stopped;
663         else
664           goto select_error;
665       }
666     }
667
668     /* check for last line */
669     if (buffer[0] == '\r')
670       buffer[0] = '\0';
671     if (buffer[0] == '\0')
672       break;
673
674     if (line == 0) {
675       /* first line, parse response */
676       gchar versionstr[20];
677       gchar *bptr;
678
679       bptr = (gchar *) buffer;
680
681       parse_string (versionstr, sizeof (versionstr), &bptr);
682       parse_string (codestr, sizeof (codestr), &bptr);
683       code = atoi (codestr);
684
685       while (g_ascii_isspace (*bptr))
686         bptr++;
687
688       resultstr = bptr;
689
690       if (code != GST_RTSP_STS_OK)
691         goto wrong_result;
692     } else {
693       gchar key[32];
694       gchar *value;
695
696       /* other lines, parse key/value */
697       res = parse_key_value (buffer, key, sizeof (key), &value);
698       if (res == GST_RTSP_OK) {
699         /* we got a new ip address */
700         if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) {
701           if (conn->proxy_host) {
702             /* if we use a proxy we need to change the destination url */
703             g_free (url->host);
704             url->host = g_strdup (value);
705             g_free (hostparam);
706             g_free (url_port_str);
707             hostparam =
708                 g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
709             url_port_str = g_strdup_printf (":%d", url_port);
710           } else {
711             /* and resolve the new ip address */
712             if (!(ip = do_resolve (conn->ip)))
713               goto not_resolved;
714             g_free (conn->ip);
715             conn->ip = ip;
716           }
717         }
718       }
719     }
720     line++;
721   }
722
723   /* connect to the host/port */
724   res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout);
725   if (res != GST_RTSP_OK)
726     goto connect_failed;
727
728   /* this is now our writing socket */
729   conn->writefd = &conn->fd1;
730
731   /* */
732   str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n"
733       "%s"
734       "x-sessioncookie: %s\r\n"
735       "Content-Type: application/x-rtsp-tunnelled\r\n"
736       "Pragma: no-cache\r\n"
737       "Cache-Control: no-cache\r\n"
738       "Content-Length: 32767\r\n"
739       "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
740       "\r\n",
741       conn->proxy_host ? "http://" : "",
742       conn->proxy_host ? url->host : "",
743       conn->proxy_host ? url_port_str : "",
744       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
745       hostparam ? hostparam : "", conn->tunnelid);
746
747   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
748   g_free (str);
749   if (res != GST_RTSP_OK)
750     goto write_failed;
751
752 exit:
753   g_free (hostparam);
754   g_free (url_port_str);
755
756   return res;
757
758   /* ERRORS */
759 write_failed:
760   {
761     GST_ERROR ("write failed (%d)", res);
762     goto exit;
763   }
764 eof:
765   {
766     res = GST_RTSP_EEOF;
767     goto exit;
768   }
769 read_error:
770   {
771     goto exit;
772   }
773 timeout:
774   {
775     res = GST_RTSP_ETIMEOUT;
776     goto exit;
777   }
778 select_error:
779   {
780     res = GST_RTSP_ESYS;
781     goto exit;
782   }
783 stopped:
784   {
785     res = GST_RTSP_EINTR;
786     goto exit;
787   }
788 wrong_result:
789   {
790     GST_ERROR ("got failure response %d %s", code, resultstr);
791     res = GST_RTSP_ERROR;
792     goto exit;
793   }
794 not_resolved:
795   {
796     GST_ERROR ("could not resolve %s", conn->ip);
797     res = GST_RTSP_ENET;
798     goto exit;
799   }
800 connect_failed:
801   {
802     GST_ERROR ("failed to connect");
803     goto exit;
804   }
805 }
806
807 /**
808  * gst_rtsp_connection_connect:
809  * @conn: a #GstRTSPConnection 
810  * @timeout: a #GTimeVal timeout
811  *
812  * Attempt to connect to the url of @conn made with
813  * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
814  * forever. If @timeout contains a valid timeout, this function will return
815  * #GST_RTSP_ETIMEOUT after the timeout expired.
816  *
817  * This function can be cancelled with gst_rtsp_connection_flush().
818  *
819  * Returns: #GST_RTSP_OK when a connection could be made.
820  */
821 GstRTSPResult
822 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
823 {
824   GstRTSPResult res;
825   gchar *ip;
826   guint16 port;
827   GstRTSPUrl *url;
828
829   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
830   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
831   g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);
832
833   url = conn->url;
834
835   if (conn->proxy_host && conn->tunneled) {
836     if (!(ip = do_resolve (conn->proxy_host))) {
837       GST_ERROR ("could not resolve %s", conn->proxy_host);
838       goto not_resolved;
839     }
840     port = conn->proxy_port;
841     g_free (conn->proxy_host);
842     conn->proxy_host = ip;
843   } else {
844     if (!(ip = do_resolve (url->host))) {
845       GST_ERROR ("could not resolve %s", url->host);
846       goto not_resolved;
847     }
848     /* get the port from the url */
849     gst_rtsp_url_get_port (url, &port);
850
851     g_free (conn->ip);
852     conn->ip = ip;
853   }
854
855   /* connect to the host/port */
856   res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout);
857   if (res != GST_RTSP_OK)
858     goto connect_failed;
859
860   /* this is our read URL */
861   conn->readfd = &conn->fd0;
862
863   if (conn->tunneled) {
864     res = setup_tunneling (conn, timeout);
865     if (res != GST_RTSP_OK)
866       goto tunneling_failed;
867   } else {
868     conn->writefd = &conn->fd0;
869   }
870
871   return GST_RTSP_OK;
872
873 not_resolved:
874   {
875     return GST_RTSP_ENET;
876   }
877 connect_failed:
878   {
879     GST_ERROR ("failed to connect");
880     return res;
881   }
882 tunneling_failed:
883   {
884     GST_ERROR ("failed to setup tunneling");
885     return res;
886   }
887 }
888
889 static void
890 auth_digest_compute_hex_urp (const gchar * username,
891     const gchar * realm, const gchar * password, gchar hex_urp[33])
892 {
893   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
894   const gchar *digest_string;
895
896   g_checksum_update (md5_context, (const guchar *) username, strlen (username));
897   g_checksum_update (md5_context, (const guchar *) ":", 1);
898   g_checksum_update (md5_context, (const guchar *) realm, strlen (realm));
899   g_checksum_update (md5_context, (const guchar *) ":", 1);
900   g_checksum_update (md5_context, (const guchar *) password, strlen (password));
901   digest_string = g_checksum_get_string (md5_context);
902
903   memset (hex_urp, 0, 33);
904   memcpy (hex_urp, digest_string, strlen (digest_string));
905
906   g_checksum_free (md5_context);
907 }
908
909 static void
910 auth_digest_compute_response (const gchar * method,
911     const gchar * uri, const gchar * hex_a1, const gchar * nonce,
912     gchar response[33])
913 {
914   char hex_a2[33] = { 0, };
915   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
916   const gchar *digest_string;
917
918   /* compute A2 */
919   g_checksum_update (md5_context, (const guchar *) method, strlen (method));
920   g_checksum_update (md5_context, (const guchar *) ":", 1);
921   g_checksum_update (md5_context, (const guchar *) uri, strlen (uri));
922   digest_string = g_checksum_get_string (md5_context);
923   memcpy (hex_a2, digest_string, strlen (digest_string));
924
925   /* compute KD */
926 #if GLIB_CHECK_VERSION (2, 18, 0)
927   g_checksum_reset (md5_context);
928 #else
929   g_checksum_free (md5_context);
930   md5_context = g_checksum_new (G_CHECKSUM_MD5);
931 #endif
932   g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1));
933   g_checksum_update (md5_context, (const guchar *) ":", 1);
934   g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce));
935   g_checksum_update (md5_context, (const guchar *) ":", 1);
936
937   g_checksum_update (md5_context, (const guchar *) hex_a2, 32);
938   digest_string = g_checksum_get_string (md5_context);
939   memset (response, 0, 33);
940   memcpy (response, digest_string, strlen (digest_string));
941
942   g_checksum_free (md5_context);
943 }
944
945 static void
946 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
947 {
948   switch (conn->auth_method) {
949     case GST_RTSP_AUTH_BASIC:{
950       gchar *user_pass;
951       gchar *user_pass64;
952       gchar *auth_string;
953
954       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
955       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
956       auth_string = g_strdup_printf ("Basic %s", user_pass64);
957
958       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
959           auth_string);
960
961       g_free (user_pass);
962       g_free (user_pass64);
963       break;
964     }
965     case GST_RTSP_AUTH_DIGEST:{
966       gchar response[33], hex_urp[33];
967       gchar *auth_string, *auth_string2;
968       gchar *realm;
969       gchar *nonce;
970       gchar *opaque;
971       const gchar *uri;
972       const gchar *method;
973
974       /* we need to have some params set */
975       if (conn->auth_params == NULL)
976         break;
977
978       /* we need the realm and nonce */
979       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
980       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
981       if (realm == NULL || nonce == NULL)
982         break;
983
984       auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
985           hex_urp);
986
987       method = gst_rtsp_method_as_text (message->type_data.request.method);
988       uri = message->type_data.request.uri;
989
990       /* Assume no qop, algorithm=md5, stale=false */
991       /* For algorithm MD5, a1 = urp. */
992       auth_digest_compute_response (method, uri, hex_urp, nonce, response);
993       auth_string = g_strdup_printf ("Digest username=\"%s\", "
994           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
995           conn->username, realm, nonce, uri, response);
996
997       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
998       if (opaque) {
999         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1000             opaque);
1001         g_free (auth_string);
1002         auth_string = auth_string2;
1003       }
1004       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1005           auth_string);
1006       break;
1007     }
1008     default:
1009       /* Nothing to do */
1010       break;
1011   }
1012 }
1013
1014 static void
1015 gen_date_string (gchar * date_string, guint len)
1016 {
1017   GTimeVal tv;
1018   time_t t;
1019 #ifdef HAVE_GMTIME_R
1020   struct tm tm_;
1021 #endif
1022
1023   g_get_current_time (&tv);
1024   t = (time_t) tv.tv_sec;
1025
1026 #ifdef HAVE_GMTIME_R
1027   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
1028 #else
1029   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
1030 #endif
1031 }
1032
1033 static GstRTSPResult
1034 write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
1035 {
1036   guint left;
1037
1038   if (G_UNLIKELY (*idx > size))
1039     return GST_RTSP_ERROR;
1040
1041   left = size - *idx;
1042
1043   while (left) {
1044     gint r;
1045
1046     r = WRITE_SOCKET (fd, &buffer[*idx], left);
1047     if (G_UNLIKELY (r == 0)) {
1048       return GST_RTSP_EINTR;
1049     } else if (G_UNLIKELY (r < 0)) {
1050       if (ERRNO_IS_EAGAIN)
1051         return GST_RTSP_EINTR;
1052       if (!ERRNO_IS_EINTR)
1053         return GST_RTSP_ESYS;
1054     } else {
1055       left -= r;
1056       *idx += r;
1057     }
1058   }
1059   return GST_RTSP_OK;
1060 }
1061
1062 static gint
1063 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
1064 {
1065   gint out = 0;
1066
1067   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1068     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1069
1070     out = MIN (left, size);
1071     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1072
1073     if (left == (gsize) out) {
1074       g_free (conn->initial_buffer);
1075       conn->initial_buffer = NULL;
1076       conn->initial_buffer_offset = 0;
1077     } else
1078       conn->initial_buffer_offset += out;
1079   }
1080
1081   if (G_LIKELY (size > (guint) out)) {
1082     gint r;
1083
1084     r = READ_SOCKET (conn->readfd->fd, &buffer[out], size - out);
1085     if (r <= 0) {
1086       if (out == 0)
1087         out = r;
1088     } else
1089       out += r;
1090   }
1091
1092   return out;
1093 }
1094
1095 static gint
1096 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
1097 {
1098   DecodeCtx *ctx = conn->ctxp;
1099   gint out = 0;
1100
1101   if (ctx) {
1102     while (size > 0) {
1103       guint8 in[sizeof (ctx->out) * 4 / 3];
1104       gint r;
1105
1106       while (size > 0 && ctx->cout < ctx->coutl) {
1107         /* we have some leftover bytes */
1108         *buffer++ = ctx->out[ctx->cout++];
1109         size--;
1110         out++;
1111       }
1112
1113       /* got what we needed? */
1114       if (size == 0)
1115         break;
1116
1117       /* try to read more bytes */
1118       r = fill_raw_bytes (conn, in, sizeof (in));
1119       if (r <= 0) {
1120         if (out == 0)
1121           out = r;
1122         break;
1123       }
1124
1125       ctx->cout = 0;
1126       ctx->coutl =
1127           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1128           &ctx->save);
1129     }
1130   } else {
1131     out = fill_raw_bytes (conn, buffer, size);
1132   }
1133
1134   return out;
1135 }
1136
1137 static GstRTSPResult
1138 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
1139 {
1140   guint left;
1141
1142   if (G_UNLIKELY (*idx > size))
1143     return GST_RTSP_ERROR;
1144
1145   left = size - *idx;
1146
1147   while (left) {
1148     gint r;
1149
1150     r = fill_bytes (conn, &buffer[*idx], left);
1151     if (G_UNLIKELY (r == 0)) {
1152       return GST_RTSP_EEOF;
1153     } else if (G_UNLIKELY (r < 0)) {
1154       if (ERRNO_IS_EAGAIN)
1155         return GST_RTSP_EINTR;
1156       if (!ERRNO_IS_EINTR)
1157         return GST_RTSP_ESYS;
1158     } else {
1159       left -= r;
1160       *idx += r;
1161     }
1162   }
1163   return GST_RTSP_OK;
1164 }
1165
1166 static GstRTSPResult
1167 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
1168 {
1169   while (TRUE) {
1170     guint8 c;
1171     gint r;
1172
1173     r = fill_bytes (conn, &c, 1);
1174     if (G_UNLIKELY (r == 0)) {
1175       return GST_RTSP_EEOF;
1176     } else if (G_UNLIKELY (r < 0)) {
1177       if (ERRNO_IS_EAGAIN)
1178         return GST_RTSP_EINTR;
1179       if (!ERRNO_IS_EINTR)
1180         return GST_RTSP_ESYS;
1181     } else {
1182       if (c == '\n')            /* end on \n */
1183         break;
1184       if (c == '\r')            /* ignore \r */
1185         continue;
1186
1187       if (G_LIKELY (*idx < size - 1))
1188         buffer[(*idx)++] = c;
1189     }
1190   }
1191   buffer[*idx] = '\0';
1192
1193   return GST_RTSP_OK;
1194 }
1195
1196 /**
1197  * gst_rtsp_connection_write:
1198  * @conn: a #GstRTSPConnection
1199  * @data: the data to write
1200  * @size: the size of @data
1201  * @timeout: a timeout value or #NULL
1202  *
1203  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1204  * the specified @timeout. @timeout can be #NULL, in which case this function
1205  * might block forever.
1206  * 
1207  * This function can be cancelled with gst_rtsp_connection_flush().
1208  *
1209  * Returns: #GST_RTSP_OK on success.
1210  */
1211 GstRTSPResult
1212 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1213     guint size, GTimeVal * timeout)
1214 {
1215   guint offset;
1216   gint retval;
1217   GstClockTime to;
1218   GstRTSPResult res;
1219
1220   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1221   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1222   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1223
1224   gst_poll_set_controllable (conn->fdset, TRUE);
1225   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
1226   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
1227   /* clear all previous poll results */
1228   gst_poll_fd_ignored (conn->fdset, conn->writefd);
1229   gst_poll_fd_ignored (conn->fdset, conn->readfd);
1230
1231   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1232
1233   offset = 0;
1234
1235   while (TRUE) {
1236     /* try to write */
1237     res = write_bytes (conn->writefd->fd, data, &offset, size);
1238     if (G_LIKELY (res == GST_RTSP_OK))
1239       break;
1240     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1241       goto write_error;
1242
1243     /* not all is written, wait until we can write more */
1244     do {
1245       retval = gst_poll_wait (conn->fdset, to);
1246     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1247
1248     if (G_UNLIKELY (retval == 0))
1249       goto timeout;
1250
1251     if (G_UNLIKELY (retval == -1)) {
1252       if (errno == EBUSY)
1253         goto stopped;
1254       else
1255         goto select_error;
1256     }
1257   }
1258   return GST_RTSP_OK;
1259
1260   /* ERRORS */
1261 timeout:
1262   {
1263     return GST_RTSP_ETIMEOUT;
1264   }
1265 select_error:
1266   {
1267     return GST_RTSP_ESYS;
1268   }
1269 stopped:
1270   {
1271     return GST_RTSP_EINTR;
1272   }
1273 write_error:
1274   {
1275     return res;
1276   }
1277 }
1278
1279 static GString *
1280 message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
1281 {
1282   GString *str = NULL;
1283
1284   str = g_string_new ("");
1285
1286   switch (message->type) {
1287     case GST_RTSP_MESSAGE_REQUEST:
1288       /* create request string, add CSeq */
1289       g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
1290           "CSeq: %d\r\n",
1291           gst_rtsp_method_as_text (message->type_data.request.method),
1292           message->type_data.request.uri, conn->cseq++);
1293       /* add session id if we have one */
1294       if (conn->session_id[0] != '\0') {
1295         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1296         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1297             conn->session_id);
1298       }
1299       /* add any authentication headers */
1300       add_auth_header (conn, message);
1301       break;
1302     case GST_RTSP_MESSAGE_RESPONSE:
1303       /* create response string */
1304       g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
1305           message->type_data.response.code, message->type_data.response.reason);
1306       break;
1307     case GST_RTSP_MESSAGE_DATA:
1308     {
1309       guint8 data_header[4];
1310
1311       /* prepare data header */
1312       data_header[0] = '$';
1313       data_header[1] = message->type_data.data.channel;
1314       data_header[2] = (message->body_size >> 8) & 0xff;
1315       data_header[3] = message->body_size & 0xff;
1316
1317       /* create string with header and data */
1318       str = g_string_append_len (str, (gchar *) data_header, 4);
1319       str =
1320           g_string_append_len (str, (gchar *) message->body,
1321           message->body_size);
1322       break;
1323     }
1324     default:
1325       g_string_free (str, TRUE);
1326       g_return_val_if_reached (NULL);
1327       break;
1328   }
1329
1330   /* append headers and body */
1331   if (message->type != GST_RTSP_MESSAGE_DATA) {
1332     gchar date_string[100];
1333
1334     gen_date_string (date_string, sizeof (date_string));
1335
1336     /* add date header */
1337     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1338     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1339
1340     /* append headers */
1341     gst_rtsp_message_append_headers (message, str);
1342
1343     /* append Content-Length and body if needed */
1344     if (message->body != NULL && message->body_size > 0) {
1345       gchar *len;
1346
1347       len = g_strdup_printf ("%d", message->body_size);
1348       g_string_append_printf (str, "%s: %s\r\n",
1349           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1350       g_free (len);
1351       /* header ends here */
1352       g_string_append (str, "\r\n");
1353       str =
1354           g_string_append_len (str, (gchar *) message->body,
1355           message->body_size);
1356     } else {
1357       /* just end headers */
1358       g_string_append (str, "\r\n");
1359     }
1360   }
1361
1362   return str;
1363 }
1364
1365 /**
1366  * gst_rtsp_connection_send:
1367  * @conn: a #GstRTSPConnection
1368  * @message: the message to send
1369  * @timeout: a timeout value or #NULL
1370  *
1371  * Attempt to send @message to the connected @conn, blocking up to
1372  * the specified @timeout. @timeout can be #NULL, in which case this function
1373  * might block forever.
1374  * 
1375  * This function can be cancelled with gst_rtsp_connection_flush().
1376  *
1377  * Returns: #GST_RTSP_OK on success.
1378  */
1379 GstRTSPResult
1380 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1381     GTimeVal * timeout)
1382 {
1383   GString *string = NULL;
1384   GstRTSPResult res;
1385   gchar *str;
1386   gsize len;
1387
1388   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1389   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1390
1391   if (G_UNLIKELY (!(string = message_to_string (conn, message))))
1392     goto no_message;
1393
1394   if (conn->tunneled) {
1395     str = g_base64_encode ((const guchar *) string->str, string->len);
1396     g_string_free (string, TRUE);
1397     len = strlen (str);
1398   } else {
1399     str = string->str;
1400     len = string->len;
1401     g_string_free (string, FALSE);
1402   }
1403
1404   /* write request */
1405   res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
1406
1407   g_free (str);
1408
1409   return res;
1410
1411 no_message:
1412   {
1413     g_warning ("Wrong message");
1414     return GST_RTSP_EINVAL;
1415   }
1416 }
1417
1418 static void
1419 parse_string (gchar * dest, gint size, gchar ** src)
1420 {
1421   gint idx;
1422
1423   idx = 0;
1424   /* skip spaces */
1425   while (g_ascii_isspace (**src))
1426     (*src)++;
1427
1428   while (!g_ascii_isspace (**src) && **src != '\0') {
1429     if (idx < size - 1)
1430       dest[idx++] = **src;
1431     (*src)++;
1432   }
1433   if (size > 0)
1434     dest[idx] = '\0';
1435 }
1436
1437 static void
1438 parse_key (gchar * dest, gint size, gchar ** src)
1439 {
1440   gint idx;
1441
1442   idx = 0;
1443   while (**src != ':' && **src != '\0') {
1444     if (idx < size - 1)
1445       dest[idx++] = **src;
1446     (*src)++;
1447   }
1448   if (size > 0)
1449     dest[idx] = '\0';
1450 }
1451
1452 static GstRTSPResult
1453 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
1454 {
1455   GstRTSPResult res;
1456   gchar versionstr[20];
1457   gchar codestr[4];
1458   gint code;
1459   gchar *bptr;
1460
1461   bptr = (gchar *) buffer;
1462
1463   parse_string (versionstr, sizeof (versionstr), &bptr);
1464   parse_string (codestr, sizeof (codestr), &bptr);
1465   code = atoi (codestr);
1466
1467   while (g_ascii_isspace (*bptr))
1468     bptr++;
1469
1470   if (strcmp (versionstr, "RTSP/1.0") == 0)
1471     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1472         parse_error);
1473   else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1474     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1475         parse_error);
1476     msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
1477   } else
1478     goto parse_error;
1479
1480   return GST_RTSP_OK;
1481
1482 parse_error:
1483   {
1484     return GST_RTSP_EPARSE;
1485   }
1486 }
1487
1488 static GstRTSPResult
1489 parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
1490     GstRTSPMessage * msg)
1491 {
1492   GstRTSPResult res = GST_RTSP_OK;
1493   gchar versionstr[20];
1494   gchar methodstr[20];
1495   gchar urlstr[4096];
1496   gchar *bptr;
1497   GstRTSPMethod method;
1498   GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;
1499
1500   bptr = (gchar *) buffer;
1501
1502   parse_string (methodstr, sizeof (methodstr), &bptr);
1503   method = gst_rtsp_find_method (methodstr);
1504   if (method == GST_RTSP_INVALID) {
1505     /* a tunnel request is allowed when we don't have one yet */
1506     if (conn->tstate != TUNNEL_STATE_NONE)
1507       goto invalid_method;
1508     /* we need GET or POST for a valid tunnel request */
1509     if (!strcmp (methodstr, "GET"))
1510       tstate = TUNNEL_STATE_GET;
1511     else if (!strcmp (methodstr, "POST"))
1512       tstate = TUNNEL_STATE_POST;
1513     else
1514       goto invalid_method;
1515   }
1516
1517   parse_string (urlstr, sizeof (urlstr), &bptr);
1518   if (G_UNLIKELY (*urlstr == '\0'))
1519     goto invalid_url;
1520
1521   parse_string (versionstr, sizeof (versionstr), &bptr);
1522
1523   if (G_UNLIKELY (*bptr != '\0'))
1524     goto invalid_version;
1525
1526   if (strcmp (versionstr, "RTSP/1.0") == 0) {
1527     res = gst_rtsp_message_init_request (msg, method, urlstr);
1528   } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1529     res = gst_rtsp_message_init_request (msg, method, urlstr);
1530     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1531   } else if (strcmp (versionstr, "HTTP/1.0") == 0) {
1532     /* tunnel request, we need a tunnel method */
1533     if (tstate == TUNNEL_STATE_NONE) {
1534       res = GST_RTSP_EPARSE;
1535     } else {
1536       conn->tstate = tstate;
1537     }
1538   } else {
1539     res = GST_RTSP_EPARSE;
1540   }
1541
1542   return res;
1543
1544   /* ERRORS */
1545 invalid_method:
1546   {
1547     GST_ERROR ("invalid method %s", methodstr);
1548     return GST_RTSP_EPARSE;
1549   }
1550 invalid_url:
1551   {
1552     GST_ERROR ("invalid url %s", urlstr);
1553     return GST_RTSP_EPARSE;
1554   }
1555 invalid_version:
1556   {
1557     GST_ERROR ("invalid version");
1558     return GST_RTSP_EPARSE;
1559   }
1560 }
1561
1562 static GstRTSPResult
1563 parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value)
1564 {
1565   gchar *bptr;
1566
1567   bptr = (gchar *) buffer;
1568
1569   /* read key */
1570   parse_key (key, keysize, &bptr);
1571   if (G_UNLIKELY (*bptr != ':'))
1572     goto no_column;
1573
1574   bptr++;
1575   while (g_ascii_isspace (*bptr))
1576     bptr++;
1577
1578   *value = bptr;
1579
1580   return GST_RTSP_OK;
1581
1582   /* ERRORS */
1583 no_column:
1584   {
1585     return GST_RTSP_EPARSE;
1586   }
1587 }
1588
1589 /* parsing lines means reading a Key: Value pair */
1590 static GstRTSPResult
1591 parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
1592 {
1593   GstRTSPResult res;
1594   gchar key[32];
1595   gchar *value;
1596   GstRTSPHeaderField field;
1597
1598   res = parse_key_value (buffer, key, sizeof (key), &value);
1599   if (G_UNLIKELY (res != GST_RTSP_OK))
1600     goto parse_error;
1601
1602   if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
1603     /* save the tunnel session in the connection */
1604     if (!strcmp (key, "x-sessioncookie")) {
1605       strncpy (conn->tunnelid, value, TUNNELID_LEN);
1606       conn->tunnelid[TUNNELID_LEN - 1] = '\0';
1607       conn->tunneled = TRUE;
1608     }
1609   } else {
1610     field = gst_rtsp_find_header_field (key);
1611     if (field != GST_RTSP_HDR_INVALID)
1612       gst_rtsp_message_add_header (msg, field, value);
1613   }
1614
1615   return GST_RTSP_OK;
1616
1617   /* ERRORS */
1618 parse_error:
1619   {
1620     return res;
1621   }
1622 }
1623
1624 /* returns:
1625  *  GST_RTSP_OK when a complete message was read.
1626  *  GST_RTSP_EEOF: when the socket is closed
1627  *  GST_RTSP_EINTR: when more data is needed.
1628  *  GST_RTSP_..: some other error occured.
1629  */
1630 static GstRTSPResult
1631 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
1632     GstRTSPConnection * conn)
1633 {
1634   GstRTSPResult res;
1635
1636   while (TRUE) {
1637     switch (builder->state) {
1638       case STATE_START:
1639         builder->offset = 0;
1640         res =
1641             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
1642         if (res != GST_RTSP_OK)
1643           goto done;
1644
1645         /* we have 1 bytes now and we can see if this is a data message or
1646          * not */
1647         if (builder->buffer[0] == '$') {
1648           /* data message, prepare for the header */
1649           builder->state = STATE_DATA_HEADER;
1650         } else {
1651           builder->line = 0;
1652           builder->state = STATE_READ_LINES;
1653         }
1654         break;
1655       case STATE_DATA_HEADER:
1656       {
1657         res =
1658             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4);
1659         if (res != GST_RTSP_OK)
1660           goto done;
1661
1662         gst_rtsp_message_init_data (message, builder->buffer[1]);
1663
1664         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
1665         builder->body_data = g_malloc (builder->body_len + 1);
1666         builder->body_data[builder->body_len] = '\0';
1667         builder->offset = 0;
1668         builder->state = STATE_DATA_BODY;
1669         break;
1670       }
1671       case STATE_DATA_BODY:
1672       {
1673         res =
1674             read_bytes (conn, builder->body_data, &builder->offset,
1675             builder->body_len);
1676         if (res != GST_RTSP_OK)
1677           goto done;
1678
1679         /* we have the complete body now, store in the message adjusting the
1680          * length to include the traling '\0' */
1681         gst_rtsp_message_take_body (message,
1682             (guint8 *) builder->body_data, builder->body_len + 1);
1683         builder->body_data = NULL;
1684         builder->body_len = 0;
1685
1686         builder->state = STATE_END;
1687         break;
1688       }
1689       case STATE_READ_LINES:
1690       {
1691         res = read_line (conn, builder->buffer, &builder->offset,
1692             sizeof (builder->buffer));
1693         if (res != GST_RTSP_OK)
1694           goto done;
1695
1696         /* we have a regular response */
1697         if (builder->buffer[0] == '\r') {
1698           builder->buffer[0] = '\0';
1699         }
1700
1701         if (builder->buffer[0] == '\0') {
1702           gchar *hdrval;
1703
1704           /* empty line, end of message header */
1705           /* see if there is a Content-Length header */
1706           if (gst_rtsp_message_get_header (message,
1707                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
1708             /* there is, prepare to read the body */
1709             builder->body_len = atol (hdrval);
1710             builder->body_data = g_malloc (builder->body_len + 1);
1711             builder->body_data[builder->body_len] = '\0';
1712             builder->offset = 0;
1713             builder->state = STATE_DATA_BODY;
1714           } else {
1715             builder->state = STATE_END;
1716           }
1717           break;
1718         }
1719
1720         /* we have a line */
1721         if (builder->line == 0) {
1722           /* first line, check for response status */
1723           if (memcmp (builder->buffer, "RTSP", 4) == 0) {
1724             res = parse_response_status (builder->buffer, message);
1725           } else {
1726             res = parse_request_line (conn, builder->buffer, message);
1727           }
1728           /* the first line must parse without errors */
1729           if (res != GST_RTSP_OK)
1730             goto done;
1731         } else {
1732           /* else just parse the line, ignore errors */
1733           parse_line (conn, builder->buffer, message);
1734         }
1735         builder->line++;
1736         builder->offset = 0;
1737         break;
1738       }
1739       case STATE_END:
1740       {
1741         gchar *session_id;
1742
1743         if (conn->tstate == TUNNEL_STATE_GET) {
1744           res = GST_RTSP_ETGET;
1745           goto done;
1746         } else if (conn->tstate == TUNNEL_STATE_POST) {
1747           res = GST_RTSP_ETPOST;
1748           goto done;
1749         }
1750
1751         if (message->type == GST_RTSP_MESSAGE_DATA) {
1752           /* data messages don't have headers */
1753           res = GST_RTSP_OK;
1754           goto done;
1755         }
1756
1757         /* save session id in the connection for further use */
1758         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
1759             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
1760                 &session_id, 0) == GST_RTSP_OK) {
1761           gint maxlen, i;
1762
1763           maxlen = sizeof (conn->session_id) - 1;
1764           /* the sessionid can have attributes marked with ;
1765            * Make sure we strip them */
1766           for (i = 0; session_id[i] != '\0'; i++) {
1767             if (session_id[i] == ';') {
1768               maxlen = i;
1769               /* parse timeout */
1770               do {
1771                 i++;
1772               } while (g_ascii_isspace (session_id[i]));
1773               if (g_str_has_prefix (&session_id[i], "timeout=")) {
1774                 gint to;
1775
1776                 /* if we parsed something valid, configure */
1777                 if ((to = atoi (&session_id[i + 8])) > 0)
1778                   conn->timeout = to;
1779               }
1780               break;
1781             }
1782           }
1783
1784           /* make sure to not overflow */
1785           strncpy (conn->session_id, session_id, maxlen);
1786           conn->session_id[maxlen] = '\0';
1787         }
1788         res = GST_RTSP_OK;
1789         goto done;
1790       }
1791       default:
1792         res = GST_RTSP_ERROR;
1793         break;
1794     }
1795   }
1796 done:
1797   return res;
1798 }
1799
1800 /**
1801  * gst_rtsp_connection_read:
1802  * @conn: a #GstRTSPConnection
1803  * @data: the data to read
1804  * @size: the size of @data
1805  * @timeout: a timeout value or #NULL
1806  *
1807  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
1808  * the specified @timeout. @timeout can be #NULL, in which case this function
1809  * might block forever.
1810  *
1811  * This function can be cancelled with gst_rtsp_connection_flush().
1812  *
1813  * Returns: #GST_RTSP_OK on success.
1814  */
1815 GstRTSPResult
1816 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
1817     GTimeVal * timeout)
1818 {
1819   guint offset;
1820   gint retval;
1821   GstClockTime to;
1822   GstRTSPResult res;
1823
1824   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1825   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
1826   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1827
1828   if (G_UNLIKELY (size == 0))
1829     return GST_RTSP_OK;
1830
1831   offset = 0;
1832
1833   /* configure timeout if any */
1834   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1835
1836   gst_poll_set_controllable (conn->fdset, TRUE);
1837   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1838   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1839
1840   while (TRUE) {
1841     res = read_bytes (conn, data, &offset, size);
1842     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1843       goto eof;
1844     if (G_LIKELY (res == GST_RTSP_OK))
1845       break;
1846     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1847       goto read_error;
1848
1849     do {
1850       retval = gst_poll_wait (conn->fdset, to);
1851     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1852
1853     /* check for timeout */
1854     if (G_UNLIKELY (retval == 0))
1855       goto select_timeout;
1856
1857     if (G_UNLIKELY (retval == -1)) {
1858       if (errno == EBUSY)
1859         goto stopped;
1860       else
1861         goto select_error;
1862     }
1863     gst_poll_set_controllable (conn->fdset, FALSE);
1864   }
1865   return GST_RTSP_OK;
1866
1867   /* ERRORS */
1868 select_error:
1869   {
1870     return GST_RTSP_ESYS;
1871   }
1872 select_timeout:
1873   {
1874     return GST_RTSP_ETIMEOUT;
1875   }
1876 stopped:
1877   {
1878     return GST_RTSP_EINTR;
1879   }
1880 eof:
1881   {
1882     return GST_RTSP_EEOF;
1883   }
1884 read_error:
1885   {
1886     return res;
1887   }
1888 }
1889
1890 static GString *
1891 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
1892 {
1893   GString *str;
1894   gchar date_string[100];
1895   const gchar *status;
1896
1897   gen_date_string (date_string, sizeof (date_string));
1898
1899   status = gst_rtsp_status_as_text (code);
1900   if (status == NULL) {
1901     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
1902     status = "Internal Server Error";
1903   }
1904
1905   str = g_string_new ("");
1906
1907   /* */
1908   g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
1909   g_string_append_printf (str,
1910       "Server: GStreamer RTSP Server\r\n"
1911       "Date: %s\r\n"
1912       "Connection: close\r\n"
1913       "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
1914   if (code == GST_RTSP_STS_OK) {
1915     if (conn->ip)
1916       g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
1917     g_string_append_printf (str,
1918         "Content-Type: application/x-rtsp-tunnelled\r\n");
1919   }
1920   g_string_append_printf (str, "\r\n");
1921   return str;
1922 }
1923
1924 /**
1925  * gst_rtsp_connection_receive:
1926  * @conn: a #GstRTSPConnection
1927  * @message: the message to read
1928  * @timeout: a timeout value or #NULL
1929  *
1930  * Attempt to read into @message from the connected @conn, blocking up to
1931  * the specified @timeout. @timeout can be #NULL, in which case this function
1932  * might block forever.
1933  * 
1934  * This function can be cancelled with gst_rtsp_connection_flush().
1935  *
1936  * Returns: #GST_RTSP_OK on success.
1937  */
1938 GstRTSPResult
1939 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
1940     GTimeVal * timeout)
1941 {
1942   GstRTSPResult res;
1943   GstRTSPBuilder builder;
1944   gint retval;
1945   GstClockTime to;
1946
1947   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1948   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1949   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1950
1951   /* configure timeout if any */
1952   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1953
1954   gst_poll_set_controllable (conn->fdset, TRUE);
1955   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1956   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1957
1958   memset (&builder, 0, sizeof (GstRTSPBuilder));
1959   while (TRUE) {
1960     res = build_next (&builder, message, conn);
1961     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1962       goto eof;
1963     if (G_LIKELY (res == GST_RTSP_OK))
1964       break;
1965     if (res == GST_RTSP_ETGET) {
1966       GString *str;
1967
1968       /* tunnel GET request, we can reply now */
1969       str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
1970       res =
1971           gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
1972           timeout);
1973       g_string_free (str, TRUE);
1974     } else if (res == GST_RTSP_ETPOST) {
1975       /* tunnel POST request, return the value, the caller now has to link the
1976        * two connections. */
1977       break;
1978     } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
1979       goto read_error;
1980
1981     do {
1982       retval = gst_poll_wait (conn->fdset, to);
1983     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1984
1985     /* check for timeout */
1986     if (G_UNLIKELY (retval == 0))
1987       goto select_timeout;
1988
1989     if (G_UNLIKELY (retval == -1)) {
1990       if (errno == EBUSY)
1991         goto stopped;
1992       else
1993         goto select_error;
1994     }
1995     gst_poll_set_controllable (conn->fdset, FALSE);
1996   }
1997
1998   /* we have a message here */
1999   build_reset (&builder);
2000
2001   return GST_RTSP_OK;
2002
2003   /* ERRORS */
2004 select_error:
2005   {
2006     res = GST_RTSP_ESYS;
2007     goto cleanup;
2008   }
2009 select_timeout:
2010   {
2011     res = GST_RTSP_ETIMEOUT;
2012     goto cleanup;
2013   }
2014 stopped:
2015   {
2016     res = GST_RTSP_EINTR;
2017     goto cleanup;
2018   }
2019 eof:
2020   {
2021     res = GST_RTSP_EEOF;
2022     goto cleanup;
2023   }
2024 read_error:
2025 cleanup:
2026   {
2027     build_reset (&builder);
2028     gst_rtsp_message_unset (message);
2029     return res;
2030   }
2031 }
2032
2033 /**
2034  * gst_rtsp_connection_close:
2035  * @conn: a #GstRTSPConnection
2036  *
2037  * Close the connected @conn. After this call, the connection is in the same
2038  * state as when it was first created.
2039  * 
2040  * Returns: #GST_RTSP_OK on success.
2041  */
2042 GstRTSPResult
2043 gst_rtsp_connection_close (GstRTSPConnection * conn)
2044 {
2045   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2046
2047   g_free (conn->ip);
2048   conn->ip = NULL;
2049
2050   g_free (conn->initial_buffer);
2051   conn->initial_buffer = NULL;
2052   conn->initial_buffer_offset = 0;
2053
2054   REMOVE_POLLFD (conn->fdset, &conn->fd0);
2055   REMOVE_POLLFD (conn->fdset, &conn->fd1);
2056   conn->writefd = NULL;
2057   conn->readfd = NULL;
2058   conn->tunneled = FALSE;
2059   conn->tstate = TUNNEL_STATE_NONE;
2060   conn->ctxp = NULL;
2061   g_free (conn->username);
2062   conn->username = NULL;
2063   g_free (conn->passwd);
2064   conn->passwd = NULL;
2065   gst_rtsp_connection_clear_auth_params (conn);
2066   conn->timeout = 60;
2067   conn->cseq = 0;
2068   conn->session_id[0] = '\0';
2069
2070   return GST_RTSP_OK;
2071 }
2072
2073 /**
2074  * gst_rtsp_connection_free:
2075  * @conn: a #GstRTSPConnection
2076  *
2077  * Close and free @conn.
2078  * 
2079  * Returns: #GST_RTSP_OK on success.
2080  */
2081 GstRTSPResult
2082 gst_rtsp_connection_free (GstRTSPConnection * conn)
2083 {
2084   GstRTSPResult res;
2085
2086   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2087
2088   res = gst_rtsp_connection_close (conn);
2089   gst_poll_free (conn->fdset);
2090   g_timer_destroy (conn->timer);
2091   gst_rtsp_url_free (conn->url);
2092   g_free (conn->proxy_host);
2093   g_free (conn);
2094 #ifdef G_OS_WIN32
2095   WSACleanup ();
2096 #endif
2097
2098   return res;
2099 }
2100
2101 /**
2102  * gst_rtsp_connection_poll:
2103  * @conn: a #GstRTSPConnection
2104  * @events: a bitmask of #GstRTSPEvent flags to check
2105  * @revents: location for result flags 
2106  * @timeout: a timeout
2107  *
2108  * Wait up to the specified @timeout for the connection to become available for
2109  * at least one of the operations specified in @events. When the function returns
2110  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2111  * @conn.
2112  *
2113  * @timeout can be #NULL, in which case this function might block forever.
2114  *
2115  * This function can be cancelled with gst_rtsp_connection_flush().
2116  * 
2117  * Returns: #GST_RTSP_OK on success.
2118  *
2119  * Since: 0.10.15
2120  */
2121 GstRTSPResult
2122 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2123     GstRTSPEvent * revents, GTimeVal * timeout)
2124 {
2125   GstClockTime to;
2126   gint retval;
2127
2128   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2129   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2130   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2131   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2132   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2133
2134   gst_poll_set_controllable (conn->fdset, TRUE);
2135
2136   /* add fd to writer set when asked to */
2137   gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
2138       events & GST_RTSP_EV_WRITE);
2139
2140   /* add fd to reader set when asked to */
2141   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);
2142
2143   /* configure timeout if any */
2144   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
2145
2146   do {
2147     retval = gst_poll_wait (conn->fdset, to);
2148   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2149
2150   if (G_UNLIKELY (retval == 0))
2151     goto select_timeout;
2152
2153   if (G_UNLIKELY (retval == -1)) {
2154     if (errno == EBUSY)
2155       goto stopped;
2156     else
2157       goto select_error;
2158   }
2159
2160   *revents = 0;
2161   if (events & GST_RTSP_EV_READ) {
2162     if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
2163       *revents |= GST_RTSP_EV_READ;
2164   }
2165   if (events & GST_RTSP_EV_WRITE) {
2166     if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
2167       *revents |= GST_RTSP_EV_WRITE;
2168   }
2169   return GST_RTSP_OK;
2170
2171   /* ERRORS */
2172 select_timeout:
2173   {
2174     return GST_RTSP_ETIMEOUT;
2175   }
2176 select_error:
2177   {
2178     return GST_RTSP_ESYS;
2179   }
2180 stopped:
2181   {
2182     return GST_RTSP_EINTR;
2183   }
2184 }
2185
2186 /**
2187  * gst_rtsp_connection_next_timeout:
2188  * @conn: a #GstRTSPConnection
2189  * @timeout: a timeout
2190  *
2191  * Calculate the next timeout for @conn, storing the result in @timeout.
2192  * 
2193  * Returns: #GST_RTSP_OK.
2194  */
2195 GstRTSPResult
2196 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2197 {
2198   gdouble elapsed;
2199   glong sec;
2200   gulong usec;
2201
2202   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2203   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2204
2205   elapsed = g_timer_elapsed (conn->timer, &usec);
2206   if (elapsed >= conn->timeout) {
2207     sec = 0;
2208     usec = 0;
2209   } else {
2210     sec = conn->timeout - elapsed;
2211   }
2212
2213   timeout->tv_sec = sec;
2214   timeout->tv_usec = usec;
2215
2216   return GST_RTSP_OK;
2217 }
2218
2219 /**
2220  * gst_rtsp_connection_reset_timeout:
2221  * @conn: a #GstRTSPConnection
2222  *
2223  * Reset the timeout of @conn.
2224  * 
2225  * Returns: #GST_RTSP_OK.
2226  */
2227 GstRTSPResult
2228 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2229 {
2230   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2231
2232   g_timer_start (conn->timer);
2233
2234   return GST_RTSP_OK;
2235 }
2236
2237 /**
2238  * gst_rtsp_connection_flush:
2239  * @conn: a #GstRTSPConnection
2240  * @flush: start or stop the flush
2241  *
2242  * Start or stop the flushing action on @conn. When flushing, all current
2243  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2244  * is set to non-flushing mode again.
2245  * 
2246  * Returns: #GST_RTSP_OK.
2247  */
2248 GstRTSPResult
2249 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2250 {
2251   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2252
2253   gst_poll_set_flushing (conn->fdset, flush);
2254
2255   return GST_RTSP_OK;
2256 }
2257
2258 /**
2259  * gst_rtsp_connection_set_proxy:
2260  * @conn: a #GstRTSPConnection
2261  * @host: the proxy host
2262  * @port: the proxy port
2263  *
2264  * Set the proxy host and port.
2265  * 
2266  * Returns: #GST_RTSP_OK.
2267  *
2268  * Since: 0.10.23
2269  */
2270 GstRTSPResult
2271 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
2272     const gchar * host, guint port)
2273 {
2274   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2275
2276   g_free (conn->proxy_host);
2277   conn->proxy_host = g_strdup (host);
2278   conn->proxy_port = port;
2279
2280   return GST_RTSP_OK;
2281 }
2282
2283 /**
2284  * gst_rtsp_connection_set_auth:
2285  * @conn: a #GstRTSPConnection
2286  * @method: authentication method
2287  * @user: the user
2288  * @pass: the password
2289  *
2290  * Configure @conn for authentication mode @method with @user and @pass as the
2291  * user and password respectively.
2292  * 
2293  * Returns: #GST_RTSP_OK.
2294  */
2295 GstRTSPResult
2296 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
2297     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
2298 {
2299   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2300
2301   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
2302           || g_strrstr (user, ":") != NULL))
2303     return GST_RTSP_EINVAL;
2304
2305   /* Make sure the username and passwd are being set for authentication */
2306   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
2307     return GST_RTSP_EINVAL;
2308
2309   /* ":" chars are not allowed in usernames for basic auth */
2310   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
2311     return GST_RTSP_EINVAL;
2312
2313   g_free (conn->username);
2314   g_free (conn->passwd);
2315
2316   conn->auth_method = method;
2317   conn->username = g_strdup (user);
2318   conn->passwd = g_strdup (pass);
2319
2320   return GST_RTSP_OK;
2321 }
2322
2323 /**
2324  * str_case_hash:
2325  * @key: ASCII string to hash
2326  *
2327  * Hashes @key in a case-insensitive manner.
2328  *
2329  * Returns: the hash code.
2330  **/
2331 static guint
2332 str_case_hash (gconstpointer key)
2333 {
2334   const char *p = key;
2335   guint h = g_ascii_toupper (*p);
2336
2337   if (h)
2338     for (p += 1; *p != '\0'; p++)
2339       h = (h << 5) - h + g_ascii_toupper (*p);
2340
2341   return h;
2342 }
2343
2344 /**
2345  * str_case_equal:
2346  * @v1: an ASCII string
2347  * @v2: another ASCII string
2348  *
2349  * Compares @v1 and @v2 in a case-insensitive manner
2350  *
2351  * Returns: %TRUE if they are equal (modulo case)
2352  **/
2353 static gboolean
2354 str_case_equal (gconstpointer v1, gconstpointer v2)
2355 {
2356   const char *string1 = v1;
2357   const char *string2 = v2;
2358
2359   return g_ascii_strcasecmp (string1, string2) == 0;
2360 }
2361
2362 /**
2363  * gst_rtsp_connection_set_auth_param:
2364  * @conn: a #GstRTSPConnection
2365  * @param: authentication directive
2366  * @value: value
2367  *
2368  * Setup @conn with authentication directives. This is not necesary for
2369  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
2370  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
2371  * in the WWW-Authenticate response header and can include realm, domain,
2372  * nonce, opaque, stale, algorithm, qop as per RFC2617.
2373  * 
2374  * Since: 0.10.20
2375  */
2376 void
2377 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
2378     const gchar * param, const gchar * value)
2379 {
2380   g_return_if_fail (conn != NULL);
2381   g_return_if_fail (param != NULL);
2382
2383   if (conn->auth_params == NULL) {
2384     conn->auth_params =
2385         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
2386   }
2387   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
2388 }
2389
2390 /**
2391  * gst_rtsp_connection_clear_auth_params:
2392  * @conn: a #GstRTSPConnection
2393  *
2394  * Clear the list of authentication directives stored in @conn.
2395  *
2396  * Since: 0.10.20
2397  */
2398 void
2399 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
2400 {
2401   g_return_if_fail (conn != NULL);
2402
2403   if (conn->auth_params != NULL) {
2404     g_hash_table_destroy (conn->auth_params);
2405     conn->auth_params = NULL;
2406   }
2407 }
2408
2409 static GstRTSPResult
2410 set_qos_dscp (gint fd, guint qos_dscp)
2411 {
2412   union gst_sockaddr sa;
2413   socklen_t slen = sizeof (sa);
2414   gint af;
2415   gint tos;
2416
2417   if (fd == -1)
2418     return GST_RTSP_OK;
2419
2420   if (getsockname (fd, &sa.sa, &slen) < 0)
2421     goto no_getsockname;
2422
2423   af = sa.sa.sa_family;
2424
2425   /* if this is an IPv4-mapped address then do IPv4 QoS */
2426   if (af == AF_INET6) {
2427     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
2428       af = AF_INET;
2429   }
2430
2431   /* extract and shift 6 bits of the DSCP */
2432   tos = (qos_dscp & 0x3f) << 2;
2433
2434   switch (af) {
2435     case AF_INET:
2436       if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
2437         goto no_setsockopt;
2438       break;
2439     case AF_INET6:
2440 #ifdef IPV6_TCLASS
2441       if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
2442         goto no_setsockopt;
2443       break;
2444 #endif
2445     default:
2446       goto wrong_family;
2447   }
2448
2449   return GST_RTSP_OK;
2450
2451   /* ERRORS */
2452 no_getsockname:
2453 no_setsockopt:
2454   {
2455     return GST_RTSP_ESYS;
2456   }
2457
2458 wrong_family:
2459   {
2460     return GST_RTSP_ERROR;
2461   }
2462 }
2463
2464 /**
2465  * gst_rtsp_connection_set_qos_dscp:
2466  * @conn: a #GstRTSPConnection
2467  * @qos_dscp: DSCP value
2468  *
2469  * Configure @conn to use the specified DSCP value.
2470  *
2471  * Returns: #GST_RTSP_OK on success.
2472  *
2473  * Since: 0.10.20
2474  */
2475 GstRTSPResult
2476 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
2477 {
2478   GstRTSPResult res;
2479
2480   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2481   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2482   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2483
2484   res = set_qos_dscp (conn->fd0.fd, qos_dscp);
2485   if (res == GST_RTSP_OK)
2486     res = set_qos_dscp (conn->fd1.fd, qos_dscp);
2487
2488   return res;
2489 }
2490
2491
2492 /**
2493  * gst_rtsp_connection_get_url:
2494  * @conn: a #GstRTSPConnection
2495  *
2496  * Retrieve the URL of the other end of @conn.
2497  *
2498  * Returns: The URL. This value remains valid until the
2499  * connection is freed.
2500  *
2501  * Since: 0.10.23
2502  */
2503 GstRTSPUrl *
2504 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
2505 {
2506   g_return_val_if_fail (conn != NULL, NULL);
2507
2508   return conn->url;
2509 }
2510
2511 /**
2512  * gst_rtsp_connection_get_ip:
2513  * @conn: a #GstRTSPConnection
2514  *
2515  * Retrieve the IP address of the other end of @conn.
2516  *
2517  * Returns: The IP address as a string. this value remains valid until the
2518  * connection is closed.
2519  *
2520  * Since: 0.10.20
2521  */
2522 const gchar *
2523 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
2524 {
2525   g_return_val_if_fail (conn != NULL, NULL);
2526
2527   return conn->ip;
2528 }
2529
2530 /**
2531  * gst_rtsp_connection_set_ip:
2532  * @conn: a #GstRTSPConnection
2533  * @ip: an ip address
2534  *
2535  * Set the IP address of the server.
2536  *
2537  * Since: 0.10.23
2538  */
2539 void
2540 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
2541 {
2542   g_return_if_fail (conn != NULL);
2543
2544   g_free (conn->ip);
2545   conn->ip = g_strdup (ip);
2546 }
2547
2548 /**
2549  * gst_rtsp_connection_get_readfd:
2550  * @conn: a #GstRTSPConnection
2551  *
2552  * Get the file descriptor for reading.
2553  *
2554  * Returns: the file descriptor used for reading or -1 on error. The file
2555  * descriptor remains valid until the connection is closed.
2556  *
2557  * Since: 0.10.23
2558  */
2559 gint
2560 gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn)
2561 {
2562   g_return_val_if_fail (conn != NULL, -1);
2563   g_return_val_if_fail (conn->readfd != NULL, -1);
2564
2565   return conn->readfd->fd;
2566 }
2567
2568 /**
2569  * gst_rtsp_connection_get_writefd:
2570  * @conn: a #GstRTSPConnection
2571  *
2572  * Get the file descriptor for writing.
2573  *
2574  * Returns: the file descriptor used for writing or -1 on error. The file
2575  * descriptor remains valid until the connection is closed.
2576  *
2577  * Since: 0.10.23
2578  */
2579 gint
2580 gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn)
2581 {
2582   g_return_val_if_fail (conn != NULL, -1);
2583   g_return_val_if_fail (conn->writefd != NULL, -1);
2584
2585   return conn->writefd->fd;
2586 }
2587
2588
2589 /**
2590  * gst_rtsp_connection_set_tunneled:
2591  * @conn: a #GstRTSPConnection
2592  * @tunneled: the new state
2593  *
2594  * Set the HTTP tunneling state of the connection. This must be configured before
2595  * the @conn is connected.
2596  *
2597  * Since: 0.10.23
2598  */
2599 void
2600 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
2601 {
2602   g_return_if_fail (conn != NULL);
2603   g_return_if_fail (conn->readfd == NULL);
2604   g_return_if_fail (conn->writefd == NULL);
2605
2606   conn->tunneled = tunneled;
2607 }
2608
2609 /**
2610  * gst_rtsp_connection_is_tunneled:
2611  * @conn: a #GstRTSPConnection
2612  *
2613  * Get the tunneling state of the connection. 
2614  *
2615  * Returns: if @conn is using HTTP tunneling.
2616  *
2617  * Since: 0.10.23
2618  */
2619 gboolean
2620 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
2621 {
2622   g_return_val_if_fail (conn != NULL, FALSE);
2623
2624   return conn->tunneled;
2625 }
2626
2627 /**
2628  * gst_rtsp_connection_get_tunnelid:
2629  * @conn: a #GstRTSPConnection
2630  *
2631  * Get the tunnel session id the connection. 
2632  *
2633  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
2634  *
2635  * Since: 0.10.23
2636  */
2637 const gchar *
2638 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
2639 {
2640   g_return_val_if_fail (conn != NULL, NULL);
2641
2642   if (!conn->tunneled)
2643     return NULL;
2644
2645   return conn->tunnelid;
2646 }
2647
2648 /**
2649  * gst_rtsp_connection_do_tunnel:
2650  * @conn: a #GstRTSPConnection
2651  * @conn2: a #GstRTSPConnection
2652  *
2653  * If @conn received the first tunnel connection and @conn2 received
2654  * the second tunnel connection, link the two connections together so that
2655  * @conn manages the tunneled connection.
2656  *
2657  * After this call, @conn2 cannot be used anymore and must be freed with
2658  * gst_rtsp_connection_free().
2659  *
2660  * Returns: return GST_RTSP_OK on success.
2661  *
2662  * Since: 0.10.23
2663  */
2664 GstRTSPResult
2665 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
2666     GstRTSPConnection * conn2)
2667 {
2668   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2669   g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
2670   g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
2671   g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
2672   g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
2673       GST_RTSP_EINVAL);
2674
2675   /* both connections have fd0 as the read/write socket. start by taking the
2676    * socket from conn2 and set it as the socket in conn */
2677   conn->fd1 = conn2->fd0;
2678
2679   /* clean up some of the state of conn2 */
2680   gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
2681   conn2->fd0.fd = -1;
2682   conn2->readfd = conn2->writefd = NULL;
2683
2684   /* We make fd0 the write socket and fd1 the read socket. */
2685   conn->writefd = &conn->fd0;
2686   conn->readfd = &conn->fd1;
2687
2688   conn->tstate = TUNNEL_STATE_COMPLETE;
2689
2690   /* we need base64 decoding for the readfd */
2691   conn->ctx.state = 0;
2692   conn->ctx.save = 0;
2693   conn->ctx.cout = 0;
2694   conn->ctx.coutl = 0;
2695   conn->ctxp = &conn->ctx;
2696
2697   return GST_RTSP_OK;
2698 }
2699
2700 #define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
2701 #define WRITE_COND  (G_IO_OUT | G_IO_ERR)
2702
2703 typedef struct
2704 {
2705   guint8 *data;
2706   guint size;
2707   guint id;
2708 } GstRTSPRec;
2709
2710 /* async functions */
2711 struct _GstRTSPWatch
2712 {
2713   GSource source;
2714
2715   GstRTSPConnection *conn;
2716
2717   GstRTSPBuilder builder;
2718   GstRTSPMessage message;
2719
2720   GPollFD readfd;
2721   GPollFD writefd;
2722   gboolean write_added;
2723
2724   /* queued message for transmission */
2725   guint id;
2726   GAsyncQueue *messages;
2727   guint8 *write_data;
2728   guint write_off;
2729   guint write_size;
2730   guint write_id;
2731
2732   GstRTSPWatchFuncs funcs;
2733
2734   gpointer user_data;
2735   GDestroyNotify notify;
2736 };
2737
2738 static gboolean
2739 gst_rtsp_source_prepare (GSource * source, gint * timeout)
2740 {
2741   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2742
2743   if (watch->conn->initial_buffer != NULL)
2744     return TRUE;
2745
2746   *timeout = (watch->conn->timeout * 1000);
2747
2748   return FALSE;
2749 }
2750
2751 static gboolean
2752 gst_rtsp_source_check (GSource * source)
2753 {
2754   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2755
2756   if (watch->readfd.revents & READ_COND)
2757     return TRUE;
2758
2759   if (watch->writefd.revents & WRITE_COND)
2760     return TRUE;
2761
2762   return FALSE;
2763 }
2764
2765 static gboolean
2766 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
2767     gpointer user_data G_GNUC_UNUSED)
2768 {
2769   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2770   GstRTSPResult res;
2771
2772   /* first read as much as we can */
2773   if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
2774     do {
2775       res = build_next (&watch->builder, &watch->message, watch->conn);
2776       if (res == GST_RTSP_EINTR)
2777         break;
2778       if (G_UNLIKELY (res == GST_RTSP_EEOF))
2779         goto eof;
2780       if (res == GST_RTSP_ETGET) {
2781         GString *str;
2782         GstRTSPStatusCode code;
2783         guint size;
2784
2785         if (watch->funcs.tunnel_start)
2786           code = watch->funcs.tunnel_start (watch, watch->user_data);
2787         else
2788           code = GST_RTSP_STS_OK;
2789
2790         /* queue the response string */
2791         str = gen_tunnel_reply (watch->conn, code);
2792         size = str->len;
2793         gst_rtsp_watch_queue_data (watch, (guint8 *) g_string_free (str, FALSE),
2794             size);
2795       } else if (res == GST_RTSP_ETPOST) {
2796         /* in the callback the connection should be tunneled with the
2797          * GET connection */
2798         if (watch->funcs.tunnel_complete)
2799           watch->funcs.tunnel_complete (watch, watch->user_data);
2800       } else if (G_UNLIKELY (res != GST_RTSP_OK))
2801         goto error;
2802
2803       if (G_LIKELY (res == GST_RTSP_OK)) {
2804         if (watch->funcs.message_received)
2805           watch->funcs.message_received (watch, &watch->message,
2806               watch->user_data);
2807
2808         gst_rtsp_message_unset (&watch->message);
2809       }
2810       build_reset (&watch->builder);
2811     } while (FALSE);
2812   }
2813
2814   if (watch->writefd.revents & WRITE_COND) {
2815     do {
2816       if (watch->write_data == NULL) {
2817         GstRTSPRec *rec;
2818
2819         /* get a new message from the queue */
2820         rec = g_async_queue_try_pop (watch->messages);
2821         if (rec == NULL)
2822           goto done;
2823
2824         watch->write_off = 0;
2825         watch->write_data = rec->data;
2826         watch->write_size = rec->size;
2827         watch->write_id = rec->id;
2828
2829         g_slice_free (GstRTSPRec, rec);
2830       }
2831
2832       res = write_bytes (watch->writefd.fd, watch->write_data,
2833           &watch->write_off, watch->write_size);
2834       if (res == GST_RTSP_EINTR)
2835         break;
2836       if (G_UNLIKELY (res != GST_RTSP_OK))
2837         goto error;
2838
2839       if (watch->funcs.message_sent)
2840         watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
2841
2842     done:
2843       if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
2844         g_source_remove_poll ((GSource *) watch, &watch->writefd);
2845         watch->write_added = FALSE;
2846         watch->writefd.revents = 0;
2847       }
2848       g_free (watch->write_data);
2849       watch->write_data = NULL;
2850     } while (FALSE);
2851   }
2852
2853   return TRUE;
2854
2855   /* ERRORS */
2856 eof:
2857   {
2858     if (watch->funcs.closed)
2859       watch->funcs.closed (watch, watch->user_data);
2860     return FALSE;
2861   }
2862 error:
2863   {
2864     if (watch->funcs.error)
2865       watch->funcs.error (watch, res, watch->user_data);
2866     return FALSE;
2867   }
2868 }
2869
2870 static void
2871 gst_rtsp_rec_free (gpointer data)
2872 {
2873   GstRTSPRec *rec = data;
2874
2875   g_free (rec->data);
2876   g_slice_free (GstRTSPRec, rec);
2877 }
2878
2879 static void
2880 gst_rtsp_source_finalize (GSource * source)
2881 {
2882   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2883
2884   build_reset (&watch->builder);
2885   gst_rtsp_message_unset (&watch->message);
2886
2887   g_async_queue_unref (watch->messages);
2888   watch->messages = NULL;
2889
2890   g_free (watch->write_data);
2891
2892   if (watch->notify)
2893     watch->notify (watch->user_data);
2894 }
2895
2896 static GSourceFuncs gst_rtsp_source_funcs = {
2897   gst_rtsp_source_prepare,
2898   gst_rtsp_source_check,
2899   gst_rtsp_source_dispatch,
2900   gst_rtsp_source_finalize,
2901   NULL,
2902   NULL
2903 };
2904
2905 /**
2906  * gst_rtsp_watch_new:
2907  * @conn: a #GstRTSPConnection
2908  * @funcs: watch functions
2909  * @user_data: user data to pass to @funcs
2910  * @notify: notify when @user_data is not referenced anymore
2911  *
2912  * Create a watch object for @conn. The functions provided in @funcs will be
2913  * called with @user_data when activity happened on the watch.
2914  *
2915  * The new watch is usually created so that it can be attached to a
2916  * maincontext with gst_rtsp_watch_attach(). 
2917  *
2918  * @conn must exist for the entire lifetime of the watch.
2919  *
2920  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
2921  * communication. Free with gst_rtsp_watch_unref () after usage.
2922  *
2923  * Since: 0.10.23
2924  */
2925 GstRTSPWatch *
2926 gst_rtsp_watch_new (GstRTSPConnection * conn,
2927     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
2928 {
2929   GstRTSPWatch *result;
2930
2931   g_return_val_if_fail (conn != NULL, NULL);
2932   g_return_val_if_fail (funcs != NULL, NULL);
2933   g_return_val_if_fail (conn->readfd != NULL, NULL);
2934   g_return_val_if_fail (conn->writefd != NULL, NULL);
2935
2936   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
2937       sizeof (GstRTSPWatch));
2938
2939   result->conn = conn;
2940   result->builder.state = STATE_START;
2941
2942   result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
2943
2944   result->readfd.fd = -1;
2945   result->writefd.fd = -1;
2946
2947   gst_rtsp_watch_reset (result);
2948
2949   result->funcs = *funcs;
2950   result->user_data = user_data;
2951   result->notify = notify;
2952
2953   /* only add the read fd, the write fd is only added when we have data
2954    * to send. */
2955   g_source_add_poll ((GSource *) result, &result->readfd);
2956
2957   return result;
2958 }
2959
2960 /**
2961  * gst_rtsp_watch_reset:
2962  * @watch: a #GstRTSPWatch
2963  *
2964  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
2965  * when the file descriptors of the connection might have changed.
2966  *
2967  * Since: 0.10.23
2968  */
2969 void
2970 gst_rtsp_watch_reset (GstRTSPWatch * watch)
2971 {
2972   if (watch->readfd.fd != -1)
2973     g_source_remove_poll ((GSource *) watch, &watch->readfd);
2974   if (watch->writefd.fd != -1)
2975     g_source_remove_poll ((GSource *) watch, &watch->writefd);
2976
2977   watch->readfd.fd = watch->conn->readfd->fd;
2978   watch->readfd.events = READ_COND;
2979   watch->readfd.revents = 0;
2980
2981   watch->writefd.fd = watch->conn->writefd->fd;
2982   watch->writefd.events = WRITE_COND;
2983   watch->writefd.revents = 0;
2984   watch->write_added = FALSE;
2985
2986   g_source_add_poll ((GSource *) watch, &watch->readfd);
2987 }
2988
2989 /**
2990  * gst_rtsp_watch_attach:
2991  * @watch: a #GstRTSPWatch
2992  * @context: a GMainContext (if NULL, the default context will be used)
2993  *
2994  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
2995  *
2996  * Returns: the ID (greater than 0) for the watch within the GMainContext. 
2997  *
2998  * Since: 0.10.23
2999  */
3000 guint
3001 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
3002 {
3003   g_return_val_if_fail (watch != NULL, 0);
3004
3005   return g_source_attach ((GSource *) watch, context);
3006 }
3007
3008 /**
3009  * gst_rtsp_watch_unref:
3010  * @watch: a #GstRTSPWatch
3011  *
3012  * Decreases the reference count of @watch by one. If the resulting reference
3013  * count is zero the watch and associated memory will be destroyed.
3014  *
3015  * Since: 0.10.23
3016  */
3017 void
3018 gst_rtsp_watch_unref (GstRTSPWatch * watch)
3019 {
3020   g_return_if_fail (watch != NULL);
3021
3022   g_source_unref ((GSource *) watch);
3023 }
3024
3025 /**
3026  * gst_rtsp_watch_queue_data:
3027  * @watch: a #GstRTSPWatch
3028  * @data: the data to queue
3029  * @size: the size of @data
3030  *
3031  * Queue @data for transmission in @watch. It will be transmitted when the
3032  * connection of the @watch becomes writable.
3033  *
3034  * This function will take ownership of @data and g_free() it after use.
3035  *
3036  * The return value of this function will be used as the id argument in the
3037  * message_sent callback.
3038  *
3039  * Returns: an id.
3040  *
3041  * Since: 0.10.24
3042  */
3043 guint
3044 gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
3045     guint size)
3046 {
3047   GstRTSPRec *rec;
3048
3049   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3050   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
3051   g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
3052
3053   /* make a record with the data and id */
3054   rec = g_slice_new (GstRTSPRec);
3055   rec->data = (guint8 *) data;
3056   rec->size = size;
3057   do {
3058     /* make sure rec->id is never 0 */
3059     rec->id = ++watch->id;
3060   } while (G_UNLIKELY (rec->id == 0));
3061
3062   /* add the record to a queue. FIXME we would like to have an upper limit here */
3063   g_async_queue_push (watch->messages, rec);
3064
3065   /* FIXME: does the following need to be made thread-safe? (this might be
3066    * called from a streaming thread, like appsink's render function) */
3067   /* make sure the main context will now also check for writability on the
3068    * socket */
3069   if (!watch->write_added) {
3070     g_source_add_poll ((GSource *) watch, &watch->writefd);
3071     watch->write_added = TRUE;
3072   }
3073
3074   return rec->id;
3075 }
3076
3077 /**
3078  * gst_rtsp_watch_queue_message:
3079  * @watch: a #GstRTSPWatch
3080  * @message: a #GstRTSPMessage
3081  *
3082  * Queue a @message for transmission in @watch. The contents of this
3083  * message will be serialized and transmitted when the connection of the
3084  * @watch becomes writable.
3085  *
3086  * The return value of this function will be used as the id argument in the
3087  * message_sent callback.
3088  *
3089  * Returns: an id.
3090  *
3091  * Since: 0.10.23
3092  */
3093 guint
3094 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
3095 {
3096   GString *str;
3097   guint size;
3098
3099   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3100   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
3101
3102   /* make a record with the message as a string and id */
3103   str = message_to_string (watch->conn, message);
3104   size = str->len;
3105   return gst_rtsp_watch_queue_data (watch,
3106       (guint8 *) g_string_free (str, FALSE), size);
3107 }