rtsp: Added support for HTTP messages
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @short_description: manage RTSP connections
46  * @see_also: gstrtspurl
47  *  
48  * <refsect2>
49  * <para>
50  * This object manages the RTSP connection to the server. It provides function
51  * to receive and send bytes and messages.
52  * </para>
53  * </refsect2>
54  *  
55  * Last reviewed on 2007-07-24 (0.10.14)
56  */
57
58 #ifdef HAVE_CONFIG_H
59 #  include <config.h>
60 #endif
61
62 #include <stdio.h>
63 #include <errno.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <time.h>
67
68 #ifdef HAVE_UNISTD_H
69 #include <unistd.h>
70 #endif
71
72 /* we include this here to get the G_OS_* defines */
73 #include <glib.h>
74 #include <gst/gst.h>
75
76 #ifdef G_OS_WIN32
77 /* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
78  * minwg32 headers check WINVER before allowing the use of these */
79 #ifndef WINVER
80 #define WINVER 0x0501
81 #endif
82 #include <winsock2.h>
83 #include <ws2tcpip.h>
84 #define EINPROGRESS WSAEINPROGRESS
85 #else
86 #include <sys/ioctl.h>
87 #include <netdb.h>
88 #include <sys/socket.h>
89 #include <fcntl.h>
90 #include <netinet/in.h>
91 #endif
92
93 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
94 #include <sys/filio.h>
95 #endif
96
97 #include "gstrtspconnection.h"
98 #include "gstrtspbase64.h"
99
100 union gst_sockaddr
101 {
102   struct sockaddr sa;
103   struct sockaddr_in sa_in;
104   struct sockaddr_in6 sa_in6;
105   struct sockaddr_storage sa_stor;
106 };
107
108 typedef struct
109 {
110   gint state;
111   guint save;
112   guchar out[3];                /* the size must be evenly divisible by 3 */
113   guint cout;
114   guint coutl;
115 } DecodeCtx;
116
117 static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer,
118     guint * idx, guint size);
119 static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
120     guint keysize, gchar ** value);
121 static void parse_string (gchar * dest, gint size, gchar ** src);
122
123 #ifdef G_OS_WIN32
124 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
125 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
126 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
127 #define CLOSE_SOCKET(sock) closesocket (sock)
128 #define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
129 #define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
130 /* According to Microsoft's connect() documentation this one returns
131  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
132 #define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
133 #else
134 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
135 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
136 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
137 #define CLOSE_SOCKET(sock) close (sock)
138 #define ERRNO_IS_EAGAIN (errno == EAGAIN)
139 #define ERRNO_IS_EINTR (errno == EINTR)
140 #define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
141 #endif
142
143 #define ADD_POLLFD(fdset, pfd, fd)        \
144 G_STMT_START {                            \
145   (pfd)->fd = fd;                         \
146   gst_poll_add_fd (fdset, pfd);           \
147 } G_STMT_END
148
149 #define REMOVE_POLLFD(fdset, pfd)          \
150 G_STMT_START {                             \
151   if ((pfd)->fd != -1) {                   \
152     GST_DEBUG ("remove fd %d", (pfd)->fd); \
153     gst_poll_remove_fd (fdset, pfd);       \
154     CLOSE_SOCKET ((pfd)->fd);              \
155     (pfd)->fd = -1;                        \
156   }                                        \
157 } G_STMT_END
158
159 typedef enum
160 {
161   TUNNEL_STATE_NONE,
162   TUNNEL_STATE_GET,
163   TUNNEL_STATE_POST,
164   TUNNEL_STATE_COMPLETE
165 } GstRTSPTunnelState;
166
167 #define TUNNELID_LEN   24
168
169 struct _GstRTSPConnection
170 {
171   /*< private > */
172   /* URL for the connection */
173   GstRTSPUrl *url;
174
175   /* connection state */
176   GstPollFD fd0;
177   GstPollFD fd1;
178
179   GstPollFD *readfd;
180   GstPollFD *writefd;
181
182   gchar tunnelid[TUNNELID_LEN];
183   gboolean tunneled;
184   GstRTSPTunnelState tstate;
185
186   GstPoll *fdset;
187   gchar *ip;
188
189   gchar *initial_buffer;
190   gsize initial_buffer_offset;
191
192   /* Session state */
193   gint cseq;                    /* sequence number */
194   gchar session_id[512];        /* session id */
195   gint timeout;                 /* session timeout in seconds */
196   GTimer *timer;                /* timeout timer */
197
198   /* Authentication */
199   GstRTSPAuthMethod auth_method;
200   gchar *username;
201   gchar *passwd;
202   GHashTable *auth_params;
203
204   DecodeCtx ctx;
205   DecodeCtx *ctxp;
206
207   gchar *proxy_host;
208   guint proxy_port;
209 };
210
211 enum
212 {
213   STATE_START = 0,
214   STATE_DATA_HEADER,
215   STATE_DATA_BODY,
216   STATE_READ_LINES,
217   STATE_END,
218   STATE_LAST
219 };
220
221 /* a structure for constructing RTSPMessages */
222 typedef struct
223 {
224   gint state;
225   guint8 buffer[4096];
226   guint offset;
227
228   guint line;
229   guint8 *body_data;
230   glong body_len;
231 } GstRTSPBuilder;
232
233 static void
234 build_reset (GstRTSPBuilder * builder)
235 {
236   g_free (builder->body_data);
237   memset (builder, 0, sizeof (GstRTSPBuilder));
238 }
239
240 /**
241  * gst_rtsp_connection_create:
242  * @url: a #GstRTSPUrl 
243  * @conn: storage for a #GstRTSPConnection
244  *
245  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
246  * The connection will not yet attempt to connect to @url, use
247  * gst_rtsp_connection_connect().
248  *
249  * A copy of @url will be made.
250  *
251  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
252  */
253 GstRTSPResult
254 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
255 {
256   GstRTSPConnection *newconn;
257 #ifdef G_OS_WIN32
258   WSADATA w;
259   int error;
260 #endif
261
262   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
263
264 #ifdef G_OS_WIN32
265   error = WSAStartup (0x0202, &w);
266
267   if (error)
268     goto startup_error;
269
270   if (w.wVersion != 0x0202)
271     goto version_error;
272 #endif
273
274   newconn = g_new0 (GstRTSPConnection, 1);
275
276   if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
277     goto no_fdset;
278
279   newconn->url = gst_rtsp_url_copy (url);
280   newconn->fd0.fd = -1;
281   newconn->fd1.fd = -1;
282   newconn->timer = g_timer_new ();
283   newconn->timeout = 60;
284   newconn->cseq = 1;
285
286   newconn->auth_method = GST_RTSP_AUTH_NONE;
287   newconn->username = NULL;
288   newconn->passwd = NULL;
289   newconn->auth_params = NULL;
290
291   *conn = newconn;
292
293   return GST_RTSP_OK;
294
295   /* ERRORS */
296 #ifdef G_OS_WIN32
297 startup_error:
298   {
299     g_warning ("Error %d on WSAStartup", error);
300     return GST_RTSP_EWSASTART;
301   }
302 version_error:
303   {
304     g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
305         w.wVersion);
306     WSACleanup ();
307     return GST_RTSP_EWSAVERSION;
308   }
309 #endif
310 no_fdset:
311   {
312     g_free (newconn);
313 #ifdef G_OS_WIN32
314     WSACleanup ();
315 #endif
316     return GST_RTSP_ESYS;
317   }
318 }
319
320 /**
321  * gst_rtsp_connection_create_from_fd:
322  * @fd: a file descriptor
323  * @ip: the IP address of the other end
324  * @port: the port used by the other end
325  * @initial_buffer: data already read from @fd
326  * @conn: storage for a #GstRTSPConnection
327  *
328  * Create a new #GstRTSPConnection for handling communication on the existing
329  * file descriptor @fd. The @initial_buffer contains any data already read from
330  * @fd which should be used before starting to read new data.
331  *
332  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
333  *
334  * Since: 0.10.25
335  */
336 GstRTSPResult
337 gst_rtsp_connection_create_from_fd (gint fd, const gchar * ip, guint16 port,
338     const gchar * initial_buffer, GstRTSPConnection ** conn)
339 {
340   GstRTSPConnection *newconn = NULL;
341   GstRTSPUrl *url;
342 #ifdef G_OS_WIN32
343   gulong flags = 1;
344 #endif
345   GstRTSPResult res;
346
347   g_return_val_if_fail (fd >= 0, GST_RTSP_EINVAL);
348   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
349   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
350
351   /* set to non-blocking mode so that we can cancel the communication */
352 #ifndef G_OS_WIN32
353   fcntl (fd, F_SETFL, O_NONBLOCK);
354 #else
355   ioctlsocket (fd, FIONBIO, &flags);
356 #endif /* G_OS_WIN32 */
357
358   /* create a url for the client address */
359   url = g_new0 (GstRTSPUrl, 1);
360   url->host = g_strdup (ip);
361   url->port = port;
362
363   /* now create the connection object */
364   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
365   gst_rtsp_url_free (url);
366
367   ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);
368
369   /* both read and write initially */
370   newconn->readfd = &newconn->fd0;
371   newconn->writefd = &newconn->fd0;
372
373   newconn->ip = g_strdup (ip);
374
375   newconn->initial_buffer = g_strdup (initial_buffer);
376
377   *conn = newconn;
378
379   return GST_RTSP_OK;
380
381   /* ERRORS */
382 newconn_failed:
383   {
384     gst_rtsp_url_free (url);
385     return res;
386   }
387 }
388
389 /**
390  * gst_rtsp_connection_accept:
391  * @sock: a socket
392  * @conn: storage for a #GstRTSPConnection
393  *
394  * Accept a new connection on @sock and create a new #GstRTSPConnection for
395  * handling communication on new socket.
396  *
397  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
398  *
399  * Since: 0.10.23
400  */
401 GstRTSPResult
402 gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
403 {
404   int fd;
405   union gst_sockaddr sa;
406   socklen_t slen = sizeof (sa);
407   gchar ip[INET6_ADDRSTRLEN];
408   guint16 port;
409
410   g_return_val_if_fail (sock >= 0, GST_RTSP_EINVAL);
411   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
412
413   memset (&sa, 0, slen);
414
415 #ifndef G_OS_WIN32
416   fd = accept (sock, &sa.sa, &slen);
417 #else
418   fd = accept (sock, &sa.sa, (gint *) & slen);
419 #endif /* G_OS_WIN32 */
420   if (fd == -1)
421     goto accept_failed;
422
423   if (getnameinfo (&sa.sa, slen, ip, sizeof (ip), NULL, 0, NI_NUMERICHOST) != 0)
424     goto getnameinfo_failed;
425
426   if (sa.sa.sa_family == AF_INET)
427     port = sa.sa_in.sin_port;
428   else if (sa.sa.sa_family == AF_INET6)
429     port = sa.sa_in6.sin6_port;
430   else
431     goto wrong_family;
432
433   return gst_rtsp_connection_create_from_fd (fd, ip, port, NULL, conn);
434
435   /* ERRORS */
436 accept_failed:
437   {
438     return GST_RTSP_ESYS;
439   }
440 getnameinfo_failed:
441 wrong_family:
442   {
443     close (fd);
444     return GST_RTSP_ERROR;
445   }
446 }
447
448 static gchar *
449 do_resolve (const gchar * host)
450 {
451   static gchar ip[INET6_ADDRSTRLEN];
452   struct addrinfo *aires;
453   struct addrinfo *ai;
454   gint aierr;
455
456   aierr = getaddrinfo (host, NULL, NULL, &aires);
457   if (aierr != 0)
458     goto no_addrinfo;
459
460   for (ai = aires; ai; ai = ai->ai_next) {
461     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
462       break;
463     }
464   }
465   if (ai == NULL)
466     goto no_family;
467
468   aierr = getnameinfo (ai->ai_addr, ai->ai_addrlen, ip, sizeof (ip), NULL, 0,
469       NI_NUMERICHOST | NI_NUMERICSERV);
470   if (aierr != 0)
471     goto no_address;
472
473   freeaddrinfo (aires);
474
475   return g_strdup (ip);
476
477   /* ERRORS */
478 no_addrinfo:
479   {
480     GST_ERROR ("no addrinfo found for %s: %s", host, gai_strerror (aierr));
481     return NULL;
482   }
483 no_family:
484   {
485     GST_ERROR ("no family found for %s", host);
486     freeaddrinfo (aires);
487     return NULL;
488   }
489 no_address:
490   {
491     GST_ERROR ("no address found for %s: %s", host, gai_strerror (aierr));
492     freeaddrinfo (aires);
493     return NULL;
494   }
495 }
496
497 static GstRTSPResult
498 do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
499     GstPoll * fdset, GTimeVal * timeout)
500 {
501   gint fd;
502   struct addrinfo hints;
503   struct addrinfo *aires;
504   struct addrinfo *ai;
505   gint aierr;
506   gchar service[NI_MAXSERV];
507   gint ret;
508 #ifdef G_OS_WIN32
509   unsigned long flags = 1;
510 #endif /* G_OS_WIN32 */
511   GstClockTime to;
512   gint retval;
513
514   memset (&hints, 0, sizeof hints);
515   hints.ai_flags = AI_NUMERICHOST;
516   hints.ai_family = AF_UNSPEC;
517   hints.ai_socktype = SOCK_STREAM;
518   g_snprintf (service, sizeof (service) - 1, "%hu", port);
519   service[sizeof (service) - 1] = '\0';
520
521   aierr = getaddrinfo (ip, service, &hints, &aires);
522   if (aierr != 0)
523     goto no_addrinfo;
524
525   for (ai = aires; ai; ai = ai->ai_next) {
526     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
527       break;
528     }
529   }
530   if (ai == NULL)
531     goto no_family;
532
533   fd = socket (ai->ai_family, SOCK_STREAM, 0);
534   if (fd == -1)
535     goto no_socket;
536
537   /* set to non-blocking mode so that we can cancel the connect */
538 #ifndef G_OS_WIN32
539   fcntl (fd, F_SETFL, O_NONBLOCK);
540 #else
541   ioctlsocket (fd, FIONBIO, &flags);
542 #endif /* G_OS_WIN32 */
543
544   /* add the socket to our fdset */
545   ADD_POLLFD (fdset, fdout, fd);
546
547   /* we are going to connect ASYNC now */
548   ret = connect (fd, ai->ai_addr, ai->ai_addrlen);
549   if (ret == 0)
550     goto done;
551   if (!ERRNO_IS_EINPROGRESS)
552     goto sys_error;
553
554   /* wait for connect to complete up to the specified timeout or until we got
555    * interrupted. */
556   gst_poll_fd_ctl_write (fdset, fdout, TRUE);
557
558   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
559
560   do {
561     retval = gst_poll_wait (fdset, to);
562   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
563
564   if (retval == 0)
565     goto timeout;
566   else if (retval == -1)
567     goto sys_error;
568
569   /* we can still have an error connecting on windows */
570   if (gst_poll_fd_has_error (fdset, fdout)) {
571     socklen_t len = sizeof (errno);
572 #ifndef G_OS_WIN32
573     getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
574 #else
575     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
576 #endif
577     goto sys_error;
578   }
579
580   gst_poll_fd_ignored (fdset, fdout);
581
582 done:
583   freeaddrinfo (aires);
584
585   return GST_RTSP_OK;
586
587   /* ERRORS */
588 no_addrinfo:
589   {
590     GST_ERROR ("no addrinfo found for %s: %s", ip, gai_strerror (aierr));
591     return GST_RTSP_ERROR;
592   }
593 no_family:
594   {
595     GST_ERROR ("no family found for %s", ip);
596     freeaddrinfo (aires);
597     return GST_RTSP_ERROR;
598   }
599 no_socket:
600   {
601     GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno));
602     freeaddrinfo (aires);
603     return GST_RTSP_ESYS;
604   }
605 sys_error:
606   {
607     GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
608     REMOVE_POLLFD (fdset, fdout);
609     freeaddrinfo (aires);
610     return GST_RTSP_ESYS;
611   }
612 timeout:
613   {
614     GST_ERROR ("timeout");
615     REMOVE_POLLFD (fdset, fdout);
616     freeaddrinfo (aires);
617     return GST_RTSP_ETIMEOUT;
618   }
619 }
620
621 static GstRTSPResult
622 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
623 {
624   gint i;
625   GstRTSPResult res;
626   gchar *str;
627   guint idx, line;
628   gint retval;
629   GstClockTime to;
630   gchar *ip, *url_port_str;
631   guint16 port, url_port;
632   gchar codestr[4], *resultstr;
633   gint code;
634   GstRTSPUrl *url;
635   gchar *hostparam;
636
637   /* create a random sessionid */
638   for (i = 0; i < TUNNELID_LEN; i++)
639     conn->tunnelid[i] = g_random_int_range ('a', 'z');
640   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
641
642   url = conn->url;
643   /* get the port from the url */
644   gst_rtsp_url_get_port (url, &url_port);
645
646   if (conn->proxy_host) {
647     hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
648     url_port_str = g_strdup_printf (":%d", url_port);
649     ip = conn->proxy_host;
650     port = conn->proxy_port;
651   } else {
652     hostparam = NULL;
653     url_port_str = NULL;
654     ip = conn->ip;
655     port = url_port;
656   }
657
658   /* */
659   str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n"
660       "%s"
661       "x-sessioncookie: %s\r\n"
662       "Accept: application/x-rtsp-tunnelled\r\n"
663       "Pragma: no-cache\r\n"
664       "Cache-Control: no-cache\r\n" "\r\n",
665       conn->proxy_host ? "http://" : "",
666       conn->proxy_host ? url->host : "",
667       conn->proxy_host ? url_port_str : "",
668       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
669       hostparam ? hostparam : "", conn->tunnelid);
670
671   /* we start by writing to this fd */
672   conn->writefd = &conn->fd0;
673
674   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
675   g_free (str);
676   if (res != GST_RTSP_OK)
677     goto write_failed;
678
679   gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE);
680   gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE);
681
682   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
683
684   line = 0;
685   while (TRUE) {
686     guint8 buffer[4096];
687
688     idx = 0;
689     while (TRUE) {
690       res = read_line (conn, buffer, &idx, sizeof (buffer));
691       if (res == GST_RTSP_EEOF)
692         goto eof;
693       if (res == GST_RTSP_OK)
694         break;
695       if (res != GST_RTSP_EINTR)
696         goto read_error;
697
698       do {
699         retval = gst_poll_wait (conn->fdset, to);
700       } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
701
702       /* check for timeout */
703       if (retval == 0)
704         goto timeout;
705
706       if (retval == -1) {
707         if (errno == EBUSY)
708           goto stopped;
709         else
710           goto select_error;
711       }
712     }
713
714     /* check for last line */
715     if (buffer[0] == '\r')
716       buffer[0] = '\0';
717     if (buffer[0] == '\0')
718       break;
719
720     if (line == 0) {
721       /* first line, parse response */
722       gchar versionstr[20];
723       gchar *bptr;
724
725       bptr = (gchar *) buffer;
726
727       parse_string (versionstr, sizeof (versionstr), &bptr);
728       parse_string (codestr, sizeof (codestr), &bptr);
729       code = atoi (codestr);
730
731       while (g_ascii_isspace (*bptr))
732         bptr++;
733
734       resultstr = bptr;
735
736       if (code != GST_RTSP_STS_OK)
737         goto wrong_result;
738     } else {
739       gchar key[32];
740       gchar *value;
741
742       /* other lines, parse key/value */
743       res = parse_key_value (buffer, key, sizeof (key), &value);
744       if (res == GST_RTSP_OK) {
745         /* we got a new ip address */
746         if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) {
747           if (conn->proxy_host) {
748             /* if we use a proxy we need to change the destination url */
749             g_free (url->host);
750             url->host = g_strdup (value);
751             g_free (hostparam);
752             g_free (url_port_str);
753             hostparam =
754                 g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
755             url_port_str = g_strdup_printf (":%d", url_port);
756           } else {
757             /* and resolve the new ip address */
758             if (!(ip = do_resolve (conn->ip)))
759               goto not_resolved;
760             g_free (conn->ip);
761             conn->ip = ip;
762           }
763         }
764       }
765     }
766     line++;
767   }
768
769   /* connect to the host/port */
770   res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout);
771   if (res != GST_RTSP_OK)
772     goto connect_failed;
773
774   /* this is now our writing socket */
775   conn->writefd = &conn->fd1;
776
777   /* */
778   str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n"
779       "%s"
780       "x-sessioncookie: %s\r\n"
781       "Content-Type: application/x-rtsp-tunnelled\r\n"
782       "Pragma: no-cache\r\n"
783       "Cache-Control: no-cache\r\n"
784       "Content-Length: 32767\r\n"
785       "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
786       "\r\n",
787       conn->proxy_host ? "http://" : "",
788       conn->proxy_host ? url->host : "",
789       conn->proxy_host ? url_port_str : "",
790       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
791       hostparam ? hostparam : "", conn->tunnelid);
792
793   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
794   g_free (str);
795   if (res != GST_RTSP_OK)
796     goto write_failed;
797
798 exit:
799   g_free (hostparam);
800   g_free (url_port_str);
801
802   return res;
803
804   /* ERRORS */
805 write_failed:
806   {
807     GST_ERROR ("write failed (%d)", res);
808     goto exit;
809   }
810 eof:
811   {
812     res = GST_RTSP_EEOF;
813     goto exit;
814   }
815 read_error:
816   {
817     goto exit;
818   }
819 timeout:
820   {
821     res = GST_RTSP_ETIMEOUT;
822     goto exit;
823   }
824 select_error:
825   {
826     res = GST_RTSP_ESYS;
827     goto exit;
828   }
829 stopped:
830   {
831     res = GST_RTSP_EINTR;
832     goto exit;
833   }
834 wrong_result:
835   {
836     GST_ERROR ("got failure response %d %s", code, resultstr);
837     res = GST_RTSP_ERROR;
838     goto exit;
839   }
840 not_resolved:
841   {
842     GST_ERROR ("could not resolve %s", conn->ip);
843     res = GST_RTSP_ENET;
844     goto exit;
845   }
846 connect_failed:
847   {
848     GST_ERROR ("failed to connect");
849     goto exit;
850   }
851 }
852
853 /**
854  * gst_rtsp_connection_connect:
855  * @conn: a #GstRTSPConnection 
856  * @timeout: a #GTimeVal timeout
857  *
858  * Attempt to connect to the url of @conn made with
859  * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
860  * forever. If @timeout contains a valid timeout, this function will return
861  * #GST_RTSP_ETIMEOUT after the timeout expired.
862  *
863  * This function can be cancelled with gst_rtsp_connection_flush().
864  *
865  * Returns: #GST_RTSP_OK when a connection could be made.
866  */
867 GstRTSPResult
868 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
869 {
870   GstRTSPResult res;
871   gchar *ip;
872   guint16 port;
873   GstRTSPUrl *url;
874
875   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
876   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
877   g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);
878
879   url = conn->url;
880
881   if (conn->proxy_host && conn->tunneled) {
882     if (!(ip = do_resolve (conn->proxy_host))) {
883       GST_ERROR ("could not resolve %s", conn->proxy_host);
884       goto not_resolved;
885     }
886     port = conn->proxy_port;
887     g_free (conn->proxy_host);
888     conn->proxy_host = ip;
889   } else {
890     if (!(ip = do_resolve (url->host))) {
891       GST_ERROR ("could not resolve %s", url->host);
892       goto not_resolved;
893     }
894     /* get the port from the url */
895     gst_rtsp_url_get_port (url, &port);
896
897     g_free (conn->ip);
898     conn->ip = ip;
899   }
900
901   /* connect to the host/port */
902   res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout);
903   if (res != GST_RTSP_OK)
904     goto connect_failed;
905
906   /* this is our read URL */
907   conn->readfd = &conn->fd0;
908
909   if (conn->tunneled) {
910     res = setup_tunneling (conn, timeout);
911     if (res != GST_RTSP_OK)
912       goto tunneling_failed;
913   } else {
914     conn->writefd = &conn->fd0;
915   }
916
917   return GST_RTSP_OK;
918
919 not_resolved:
920   {
921     return GST_RTSP_ENET;
922   }
923 connect_failed:
924   {
925     GST_ERROR ("failed to connect");
926     return res;
927   }
928 tunneling_failed:
929   {
930     GST_ERROR ("failed to setup tunneling");
931     return res;
932   }
933 }
934
935 static void
936 auth_digest_compute_hex_urp (const gchar * username,
937     const gchar * realm, const gchar * password, gchar hex_urp[33])
938 {
939   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
940   const gchar *digest_string;
941
942   g_checksum_update (md5_context, (const guchar *) username, strlen (username));
943   g_checksum_update (md5_context, (const guchar *) ":", 1);
944   g_checksum_update (md5_context, (const guchar *) realm, strlen (realm));
945   g_checksum_update (md5_context, (const guchar *) ":", 1);
946   g_checksum_update (md5_context, (const guchar *) password, strlen (password));
947   digest_string = g_checksum_get_string (md5_context);
948
949   memset (hex_urp, 0, 33);
950   memcpy (hex_urp, digest_string, strlen (digest_string));
951
952   g_checksum_free (md5_context);
953 }
954
955 static void
956 auth_digest_compute_response (const gchar * method,
957     const gchar * uri, const gchar * hex_a1, const gchar * nonce,
958     gchar response[33])
959 {
960   char hex_a2[33] = { 0, };
961   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
962   const gchar *digest_string;
963
964   /* compute A2 */
965   g_checksum_update (md5_context, (const guchar *) method, strlen (method));
966   g_checksum_update (md5_context, (const guchar *) ":", 1);
967   g_checksum_update (md5_context, (const guchar *) uri, strlen (uri));
968   digest_string = g_checksum_get_string (md5_context);
969   memcpy (hex_a2, digest_string, strlen (digest_string));
970
971   /* compute KD */
972 #if GLIB_CHECK_VERSION (2, 18, 0)
973   g_checksum_reset (md5_context);
974 #else
975   g_checksum_free (md5_context);
976   md5_context = g_checksum_new (G_CHECKSUM_MD5);
977 #endif
978   g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1));
979   g_checksum_update (md5_context, (const guchar *) ":", 1);
980   g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce));
981   g_checksum_update (md5_context, (const guchar *) ":", 1);
982
983   g_checksum_update (md5_context, (const guchar *) hex_a2, 32);
984   digest_string = g_checksum_get_string (md5_context);
985   memset (response, 0, 33);
986   memcpy (response, digest_string, strlen (digest_string));
987
988   g_checksum_free (md5_context);
989 }
990
991 static void
992 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
993 {
994   switch (conn->auth_method) {
995     case GST_RTSP_AUTH_BASIC:{
996       gchar *user_pass;
997       gchar *user_pass64;
998       gchar *auth_string;
999
1000       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1001       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1002       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1003
1004       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1005           auth_string);
1006
1007       g_free (user_pass);
1008       g_free (user_pass64);
1009       break;
1010     }
1011     case GST_RTSP_AUTH_DIGEST:{
1012       gchar response[33], hex_urp[33];
1013       gchar *auth_string, *auth_string2;
1014       gchar *realm;
1015       gchar *nonce;
1016       gchar *opaque;
1017       const gchar *uri;
1018       const gchar *method;
1019
1020       /* we need to have some params set */
1021       if (conn->auth_params == NULL)
1022         break;
1023
1024       /* we need the realm and nonce */
1025       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1026       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1027       if (realm == NULL || nonce == NULL)
1028         break;
1029
1030       auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
1031           hex_urp);
1032
1033       method = gst_rtsp_method_as_text (message->type_data.request.method);
1034       uri = message->type_data.request.uri;
1035
1036       /* Assume no qop, algorithm=md5, stale=false */
1037       /* For algorithm MD5, a1 = urp. */
1038       auth_digest_compute_response (method, uri, hex_urp, nonce, response);
1039       auth_string = g_strdup_printf ("Digest username=\"%s\", "
1040           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1041           conn->username, realm, nonce, uri, response);
1042
1043       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1044       if (opaque) {
1045         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1046             opaque);
1047         g_free (auth_string);
1048         auth_string = auth_string2;
1049       }
1050       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1051           auth_string);
1052       break;
1053     }
1054     default:
1055       /* Nothing to do */
1056       break;
1057   }
1058 }
1059
1060 static void
1061 gen_date_string (gchar * date_string, guint len)
1062 {
1063   GTimeVal tv;
1064   time_t t;
1065 #ifdef HAVE_GMTIME_R
1066   struct tm tm_;
1067 #endif
1068
1069   g_get_current_time (&tv);
1070   t = (time_t) tv.tv_sec;
1071
1072 #ifdef HAVE_GMTIME_R
1073   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
1074 #else
1075   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
1076 #endif
1077 }
1078
1079 static GstRTSPResult
1080 write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
1081 {
1082   guint left;
1083
1084   if (G_UNLIKELY (*idx > size))
1085     return GST_RTSP_ERROR;
1086
1087   left = size - *idx;
1088
1089   while (left) {
1090     gint r;
1091
1092     r = WRITE_SOCKET (fd, &buffer[*idx], left);
1093     if (G_UNLIKELY (r == 0)) {
1094       return GST_RTSP_EINTR;
1095     } else if (G_UNLIKELY (r < 0)) {
1096       if (ERRNO_IS_EAGAIN)
1097         return GST_RTSP_EINTR;
1098       if (!ERRNO_IS_EINTR)
1099         return GST_RTSP_ESYS;
1100     } else {
1101       left -= r;
1102       *idx += r;
1103     }
1104   }
1105   return GST_RTSP_OK;
1106 }
1107
1108 static gint
1109 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
1110 {
1111   gint out = 0;
1112
1113   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1114     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1115
1116     out = MIN (left, size);
1117     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1118
1119     if (left == (gsize) out) {
1120       g_free (conn->initial_buffer);
1121       conn->initial_buffer = NULL;
1122       conn->initial_buffer_offset = 0;
1123     } else
1124       conn->initial_buffer_offset += out;
1125   }
1126
1127   if (G_LIKELY (size > (guint) out)) {
1128     gint r;
1129
1130     r = READ_SOCKET (conn->readfd->fd, &buffer[out], size - out);
1131     if (r <= 0) {
1132       if (out == 0)
1133         out = r;
1134     } else
1135       out += r;
1136   }
1137
1138   return out;
1139 }
1140
1141 static gint
1142 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
1143 {
1144   DecodeCtx *ctx = conn->ctxp;
1145   gint out = 0;
1146
1147   if (ctx) {
1148     while (size > 0) {
1149       guint8 in[sizeof (ctx->out) * 4 / 3];
1150       gint r;
1151
1152       while (size > 0 && ctx->cout < ctx->coutl) {
1153         /* we have some leftover bytes */
1154         *buffer++ = ctx->out[ctx->cout++];
1155         size--;
1156         out++;
1157       }
1158
1159       /* got what we needed? */
1160       if (size == 0)
1161         break;
1162
1163       /* try to read more bytes */
1164       r = fill_raw_bytes (conn, in, sizeof (in));
1165       if (r <= 0) {
1166         if (out == 0)
1167           out = r;
1168         break;
1169       }
1170
1171       ctx->cout = 0;
1172       ctx->coutl =
1173           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1174           &ctx->save);
1175     }
1176   } else {
1177     out = fill_raw_bytes (conn, buffer, size);
1178   }
1179
1180   return out;
1181 }
1182
1183 static GstRTSPResult
1184 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
1185 {
1186   guint left;
1187
1188   if (G_UNLIKELY (*idx > size))
1189     return GST_RTSP_ERROR;
1190
1191   left = size - *idx;
1192
1193   while (left) {
1194     gint r;
1195
1196     r = fill_bytes (conn, &buffer[*idx], left);
1197     if (G_UNLIKELY (r == 0)) {
1198       return GST_RTSP_EEOF;
1199     } else if (G_UNLIKELY (r < 0)) {
1200       if (ERRNO_IS_EAGAIN)
1201         return GST_RTSP_EINTR;
1202       if (!ERRNO_IS_EINTR)
1203         return GST_RTSP_ESYS;
1204     } else {
1205       left -= r;
1206       *idx += r;
1207     }
1208   }
1209   return GST_RTSP_OK;
1210 }
1211
1212 static GstRTSPResult
1213 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
1214 {
1215   while (TRUE) {
1216     guint8 c;
1217     gint r;
1218
1219     r = fill_bytes (conn, &c, 1);
1220     if (G_UNLIKELY (r == 0)) {
1221       return GST_RTSP_EEOF;
1222     } else if (G_UNLIKELY (r < 0)) {
1223       if (ERRNO_IS_EAGAIN)
1224         return GST_RTSP_EINTR;
1225       if (!ERRNO_IS_EINTR)
1226         return GST_RTSP_ESYS;
1227     } else {
1228       if (c == '\n')            /* end on \n */
1229         break;
1230       if (c == '\r')            /* ignore \r */
1231         continue;
1232
1233       if (G_LIKELY (*idx < size - 1))
1234         buffer[(*idx)++] = c;
1235     }
1236   }
1237   buffer[*idx] = '\0';
1238
1239   return GST_RTSP_OK;
1240 }
1241
1242 /**
1243  * gst_rtsp_connection_write:
1244  * @conn: a #GstRTSPConnection
1245  * @data: the data to write
1246  * @size: the size of @data
1247  * @timeout: a timeout value or #NULL
1248  *
1249  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1250  * the specified @timeout. @timeout can be #NULL, in which case this function
1251  * might block forever.
1252  * 
1253  * This function can be cancelled with gst_rtsp_connection_flush().
1254  *
1255  * Returns: #GST_RTSP_OK on success.
1256  */
1257 GstRTSPResult
1258 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1259     guint size, GTimeVal * timeout)
1260 {
1261   guint offset;
1262   gint retval;
1263   GstClockTime to;
1264   GstRTSPResult res;
1265
1266   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1267   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1268   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1269
1270   gst_poll_set_controllable (conn->fdset, TRUE);
1271   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
1272   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
1273   /* clear all previous poll results */
1274   gst_poll_fd_ignored (conn->fdset, conn->writefd);
1275   gst_poll_fd_ignored (conn->fdset, conn->readfd);
1276
1277   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1278
1279   offset = 0;
1280
1281   while (TRUE) {
1282     /* try to write */
1283     res = write_bytes (conn->writefd->fd, data, &offset, size);
1284     if (G_LIKELY (res == GST_RTSP_OK))
1285       break;
1286     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1287       goto write_error;
1288
1289     /* not all is written, wait until we can write more */
1290     do {
1291       retval = gst_poll_wait (conn->fdset, to);
1292     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1293
1294     if (G_UNLIKELY (retval == 0))
1295       goto timeout;
1296
1297     if (G_UNLIKELY (retval == -1)) {
1298       if (errno == EBUSY)
1299         goto stopped;
1300       else
1301         goto select_error;
1302     }
1303   }
1304   return GST_RTSP_OK;
1305
1306   /* ERRORS */
1307 timeout:
1308   {
1309     return GST_RTSP_ETIMEOUT;
1310   }
1311 select_error:
1312   {
1313     return GST_RTSP_ESYS;
1314   }
1315 stopped:
1316   {
1317     return GST_RTSP_EINTR;
1318   }
1319 write_error:
1320   {
1321     return res;
1322   }
1323 }
1324
1325 static GString *
1326 message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
1327 {
1328   GString *str = NULL;
1329
1330   str = g_string_new ("");
1331
1332   switch (message->type) {
1333     case GST_RTSP_MESSAGE_REQUEST:
1334       /* create request string, add CSeq */
1335       g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
1336           "CSeq: %d\r\n",
1337           gst_rtsp_method_as_text (message->type_data.request.method),
1338           message->type_data.request.uri, conn->cseq++);
1339       /* add session id if we have one */
1340       if (conn->session_id[0] != '\0') {
1341         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1342         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1343             conn->session_id);
1344       }
1345       /* add any authentication headers */
1346       add_auth_header (conn, message);
1347       break;
1348     case GST_RTSP_MESSAGE_RESPONSE:
1349       /* create response string */
1350       g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
1351           message->type_data.response.code, message->type_data.response.reason);
1352       break;
1353     case GST_RTSP_MESSAGE_HTTP_REQUEST:
1354       /* create request string */
1355       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1356           gst_rtsp_method_as_text (message->type_data.request.method),
1357           message->type_data.request.uri,
1358           gst_rtsp_version_as_text (message->type_data.request.version));
1359       /* add any authentication headers */
1360       add_auth_header (conn, message);
1361       break;
1362     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1363       /* create response string */
1364       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1365           gst_rtsp_version_as_text (message->type_data.request.version),
1366           message->type_data.response.code, message->type_data.response.reason);
1367       break;
1368     case GST_RTSP_MESSAGE_DATA:
1369     {
1370       guint8 data_header[4];
1371
1372       /* prepare data header */
1373       data_header[0] = '$';
1374       data_header[1] = message->type_data.data.channel;
1375       data_header[2] = (message->body_size >> 8) & 0xff;
1376       data_header[3] = message->body_size & 0xff;
1377
1378       /* create string with header and data */
1379       str = g_string_append_len (str, (gchar *) data_header, 4);
1380       str =
1381           g_string_append_len (str, (gchar *) message->body,
1382           message->body_size);
1383       break;
1384     }
1385     default:
1386       g_string_free (str, TRUE);
1387       g_return_val_if_reached (NULL);
1388       break;
1389   }
1390
1391   /* append headers and body */
1392   if (message->type != GST_RTSP_MESSAGE_DATA) {
1393     gchar date_string[100];
1394
1395     gen_date_string (date_string, sizeof (date_string));
1396
1397     /* add date header */
1398     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1399     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1400
1401     /* append headers */
1402     gst_rtsp_message_append_headers (message, str);
1403
1404     /* append Content-Length and body if needed */
1405     if (message->body != NULL && message->body_size > 0) {
1406       gchar *len;
1407
1408       len = g_strdup_printf ("%d", message->body_size);
1409       g_string_append_printf (str, "%s: %s\r\n",
1410           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1411       g_free (len);
1412       /* header ends here */
1413       g_string_append (str, "\r\n");
1414       str =
1415           g_string_append_len (str, (gchar *) message->body,
1416           message->body_size);
1417     } else {
1418       /* just end headers */
1419       g_string_append (str, "\r\n");
1420     }
1421   }
1422
1423   return str;
1424 }
1425
1426 /**
1427  * gst_rtsp_connection_send:
1428  * @conn: a #GstRTSPConnection
1429  * @message: the message to send
1430  * @timeout: a timeout value or #NULL
1431  *
1432  * Attempt to send @message to the connected @conn, blocking up to
1433  * the specified @timeout. @timeout can be #NULL, in which case this function
1434  * might block forever.
1435  * 
1436  * This function can be cancelled with gst_rtsp_connection_flush().
1437  *
1438  * Returns: #GST_RTSP_OK on success.
1439  */
1440 GstRTSPResult
1441 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1442     GTimeVal * timeout)
1443 {
1444   GString *string = NULL;
1445   GstRTSPResult res;
1446   gchar *str;
1447   gsize len;
1448
1449   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1450   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1451
1452   if (G_UNLIKELY (!(string = message_to_string (conn, message))))
1453     goto no_message;
1454
1455   if (conn->tunneled) {
1456     str = g_base64_encode ((const guchar *) string->str, string->len);
1457     g_string_free (string, TRUE);
1458     len = strlen (str);
1459   } else {
1460     str = string->str;
1461     len = string->len;
1462     g_string_free (string, FALSE);
1463   }
1464
1465   /* write request */
1466   res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
1467
1468   g_free (str);
1469
1470   return res;
1471
1472 no_message:
1473   {
1474     g_warning ("Wrong message");
1475     return GST_RTSP_EINVAL;
1476   }
1477 }
1478
1479 static void
1480 parse_string (gchar * dest, gint size, gchar ** src)
1481 {
1482   gint idx;
1483
1484   idx = 0;
1485   /* skip spaces */
1486   while (g_ascii_isspace (**src))
1487     (*src)++;
1488
1489   while (!g_ascii_isspace (**src) && **src != '\0') {
1490     if (idx < size - 1)
1491       dest[idx++] = **src;
1492     (*src)++;
1493   }
1494   if (size > 0)
1495     dest[idx] = '\0';
1496 }
1497
1498 static void
1499 parse_key (gchar * dest, gint size, gchar ** src)
1500 {
1501   gint idx;
1502
1503   idx = 0;
1504   while (**src != ':' && **src != '\0') {
1505     if (idx < size - 1)
1506       dest[idx++] = **src;
1507     (*src)++;
1508   }
1509   if (size > 0)
1510     dest[idx] = '\0';
1511 }
1512
1513 static GstRTSPResult
1514 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
1515 {
1516   GstRTSPResult res;
1517   gchar versionstr[20];
1518   gchar codestr[4];
1519   gint code;
1520   gchar *bptr;
1521
1522   bptr = (gchar *) buffer;
1523
1524   parse_string (versionstr, sizeof (versionstr), &bptr);
1525   parse_string (codestr, sizeof (codestr), &bptr);
1526   code = atoi (codestr);
1527
1528   while (g_ascii_isspace (*bptr))
1529     bptr++;
1530
1531   if (strcmp (versionstr, "RTSP/1.0") == 0)
1532     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1533         parse_error);
1534   else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1535     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1536         parse_error);
1537     msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
1538   } else
1539     goto parse_error;
1540
1541   return GST_RTSP_OK;
1542
1543 parse_error:
1544   {
1545     return GST_RTSP_EPARSE;
1546   }
1547 }
1548
1549 static GstRTSPResult
1550 parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
1551     GstRTSPMessage * msg)
1552 {
1553   GstRTSPResult res = GST_RTSP_OK;
1554   gchar versionstr[20];
1555   gchar methodstr[20];
1556   gchar urlstr[4096];
1557   gchar *bptr;
1558   GstRTSPMethod method;
1559   GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;
1560
1561   bptr = (gchar *) buffer;
1562
1563   parse_string (methodstr, sizeof (methodstr), &bptr);
1564   method = gst_rtsp_find_method (methodstr);
1565   if (method == GST_RTSP_INVALID) {
1566     /* a tunnel request is allowed when we don't have one yet */
1567     if (conn->tstate != TUNNEL_STATE_NONE)
1568       goto invalid_method;
1569     /* we need GET or POST for a valid tunnel request */
1570     if (!strcmp (methodstr, "GET"))
1571       tstate = TUNNEL_STATE_GET;
1572     else if (!strcmp (methodstr, "POST"))
1573       tstate = TUNNEL_STATE_POST;
1574     else
1575       goto invalid_method;
1576   }
1577
1578   parse_string (urlstr, sizeof (urlstr), &bptr);
1579   if (G_UNLIKELY (*urlstr == '\0'))
1580     goto invalid_url;
1581
1582   parse_string (versionstr, sizeof (versionstr), &bptr);
1583
1584   if (G_UNLIKELY (*bptr != '\0'))
1585     goto invalid_version;
1586
1587   if (strcmp (versionstr, "RTSP/1.0") == 0) {
1588     res = gst_rtsp_message_init_request (msg, method, urlstr);
1589   } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1590     res = gst_rtsp_message_init_request (msg, method, urlstr);
1591     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1592   } else if (strcmp (versionstr, "HTTP/1.0") == 0) {
1593     /* tunnel request, we need a tunnel method */
1594     if (tstate == TUNNEL_STATE_NONE) {
1595       res = GST_RTSP_EPARSE;
1596     } else {
1597       conn->tstate = tstate;
1598     }
1599   } else {
1600     res = GST_RTSP_EPARSE;
1601   }
1602
1603   return res;
1604
1605   /* ERRORS */
1606 invalid_method:
1607   {
1608     GST_ERROR ("invalid method %s", methodstr);
1609     return GST_RTSP_EPARSE;
1610   }
1611 invalid_url:
1612   {
1613     GST_ERROR ("invalid url %s", urlstr);
1614     return GST_RTSP_EPARSE;
1615   }
1616 invalid_version:
1617   {
1618     GST_ERROR ("invalid version");
1619     return GST_RTSP_EPARSE;
1620   }
1621 }
1622
1623 static GstRTSPResult
1624 parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value)
1625 {
1626   gchar *bptr;
1627
1628   bptr = (gchar *) buffer;
1629
1630   /* read key */
1631   parse_key (key, keysize, &bptr);
1632   if (G_UNLIKELY (*bptr != ':'))
1633     goto no_column;
1634
1635   bptr++;
1636   while (g_ascii_isspace (*bptr))
1637     bptr++;
1638
1639   *value = bptr;
1640
1641   return GST_RTSP_OK;
1642
1643   /* ERRORS */
1644 no_column:
1645   {
1646     return GST_RTSP_EPARSE;
1647   }
1648 }
1649
1650 /* parsing lines means reading a Key: Value pair */
1651 static GstRTSPResult
1652 parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
1653 {
1654   GstRTSPResult res;
1655   gchar key[32];
1656   gchar *value;
1657   GstRTSPHeaderField field;
1658
1659   res = parse_key_value (buffer, key, sizeof (key), &value);
1660   if (G_UNLIKELY (res != GST_RTSP_OK))
1661     goto parse_error;
1662
1663   if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
1664     /* save the tunnel session in the connection */
1665     if (!strcmp (key, "x-sessioncookie")) {
1666       strncpy (conn->tunnelid, value, TUNNELID_LEN);
1667       conn->tunnelid[TUNNELID_LEN - 1] = '\0';
1668       conn->tunneled = TRUE;
1669     }
1670   } else {
1671     field = gst_rtsp_find_header_field (key);
1672     if (field != GST_RTSP_HDR_INVALID)
1673       gst_rtsp_message_add_header (msg, field, value);
1674   }
1675
1676   return GST_RTSP_OK;
1677
1678   /* ERRORS */
1679 parse_error:
1680   {
1681     return res;
1682   }
1683 }
1684
1685 /* returns:
1686  *  GST_RTSP_OK when a complete message was read.
1687  *  GST_RTSP_EEOF: when the socket is closed
1688  *  GST_RTSP_EINTR: when more data is needed.
1689  *  GST_RTSP_..: some other error occured.
1690  */
1691 static GstRTSPResult
1692 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
1693     GstRTSPConnection * conn)
1694 {
1695   GstRTSPResult res;
1696
1697   while (TRUE) {
1698     switch (builder->state) {
1699       case STATE_START:
1700         builder->offset = 0;
1701         res =
1702             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
1703         if (res != GST_RTSP_OK)
1704           goto done;
1705
1706         /* we have 1 bytes now and we can see if this is a data message or
1707          * not */
1708         if (builder->buffer[0] == '$') {
1709           /* data message, prepare for the header */
1710           builder->state = STATE_DATA_HEADER;
1711         } else {
1712           builder->line = 0;
1713           builder->state = STATE_READ_LINES;
1714         }
1715         break;
1716       case STATE_DATA_HEADER:
1717       {
1718         res =
1719             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4);
1720         if (res != GST_RTSP_OK)
1721           goto done;
1722
1723         gst_rtsp_message_init_data (message, builder->buffer[1]);
1724
1725         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
1726         builder->body_data = g_malloc (builder->body_len + 1);
1727         builder->body_data[builder->body_len] = '\0';
1728         builder->offset = 0;
1729         builder->state = STATE_DATA_BODY;
1730         break;
1731       }
1732       case STATE_DATA_BODY:
1733       {
1734         res =
1735             read_bytes (conn, builder->body_data, &builder->offset,
1736             builder->body_len);
1737         if (res != GST_RTSP_OK)
1738           goto done;
1739
1740         /* we have the complete body now, store in the message adjusting the
1741          * length to include the traling '\0' */
1742         gst_rtsp_message_take_body (message,
1743             (guint8 *) builder->body_data, builder->body_len + 1);
1744         builder->body_data = NULL;
1745         builder->body_len = 0;
1746
1747         builder->state = STATE_END;
1748         break;
1749       }
1750       case STATE_READ_LINES:
1751       {
1752         res = read_line (conn, builder->buffer, &builder->offset,
1753             sizeof (builder->buffer));
1754         if (res != GST_RTSP_OK)
1755           goto done;
1756
1757         /* we have a regular response */
1758         if (builder->buffer[0] == '\r') {
1759           builder->buffer[0] = '\0';
1760         }
1761
1762         if (builder->buffer[0] == '\0') {
1763           gchar *hdrval;
1764
1765           /* empty line, end of message header */
1766           /* see if there is a Content-Length header */
1767           if (gst_rtsp_message_get_header (message,
1768                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
1769             /* there is, prepare to read the body */
1770             builder->body_len = atol (hdrval);
1771             builder->body_data = g_malloc (builder->body_len + 1);
1772             builder->body_data[builder->body_len] = '\0';
1773             builder->offset = 0;
1774             builder->state = STATE_DATA_BODY;
1775           } else {
1776             builder->state = STATE_END;
1777           }
1778           break;
1779         }
1780
1781         /* we have a line */
1782         if (builder->line == 0) {
1783           /* first line, check for response status */
1784           if (memcmp (builder->buffer, "RTSP", 4) == 0) {
1785             res = parse_response_status (builder->buffer, message);
1786           } else {
1787             res = parse_request_line (conn, builder->buffer, message);
1788           }
1789           /* the first line must parse without errors */
1790           if (res != GST_RTSP_OK)
1791             goto done;
1792         } else {
1793           /* else just parse the line, ignore errors */
1794           parse_line (conn, builder->buffer, message);
1795         }
1796         builder->line++;
1797         builder->offset = 0;
1798         break;
1799       }
1800       case STATE_END:
1801       {
1802         gchar *session_id;
1803
1804         if (conn->tstate == TUNNEL_STATE_GET) {
1805           res = GST_RTSP_ETGET;
1806           goto done;
1807         } else if (conn->tstate == TUNNEL_STATE_POST) {
1808           res = GST_RTSP_ETPOST;
1809           goto done;
1810         }
1811
1812         if (message->type == GST_RTSP_MESSAGE_DATA) {
1813           /* data messages don't have headers */
1814           res = GST_RTSP_OK;
1815           goto done;
1816         }
1817
1818         /* save session id in the connection for further use */
1819         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
1820             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
1821                 &session_id, 0) == GST_RTSP_OK) {
1822           gint maxlen, i;
1823
1824           maxlen = sizeof (conn->session_id) - 1;
1825           /* the sessionid can have attributes marked with ;
1826            * Make sure we strip them */
1827           for (i = 0; session_id[i] != '\0'; i++) {
1828             if (session_id[i] == ';') {
1829               maxlen = i;
1830               /* parse timeout */
1831               do {
1832                 i++;
1833               } while (g_ascii_isspace (session_id[i]));
1834               if (g_str_has_prefix (&session_id[i], "timeout=")) {
1835                 gint to;
1836
1837                 /* if we parsed something valid, configure */
1838                 if ((to = atoi (&session_id[i + 8])) > 0)
1839                   conn->timeout = to;
1840               }
1841               break;
1842             }
1843           }
1844
1845           /* make sure to not overflow */
1846           strncpy (conn->session_id, session_id, maxlen);
1847           conn->session_id[maxlen] = '\0';
1848         }
1849         res = GST_RTSP_OK;
1850         goto done;
1851       }
1852       default:
1853         res = GST_RTSP_ERROR;
1854         break;
1855     }
1856   }
1857 done:
1858   return res;
1859 }
1860
1861 /**
1862  * gst_rtsp_connection_read:
1863  * @conn: a #GstRTSPConnection
1864  * @data: the data to read
1865  * @size: the size of @data
1866  * @timeout: a timeout value or #NULL
1867  *
1868  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
1869  * the specified @timeout. @timeout can be #NULL, in which case this function
1870  * might block forever.
1871  *
1872  * This function can be cancelled with gst_rtsp_connection_flush().
1873  *
1874  * Returns: #GST_RTSP_OK on success.
1875  */
1876 GstRTSPResult
1877 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
1878     GTimeVal * timeout)
1879 {
1880   guint offset;
1881   gint retval;
1882   GstClockTime to;
1883   GstRTSPResult res;
1884
1885   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1886   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
1887   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1888
1889   if (G_UNLIKELY (size == 0))
1890     return GST_RTSP_OK;
1891
1892   offset = 0;
1893
1894   /* configure timeout if any */
1895   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1896
1897   gst_poll_set_controllable (conn->fdset, TRUE);
1898   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1899   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1900
1901   while (TRUE) {
1902     res = read_bytes (conn, data, &offset, size);
1903     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1904       goto eof;
1905     if (G_LIKELY (res == GST_RTSP_OK))
1906       break;
1907     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1908       goto read_error;
1909
1910     do {
1911       retval = gst_poll_wait (conn->fdset, to);
1912     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1913
1914     /* check for timeout */
1915     if (G_UNLIKELY (retval == 0))
1916       goto select_timeout;
1917
1918     if (G_UNLIKELY (retval == -1)) {
1919       if (errno == EBUSY)
1920         goto stopped;
1921       else
1922         goto select_error;
1923     }
1924     gst_poll_set_controllable (conn->fdset, FALSE);
1925   }
1926   return GST_RTSP_OK;
1927
1928   /* ERRORS */
1929 select_error:
1930   {
1931     return GST_RTSP_ESYS;
1932   }
1933 select_timeout:
1934   {
1935     return GST_RTSP_ETIMEOUT;
1936   }
1937 stopped:
1938   {
1939     return GST_RTSP_EINTR;
1940   }
1941 eof:
1942   {
1943     return GST_RTSP_EEOF;
1944   }
1945 read_error:
1946   {
1947     return res;
1948   }
1949 }
1950
1951 static GString *
1952 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
1953 {
1954   GString *str;
1955   gchar date_string[100];
1956   const gchar *status;
1957
1958   gen_date_string (date_string, sizeof (date_string));
1959
1960   status = gst_rtsp_status_as_text (code);
1961   if (status == NULL) {
1962     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
1963     status = "Internal Server Error";
1964   }
1965
1966   str = g_string_new ("");
1967
1968   /* */
1969   g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
1970   g_string_append_printf (str,
1971       "Server: GStreamer RTSP Server\r\n"
1972       "Date: %s\r\n"
1973       "Connection: close\r\n"
1974       "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
1975   if (code == GST_RTSP_STS_OK) {
1976     if (conn->ip)
1977       g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
1978     g_string_append_printf (str,
1979         "Content-Type: application/x-rtsp-tunnelled\r\n");
1980   }
1981   g_string_append_printf (str, "\r\n");
1982   return str;
1983 }
1984
1985 /**
1986  * gst_rtsp_connection_receive:
1987  * @conn: a #GstRTSPConnection
1988  * @message: the message to read
1989  * @timeout: a timeout value or #NULL
1990  *
1991  * Attempt to read into @message from the connected @conn, blocking up to
1992  * the specified @timeout. @timeout can be #NULL, in which case this function
1993  * might block forever.
1994  * 
1995  * This function can be cancelled with gst_rtsp_connection_flush().
1996  *
1997  * Returns: #GST_RTSP_OK on success.
1998  */
1999 GstRTSPResult
2000 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
2001     GTimeVal * timeout)
2002 {
2003   GstRTSPResult res;
2004   GstRTSPBuilder builder;
2005   gint retval;
2006   GstClockTime to;
2007
2008   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2009   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2010   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2011
2012   /* configure timeout if any */
2013   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
2014
2015   gst_poll_set_controllable (conn->fdset, TRUE);
2016   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
2017   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
2018
2019   memset (&builder, 0, sizeof (GstRTSPBuilder));
2020   while (TRUE) {
2021     res = build_next (&builder, message, conn);
2022     if (G_UNLIKELY (res == GST_RTSP_EEOF))
2023       goto eof;
2024     if (G_LIKELY (res == GST_RTSP_OK))
2025       break;
2026     if (res == GST_RTSP_ETGET) {
2027       GString *str;
2028
2029       /* tunnel GET request, we can reply now */
2030       str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
2031       res =
2032           gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
2033           timeout);
2034       g_string_free (str, TRUE);
2035     } else if (res == GST_RTSP_ETPOST) {
2036       /* tunnel POST request, return the value, the caller now has to link the
2037        * two connections. */
2038       break;
2039     } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
2040       goto read_error;
2041
2042     do {
2043       retval = gst_poll_wait (conn->fdset, to);
2044     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2045
2046     /* check for timeout */
2047     if (G_UNLIKELY (retval == 0))
2048       goto select_timeout;
2049
2050     if (G_UNLIKELY (retval == -1)) {
2051       if (errno == EBUSY)
2052         goto stopped;
2053       else
2054         goto select_error;
2055     }
2056     gst_poll_set_controllable (conn->fdset, FALSE);
2057   }
2058
2059   /* we have a message here */
2060   build_reset (&builder);
2061
2062   return GST_RTSP_OK;
2063
2064   /* ERRORS */
2065 select_error:
2066   {
2067     res = GST_RTSP_ESYS;
2068     goto cleanup;
2069   }
2070 select_timeout:
2071   {
2072     res = GST_RTSP_ETIMEOUT;
2073     goto cleanup;
2074   }
2075 stopped:
2076   {
2077     res = GST_RTSP_EINTR;
2078     goto cleanup;
2079   }
2080 eof:
2081   {
2082     res = GST_RTSP_EEOF;
2083     goto cleanup;
2084   }
2085 read_error:
2086 cleanup:
2087   {
2088     build_reset (&builder);
2089     gst_rtsp_message_unset (message);
2090     return res;
2091   }
2092 }
2093
2094 /**
2095  * gst_rtsp_connection_close:
2096  * @conn: a #GstRTSPConnection
2097  *
2098  * Close the connected @conn. After this call, the connection is in the same
2099  * state as when it was first created.
2100  * 
2101  * Returns: #GST_RTSP_OK on success.
2102  */
2103 GstRTSPResult
2104 gst_rtsp_connection_close (GstRTSPConnection * conn)
2105 {
2106   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2107
2108   g_free (conn->ip);
2109   conn->ip = NULL;
2110
2111   g_free (conn->initial_buffer);
2112   conn->initial_buffer = NULL;
2113   conn->initial_buffer_offset = 0;
2114
2115   REMOVE_POLLFD (conn->fdset, &conn->fd0);
2116   REMOVE_POLLFD (conn->fdset, &conn->fd1);
2117   conn->writefd = NULL;
2118   conn->readfd = NULL;
2119   conn->tunneled = FALSE;
2120   conn->tstate = TUNNEL_STATE_NONE;
2121   conn->ctxp = NULL;
2122   g_free (conn->username);
2123   conn->username = NULL;
2124   g_free (conn->passwd);
2125   conn->passwd = NULL;
2126   gst_rtsp_connection_clear_auth_params (conn);
2127   conn->timeout = 60;
2128   conn->cseq = 0;
2129   conn->session_id[0] = '\0';
2130
2131   return GST_RTSP_OK;
2132 }
2133
2134 /**
2135  * gst_rtsp_connection_free:
2136  * @conn: a #GstRTSPConnection
2137  *
2138  * Close and free @conn.
2139  * 
2140  * Returns: #GST_RTSP_OK on success.
2141  */
2142 GstRTSPResult
2143 gst_rtsp_connection_free (GstRTSPConnection * conn)
2144 {
2145   GstRTSPResult res;
2146
2147   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2148
2149   res = gst_rtsp_connection_close (conn);
2150   gst_poll_free (conn->fdset);
2151   g_timer_destroy (conn->timer);
2152   gst_rtsp_url_free (conn->url);
2153   g_free (conn->proxy_host);
2154   g_free (conn);
2155 #ifdef G_OS_WIN32
2156   WSACleanup ();
2157 #endif
2158
2159   return res;
2160 }
2161
2162 /**
2163  * gst_rtsp_connection_poll:
2164  * @conn: a #GstRTSPConnection
2165  * @events: a bitmask of #GstRTSPEvent flags to check
2166  * @revents: location for result flags 
2167  * @timeout: a timeout
2168  *
2169  * Wait up to the specified @timeout for the connection to become available for
2170  * at least one of the operations specified in @events. When the function returns
2171  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2172  * @conn.
2173  *
2174  * @timeout can be #NULL, in which case this function might block forever.
2175  *
2176  * This function can be cancelled with gst_rtsp_connection_flush().
2177  * 
2178  * Returns: #GST_RTSP_OK on success.
2179  *
2180  * Since: 0.10.15
2181  */
2182 GstRTSPResult
2183 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2184     GstRTSPEvent * revents, GTimeVal * timeout)
2185 {
2186   GstClockTime to;
2187   gint retval;
2188
2189   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2190   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2191   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2192   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2193   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2194
2195   gst_poll_set_controllable (conn->fdset, TRUE);
2196
2197   /* add fd to writer set when asked to */
2198   gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
2199       events & GST_RTSP_EV_WRITE);
2200
2201   /* add fd to reader set when asked to */
2202   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);
2203
2204   /* configure timeout if any */
2205   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
2206
2207   do {
2208     retval = gst_poll_wait (conn->fdset, to);
2209   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2210
2211   if (G_UNLIKELY (retval == 0))
2212     goto select_timeout;
2213
2214   if (G_UNLIKELY (retval == -1)) {
2215     if (errno == EBUSY)
2216       goto stopped;
2217     else
2218       goto select_error;
2219   }
2220
2221   *revents = 0;
2222   if (events & GST_RTSP_EV_READ) {
2223     if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
2224       *revents |= GST_RTSP_EV_READ;
2225   }
2226   if (events & GST_RTSP_EV_WRITE) {
2227     if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
2228       *revents |= GST_RTSP_EV_WRITE;
2229   }
2230   return GST_RTSP_OK;
2231
2232   /* ERRORS */
2233 select_timeout:
2234   {
2235     return GST_RTSP_ETIMEOUT;
2236   }
2237 select_error:
2238   {
2239     return GST_RTSP_ESYS;
2240   }
2241 stopped:
2242   {
2243     return GST_RTSP_EINTR;
2244   }
2245 }
2246
2247 /**
2248  * gst_rtsp_connection_next_timeout:
2249  * @conn: a #GstRTSPConnection
2250  * @timeout: a timeout
2251  *
2252  * Calculate the next timeout for @conn, storing the result in @timeout.
2253  * 
2254  * Returns: #GST_RTSP_OK.
2255  */
2256 GstRTSPResult
2257 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2258 {
2259   gdouble elapsed;
2260   glong sec;
2261   gulong usec;
2262
2263   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2264   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2265
2266   elapsed = g_timer_elapsed (conn->timer, &usec);
2267   if (elapsed >= conn->timeout) {
2268     sec = 0;
2269     usec = 0;
2270   } else {
2271     sec = conn->timeout - elapsed;
2272   }
2273
2274   timeout->tv_sec = sec;
2275   timeout->tv_usec = usec;
2276
2277   return GST_RTSP_OK;
2278 }
2279
2280 /**
2281  * gst_rtsp_connection_reset_timeout:
2282  * @conn: a #GstRTSPConnection
2283  *
2284  * Reset the timeout of @conn.
2285  * 
2286  * Returns: #GST_RTSP_OK.
2287  */
2288 GstRTSPResult
2289 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2290 {
2291   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2292
2293   g_timer_start (conn->timer);
2294
2295   return GST_RTSP_OK;
2296 }
2297
2298 /**
2299  * gst_rtsp_connection_flush:
2300  * @conn: a #GstRTSPConnection
2301  * @flush: start or stop the flush
2302  *
2303  * Start or stop the flushing action on @conn. When flushing, all current
2304  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2305  * is set to non-flushing mode again.
2306  * 
2307  * Returns: #GST_RTSP_OK.
2308  */
2309 GstRTSPResult
2310 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2311 {
2312   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2313
2314   gst_poll_set_flushing (conn->fdset, flush);
2315
2316   return GST_RTSP_OK;
2317 }
2318
2319 /**
2320  * gst_rtsp_connection_set_proxy:
2321  * @conn: a #GstRTSPConnection
2322  * @host: the proxy host
2323  * @port: the proxy port
2324  *
2325  * Set the proxy host and port.
2326  * 
2327  * Returns: #GST_RTSP_OK.
2328  *
2329  * Since: 0.10.23
2330  */
2331 GstRTSPResult
2332 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
2333     const gchar * host, guint port)
2334 {
2335   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2336
2337   g_free (conn->proxy_host);
2338   conn->proxy_host = g_strdup (host);
2339   conn->proxy_port = port;
2340
2341   return GST_RTSP_OK;
2342 }
2343
2344 /**
2345  * gst_rtsp_connection_set_auth:
2346  * @conn: a #GstRTSPConnection
2347  * @method: authentication method
2348  * @user: the user
2349  * @pass: the password
2350  *
2351  * Configure @conn for authentication mode @method with @user and @pass as the
2352  * user and password respectively.
2353  * 
2354  * Returns: #GST_RTSP_OK.
2355  */
2356 GstRTSPResult
2357 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
2358     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
2359 {
2360   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2361
2362   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
2363           || g_strrstr (user, ":") != NULL))
2364     return GST_RTSP_EINVAL;
2365
2366   /* Make sure the username and passwd are being set for authentication */
2367   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
2368     return GST_RTSP_EINVAL;
2369
2370   /* ":" chars are not allowed in usernames for basic auth */
2371   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
2372     return GST_RTSP_EINVAL;
2373
2374   g_free (conn->username);
2375   g_free (conn->passwd);
2376
2377   conn->auth_method = method;
2378   conn->username = g_strdup (user);
2379   conn->passwd = g_strdup (pass);
2380
2381   return GST_RTSP_OK;
2382 }
2383
2384 /**
2385  * str_case_hash:
2386  * @key: ASCII string to hash
2387  *
2388  * Hashes @key in a case-insensitive manner.
2389  *
2390  * Returns: the hash code.
2391  **/
2392 static guint
2393 str_case_hash (gconstpointer key)
2394 {
2395   const char *p = key;
2396   guint h = g_ascii_toupper (*p);
2397
2398   if (h)
2399     for (p += 1; *p != '\0'; p++)
2400       h = (h << 5) - h + g_ascii_toupper (*p);
2401
2402   return h;
2403 }
2404
2405 /**
2406  * str_case_equal:
2407  * @v1: an ASCII string
2408  * @v2: another ASCII string
2409  *
2410  * Compares @v1 and @v2 in a case-insensitive manner
2411  *
2412  * Returns: %TRUE if they are equal (modulo case)
2413  **/
2414 static gboolean
2415 str_case_equal (gconstpointer v1, gconstpointer v2)
2416 {
2417   const char *string1 = v1;
2418   const char *string2 = v2;
2419
2420   return g_ascii_strcasecmp (string1, string2) == 0;
2421 }
2422
2423 /**
2424  * gst_rtsp_connection_set_auth_param:
2425  * @conn: a #GstRTSPConnection
2426  * @param: authentication directive
2427  * @value: value
2428  *
2429  * Setup @conn with authentication directives. This is not necesary for
2430  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
2431  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
2432  * in the WWW-Authenticate response header and can include realm, domain,
2433  * nonce, opaque, stale, algorithm, qop as per RFC2617.
2434  * 
2435  * Since: 0.10.20
2436  */
2437 void
2438 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
2439     const gchar * param, const gchar * value)
2440 {
2441   g_return_if_fail (conn != NULL);
2442   g_return_if_fail (param != NULL);
2443
2444   if (conn->auth_params == NULL) {
2445     conn->auth_params =
2446         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
2447   }
2448   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
2449 }
2450
2451 /**
2452  * gst_rtsp_connection_clear_auth_params:
2453  * @conn: a #GstRTSPConnection
2454  *
2455  * Clear the list of authentication directives stored in @conn.
2456  *
2457  * Since: 0.10.20
2458  */
2459 void
2460 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
2461 {
2462   g_return_if_fail (conn != NULL);
2463
2464   if (conn->auth_params != NULL) {
2465     g_hash_table_destroy (conn->auth_params);
2466     conn->auth_params = NULL;
2467   }
2468 }
2469
2470 static GstRTSPResult
2471 set_qos_dscp (gint fd, guint qos_dscp)
2472 {
2473   union gst_sockaddr sa;
2474   socklen_t slen = sizeof (sa);
2475   gint af;
2476   gint tos;
2477
2478   if (fd == -1)
2479     return GST_RTSP_OK;
2480
2481   if (getsockname (fd, &sa.sa, &slen) < 0)
2482     goto no_getsockname;
2483
2484   af = sa.sa.sa_family;
2485
2486   /* if this is an IPv4-mapped address then do IPv4 QoS */
2487   if (af == AF_INET6) {
2488     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
2489       af = AF_INET;
2490   }
2491
2492   /* extract and shift 6 bits of the DSCP */
2493   tos = (qos_dscp & 0x3f) << 2;
2494
2495   switch (af) {
2496     case AF_INET:
2497       if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
2498         goto no_setsockopt;
2499       break;
2500     case AF_INET6:
2501 #ifdef IPV6_TCLASS
2502       if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
2503         goto no_setsockopt;
2504       break;
2505 #endif
2506     default:
2507       goto wrong_family;
2508   }
2509
2510   return GST_RTSP_OK;
2511
2512   /* ERRORS */
2513 no_getsockname:
2514 no_setsockopt:
2515   {
2516     return GST_RTSP_ESYS;
2517   }
2518
2519 wrong_family:
2520   {
2521     return GST_RTSP_ERROR;
2522   }
2523 }
2524
2525 /**
2526  * gst_rtsp_connection_set_qos_dscp:
2527  * @conn: a #GstRTSPConnection
2528  * @qos_dscp: DSCP value
2529  *
2530  * Configure @conn to use the specified DSCP value.
2531  *
2532  * Returns: #GST_RTSP_OK on success.
2533  *
2534  * Since: 0.10.20
2535  */
2536 GstRTSPResult
2537 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
2538 {
2539   GstRTSPResult res;
2540
2541   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2542   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2543   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2544
2545   res = set_qos_dscp (conn->fd0.fd, qos_dscp);
2546   if (res == GST_RTSP_OK)
2547     res = set_qos_dscp (conn->fd1.fd, qos_dscp);
2548
2549   return res;
2550 }
2551
2552
2553 /**
2554  * gst_rtsp_connection_get_url:
2555  * @conn: a #GstRTSPConnection
2556  *
2557  * Retrieve the URL of the other end of @conn.
2558  *
2559  * Returns: The URL. This value remains valid until the
2560  * connection is freed.
2561  *
2562  * Since: 0.10.23
2563  */
2564 GstRTSPUrl *
2565 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
2566 {
2567   g_return_val_if_fail (conn != NULL, NULL);
2568
2569   return conn->url;
2570 }
2571
2572 /**
2573  * gst_rtsp_connection_get_ip:
2574  * @conn: a #GstRTSPConnection
2575  *
2576  * Retrieve the IP address of the other end of @conn.
2577  *
2578  * Returns: The IP address as a string. this value remains valid until the
2579  * connection is closed.
2580  *
2581  * Since: 0.10.20
2582  */
2583 const gchar *
2584 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
2585 {
2586   g_return_val_if_fail (conn != NULL, NULL);
2587
2588   return conn->ip;
2589 }
2590
2591 /**
2592  * gst_rtsp_connection_set_ip:
2593  * @conn: a #GstRTSPConnection
2594  * @ip: an ip address
2595  *
2596  * Set the IP address of the server.
2597  *
2598  * Since: 0.10.23
2599  */
2600 void
2601 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
2602 {
2603   g_return_if_fail (conn != NULL);
2604
2605   g_free (conn->ip);
2606   conn->ip = g_strdup (ip);
2607 }
2608
2609 /**
2610  * gst_rtsp_connection_get_readfd:
2611  * @conn: a #GstRTSPConnection
2612  *
2613  * Get the file descriptor for reading.
2614  *
2615  * Returns: the file descriptor used for reading or -1 on error. The file
2616  * descriptor remains valid until the connection is closed.
2617  *
2618  * Since: 0.10.23
2619  */
2620 gint
2621 gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn)
2622 {
2623   g_return_val_if_fail (conn != NULL, -1);
2624   g_return_val_if_fail (conn->readfd != NULL, -1);
2625
2626   return conn->readfd->fd;
2627 }
2628
2629 /**
2630  * gst_rtsp_connection_get_writefd:
2631  * @conn: a #GstRTSPConnection
2632  *
2633  * Get the file descriptor for writing.
2634  *
2635  * Returns: the file descriptor used for writing or -1 on error. The file
2636  * descriptor remains valid until the connection is closed.
2637  *
2638  * Since: 0.10.23
2639  */
2640 gint
2641 gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn)
2642 {
2643   g_return_val_if_fail (conn != NULL, -1);
2644   g_return_val_if_fail (conn->writefd != NULL, -1);
2645
2646   return conn->writefd->fd;
2647 }
2648
2649
2650 /**
2651  * gst_rtsp_connection_set_tunneled:
2652  * @conn: a #GstRTSPConnection
2653  * @tunneled: the new state
2654  *
2655  * Set the HTTP tunneling state of the connection. This must be configured before
2656  * the @conn is connected.
2657  *
2658  * Since: 0.10.23
2659  */
2660 void
2661 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
2662 {
2663   g_return_if_fail (conn != NULL);
2664   g_return_if_fail (conn->readfd == NULL);
2665   g_return_if_fail (conn->writefd == NULL);
2666
2667   conn->tunneled = tunneled;
2668 }
2669
2670 /**
2671  * gst_rtsp_connection_is_tunneled:
2672  * @conn: a #GstRTSPConnection
2673  *
2674  * Get the tunneling state of the connection. 
2675  *
2676  * Returns: if @conn is using HTTP tunneling.
2677  *
2678  * Since: 0.10.23
2679  */
2680 gboolean
2681 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
2682 {
2683   g_return_val_if_fail (conn != NULL, FALSE);
2684
2685   return conn->tunneled;
2686 }
2687
2688 /**
2689  * gst_rtsp_connection_get_tunnelid:
2690  * @conn: a #GstRTSPConnection
2691  *
2692  * Get the tunnel session id the connection. 
2693  *
2694  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
2695  *
2696  * Since: 0.10.23
2697  */
2698 const gchar *
2699 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
2700 {
2701   g_return_val_if_fail (conn != NULL, NULL);
2702
2703   if (!conn->tunneled)
2704     return NULL;
2705
2706   return conn->tunnelid;
2707 }
2708
2709 /**
2710  * gst_rtsp_connection_do_tunnel:
2711  * @conn: a #GstRTSPConnection
2712  * @conn2: a #GstRTSPConnection
2713  *
2714  * If @conn received the first tunnel connection and @conn2 received
2715  * the second tunnel connection, link the two connections together so that
2716  * @conn manages the tunneled connection.
2717  *
2718  * After this call, @conn2 cannot be used anymore and must be freed with
2719  * gst_rtsp_connection_free().
2720  *
2721  * Returns: return GST_RTSP_OK on success.
2722  *
2723  * Since: 0.10.23
2724  */
2725 GstRTSPResult
2726 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
2727     GstRTSPConnection * conn2)
2728 {
2729   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2730   g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
2731   g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
2732   g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
2733   g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
2734       GST_RTSP_EINVAL);
2735
2736   /* both connections have fd0 as the read/write socket. start by taking the
2737    * socket from conn2 and set it as the socket in conn */
2738   conn->fd1 = conn2->fd0;
2739
2740   /* clean up some of the state of conn2 */
2741   gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
2742   conn2->fd0.fd = -1;
2743   conn2->readfd = conn2->writefd = NULL;
2744
2745   /* We make fd0 the write socket and fd1 the read socket. */
2746   conn->writefd = &conn->fd0;
2747   conn->readfd = &conn->fd1;
2748
2749   conn->tstate = TUNNEL_STATE_COMPLETE;
2750
2751   /* we need base64 decoding for the readfd */
2752   conn->ctx.state = 0;
2753   conn->ctx.save = 0;
2754   conn->ctx.cout = 0;
2755   conn->ctx.coutl = 0;
2756   conn->ctxp = &conn->ctx;
2757
2758   return GST_RTSP_OK;
2759 }
2760
2761 #define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
2762 #define WRITE_COND  (G_IO_OUT | G_IO_ERR)
2763
2764 typedef struct
2765 {
2766   guint8 *data;
2767   guint size;
2768   guint id;
2769 } GstRTSPRec;
2770
2771 /* async functions */
2772 struct _GstRTSPWatch
2773 {
2774   GSource source;
2775
2776   GstRTSPConnection *conn;
2777
2778   GstRTSPBuilder builder;
2779   GstRTSPMessage message;
2780
2781   GPollFD readfd;
2782   GPollFD writefd;
2783   gboolean write_added;
2784
2785   /* queued message for transmission */
2786   guint id;
2787   GAsyncQueue *messages;
2788   guint8 *write_data;
2789   guint write_off;
2790   guint write_size;
2791   guint write_id;
2792
2793   GstRTSPWatchFuncs funcs;
2794
2795   gpointer user_data;
2796   GDestroyNotify notify;
2797 };
2798
2799 static gboolean
2800 gst_rtsp_source_prepare (GSource * source, gint * timeout)
2801 {
2802   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2803
2804   if (watch->conn->initial_buffer != NULL)
2805     return TRUE;
2806
2807   *timeout = (watch->conn->timeout * 1000);
2808
2809   return FALSE;
2810 }
2811
2812 static gboolean
2813 gst_rtsp_source_check (GSource * source)
2814 {
2815   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2816
2817   if (watch->readfd.revents & READ_COND)
2818     return TRUE;
2819
2820   if (watch->writefd.revents & WRITE_COND)
2821     return TRUE;
2822
2823   return FALSE;
2824 }
2825
2826 static gboolean
2827 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
2828     gpointer user_data G_GNUC_UNUSED)
2829 {
2830   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2831   GstRTSPResult res;
2832
2833   /* first read as much as we can */
2834   if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
2835     do {
2836       res = build_next (&watch->builder, &watch->message, watch->conn);
2837       if (res == GST_RTSP_EINTR)
2838         break;
2839       if (G_UNLIKELY (res == GST_RTSP_EEOF))
2840         goto eof;
2841       if (res == GST_RTSP_ETGET) {
2842         GString *str;
2843         GstRTSPStatusCode code;
2844         guint size;
2845
2846         if (watch->funcs.tunnel_start)
2847           code = watch->funcs.tunnel_start (watch, watch->user_data);
2848         else
2849           code = GST_RTSP_STS_OK;
2850
2851         /* queue the response string */
2852         str = gen_tunnel_reply (watch->conn, code);
2853         size = str->len;
2854         gst_rtsp_watch_queue_data (watch, (guint8 *) g_string_free (str, FALSE),
2855             size);
2856       } else if (res == GST_RTSP_ETPOST) {
2857         /* in the callback the connection should be tunneled with the
2858          * GET connection */
2859         if (watch->funcs.tunnel_complete)
2860           watch->funcs.tunnel_complete (watch, watch->user_data);
2861       } else if (G_UNLIKELY (res != GST_RTSP_OK))
2862         goto error;
2863
2864       if (G_LIKELY (res == GST_RTSP_OK)) {
2865         if (watch->funcs.message_received)
2866           watch->funcs.message_received (watch, &watch->message,
2867               watch->user_data);
2868
2869         gst_rtsp_message_unset (&watch->message);
2870       }
2871       build_reset (&watch->builder);
2872     } while (FALSE);
2873   }
2874
2875   if (watch->writefd.revents & WRITE_COND) {
2876     do {
2877       if (watch->write_data == NULL) {
2878         GstRTSPRec *rec;
2879
2880         /* get a new message from the queue */
2881         rec = g_async_queue_try_pop (watch->messages);
2882         if (rec == NULL)
2883           goto done;
2884
2885         watch->write_off = 0;
2886         watch->write_data = rec->data;
2887         watch->write_size = rec->size;
2888         watch->write_id = rec->id;
2889
2890         g_slice_free (GstRTSPRec, rec);
2891       }
2892
2893       res = write_bytes (watch->writefd.fd, watch->write_data,
2894           &watch->write_off, watch->write_size);
2895       if (res == GST_RTSP_EINTR)
2896         break;
2897       if (G_UNLIKELY (res != GST_RTSP_OK))
2898         goto error;
2899
2900       if (watch->funcs.message_sent)
2901         watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
2902
2903     done:
2904       if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
2905         g_source_remove_poll ((GSource *) watch, &watch->writefd);
2906         watch->write_added = FALSE;
2907         watch->writefd.revents = 0;
2908       }
2909       g_free (watch->write_data);
2910       watch->write_data = NULL;
2911     } while (FALSE);
2912   }
2913
2914   return TRUE;
2915
2916   /* ERRORS */
2917 eof:
2918   {
2919     if (watch->funcs.closed)
2920       watch->funcs.closed (watch, watch->user_data);
2921     return FALSE;
2922   }
2923 error:
2924   {
2925     if (watch->funcs.error)
2926       watch->funcs.error (watch, res, watch->user_data);
2927     return FALSE;
2928   }
2929 }
2930
2931 static void
2932 gst_rtsp_rec_free (gpointer data)
2933 {
2934   GstRTSPRec *rec = data;
2935
2936   g_free (rec->data);
2937   g_slice_free (GstRTSPRec, rec);
2938 }
2939
2940 static void
2941 gst_rtsp_source_finalize (GSource * source)
2942 {
2943   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2944
2945   build_reset (&watch->builder);
2946   gst_rtsp_message_unset (&watch->message);
2947
2948   g_async_queue_unref (watch->messages);
2949   watch->messages = NULL;
2950
2951   g_free (watch->write_data);
2952
2953   if (watch->notify)
2954     watch->notify (watch->user_data);
2955 }
2956
2957 static GSourceFuncs gst_rtsp_source_funcs = {
2958   gst_rtsp_source_prepare,
2959   gst_rtsp_source_check,
2960   gst_rtsp_source_dispatch,
2961   gst_rtsp_source_finalize,
2962   NULL,
2963   NULL
2964 };
2965
2966 /**
2967  * gst_rtsp_watch_new:
2968  * @conn: a #GstRTSPConnection
2969  * @funcs: watch functions
2970  * @user_data: user data to pass to @funcs
2971  * @notify: notify when @user_data is not referenced anymore
2972  *
2973  * Create a watch object for @conn. The functions provided in @funcs will be
2974  * called with @user_data when activity happened on the watch.
2975  *
2976  * The new watch is usually created so that it can be attached to a
2977  * maincontext with gst_rtsp_watch_attach(). 
2978  *
2979  * @conn must exist for the entire lifetime of the watch.
2980  *
2981  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
2982  * communication. Free with gst_rtsp_watch_unref () after usage.
2983  *
2984  * Since: 0.10.23
2985  */
2986 GstRTSPWatch *
2987 gst_rtsp_watch_new (GstRTSPConnection * conn,
2988     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
2989 {
2990   GstRTSPWatch *result;
2991
2992   g_return_val_if_fail (conn != NULL, NULL);
2993   g_return_val_if_fail (funcs != NULL, NULL);
2994   g_return_val_if_fail (conn->readfd != NULL, NULL);
2995   g_return_val_if_fail (conn->writefd != NULL, NULL);
2996
2997   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
2998       sizeof (GstRTSPWatch));
2999
3000   result->conn = conn;
3001   result->builder.state = STATE_START;
3002
3003   result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
3004
3005   result->readfd.fd = -1;
3006   result->writefd.fd = -1;
3007
3008   gst_rtsp_watch_reset (result);
3009
3010   result->funcs = *funcs;
3011   result->user_data = user_data;
3012   result->notify = notify;
3013
3014   /* only add the read fd, the write fd is only added when we have data
3015    * to send. */
3016   g_source_add_poll ((GSource *) result, &result->readfd);
3017
3018   return result;
3019 }
3020
3021 /**
3022  * gst_rtsp_watch_reset:
3023  * @watch: a #GstRTSPWatch
3024  *
3025  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
3026  * when the file descriptors of the connection might have changed.
3027  *
3028  * Since: 0.10.23
3029  */
3030 void
3031 gst_rtsp_watch_reset (GstRTSPWatch * watch)
3032 {
3033   if (watch->readfd.fd != -1)
3034     g_source_remove_poll ((GSource *) watch, &watch->readfd);
3035   if (watch->writefd.fd != -1)
3036     g_source_remove_poll ((GSource *) watch, &watch->writefd);
3037
3038   watch->readfd.fd = watch->conn->readfd->fd;
3039   watch->readfd.events = READ_COND;
3040   watch->readfd.revents = 0;
3041
3042   watch->writefd.fd = watch->conn->writefd->fd;
3043   watch->writefd.events = WRITE_COND;
3044   watch->writefd.revents = 0;
3045   watch->write_added = FALSE;
3046
3047   g_source_add_poll ((GSource *) watch, &watch->readfd);
3048 }
3049
3050 /**
3051  * gst_rtsp_watch_attach:
3052  * @watch: a #GstRTSPWatch
3053  * @context: a GMainContext (if NULL, the default context will be used)
3054  *
3055  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
3056  *
3057  * Returns: the ID (greater than 0) for the watch within the GMainContext. 
3058  *
3059  * Since: 0.10.23
3060  */
3061 guint
3062 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
3063 {
3064   g_return_val_if_fail (watch != NULL, 0);
3065
3066   return g_source_attach ((GSource *) watch, context);
3067 }
3068
3069 /**
3070  * gst_rtsp_watch_unref:
3071  * @watch: a #GstRTSPWatch
3072  *
3073  * Decreases the reference count of @watch by one. If the resulting reference
3074  * count is zero the watch and associated memory will be destroyed.
3075  *
3076  * Since: 0.10.23
3077  */
3078 void
3079 gst_rtsp_watch_unref (GstRTSPWatch * watch)
3080 {
3081   g_return_if_fail (watch != NULL);
3082
3083   g_source_unref ((GSource *) watch);
3084 }
3085
3086 /**
3087  * gst_rtsp_watch_queue_data:
3088  * @watch: a #GstRTSPWatch
3089  * @data: the data to queue
3090  * @size: the size of @data
3091  *
3092  * Queue @data for transmission in @watch. It will be transmitted when the
3093  * connection of the @watch becomes writable.
3094  *
3095  * This function will take ownership of @data and g_free() it after use.
3096  *
3097  * The return value of this function will be used as the id argument in the
3098  * message_sent callback.
3099  *
3100  * Returns: an id.
3101  *
3102  * Since: 0.10.24
3103  */
3104 guint
3105 gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
3106     guint size)
3107 {
3108   GstRTSPRec *rec;
3109
3110   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3111   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
3112   g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
3113
3114   /* make a record with the data and id */
3115   rec = g_slice_new (GstRTSPRec);
3116   rec->data = (guint8 *) data;
3117   rec->size = size;
3118   do {
3119     /* make sure rec->id is never 0 */
3120     rec->id = ++watch->id;
3121   } while (G_UNLIKELY (rec->id == 0));
3122
3123   /* add the record to a queue. FIXME we would like to have an upper limit here */
3124   g_async_queue_push (watch->messages, rec);
3125
3126   /* FIXME: does the following need to be made thread-safe? (this might be
3127    * called from a streaming thread, like appsink's render function) */
3128   /* make sure the main context will now also check for writability on the
3129    * socket */
3130   if (!watch->write_added) {
3131     g_source_add_poll ((GSource *) watch, &watch->writefd);
3132     watch->write_added = TRUE;
3133   }
3134
3135   return rec->id;
3136 }
3137
3138 /**
3139  * gst_rtsp_watch_queue_message:
3140  * @watch: a #GstRTSPWatch
3141  * @message: a #GstRTSPMessage
3142  *
3143  * Queue a @message for transmission in @watch. The contents of this
3144  * message will be serialized and transmitted when the connection of the
3145  * @watch becomes writable.
3146  *
3147  * The return value of this function will be used as the id argument in the
3148  * message_sent callback.
3149  *
3150  * Returns: an id.
3151  *
3152  * Since: 0.10.23
3153  */
3154 guint
3155 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
3156 {
3157   GString *str;
3158   guint size;
3159
3160   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3161   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
3162
3163   /* make a record with the message as a string and id */
3164   str = message_to_string (watch->conn, message);
3165   size = str->len;
3166   return gst_rtsp_watch_queue_data (watch,
3167       (guint8 *) g_string_free (str, FALSE), size);
3168 }