rtsp: Added gst_rtsp_connection_create_from_fd().
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @short_description: manage RTSP connections
46  * @see_also: gstrtspurl
47  *  
48  * <refsect2>
49  * <para>
50  * This object manages the RTSP connection to the server. It provides function
51  * to receive and send bytes and messages.
52  * </para>
53  * </refsect2>
54  *  
55  * Last reviewed on 2007-07-24 (0.10.14)
56  */
57
58 #ifdef HAVE_CONFIG_H
59 #  include <config.h>
60 #endif
61
62 #include <stdio.h>
63 #include <errno.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <time.h>
67
68 #ifdef HAVE_UNISTD_H
69 #include <unistd.h>
70 #endif
71
72 /* we include this here to get the G_OS_* defines */
73 #include <glib.h>
74 #include <gst/gst.h>
75
76 #ifdef G_OS_WIN32
77 /* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
78  * minwg32 headers check WINVER before allowing the use of these */
79 #ifndef WINVER
80 #define WINVER 0x0501
81 #endif
82 #include <winsock2.h>
83 #include <ws2tcpip.h>
84 #define EINPROGRESS WSAEINPROGRESS
85 #else
86 #include <sys/ioctl.h>
87 #include <netdb.h>
88 #include <sys/socket.h>
89 #include <fcntl.h>
90 #include <netinet/in.h>
91 #endif
92
93 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
94 #include <sys/filio.h>
95 #endif
96
97 #include "gstrtspconnection.h"
98 #include "gstrtspbase64.h"
99
100 union gst_sockaddr
101 {
102   struct sockaddr sa;
103   struct sockaddr_in sa_in;
104   struct sockaddr_in6 sa_in6;
105   struct sockaddr_storage sa_stor;
106 };
107
108 typedef struct
109 {
110   gint state;
111   guint save;
112   guchar out[3];                /* the size must be evenly divisible by 3 */
113   guint cout;
114   guint coutl;
115 } DecodeCtx;
116
117 static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer,
118     guint * idx, guint size);
119 static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
120     guint keysize, gchar ** value);
121 static void parse_string (gchar * dest, gint size, gchar ** src);
122
123 #ifdef G_OS_WIN32
124 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
125 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
126 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
127 #define CLOSE_SOCKET(sock) closesocket (sock)
128 #define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
129 #define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
130 /* According to Microsoft's connect() documentation this one returns
131  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
132 #define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
133 #else
134 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
135 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
136 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
137 #define CLOSE_SOCKET(sock) close (sock)
138 #define ERRNO_IS_EAGAIN (errno == EAGAIN)
139 #define ERRNO_IS_EINTR (errno == EINTR)
140 #define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
141 #endif
142
143 #define ADD_POLLFD(fdset, pfd, fd)        \
144 G_STMT_START {                            \
145   (pfd)->fd = fd;                         \
146   gst_poll_add_fd (fdset, pfd);           \
147 } G_STMT_END
148
149 #define REMOVE_POLLFD(fdset, pfd)          \
150 G_STMT_START {                             \
151   if ((pfd)->fd != -1) {                   \
152     GST_DEBUG ("remove fd %d", (pfd)->fd); \
153     gst_poll_remove_fd (fdset, pfd);       \
154     CLOSE_SOCKET ((pfd)->fd);              \
155     (pfd)->fd = -1;                        \
156   }                                        \
157 } G_STMT_END
158
159 typedef enum
160 {
161   TUNNEL_STATE_NONE,
162   TUNNEL_STATE_GET,
163   TUNNEL_STATE_POST,
164   TUNNEL_STATE_COMPLETE
165 } GstRTSPTunnelState;
166
167 #define TUNNELID_LEN   24
168
169 struct _GstRTSPConnection
170 {
171   /*< private > */
172   /* URL for the connection */
173   GstRTSPUrl *url;
174
175   /* connection state */
176   GstPollFD fd0;
177   GstPollFD fd1;
178
179   GstPollFD *readfd;
180   GstPollFD *writefd;
181
182   gchar tunnelid[TUNNELID_LEN];
183   gboolean tunneled;
184   GstRTSPTunnelState tstate;
185
186   GstPoll *fdset;
187   gchar *ip;
188
189   gchar *initial_buffer;
190   gsize initial_buffer_offset;
191
192   /* Session state */
193   gint cseq;                    /* sequence number */
194   gchar session_id[512];        /* session id */
195   gint timeout;                 /* session timeout in seconds */
196   GTimer *timer;                /* timeout timer */
197
198   /* Authentication */
199   GstRTSPAuthMethod auth_method;
200   gchar *username;
201   gchar *passwd;
202   GHashTable *auth_params;
203
204   DecodeCtx ctx;
205   DecodeCtx *ctxp;
206
207   gchar *proxy_host;
208   guint proxy_port;
209 };
210
211 enum
212 {
213   STATE_START = 0,
214   STATE_DATA_HEADER,
215   STATE_DATA_BODY,
216   STATE_READ_LINES,
217   STATE_END,
218   STATE_LAST
219 };
220
221 /* a structure for constructing RTSPMessages */
222 typedef struct
223 {
224   gint state;
225   guint8 buffer[4096];
226   guint offset;
227
228   guint line;
229   guint8 *body_data;
230   glong body_len;
231 } GstRTSPBuilder;
232
233 static void
234 build_reset (GstRTSPBuilder * builder)
235 {
236   g_free (builder->body_data);
237   memset (builder, 0, sizeof (GstRTSPBuilder));
238 }
239
240 /**
241  * gst_rtsp_connection_create:
242  * @url: a #GstRTSPUrl 
243  * @conn: storage for a #GstRTSPConnection
244  *
245  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
246  * The connection will not yet attempt to connect to @url, use
247  * gst_rtsp_connection_connect().
248  *
249  * A copy of @url will be made.
250  *
251  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
252  */
253 GstRTSPResult
254 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
255 {
256   GstRTSPConnection *newconn;
257 #ifdef G_OS_WIN32
258   WSADATA w;
259   int error;
260 #endif
261
262   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
263
264 #ifdef G_OS_WIN32
265   error = WSAStartup (0x0202, &w);
266
267   if (error)
268     goto startup_error;
269
270   if (w.wVersion != 0x0202)
271     goto version_error;
272 #endif
273
274   newconn = g_new0 (GstRTSPConnection, 1);
275
276   if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
277     goto no_fdset;
278
279   newconn->url = gst_rtsp_url_copy (url);
280   newconn->fd0.fd = -1;
281   newconn->fd1.fd = -1;
282   newconn->timer = g_timer_new ();
283   newconn->timeout = 60;
284   newconn->cseq = 1;
285
286   newconn->auth_method = GST_RTSP_AUTH_NONE;
287   newconn->username = NULL;
288   newconn->passwd = NULL;
289   newconn->auth_params = NULL;
290
291   *conn = newconn;
292
293   return GST_RTSP_OK;
294
295   /* ERRORS */
296 #ifdef G_OS_WIN32
297 startup_error:
298   {
299     g_warning ("Error %d on WSAStartup", error);
300     return GST_RTSP_EWSASTART;
301   }
302 version_error:
303   {
304     g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
305         w.wVersion);
306     WSACleanup ();
307     return GST_RTSP_EWSAVERSION;
308   }
309 #endif
310 no_fdset:
311   {
312     g_free (newconn);
313 #ifdef G_OS_WIN32
314     WSACleanup ();
315 #endif
316     return GST_RTSP_ESYS;
317   }
318 }
319
320 /**
321  * gst_rtsp_connection_create_from_fd:
322  * @fd: a file descriptor
323  * @ip: the IP address of the other end
324  * @port: the port used by the other end
325  * @initial_buffer: data already read from @fd
326  * @conn: storage for a #GstRTSPConnection
327  *
328  * Create a new #GstRTSPConnection for handling communication on the existing
329  * file descriptor @fd. The @initial_buffer contains any data already read from
330  * @fd which should be used before starting to read new data.
331  *
332  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
333  *
334  * Since: 0.10.25
335  */
336 GstRTSPResult
337 gst_rtsp_connection_create_from_fd (gint fd, const gchar * ip, guint16 port,
338     const gchar * initial_buffer, GstRTSPConnection ** conn)
339 {
340   GstRTSPConnection *newconn = NULL;
341   GstRTSPUrl *url;
342 #ifdef G_OS_WIN32
343   gulong flags = 1;
344 #endif
345   GstRTSPResult res;
346
347   g_return_val_if_fail (fd >= 0, GST_RTSP_EINVAL);
348   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
349   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
350
351   /* set to non-blocking mode so that we can cancel the communication */
352 #ifndef G_OS_WIN32
353   fcntl (fd, F_SETFL, O_NONBLOCK);
354 #else
355   ioctlsocket (fd, FIONBIO, &flags);
356 #endif /* G_OS_WIN32 */
357
358   /* create a url for the client address */
359   url = g_new0 (GstRTSPUrl, 1);
360   url->host = g_strdup (ip);
361   url->port = port;
362
363   /* now create the connection object */
364   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
365   gst_rtsp_url_free (url);
366
367   ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);
368
369   /* both read and write initially */
370   newconn->readfd = &newconn->fd0;
371   newconn->writefd = &newconn->fd0;
372
373   newconn->ip = g_strdup (ip);
374
375   newconn->initial_buffer = g_strdup (initial_buffer);
376
377   *conn = newconn;
378
379   return GST_RTSP_OK;
380
381   /* ERRORS */
382 newconn_failed:
383   {
384     gst_rtsp_url_free (url);
385     return res;
386   }
387 }
388
389 /**
390  * gst_rtsp_connection_accept:
391  * @sock: a socket
392  * @conn: storage for a #GstRTSPConnection
393  *
394  * Accept a new connection on @sock and create a new #GstRTSPConnection for
395  * handling communication on new socket.
396  *
397  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
398  *
399  * Since: 0.10.23
400  */
401 GstRTSPResult
402 gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
403 {
404   int fd;
405   union gst_sockaddr sa;
406   socklen_t slen = sizeof (sa);
407   gchar ip[INET6_ADDRSTRLEN];
408   guint16 port;
409
410   g_return_val_if_fail (sock >= 0, GST_RTSP_EINVAL);
411   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
412
413   memset (&sa, 0, slen);
414
415 #ifndef G_OS_WIN32
416   fd = accept (sock, &sa.sa, &slen);
417 #else
418   fd = accept (sock, &sa.sa, (gint *) & slen);
419 #endif /* G_OS_WIN32 */
420   if (fd == -1)
421     goto accept_failed;
422
423   if (getnameinfo (&sa.sa, slen, ip, sizeof (ip), NULL, 0, NI_NUMERICHOST) != 0)
424     goto getnameinfo_failed;
425
426   if (sa.sa.sa_family == AF_INET)
427     port = sa.sa_in.sin_port;
428   else if (sa.sa.sa_family == AF_INET6)
429     port = sa.sa_in6.sin6_port;
430   else
431     goto wrong_family;
432
433   return gst_rtsp_connection_create_from_fd (fd, ip, port, NULL, conn);
434
435   /* ERRORS */
436 accept_failed:
437   {
438     return GST_RTSP_ESYS;
439   }
440 getnameinfo_failed:
441 wrong_family:
442   {
443     close (fd);
444     return GST_RTSP_ERROR;
445   }
446 }
447
448 static gchar *
449 do_resolve (const gchar * host)
450 {
451   static gchar ip[INET6_ADDRSTRLEN];
452   struct addrinfo *aires;
453   struct addrinfo *ai;
454   gint aierr;
455
456   aierr = getaddrinfo (host, NULL, NULL, &aires);
457   if (aierr != 0)
458     goto no_addrinfo;
459
460   for (ai = aires; ai; ai = ai->ai_next) {
461     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
462       break;
463     }
464   }
465   if (ai == NULL)
466     goto no_family;
467
468   aierr = getnameinfo (ai->ai_addr, ai->ai_addrlen, ip, sizeof (ip), NULL, 0,
469       NI_NUMERICHOST | NI_NUMERICSERV);
470   if (aierr != 0)
471     goto no_address;
472
473   freeaddrinfo (aires);
474
475   return g_strdup (ip);
476
477   /* ERRORS */
478 no_addrinfo:
479   {
480     GST_ERROR ("no addrinfo found for %s: %s", host, gai_strerror (aierr));
481     return NULL;
482   }
483 no_family:
484   {
485     GST_ERROR ("no family found for %s", host);
486     freeaddrinfo (aires);
487     return NULL;
488   }
489 no_address:
490   {
491     GST_ERROR ("no address found for %s: %s", host, gai_strerror (aierr));
492     freeaddrinfo (aires);
493     return NULL;
494   }
495 }
496
497 static GstRTSPResult
498 do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
499     GstPoll * fdset, GTimeVal * timeout)
500 {
501   gint fd;
502   struct addrinfo hints;
503   struct addrinfo *aires;
504   struct addrinfo *ai;
505   gint aierr;
506   gchar service[NI_MAXSERV];
507   gint ret;
508 #ifdef G_OS_WIN32
509   unsigned long flags = 1;
510 #endif /* G_OS_WIN32 */
511   GstClockTime to;
512   gint retval;
513
514   memset (&hints, 0, sizeof hints);
515   hints.ai_flags = AI_NUMERICHOST;
516   hints.ai_family = AF_UNSPEC;
517   hints.ai_socktype = SOCK_STREAM;
518   g_snprintf (service, sizeof (service) - 1, "%hu", port);
519   service[sizeof (service) - 1] = '\0';
520
521   aierr = getaddrinfo (ip, service, &hints, &aires);
522   if (aierr != 0)
523     goto no_addrinfo;
524
525   for (ai = aires; ai; ai = ai->ai_next) {
526     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
527       break;
528     }
529   }
530   if (ai == NULL)
531     goto no_family;
532
533   fd = socket (ai->ai_family, SOCK_STREAM, 0);
534   if (fd == -1)
535     goto no_socket;
536
537   /* set to non-blocking mode so that we can cancel the connect */
538 #ifndef G_OS_WIN32
539   fcntl (fd, F_SETFL, O_NONBLOCK);
540 #else
541   ioctlsocket (fd, FIONBIO, &flags);
542 #endif /* G_OS_WIN32 */
543
544   /* add the socket to our fdset */
545   ADD_POLLFD (fdset, fdout, fd);
546
547   /* we are going to connect ASYNC now */
548   ret = connect (fd, ai->ai_addr, ai->ai_addrlen);
549   if (ret == 0)
550     goto done;
551   if (!ERRNO_IS_EINPROGRESS)
552     goto sys_error;
553
554   /* wait for connect to complete up to the specified timeout or until we got
555    * interrupted. */
556   gst_poll_fd_ctl_write (fdset, fdout, TRUE);
557
558   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
559
560   do {
561     retval = gst_poll_wait (fdset, to);
562   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
563
564   if (retval == 0)
565     goto timeout;
566   else if (retval == -1)
567     goto sys_error;
568
569   /* we can still have an error connecting on windows */
570   if (gst_poll_fd_has_error (fdset, fdout)) {
571     socklen_t len = sizeof (errno);
572 #ifndef G_OS_WIN32
573     getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
574 #else
575     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
576 #endif
577     goto sys_error;
578   }
579
580   gst_poll_fd_ignored (fdset, fdout);
581
582 done:
583   freeaddrinfo (aires);
584
585   return GST_RTSP_OK;
586
587   /* ERRORS */
588 no_addrinfo:
589   {
590     GST_ERROR ("no addrinfo found for %s: %s", ip, gai_strerror (aierr));
591     return GST_RTSP_ERROR;
592   }
593 no_family:
594   {
595     GST_ERROR ("no family found for %s", ip);
596     freeaddrinfo (aires);
597     return GST_RTSP_ERROR;
598   }
599 no_socket:
600   {
601     GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno));
602     freeaddrinfo (aires);
603     return GST_RTSP_ESYS;
604   }
605 sys_error:
606   {
607     GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
608     REMOVE_POLLFD (fdset, fdout);
609     freeaddrinfo (aires);
610     return GST_RTSP_ESYS;
611   }
612 timeout:
613   {
614     GST_ERROR ("timeout");
615     REMOVE_POLLFD (fdset, fdout);
616     freeaddrinfo (aires);
617     return GST_RTSP_ETIMEOUT;
618   }
619 }
620
621 static GstRTSPResult
622 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
623 {
624   gint i;
625   GstRTSPResult res;
626   gchar *str;
627   guint idx, line;
628   gint retval;
629   GstClockTime to;
630   gchar *ip, *url_port_str;
631   guint16 port, url_port;
632   gchar codestr[4], *resultstr;
633   gint code;
634   GstRTSPUrl *url;
635   gchar *hostparam;
636
637   /* create a random sessionid */
638   for (i = 0; i < TUNNELID_LEN; i++)
639     conn->tunnelid[i] = g_random_int_range ('a', 'z');
640   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
641
642   url = conn->url;
643   /* get the port from the url */
644   gst_rtsp_url_get_port (url, &url_port);
645
646   if (conn->proxy_host) {
647     hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
648     url_port_str = g_strdup_printf (":%d", url_port);
649     ip = conn->proxy_host;
650     port = conn->proxy_port;
651   } else {
652     hostparam = NULL;
653     url_port_str = NULL;
654     ip = conn->ip;
655     port = url_port;
656   }
657
658   /* */
659   str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n"
660       "%s"
661       "x-sessioncookie: %s\r\n"
662       "Accept: application/x-rtsp-tunnelled\r\n"
663       "Pragma: no-cache\r\n"
664       "Cache-Control: no-cache\r\n" "\r\n",
665       conn->proxy_host ? "http://" : "",
666       conn->proxy_host ? url->host : "",
667       conn->proxy_host ? url_port_str : "",
668       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
669       hostparam ? hostparam : "", conn->tunnelid);
670
671   /* we start by writing to this fd */
672   conn->writefd = &conn->fd0;
673
674   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
675   g_free (str);
676   if (res != GST_RTSP_OK)
677     goto write_failed;
678
679   gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE);
680   gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE);
681
682   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
683
684   line = 0;
685   while (TRUE) {
686     guint8 buffer[4096];
687
688     idx = 0;
689     while (TRUE) {
690       res = read_line (conn, buffer, &idx, sizeof (buffer));
691       if (res == GST_RTSP_EEOF)
692         goto eof;
693       if (res == GST_RTSP_OK)
694         break;
695       if (res != GST_RTSP_EINTR)
696         goto read_error;
697
698       do {
699         retval = gst_poll_wait (conn->fdset, to);
700       } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
701
702       /* check for timeout */
703       if (retval == 0)
704         goto timeout;
705
706       if (retval == -1) {
707         if (errno == EBUSY)
708           goto stopped;
709         else
710           goto select_error;
711       }
712     }
713
714     /* check for last line */
715     if (buffer[0] == '\r')
716       buffer[0] = '\0';
717     if (buffer[0] == '\0')
718       break;
719
720     if (line == 0) {
721       /* first line, parse response */
722       gchar versionstr[20];
723       gchar *bptr;
724
725       bptr = (gchar *) buffer;
726
727       parse_string (versionstr, sizeof (versionstr), &bptr);
728       parse_string (codestr, sizeof (codestr), &bptr);
729       code = atoi (codestr);
730
731       while (g_ascii_isspace (*bptr))
732         bptr++;
733
734       resultstr = bptr;
735
736       if (code != GST_RTSP_STS_OK)
737         goto wrong_result;
738     } else {
739       gchar key[32];
740       gchar *value;
741
742       /* other lines, parse key/value */
743       res = parse_key_value (buffer, key, sizeof (key), &value);
744       if (res == GST_RTSP_OK) {
745         /* we got a new ip address */
746         if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) {
747           if (conn->proxy_host) {
748             /* if we use a proxy we need to change the destination url */
749             g_free (url->host);
750             url->host = g_strdup (value);
751             g_free (hostparam);
752             g_free (url_port_str);
753             hostparam =
754                 g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
755             url_port_str = g_strdup_printf (":%d", url_port);
756           } else {
757             /* and resolve the new ip address */
758             if (!(ip = do_resolve (conn->ip)))
759               goto not_resolved;
760             g_free (conn->ip);
761             conn->ip = ip;
762           }
763         }
764       }
765     }
766     line++;
767   }
768
769   /* connect to the host/port */
770   res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout);
771   if (res != GST_RTSP_OK)
772     goto connect_failed;
773
774   /* this is now our writing socket */
775   conn->writefd = &conn->fd1;
776
777   /* */
778   str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n"
779       "%s"
780       "x-sessioncookie: %s\r\n"
781       "Content-Type: application/x-rtsp-tunnelled\r\n"
782       "Pragma: no-cache\r\n"
783       "Cache-Control: no-cache\r\n"
784       "Content-Length: 32767\r\n"
785       "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
786       "\r\n",
787       conn->proxy_host ? "http://" : "",
788       conn->proxy_host ? url->host : "",
789       conn->proxy_host ? url_port_str : "",
790       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
791       hostparam ? hostparam : "", conn->tunnelid);
792
793   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
794   g_free (str);
795   if (res != GST_RTSP_OK)
796     goto write_failed;
797
798 exit:
799   g_free (hostparam);
800   g_free (url_port_str);
801
802   return res;
803
804   /* ERRORS */
805 write_failed:
806   {
807     GST_ERROR ("write failed (%d)", res);
808     goto exit;
809   }
810 eof:
811   {
812     res = GST_RTSP_EEOF;
813     goto exit;
814   }
815 read_error:
816   {
817     goto exit;
818   }
819 timeout:
820   {
821     res = GST_RTSP_ETIMEOUT;
822     goto exit;
823   }
824 select_error:
825   {
826     res = GST_RTSP_ESYS;
827     goto exit;
828   }
829 stopped:
830   {
831     res = GST_RTSP_EINTR;
832     goto exit;
833   }
834 wrong_result:
835   {
836     GST_ERROR ("got failure response %d %s", code, resultstr);
837     res = GST_RTSP_ERROR;
838     goto exit;
839   }
840 not_resolved:
841   {
842     GST_ERROR ("could not resolve %s", conn->ip);
843     res = GST_RTSP_ENET;
844     goto exit;
845   }
846 connect_failed:
847   {
848     GST_ERROR ("failed to connect");
849     goto exit;
850   }
851 }
852
853 /**
854  * gst_rtsp_connection_connect:
855  * @conn: a #GstRTSPConnection 
856  * @timeout: a #GTimeVal timeout
857  *
858  * Attempt to connect to the url of @conn made with
859  * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
860  * forever. If @timeout contains a valid timeout, this function will return
861  * #GST_RTSP_ETIMEOUT after the timeout expired.
862  *
863  * This function can be cancelled with gst_rtsp_connection_flush().
864  *
865  * Returns: #GST_RTSP_OK when a connection could be made.
866  */
867 GstRTSPResult
868 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
869 {
870   GstRTSPResult res;
871   gchar *ip;
872   guint16 port;
873   GstRTSPUrl *url;
874
875   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
876   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
877   g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);
878
879   url = conn->url;
880
881   if (conn->proxy_host && conn->tunneled) {
882     if (!(ip = do_resolve (conn->proxy_host))) {
883       GST_ERROR ("could not resolve %s", conn->proxy_host);
884       goto not_resolved;
885     }
886     port = conn->proxy_port;
887     g_free (conn->proxy_host);
888     conn->proxy_host = ip;
889   } else {
890     if (!(ip = do_resolve (url->host))) {
891       GST_ERROR ("could not resolve %s", url->host);
892       goto not_resolved;
893     }
894     /* get the port from the url */
895     gst_rtsp_url_get_port (url, &port);
896
897     g_free (conn->ip);
898     conn->ip = ip;
899   }
900
901   /* connect to the host/port */
902   res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout);
903   if (res != GST_RTSP_OK)
904     goto connect_failed;
905
906   /* this is our read URL */
907   conn->readfd = &conn->fd0;
908
909   if (conn->tunneled) {
910     res = setup_tunneling (conn, timeout);
911     if (res != GST_RTSP_OK)
912       goto tunneling_failed;
913   } else {
914     conn->writefd = &conn->fd0;
915   }
916
917   return GST_RTSP_OK;
918
919 not_resolved:
920   {
921     return GST_RTSP_ENET;
922   }
923 connect_failed:
924   {
925     GST_ERROR ("failed to connect");
926     return res;
927   }
928 tunneling_failed:
929   {
930     GST_ERROR ("failed to setup tunneling");
931     return res;
932   }
933 }
934
935 static void
936 auth_digest_compute_hex_urp (const gchar * username,
937     const gchar * realm, const gchar * password, gchar hex_urp[33])
938 {
939   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
940   const gchar *digest_string;
941
942   g_checksum_update (md5_context, (const guchar *) username, strlen (username));
943   g_checksum_update (md5_context, (const guchar *) ":", 1);
944   g_checksum_update (md5_context, (const guchar *) realm, strlen (realm));
945   g_checksum_update (md5_context, (const guchar *) ":", 1);
946   g_checksum_update (md5_context, (const guchar *) password, strlen (password));
947   digest_string = g_checksum_get_string (md5_context);
948
949   memset (hex_urp, 0, 33);
950   memcpy (hex_urp, digest_string, strlen (digest_string));
951
952   g_checksum_free (md5_context);
953 }
954
955 static void
956 auth_digest_compute_response (const gchar * method,
957     const gchar * uri, const gchar * hex_a1, const gchar * nonce,
958     gchar response[33])
959 {
960   char hex_a2[33] = { 0, };
961   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
962   const gchar *digest_string;
963
964   /* compute A2 */
965   g_checksum_update (md5_context, (const guchar *) method, strlen (method));
966   g_checksum_update (md5_context, (const guchar *) ":", 1);
967   g_checksum_update (md5_context, (const guchar *) uri, strlen (uri));
968   digest_string = g_checksum_get_string (md5_context);
969   memcpy (hex_a2, digest_string, strlen (digest_string));
970
971   /* compute KD */
972 #if GLIB_CHECK_VERSION (2, 18, 0)
973   g_checksum_reset (md5_context);
974 #else
975   g_checksum_free (md5_context);
976   md5_context = g_checksum_new (G_CHECKSUM_MD5);
977 #endif
978   g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1));
979   g_checksum_update (md5_context, (const guchar *) ":", 1);
980   g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce));
981   g_checksum_update (md5_context, (const guchar *) ":", 1);
982
983   g_checksum_update (md5_context, (const guchar *) hex_a2, 32);
984   digest_string = g_checksum_get_string (md5_context);
985   memset (response, 0, 33);
986   memcpy (response, digest_string, strlen (digest_string));
987
988   g_checksum_free (md5_context);
989 }
990
991 static void
992 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
993 {
994   switch (conn->auth_method) {
995     case GST_RTSP_AUTH_BASIC:{
996       gchar *user_pass;
997       gchar *user_pass64;
998       gchar *auth_string;
999
1000       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1001       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1002       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1003
1004       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1005           auth_string);
1006
1007       g_free (user_pass);
1008       g_free (user_pass64);
1009       break;
1010     }
1011     case GST_RTSP_AUTH_DIGEST:{
1012       gchar response[33], hex_urp[33];
1013       gchar *auth_string, *auth_string2;
1014       gchar *realm;
1015       gchar *nonce;
1016       gchar *opaque;
1017       const gchar *uri;
1018       const gchar *method;
1019
1020       /* we need to have some params set */
1021       if (conn->auth_params == NULL)
1022         break;
1023
1024       /* we need the realm and nonce */
1025       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1026       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1027       if (realm == NULL || nonce == NULL)
1028         break;
1029
1030       auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
1031           hex_urp);
1032
1033       method = gst_rtsp_method_as_text (message->type_data.request.method);
1034       uri = message->type_data.request.uri;
1035
1036       /* Assume no qop, algorithm=md5, stale=false */
1037       /* For algorithm MD5, a1 = urp. */
1038       auth_digest_compute_response (method, uri, hex_urp, nonce, response);
1039       auth_string = g_strdup_printf ("Digest username=\"%s\", "
1040           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1041           conn->username, realm, nonce, uri, response);
1042
1043       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1044       if (opaque) {
1045         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1046             opaque);
1047         g_free (auth_string);
1048         auth_string = auth_string2;
1049       }
1050       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1051           auth_string);
1052       break;
1053     }
1054     default:
1055       /* Nothing to do */
1056       break;
1057   }
1058 }
1059
1060 static void
1061 gen_date_string (gchar * date_string, guint len)
1062 {
1063   GTimeVal tv;
1064   time_t t;
1065 #ifdef HAVE_GMTIME_R
1066   struct tm tm_;
1067 #endif
1068
1069   g_get_current_time (&tv);
1070   t = (time_t) tv.tv_sec;
1071
1072 #ifdef HAVE_GMTIME_R
1073   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
1074 #else
1075   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
1076 #endif
1077 }
1078
1079 static GstRTSPResult
1080 write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
1081 {
1082   guint left;
1083
1084   if (G_UNLIKELY (*idx > size))
1085     return GST_RTSP_ERROR;
1086
1087   left = size - *idx;
1088
1089   while (left) {
1090     gint r;
1091
1092     r = WRITE_SOCKET (fd, &buffer[*idx], left);
1093     if (G_UNLIKELY (r == 0)) {
1094       return GST_RTSP_EINTR;
1095     } else if (G_UNLIKELY (r < 0)) {
1096       if (ERRNO_IS_EAGAIN)
1097         return GST_RTSP_EINTR;
1098       if (!ERRNO_IS_EINTR)
1099         return GST_RTSP_ESYS;
1100     } else {
1101       left -= r;
1102       *idx += r;
1103     }
1104   }
1105   return GST_RTSP_OK;
1106 }
1107
1108 static gint
1109 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
1110 {
1111   gint out = 0;
1112
1113   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1114     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1115
1116     out = MIN (left, size);
1117     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1118
1119     if (left == (gsize) out) {
1120       g_free (conn->initial_buffer);
1121       conn->initial_buffer = NULL;
1122       conn->initial_buffer_offset = 0;
1123     } else
1124       conn->initial_buffer_offset += out;
1125   }
1126
1127   if (G_LIKELY (size > (guint) out)) {
1128     gint r;
1129
1130     r = READ_SOCKET (conn->readfd->fd, &buffer[out], size - out);
1131     if (r <= 0) {
1132       if (out == 0)
1133         out = r;
1134     } else
1135       out += r;
1136   }
1137
1138   return out;
1139 }
1140
1141 static gint
1142 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
1143 {
1144   DecodeCtx *ctx = conn->ctxp;
1145   gint out = 0;
1146
1147   if (ctx) {
1148     while (size > 0) {
1149       guint8 in[sizeof (ctx->out) * 4 / 3];
1150       gint r;
1151
1152       while (size > 0 && ctx->cout < ctx->coutl) {
1153         /* we have some leftover bytes */
1154         *buffer++ = ctx->out[ctx->cout++];
1155         size--;
1156         out++;
1157       }
1158
1159       /* got what we needed? */
1160       if (size == 0)
1161         break;
1162
1163       /* try to read more bytes */
1164       r = fill_raw_bytes (conn, in, sizeof (in));
1165       if (r <= 0) {
1166         if (out == 0)
1167           out = r;
1168         break;
1169       }
1170
1171       ctx->cout = 0;
1172       ctx->coutl =
1173           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1174           &ctx->save);
1175     }
1176   } else {
1177     out = fill_raw_bytes (conn, buffer, size);
1178   }
1179
1180   return out;
1181 }
1182
1183 static GstRTSPResult
1184 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
1185 {
1186   guint left;
1187
1188   if (G_UNLIKELY (*idx > size))
1189     return GST_RTSP_ERROR;
1190
1191   left = size - *idx;
1192
1193   while (left) {
1194     gint r;
1195
1196     r = fill_bytes (conn, &buffer[*idx], left);
1197     if (G_UNLIKELY (r == 0)) {
1198       return GST_RTSP_EEOF;
1199     } else if (G_UNLIKELY (r < 0)) {
1200       if (ERRNO_IS_EAGAIN)
1201         return GST_RTSP_EINTR;
1202       if (!ERRNO_IS_EINTR)
1203         return GST_RTSP_ESYS;
1204     } else {
1205       left -= r;
1206       *idx += r;
1207     }
1208   }
1209   return GST_RTSP_OK;
1210 }
1211
1212 static GstRTSPResult
1213 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
1214 {
1215   while (TRUE) {
1216     guint8 c;
1217     gint r;
1218
1219     r = fill_bytes (conn, &c, 1);
1220     if (G_UNLIKELY (r == 0)) {
1221       return GST_RTSP_EEOF;
1222     } else if (G_UNLIKELY (r < 0)) {
1223       if (ERRNO_IS_EAGAIN)
1224         return GST_RTSP_EINTR;
1225       if (!ERRNO_IS_EINTR)
1226         return GST_RTSP_ESYS;
1227     } else {
1228       if (c == '\n')            /* end on \n */
1229         break;
1230       if (c == '\r')            /* ignore \r */
1231         continue;
1232
1233       if (G_LIKELY (*idx < size - 1))
1234         buffer[(*idx)++] = c;
1235     }
1236   }
1237   buffer[*idx] = '\0';
1238
1239   return GST_RTSP_OK;
1240 }
1241
1242 /**
1243  * gst_rtsp_connection_write:
1244  * @conn: a #GstRTSPConnection
1245  * @data: the data to write
1246  * @size: the size of @data
1247  * @timeout: a timeout value or #NULL
1248  *
1249  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1250  * the specified @timeout. @timeout can be #NULL, in which case this function
1251  * might block forever.
1252  * 
1253  * This function can be cancelled with gst_rtsp_connection_flush().
1254  *
1255  * Returns: #GST_RTSP_OK on success.
1256  */
1257 GstRTSPResult
1258 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1259     guint size, GTimeVal * timeout)
1260 {
1261   guint offset;
1262   gint retval;
1263   GstClockTime to;
1264   GstRTSPResult res;
1265
1266   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1267   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1268   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1269
1270   gst_poll_set_controllable (conn->fdset, TRUE);
1271   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
1272   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
1273   /* clear all previous poll results */
1274   gst_poll_fd_ignored (conn->fdset, conn->writefd);
1275   gst_poll_fd_ignored (conn->fdset, conn->readfd);
1276
1277   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1278
1279   offset = 0;
1280
1281   while (TRUE) {
1282     /* try to write */
1283     res = write_bytes (conn->writefd->fd, data, &offset, size);
1284     if (G_LIKELY (res == GST_RTSP_OK))
1285       break;
1286     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1287       goto write_error;
1288
1289     /* not all is written, wait until we can write more */
1290     do {
1291       retval = gst_poll_wait (conn->fdset, to);
1292     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1293
1294     if (G_UNLIKELY (retval == 0))
1295       goto timeout;
1296
1297     if (G_UNLIKELY (retval == -1)) {
1298       if (errno == EBUSY)
1299         goto stopped;
1300       else
1301         goto select_error;
1302     }
1303   }
1304   return GST_RTSP_OK;
1305
1306   /* ERRORS */
1307 timeout:
1308   {
1309     return GST_RTSP_ETIMEOUT;
1310   }
1311 select_error:
1312   {
1313     return GST_RTSP_ESYS;
1314   }
1315 stopped:
1316   {
1317     return GST_RTSP_EINTR;
1318   }
1319 write_error:
1320   {
1321     return res;
1322   }
1323 }
1324
1325 static GString *
1326 message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
1327 {
1328   GString *str = NULL;
1329
1330   str = g_string_new ("");
1331
1332   switch (message->type) {
1333     case GST_RTSP_MESSAGE_REQUEST:
1334       /* create request string, add CSeq */
1335       g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
1336           "CSeq: %d\r\n",
1337           gst_rtsp_method_as_text (message->type_data.request.method),
1338           message->type_data.request.uri, conn->cseq++);
1339       /* add session id if we have one */
1340       if (conn->session_id[0] != '\0') {
1341         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1342         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1343             conn->session_id);
1344       }
1345       /* add any authentication headers */
1346       add_auth_header (conn, message);
1347       break;
1348     case GST_RTSP_MESSAGE_RESPONSE:
1349       /* create response string */
1350       g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
1351           message->type_data.response.code, message->type_data.response.reason);
1352       break;
1353     case GST_RTSP_MESSAGE_DATA:
1354     {
1355       guint8 data_header[4];
1356
1357       /* prepare data header */
1358       data_header[0] = '$';
1359       data_header[1] = message->type_data.data.channel;
1360       data_header[2] = (message->body_size >> 8) & 0xff;
1361       data_header[3] = message->body_size & 0xff;
1362
1363       /* create string with header and data */
1364       str = g_string_append_len (str, (gchar *) data_header, 4);
1365       str =
1366           g_string_append_len (str, (gchar *) message->body,
1367           message->body_size);
1368       break;
1369     }
1370     default:
1371       g_string_free (str, TRUE);
1372       g_return_val_if_reached (NULL);
1373       break;
1374   }
1375
1376   /* append headers and body */
1377   if (message->type != GST_RTSP_MESSAGE_DATA) {
1378     gchar date_string[100];
1379
1380     gen_date_string (date_string, sizeof (date_string));
1381
1382     /* add date header */
1383     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1384     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1385
1386     /* append headers */
1387     gst_rtsp_message_append_headers (message, str);
1388
1389     /* append Content-Length and body if needed */
1390     if (message->body != NULL && message->body_size > 0) {
1391       gchar *len;
1392
1393       len = g_strdup_printf ("%d", message->body_size);
1394       g_string_append_printf (str, "%s: %s\r\n",
1395           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1396       g_free (len);
1397       /* header ends here */
1398       g_string_append (str, "\r\n");
1399       str =
1400           g_string_append_len (str, (gchar *) message->body,
1401           message->body_size);
1402     } else {
1403       /* just end headers */
1404       g_string_append (str, "\r\n");
1405     }
1406   }
1407
1408   return str;
1409 }
1410
1411 /**
1412  * gst_rtsp_connection_send:
1413  * @conn: a #GstRTSPConnection
1414  * @message: the message to send
1415  * @timeout: a timeout value or #NULL
1416  *
1417  * Attempt to send @message to the connected @conn, blocking up to
1418  * the specified @timeout. @timeout can be #NULL, in which case this function
1419  * might block forever.
1420  * 
1421  * This function can be cancelled with gst_rtsp_connection_flush().
1422  *
1423  * Returns: #GST_RTSP_OK on success.
1424  */
1425 GstRTSPResult
1426 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1427     GTimeVal * timeout)
1428 {
1429   GString *string = NULL;
1430   GstRTSPResult res;
1431   gchar *str;
1432   gsize len;
1433
1434   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1435   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1436
1437   if (G_UNLIKELY (!(string = message_to_string (conn, message))))
1438     goto no_message;
1439
1440   if (conn->tunneled) {
1441     str = g_base64_encode ((const guchar *) string->str, string->len);
1442     g_string_free (string, TRUE);
1443     len = strlen (str);
1444   } else {
1445     str = string->str;
1446     len = string->len;
1447     g_string_free (string, FALSE);
1448   }
1449
1450   /* write request */
1451   res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
1452
1453   g_free (str);
1454
1455   return res;
1456
1457 no_message:
1458   {
1459     g_warning ("Wrong message");
1460     return GST_RTSP_EINVAL;
1461   }
1462 }
1463
1464 static void
1465 parse_string (gchar * dest, gint size, gchar ** src)
1466 {
1467   gint idx;
1468
1469   idx = 0;
1470   /* skip spaces */
1471   while (g_ascii_isspace (**src))
1472     (*src)++;
1473
1474   while (!g_ascii_isspace (**src) && **src != '\0') {
1475     if (idx < size - 1)
1476       dest[idx++] = **src;
1477     (*src)++;
1478   }
1479   if (size > 0)
1480     dest[idx] = '\0';
1481 }
1482
1483 static void
1484 parse_key (gchar * dest, gint size, gchar ** src)
1485 {
1486   gint idx;
1487
1488   idx = 0;
1489   while (**src != ':' && **src != '\0') {
1490     if (idx < size - 1)
1491       dest[idx++] = **src;
1492     (*src)++;
1493   }
1494   if (size > 0)
1495     dest[idx] = '\0';
1496 }
1497
1498 static GstRTSPResult
1499 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
1500 {
1501   GstRTSPResult res;
1502   gchar versionstr[20];
1503   gchar codestr[4];
1504   gint code;
1505   gchar *bptr;
1506
1507   bptr = (gchar *) buffer;
1508
1509   parse_string (versionstr, sizeof (versionstr), &bptr);
1510   parse_string (codestr, sizeof (codestr), &bptr);
1511   code = atoi (codestr);
1512
1513   while (g_ascii_isspace (*bptr))
1514     bptr++;
1515
1516   if (strcmp (versionstr, "RTSP/1.0") == 0)
1517     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1518         parse_error);
1519   else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1520     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1521         parse_error);
1522     msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
1523   } else
1524     goto parse_error;
1525
1526   return GST_RTSP_OK;
1527
1528 parse_error:
1529   {
1530     return GST_RTSP_EPARSE;
1531   }
1532 }
1533
1534 static GstRTSPResult
1535 parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
1536     GstRTSPMessage * msg)
1537 {
1538   GstRTSPResult res = GST_RTSP_OK;
1539   gchar versionstr[20];
1540   gchar methodstr[20];
1541   gchar urlstr[4096];
1542   gchar *bptr;
1543   GstRTSPMethod method;
1544   GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;
1545
1546   bptr = (gchar *) buffer;
1547
1548   parse_string (methodstr, sizeof (methodstr), &bptr);
1549   method = gst_rtsp_find_method (methodstr);
1550   if (method == GST_RTSP_INVALID) {
1551     /* a tunnel request is allowed when we don't have one yet */
1552     if (conn->tstate != TUNNEL_STATE_NONE)
1553       goto invalid_method;
1554     /* we need GET or POST for a valid tunnel request */
1555     if (!strcmp (methodstr, "GET"))
1556       tstate = TUNNEL_STATE_GET;
1557     else if (!strcmp (methodstr, "POST"))
1558       tstate = TUNNEL_STATE_POST;
1559     else
1560       goto invalid_method;
1561   }
1562
1563   parse_string (urlstr, sizeof (urlstr), &bptr);
1564   if (G_UNLIKELY (*urlstr == '\0'))
1565     goto invalid_url;
1566
1567   parse_string (versionstr, sizeof (versionstr), &bptr);
1568
1569   if (G_UNLIKELY (*bptr != '\0'))
1570     goto invalid_version;
1571
1572   if (strcmp (versionstr, "RTSP/1.0") == 0) {
1573     res = gst_rtsp_message_init_request (msg, method, urlstr);
1574   } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1575     res = gst_rtsp_message_init_request (msg, method, urlstr);
1576     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1577   } else if (strcmp (versionstr, "HTTP/1.0") == 0) {
1578     /* tunnel request, we need a tunnel method */
1579     if (tstate == TUNNEL_STATE_NONE) {
1580       res = GST_RTSP_EPARSE;
1581     } else {
1582       conn->tstate = tstate;
1583     }
1584   } else {
1585     res = GST_RTSP_EPARSE;
1586   }
1587
1588   return res;
1589
1590   /* ERRORS */
1591 invalid_method:
1592   {
1593     GST_ERROR ("invalid method %s", methodstr);
1594     return GST_RTSP_EPARSE;
1595   }
1596 invalid_url:
1597   {
1598     GST_ERROR ("invalid url %s", urlstr);
1599     return GST_RTSP_EPARSE;
1600   }
1601 invalid_version:
1602   {
1603     GST_ERROR ("invalid version");
1604     return GST_RTSP_EPARSE;
1605   }
1606 }
1607
1608 static GstRTSPResult
1609 parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value)
1610 {
1611   gchar *bptr;
1612
1613   bptr = (gchar *) buffer;
1614
1615   /* read key */
1616   parse_key (key, keysize, &bptr);
1617   if (G_UNLIKELY (*bptr != ':'))
1618     goto no_column;
1619
1620   bptr++;
1621   while (g_ascii_isspace (*bptr))
1622     bptr++;
1623
1624   *value = bptr;
1625
1626   return GST_RTSP_OK;
1627
1628   /* ERRORS */
1629 no_column:
1630   {
1631     return GST_RTSP_EPARSE;
1632   }
1633 }
1634
1635 /* parsing lines means reading a Key: Value pair */
1636 static GstRTSPResult
1637 parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
1638 {
1639   GstRTSPResult res;
1640   gchar key[32];
1641   gchar *value;
1642   GstRTSPHeaderField field;
1643
1644   res = parse_key_value (buffer, key, sizeof (key), &value);
1645   if (G_UNLIKELY (res != GST_RTSP_OK))
1646     goto parse_error;
1647
1648   if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
1649     /* save the tunnel session in the connection */
1650     if (!strcmp (key, "x-sessioncookie")) {
1651       strncpy (conn->tunnelid, value, TUNNELID_LEN);
1652       conn->tunnelid[TUNNELID_LEN - 1] = '\0';
1653       conn->tunneled = TRUE;
1654     }
1655   } else {
1656     field = gst_rtsp_find_header_field (key);
1657     if (field != GST_RTSP_HDR_INVALID)
1658       gst_rtsp_message_add_header (msg, field, value);
1659   }
1660
1661   return GST_RTSP_OK;
1662
1663   /* ERRORS */
1664 parse_error:
1665   {
1666     return res;
1667   }
1668 }
1669
1670 /* returns:
1671  *  GST_RTSP_OK when a complete message was read.
1672  *  GST_RTSP_EEOF: when the socket is closed
1673  *  GST_RTSP_EINTR: when more data is needed.
1674  *  GST_RTSP_..: some other error occured.
1675  */
1676 static GstRTSPResult
1677 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
1678     GstRTSPConnection * conn)
1679 {
1680   GstRTSPResult res;
1681
1682   while (TRUE) {
1683     switch (builder->state) {
1684       case STATE_START:
1685         builder->offset = 0;
1686         res =
1687             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
1688         if (res != GST_RTSP_OK)
1689           goto done;
1690
1691         /* we have 1 bytes now and we can see if this is a data message or
1692          * not */
1693         if (builder->buffer[0] == '$') {
1694           /* data message, prepare for the header */
1695           builder->state = STATE_DATA_HEADER;
1696         } else {
1697           builder->line = 0;
1698           builder->state = STATE_READ_LINES;
1699         }
1700         break;
1701       case STATE_DATA_HEADER:
1702       {
1703         res =
1704             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4);
1705         if (res != GST_RTSP_OK)
1706           goto done;
1707
1708         gst_rtsp_message_init_data (message, builder->buffer[1]);
1709
1710         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
1711         builder->body_data = g_malloc (builder->body_len + 1);
1712         builder->body_data[builder->body_len] = '\0';
1713         builder->offset = 0;
1714         builder->state = STATE_DATA_BODY;
1715         break;
1716       }
1717       case STATE_DATA_BODY:
1718       {
1719         res =
1720             read_bytes (conn, builder->body_data, &builder->offset,
1721             builder->body_len);
1722         if (res != GST_RTSP_OK)
1723           goto done;
1724
1725         /* we have the complete body now, store in the message adjusting the
1726          * length to include the traling '\0' */
1727         gst_rtsp_message_take_body (message,
1728             (guint8 *) builder->body_data, builder->body_len + 1);
1729         builder->body_data = NULL;
1730         builder->body_len = 0;
1731
1732         builder->state = STATE_END;
1733         break;
1734       }
1735       case STATE_READ_LINES:
1736       {
1737         res = read_line (conn, builder->buffer, &builder->offset,
1738             sizeof (builder->buffer));
1739         if (res != GST_RTSP_OK)
1740           goto done;
1741
1742         /* we have a regular response */
1743         if (builder->buffer[0] == '\r') {
1744           builder->buffer[0] = '\0';
1745         }
1746
1747         if (builder->buffer[0] == '\0') {
1748           gchar *hdrval;
1749
1750           /* empty line, end of message header */
1751           /* see if there is a Content-Length header */
1752           if (gst_rtsp_message_get_header (message,
1753                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
1754             /* there is, prepare to read the body */
1755             builder->body_len = atol (hdrval);
1756             builder->body_data = g_malloc (builder->body_len + 1);
1757             builder->body_data[builder->body_len] = '\0';
1758             builder->offset = 0;
1759             builder->state = STATE_DATA_BODY;
1760           } else {
1761             builder->state = STATE_END;
1762           }
1763           break;
1764         }
1765
1766         /* we have a line */
1767         if (builder->line == 0) {
1768           /* first line, check for response status */
1769           if (memcmp (builder->buffer, "RTSP", 4) == 0) {
1770             res = parse_response_status (builder->buffer, message);
1771           } else {
1772             res = parse_request_line (conn, builder->buffer, message);
1773           }
1774           /* the first line must parse without errors */
1775           if (res != GST_RTSP_OK)
1776             goto done;
1777         } else {
1778           /* else just parse the line, ignore errors */
1779           parse_line (conn, builder->buffer, message);
1780         }
1781         builder->line++;
1782         builder->offset = 0;
1783         break;
1784       }
1785       case STATE_END:
1786       {
1787         gchar *session_id;
1788
1789         if (conn->tstate == TUNNEL_STATE_GET) {
1790           res = GST_RTSP_ETGET;
1791           goto done;
1792         } else if (conn->tstate == TUNNEL_STATE_POST) {
1793           res = GST_RTSP_ETPOST;
1794           goto done;
1795         }
1796
1797         if (message->type == GST_RTSP_MESSAGE_DATA) {
1798           /* data messages don't have headers */
1799           res = GST_RTSP_OK;
1800           goto done;
1801         }
1802
1803         /* save session id in the connection for further use */
1804         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
1805             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
1806                 &session_id, 0) == GST_RTSP_OK) {
1807           gint maxlen, i;
1808
1809           maxlen = sizeof (conn->session_id) - 1;
1810           /* the sessionid can have attributes marked with ;
1811            * Make sure we strip them */
1812           for (i = 0; session_id[i] != '\0'; i++) {
1813             if (session_id[i] == ';') {
1814               maxlen = i;
1815               /* parse timeout */
1816               do {
1817                 i++;
1818               } while (g_ascii_isspace (session_id[i]));
1819               if (g_str_has_prefix (&session_id[i], "timeout=")) {
1820                 gint to;
1821
1822                 /* if we parsed something valid, configure */
1823                 if ((to = atoi (&session_id[i + 8])) > 0)
1824                   conn->timeout = to;
1825               }
1826               break;
1827             }
1828           }
1829
1830           /* make sure to not overflow */
1831           strncpy (conn->session_id, session_id, maxlen);
1832           conn->session_id[maxlen] = '\0';
1833         }
1834         res = GST_RTSP_OK;
1835         goto done;
1836       }
1837       default:
1838         res = GST_RTSP_ERROR;
1839         break;
1840     }
1841   }
1842 done:
1843   return res;
1844 }
1845
1846 /**
1847  * gst_rtsp_connection_read:
1848  * @conn: a #GstRTSPConnection
1849  * @data: the data to read
1850  * @size: the size of @data
1851  * @timeout: a timeout value or #NULL
1852  *
1853  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
1854  * the specified @timeout. @timeout can be #NULL, in which case this function
1855  * might block forever.
1856  *
1857  * This function can be cancelled with gst_rtsp_connection_flush().
1858  *
1859  * Returns: #GST_RTSP_OK on success.
1860  */
1861 GstRTSPResult
1862 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
1863     GTimeVal * timeout)
1864 {
1865   guint offset;
1866   gint retval;
1867   GstClockTime to;
1868   GstRTSPResult res;
1869
1870   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1871   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
1872   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1873
1874   if (G_UNLIKELY (size == 0))
1875     return GST_RTSP_OK;
1876
1877   offset = 0;
1878
1879   /* configure timeout if any */
1880   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1881
1882   gst_poll_set_controllable (conn->fdset, TRUE);
1883   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1884   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1885
1886   while (TRUE) {
1887     res = read_bytes (conn, data, &offset, size);
1888     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1889       goto eof;
1890     if (G_LIKELY (res == GST_RTSP_OK))
1891       break;
1892     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1893       goto read_error;
1894
1895     do {
1896       retval = gst_poll_wait (conn->fdset, to);
1897     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1898
1899     /* check for timeout */
1900     if (G_UNLIKELY (retval == 0))
1901       goto select_timeout;
1902
1903     if (G_UNLIKELY (retval == -1)) {
1904       if (errno == EBUSY)
1905         goto stopped;
1906       else
1907         goto select_error;
1908     }
1909     gst_poll_set_controllable (conn->fdset, FALSE);
1910   }
1911   return GST_RTSP_OK;
1912
1913   /* ERRORS */
1914 select_error:
1915   {
1916     return GST_RTSP_ESYS;
1917   }
1918 select_timeout:
1919   {
1920     return GST_RTSP_ETIMEOUT;
1921   }
1922 stopped:
1923   {
1924     return GST_RTSP_EINTR;
1925   }
1926 eof:
1927   {
1928     return GST_RTSP_EEOF;
1929   }
1930 read_error:
1931   {
1932     return res;
1933   }
1934 }
1935
1936 static GString *
1937 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
1938 {
1939   GString *str;
1940   gchar date_string[100];
1941   const gchar *status;
1942
1943   gen_date_string (date_string, sizeof (date_string));
1944
1945   status = gst_rtsp_status_as_text (code);
1946   if (status == NULL) {
1947     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
1948     status = "Internal Server Error";
1949   }
1950
1951   str = g_string_new ("");
1952
1953   /* */
1954   g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
1955   g_string_append_printf (str,
1956       "Server: GStreamer RTSP Server\r\n"
1957       "Date: %s\r\n"
1958       "Connection: close\r\n"
1959       "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
1960   if (code == GST_RTSP_STS_OK) {
1961     if (conn->ip)
1962       g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
1963     g_string_append_printf (str,
1964         "Content-Type: application/x-rtsp-tunnelled\r\n");
1965   }
1966   g_string_append_printf (str, "\r\n");
1967   return str;
1968 }
1969
1970 /**
1971  * gst_rtsp_connection_receive:
1972  * @conn: a #GstRTSPConnection
1973  * @message: the message to read
1974  * @timeout: a timeout value or #NULL
1975  *
1976  * Attempt to read into @message from the connected @conn, blocking up to
1977  * the specified @timeout. @timeout can be #NULL, in which case this function
1978  * might block forever.
1979  * 
1980  * This function can be cancelled with gst_rtsp_connection_flush().
1981  *
1982  * Returns: #GST_RTSP_OK on success.
1983  */
1984 GstRTSPResult
1985 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
1986     GTimeVal * timeout)
1987 {
1988   GstRTSPResult res;
1989   GstRTSPBuilder builder;
1990   gint retval;
1991   GstClockTime to;
1992
1993   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1994   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1995   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1996
1997   /* configure timeout if any */
1998   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1999
2000   gst_poll_set_controllable (conn->fdset, TRUE);
2001   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
2002   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
2003
2004   memset (&builder, 0, sizeof (GstRTSPBuilder));
2005   while (TRUE) {
2006     res = build_next (&builder, message, conn);
2007     if (G_UNLIKELY (res == GST_RTSP_EEOF))
2008       goto eof;
2009     if (G_LIKELY (res == GST_RTSP_OK))
2010       break;
2011     if (res == GST_RTSP_ETGET) {
2012       GString *str;
2013
2014       /* tunnel GET request, we can reply now */
2015       str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
2016       res =
2017           gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
2018           timeout);
2019       g_string_free (str, TRUE);
2020     } else if (res == GST_RTSP_ETPOST) {
2021       /* tunnel POST request, return the value, the caller now has to link the
2022        * two connections. */
2023       break;
2024     } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
2025       goto read_error;
2026
2027     do {
2028       retval = gst_poll_wait (conn->fdset, to);
2029     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2030
2031     /* check for timeout */
2032     if (G_UNLIKELY (retval == 0))
2033       goto select_timeout;
2034
2035     if (G_UNLIKELY (retval == -1)) {
2036       if (errno == EBUSY)
2037         goto stopped;
2038       else
2039         goto select_error;
2040     }
2041     gst_poll_set_controllable (conn->fdset, FALSE);
2042   }
2043
2044   /* we have a message here */
2045   build_reset (&builder);
2046
2047   return GST_RTSP_OK;
2048
2049   /* ERRORS */
2050 select_error:
2051   {
2052     res = GST_RTSP_ESYS;
2053     goto cleanup;
2054   }
2055 select_timeout:
2056   {
2057     res = GST_RTSP_ETIMEOUT;
2058     goto cleanup;
2059   }
2060 stopped:
2061   {
2062     res = GST_RTSP_EINTR;
2063     goto cleanup;
2064   }
2065 eof:
2066   {
2067     res = GST_RTSP_EEOF;
2068     goto cleanup;
2069   }
2070 read_error:
2071 cleanup:
2072   {
2073     build_reset (&builder);
2074     gst_rtsp_message_unset (message);
2075     return res;
2076   }
2077 }
2078
2079 /**
2080  * gst_rtsp_connection_close:
2081  * @conn: a #GstRTSPConnection
2082  *
2083  * Close the connected @conn. After this call, the connection is in the same
2084  * state as when it was first created.
2085  * 
2086  * Returns: #GST_RTSP_OK on success.
2087  */
2088 GstRTSPResult
2089 gst_rtsp_connection_close (GstRTSPConnection * conn)
2090 {
2091   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2092
2093   g_free (conn->ip);
2094   conn->ip = NULL;
2095
2096   g_free (conn->initial_buffer);
2097   conn->initial_buffer = NULL;
2098   conn->initial_buffer_offset = 0;
2099
2100   REMOVE_POLLFD (conn->fdset, &conn->fd0);
2101   REMOVE_POLLFD (conn->fdset, &conn->fd1);
2102   conn->writefd = NULL;
2103   conn->readfd = NULL;
2104   conn->tunneled = FALSE;
2105   conn->tstate = TUNNEL_STATE_NONE;
2106   conn->ctxp = NULL;
2107   g_free (conn->username);
2108   conn->username = NULL;
2109   g_free (conn->passwd);
2110   conn->passwd = NULL;
2111   gst_rtsp_connection_clear_auth_params (conn);
2112   conn->timeout = 60;
2113   conn->cseq = 0;
2114   conn->session_id[0] = '\0';
2115
2116   return GST_RTSP_OK;
2117 }
2118
2119 /**
2120  * gst_rtsp_connection_free:
2121  * @conn: a #GstRTSPConnection
2122  *
2123  * Close and free @conn.
2124  * 
2125  * Returns: #GST_RTSP_OK on success.
2126  */
2127 GstRTSPResult
2128 gst_rtsp_connection_free (GstRTSPConnection * conn)
2129 {
2130   GstRTSPResult res;
2131
2132   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2133
2134   res = gst_rtsp_connection_close (conn);
2135   gst_poll_free (conn->fdset);
2136   g_timer_destroy (conn->timer);
2137   gst_rtsp_url_free (conn->url);
2138   g_free (conn->proxy_host);
2139   g_free (conn);
2140 #ifdef G_OS_WIN32
2141   WSACleanup ();
2142 #endif
2143
2144   return res;
2145 }
2146
2147 /**
2148  * gst_rtsp_connection_poll:
2149  * @conn: a #GstRTSPConnection
2150  * @events: a bitmask of #GstRTSPEvent flags to check
2151  * @revents: location for result flags 
2152  * @timeout: a timeout
2153  *
2154  * Wait up to the specified @timeout for the connection to become available for
2155  * at least one of the operations specified in @events. When the function returns
2156  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2157  * @conn.
2158  *
2159  * @timeout can be #NULL, in which case this function might block forever.
2160  *
2161  * This function can be cancelled with gst_rtsp_connection_flush().
2162  * 
2163  * Returns: #GST_RTSP_OK on success.
2164  *
2165  * Since: 0.10.15
2166  */
2167 GstRTSPResult
2168 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2169     GstRTSPEvent * revents, GTimeVal * timeout)
2170 {
2171   GstClockTime to;
2172   gint retval;
2173
2174   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2175   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2176   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2177   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2178   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2179
2180   gst_poll_set_controllable (conn->fdset, TRUE);
2181
2182   /* add fd to writer set when asked to */
2183   gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
2184       events & GST_RTSP_EV_WRITE);
2185
2186   /* add fd to reader set when asked to */
2187   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);
2188
2189   /* configure timeout if any */
2190   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
2191
2192   do {
2193     retval = gst_poll_wait (conn->fdset, to);
2194   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2195
2196   if (G_UNLIKELY (retval == 0))
2197     goto select_timeout;
2198
2199   if (G_UNLIKELY (retval == -1)) {
2200     if (errno == EBUSY)
2201       goto stopped;
2202     else
2203       goto select_error;
2204   }
2205
2206   *revents = 0;
2207   if (events & GST_RTSP_EV_READ) {
2208     if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
2209       *revents |= GST_RTSP_EV_READ;
2210   }
2211   if (events & GST_RTSP_EV_WRITE) {
2212     if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
2213       *revents |= GST_RTSP_EV_WRITE;
2214   }
2215   return GST_RTSP_OK;
2216
2217   /* ERRORS */
2218 select_timeout:
2219   {
2220     return GST_RTSP_ETIMEOUT;
2221   }
2222 select_error:
2223   {
2224     return GST_RTSP_ESYS;
2225   }
2226 stopped:
2227   {
2228     return GST_RTSP_EINTR;
2229   }
2230 }
2231
2232 /**
2233  * gst_rtsp_connection_next_timeout:
2234  * @conn: a #GstRTSPConnection
2235  * @timeout: a timeout
2236  *
2237  * Calculate the next timeout for @conn, storing the result in @timeout.
2238  * 
2239  * Returns: #GST_RTSP_OK.
2240  */
2241 GstRTSPResult
2242 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2243 {
2244   gdouble elapsed;
2245   glong sec;
2246   gulong usec;
2247
2248   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2249   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2250
2251   elapsed = g_timer_elapsed (conn->timer, &usec);
2252   if (elapsed >= conn->timeout) {
2253     sec = 0;
2254     usec = 0;
2255   } else {
2256     sec = conn->timeout - elapsed;
2257   }
2258
2259   timeout->tv_sec = sec;
2260   timeout->tv_usec = usec;
2261
2262   return GST_RTSP_OK;
2263 }
2264
2265 /**
2266  * gst_rtsp_connection_reset_timeout:
2267  * @conn: a #GstRTSPConnection
2268  *
2269  * Reset the timeout of @conn.
2270  * 
2271  * Returns: #GST_RTSP_OK.
2272  */
2273 GstRTSPResult
2274 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2275 {
2276   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2277
2278   g_timer_start (conn->timer);
2279
2280   return GST_RTSP_OK;
2281 }
2282
2283 /**
2284  * gst_rtsp_connection_flush:
2285  * @conn: a #GstRTSPConnection
2286  * @flush: start or stop the flush
2287  *
2288  * Start or stop the flushing action on @conn. When flushing, all current
2289  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2290  * is set to non-flushing mode again.
2291  * 
2292  * Returns: #GST_RTSP_OK.
2293  */
2294 GstRTSPResult
2295 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2296 {
2297   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2298
2299   gst_poll_set_flushing (conn->fdset, flush);
2300
2301   return GST_RTSP_OK;
2302 }
2303
2304 /**
2305  * gst_rtsp_connection_set_proxy:
2306  * @conn: a #GstRTSPConnection
2307  * @host: the proxy host
2308  * @port: the proxy port
2309  *
2310  * Set the proxy host and port.
2311  * 
2312  * Returns: #GST_RTSP_OK.
2313  *
2314  * Since: 0.10.23
2315  */
2316 GstRTSPResult
2317 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
2318     const gchar * host, guint port)
2319 {
2320   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2321
2322   g_free (conn->proxy_host);
2323   conn->proxy_host = g_strdup (host);
2324   conn->proxy_port = port;
2325
2326   return GST_RTSP_OK;
2327 }
2328
2329 /**
2330  * gst_rtsp_connection_set_auth:
2331  * @conn: a #GstRTSPConnection
2332  * @method: authentication method
2333  * @user: the user
2334  * @pass: the password
2335  *
2336  * Configure @conn for authentication mode @method with @user and @pass as the
2337  * user and password respectively.
2338  * 
2339  * Returns: #GST_RTSP_OK.
2340  */
2341 GstRTSPResult
2342 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
2343     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
2344 {
2345   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2346
2347   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
2348           || g_strrstr (user, ":") != NULL))
2349     return GST_RTSP_EINVAL;
2350
2351   /* Make sure the username and passwd are being set for authentication */
2352   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
2353     return GST_RTSP_EINVAL;
2354
2355   /* ":" chars are not allowed in usernames for basic auth */
2356   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
2357     return GST_RTSP_EINVAL;
2358
2359   g_free (conn->username);
2360   g_free (conn->passwd);
2361
2362   conn->auth_method = method;
2363   conn->username = g_strdup (user);
2364   conn->passwd = g_strdup (pass);
2365
2366   return GST_RTSP_OK;
2367 }
2368
2369 /**
2370  * str_case_hash:
2371  * @key: ASCII string to hash
2372  *
2373  * Hashes @key in a case-insensitive manner.
2374  *
2375  * Returns: the hash code.
2376  **/
2377 static guint
2378 str_case_hash (gconstpointer key)
2379 {
2380   const char *p = key;
2381   guint h = g_ascii_toupper (*p);
2382
2383   if (h)
2384     for (p += 1; *p != '\0'; p++)
2385       h = (h << 5) - h + g_ascii_toupper (*p);
2386
2387   return h;
2388 }
2389
2390 /**
2391  * str_case_equal:
2392  * @v1: an ASCII string
2393  * @v2: another ASCII string
2394  *
2395  * Compares @v1 and @v2 in a case-insensitive manner
2396  *
2397  * Returns: %TRUE if they are equal (modulo case)
2398  **/
2399 static gboolean
2400 str_case_equal (gconstpointer v1, gconstpointer v2)
2401 {
2402   const char *string1 = v1;
2403   const char *string2 = v2;
2404
2405   return g_ascii_strcasecmp (string1, string2) == 0;
2406 }
2407
2408 /**
2409  * gst_rtsp_connection_set_auth_param:
2410  * @conn: a #GstRTSPConnection
2411  * @param: authentication directive
2412  * @value: value
2413  *
2414  * Setup @conn with authentication directives. This is not necesary for
2415  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
2416  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
2417  * in the WWW-Authenticate response header and can include realm, domain,
2418  * nonce, opaque, stale, algorithm, qop as per RFC2617.
2419  * 
2420  * Since: 0.10.20
2421  */
2422 void
2423 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
2424     const gchar * param, const gchar * value)
2425 {
2426   g_return_if_fail (conn != NULL);
2427   g_return_if_fail (param != NULL);
2428
2429   if (conn->auth_params == NULL) {
2430     conn->auth_params =
2431         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
2432   }
2433   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
2434 }
2435
2436 /**
2437  * gst_rtsp_connection_clear_auth_params:
2438  * @conn: a #GstRTSPConnection
2439  *
2440  * Clear the list of authentication directives stored in @conn.
2441  *
2442  * Since: 0.10.20
2443  */
2444 void
2445 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
2446 {
2447   g_return_if_fail (conn != NULL);
2448
2449   if (conn->auth_params != NULL) {
2450     g_hash_table_destroy (conn->auth_params);
2451     conn->auth_params = NULL;
2452   }
2453 }
2454
2455 static GstRTSPResult
2456 set_qos_dscp (gint fd, guint qos_dscp)
2457 {
2458   union gst_sockaddr sa;
2459   socklen_t slen = sizeof (sa);
2460   gint af;
2461   gint tos;
2462
2463   if (fd == -1)
2464     return GST_RTSP_OK;
2465
2466   if (getsockname (fd, &sa.sa, &slen) < 0)
2467     goto no_getsockname;
2468
2469   af = sa.sa.sa_family;
2470
2471   /* if this is an IPv4-mapped address then do IPv4 QoS */
2472   if (af == AF_INET6) {
2473     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
2474       af = AF_INET;
2475   }
2476
2477   /* extract and shift 6 bits of the DSCP */
2478   tos = (qos_dscp & 0x3f) << 2;
2479
2480   switch (af) {
2481     case AF_INET:
2482       if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
2483         goto no_setsockopt;
2484       break;
2485     case AF_INET6:
2486 #ifdef IPV6_TCLASS
2487       if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
2488         goto no_setsockopt;
2489       break;
2490 #endif
2491     default:
2492       goto wrong_family;
2493   }
2494
2495   return GST_RTSP_OK;
2496
2497   /* ERRORS */
2498 no_getsockname:
2499 no_setsockopt:
2500   {
2501     return GST_RTSP_ESYS;
2502   }
2503
2504 wrong_family:
2505   {
2506     return GST_RTSP_ERROR;
2507   }
2508 }
2509
2510 /**
2511  * gst_rtsp_connection_set_qos_dscp:
2512  * @conn: a #GstRTSPConnection
2513  * @qos_dscp: DSCP value
2514  *
2515  * Configure @conn to use the specified DSCP value.
2516  *
2517  * Returns: #GST_RTSP_OK on success.
2518  *
2519  * Since: 0.10.20
2520  */
2521 GstRTSPResult
2522 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
2523 {
2524   GstRTSPResult res;
2525
2526   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2527   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2528   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2529
2530   res = set_qos_dscp (conn->fd0.fd, qos_dscp);
2531   if (res == GST_RTSP_OK)
2532     res = set_qos_dscp (conn->fd1.fd, qos_dscp);
2533
2534   return res;
2535 }
2536
2537
2538 /**
2539  * gst_rtsp_connection_get_url:
2540  * @conn: a #GstRTSPConnection
2541  *
2542  * Retrieve the URL of the other end of @conn.
2543  *
2544  * Returns: The URL. This value remains valid until the
2545  * connection is freed.
2546  *
2547  * Since: 0.10.23
2548  */
2549 GstRTSPUrl *
2550 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
2551 {
2552   g_return_val_if_fail (conn != NULL, NULL);
2553
2554   return conn->url;
2555 }
2556
2557 /**
2558  * gst_rtsp_connection_get_ip:
2559  * @conn: a #GstRTSPConnection
2560  *
2561  * Retrieve the IP address of the other end of @conn.
2562  *
2563  * Returns: The IP address as a string. this value remains valid until the
2564  * connection is closed.
2565  *
2566  * Since: 0.10.20
2567  */
2568 const gchar *
2569 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
2570 {
2571   g_return_val_if_fail (conn != NULL, NULL);
2572
2573   return conn->ip;
2574 }
2575
2576 /**
2577  * gst_rtsp_connection_set_ip:
2578  * @conn: a #GstRTSPConnection
2579  * @ip: an ip address
2580  *
2581  * Set the IP address of the server.
2582  *
2583  * Since: 0.10.23
2584  */
2585 void
2586 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
2587 {
2588   g_return_if_fail (conn != NULL);
2589
2590   g_free (conn->ip);
2591   conn->ip = g_strdup (ip);
2592 }
2593
2594 /**
2595  * gst_rtsp_connection_get_readfd:
2596  * @conn: a #GstRTSPConnection
2597  *
2598  * Get the file descriptor for reading.
2599  *
2600  * Returns: the file descriptor used for reading or -1 on error. The file
2601  * descriptor remains valid until the connection is closed.
2602  *
2603  * Since: 0.10.23
2604  */
2605 gint
2606 gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn)
2607 {
2608   g_return_val_if_fail (conn != NULL, -1);
2609   g_return_val_if_fail (conn->readfd != NULL, -1);
2610
2611   return conn->readfd->fd;
2612 }
2613
2614 /**
2615  * gst_rtsp_connection_get_writefd:
2616  * @conn: a #GstRTSPConnection
2617  *
2618  * Get the file descriptor for writing.
2619  *
2620  * Returns: the file descriptor used for writing or -1 on error. The file
2621  * descriptor remains valid until the connection is closed.
2622  *
2623  * Since: 0.10.23
2624  */
2625 gint
2626 gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn)
2627 {
2628   g_return_val_if_fail (conn != NULL, -1);
2629   g_return_val_if_fail (conn->writefd != NULL, -1);
2630
2631   return conn->writefd->fd;
2632 }
2633
2634
2635 /**
2636  * gst_rtsp_connection_set_tunneled:
2637  * @conn: a #GstRTSPConnection
2638  * @tunneled: the new state
2639  *
2640  * Set the HTTP tunneling state of the connection. This must be configured before
2641  * the @conn is connected.
2642  *
2643  * Since: 0.10.23
2644  */
2645 void
2646 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
2647 {
2648   g_return_if_fail (conn != NULL);
2649   g_return_if_fail (conn->readfd == NULL);
2650   g_return_if_fail (conn->writefd == NULL);
2651
2652   conn->tunneled = tunneled;
2653 }
2654
2655 /**
2656  * gst_rtsp_connection_is_tunneled:
2657  * @conn: a #GstRTSPConnection
2658  *
2659  * Get the tunneling state of the connection. 
2660  *
2661  * Returns: if @conn is using HTTP tunneling.
2662  *
2663  * Since: 0.10.23
2664  */
2665 gboolean
2666 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
2667 {
2668   g_return_val_if_fail (conn != NULL, FALSE);
2669
2670   return conn->tunneled;
2671 }
2672
2673 /**
2674  * gst_rtsp_connection_get_tunnelid:
2675  * @conn: a #GstRTSPConnection
2676  *
2677  * Get the tunnel session id the connection. 
2678  *
2679  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
2680  *
2681  * Since: 0.10.23
2682  */
2683 const gchar *
2684 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
2685 {
2686   g_return_val_if_fail (conn != NULL, NULL);
2687
2688   if (!conn->tunneled)
2689     return NULL;
2690
2691   return conn->tunnelid;
2692 }
2693
2694 /**
2695  * gst_rtsp_connection_do_tunnel:
2696  * @conn: a #GstRTSPConnection
2697  * @conn2: a #GstRTSPConnection
2698  *
2699  * If @conn received the first tunnel connection and @conn2 received
2700  * the second tunnel connection, link the two connections together so that
2701  * @conn manages the tunneled connection.
2702  *
2703  * After this call, @conn2 cannot be used anymore and must be freed with
2704  * gst_rtsp_connection_free().
2705  *
2706  * Returns: return GST_RTSP_OK on success.
2707  *
2708  * Since: 0.10.23
2709  */
2710 GstRTSPResult
2711 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
2712     GstRTSPConnection * conn2)
2713 {
2714   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2715   g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
2716   g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
2717   g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
2718   g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
2719       GST_RTSP_EINVAL);
2720
2721   /* both connections have fd0 as the read/write socket. start by taking the
2722    * socket from conn2 and set it as the socket in conn */
2723   conn->fd1 = conn2->fd0;
2724
2725   /* clean up some of the state of conn2 */
2726   gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
2727   conn2->fd0.fd = -1;
2728   conn2->readfd = conn2->writefd = NULL;
2729
2730   /* We make fd0 the write socket and fd1 the read socket. */
2731   conn->writefd = &conn->fd0;
2732   conn->readfd = &conn->fd1;
2733
2734   conn->tstate = TUNNEL_STATE_COMPLETE;
2735
2736   /* we need base64 decoding for the readfd */
2737   conn->ctx.state = 0;
2738   conn->ctx.save = 0;
2739   conn->ctx.cout = 0;
2740   conn->ctx.coutl = 0;
2741   conn->ctxp = &conn->ctx;
2742
2743   return GST_RTSP_OK;
2744 }
2745
2746 #define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
2747 #define WRITE_COND  (G_IO_OUT | G_IO_ERR)
2748
2749 typedef struct
2750 {
2751   guint8 *data;
2752   guint size;
2753   guint id;
2754 } GstRTSPRec;
2755
2756 /* async functions */
2757 struct _GstRTSPWatch
2758 {
2759   GSource source;
2760
2761   GstRTSPConnection *conn;
2762
2763   GstRTSPBuilder builder;
2764   GstRTSPMessage message;
2765
2766   GPollFD readfd;
2767   GPollFD writefd;
2768   gboolean write_added;
2769
2770   /* queued message for transmission */
2771   guint id;
2772   GAsyncQueue *messages;
2773   guint8 *write_data;
2774   guint write_off;
2775   guint write_size;
2776   guint write_id;
2777
2778   GstRTSPWatchFuncs funcs;
2779
2780   gpointer user_data;
2781   GDestroyNotify notify;
2782 };
2783
2784 static gboolean
2785 gst_rtsp_source_prepare (GSource * source, gint * timeout)
2786 {
2787   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2788
2789   if (watch->conn->initial_buffer != NULL)
2790     return TRUE;
2791
2792   *timeout = (watch->conn->timeout * 1000);
2793
2794   return FALSE;
2795 }
2796
2797 static gboolean
2798 gst_rtsp_source_check (GSource * source)
2799 {
2800   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2801
2802   if (watch->readfd.revents & READ_COND)
2803     return TRUE;
2804
2805   if (watch->writefd.revents & WRITE_COND)
2806     return TRUE;
2807
2808   return FALSE;
2809 }
2810
2811 static gboolean
2812 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
2813     gpointer user_data G_GNUC_UNUSED)
2814 {
2815   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2816   GstRTSPResult res;
2817
2818   /* first read as much as we can */
2819   if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
2820     do {
2821       res = build_next (&watch->builder, &watch->message, watch->conn);
2822       if (res == GST_RTSP_EINTR)
2823         break;
2824       if (G_UNLIKELY (res == GST_RTSP_EEOF))
2825         goto eof;
2826       if (res == GST_RTSP_ETGET) {
2827         GString *str;
2828         GstRTSPStatusCode code;
2829         guint size;
2830
2831         if (watch->funcs.tunnel_start)
2832           code = watch->funcs.tunnel_start (watch, watch->user_data);
2833         else
2834           code = GST_RTSP_STS_OK;
2835
2836         /* queue the response string */
2837         str = gen_tunnel_reply (watch->conn, code);
2838         size = str->len;
2839         gst_rtsp_watch_queue_data (watch, (guint8 *) g_string_free (str, FALSE),
2840             size);
2841       } else if (res == GST_RTSP_ETPOST) {
2842         /* in the callback the connection should be tunneled with the
2843          * GET connection */
2844         if (watch->funcs.tunnel_complete)
2845           watch->funcs.tunnel_complete (watch, watch->user_data);
2846       } else if (G_UNLIKELY (res != GST_RTSP_OK))
2847         goto error;
2848
2849       if (G_LIKELY (res == GST_RTSP_OK)) {
2850         if (watch->funcs.message_received)
2851           watch->funcs.message_received (watch, &watch->message,
2852               watch->user_data);
2853
2854         gst_rtsp_message_unset (&watch->message);
2855       }
2856       build_reset (&watch->builder);
2857     } while (FALSE);
2858   }
2859
2860   if (watch->writefd.revents & WRITE_COND) {
2861     do {
2862       if (watch->write_data == NULL) {
2863         GstRTSPRec *rec;
2864
2865         /* get a new message from the queue */
2866         rec = g_async_queue_try_pop (watch->messages);
2867         if (rec == NULL)
2868           goto done;
2869
2870         watch->write_off = 0;
2871         watch->write_data = rec->data;
2872         watch->write_size = rec->size;
2873         watch->write_id = rec->id;
2874
2875         g_slice_free (GstRTSPRec, rec);
2876       }
2877
2878       res = write_bytes (watch->writefd.fd, watch->write_data,
2879           &watch->write_off, watch->write_size);
2880       if (res == GST_RTSP_EINTR)
2881         break;
2882       if (G_UNLIKELY (res != GST_RTSP_OK))
2883         goto error;
2884
2885       if (watch->funcs.message_sent)
2886         watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
2887
2888     done:
2889       if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
2890         g_source_remove_poll ((GSource *) watch, &watch->writefd);
2891         watch->write_added = FALSE;
2892         watch->writefd.revents = 0;
2893       }
2894       g_free (watch->write_data);
2895       watch->write_data = NULL;
2896     } while (FALSE);
2897   }
2898
2899   return TRUE;
2900
2901   /* ERRORS */
2902 eof:
2903   {
2904     if (watch->funcs.closed)
2905       watch->funcs.closed (watch, watch->user_data);
2906     return FALSE;
2907   }
2908 error:
2909   {
2910     if (watch->funcs.error)
2911       watch->funcs.error (watch, res, watch->user_data);
2912     return FALSE;
2913   }
2914 }
2915
2916 static void
2917 gst_rtsp_rec_free (gpointer data)
2918 {
2919   GstRTSPRec *rec = data;
2920
2921   g_free (rec->data);
2922   g_slice_free (GstRTSPRec, rec);
2923 }
2924
2925 static void
2926 gst_rtsp_source_finalize (GSource * source)
2927 {
2928   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2929
2930   build_reset (&watch->builder);
2931   gst_rtsp_message_unset (&watch->message);
2932
2933   g_async_queue_unref (watch->messages);
2934   watch->messages = NULL;
2935
2936   g_free (watch->write_data);
2937
2938   if (watch->notify)
2939     watch->notify (watch->user_data);
2940 }
2941
2942 static GSourceFuncs gst_rtsp_source_funcs = {
2943   gst_rtsp_source_prepare,
2944   gst_rtsp_source_check,
2945   gst_rtsp_source_dispatch,
2946   gst_rtsp_source_finalize,
2947   NULL,
2948   NULL
2949 };
2950
2951 /**
2952  * gst_rtsp_watch_new:
2953  * @conn: a #GstRTSPConnection
2954  * @funcs: watch functions
2955  * @user_data: user data to pass to @funcs
2956  * @notify: notify when @user_data is not referenced anymore
2957  *
2958  * Create a watch object for @conn. The functions provided in @funcs will be
2959  * called with @user_data when activity happened on the watch.
2960  *
2961  * The new watch is usually created so that it can be attached to a
2962  * maincontext with gst_rtsp_watch_attach(). 
2963  *
2964  * @conn must exist for the entire lifetime of the watch.
2965  *
2966  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
2967  * communication. Free with gst_rtsp_watch_unref () after usage.
2968  *
2969  * Since: 0.10.23
2970  */
2971 GstRTSPWatch *
2972 gst_rtsp_watch_new (GstRTSPConnection * conn,
2973     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
2974 {
2975   GstRTSPWatch *result;
2976
2977   g_return_val_if_fail (conn != NULL, NULL);
2978   g_return_val_if_fail (funcs != NULL, NULL);
2979   g_return_val_if_fail (conn->readfd != NULL, NULL);
2980   g_return_val_if_fail (conn->writefd != NULL, NULL);
2981
2982   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
2983       sizeof (GstRTSPWatch));
2984
2985   result->conn = conn;
2986   result->builder.state = STATE_START;
2987
2988   result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
2989
2990   result->readfd.fd = -1;
2991   result->writefd.fd = -1;
2992
2993   gst_rtsp_watch_reset (result);
2994
2995   result->funcs = *funcs;
2996   result->user_data = user_data;
2997   result->notify = notify;
2998
2999   /* only add the read fd, the write fd is only added when we have data
3000    * to send. */
3001   g_source_add_poll ((GSource *) result, &result->readfd);
3002
3003   return result;
3004 }
3005
3006 /**
3007  * gst_rtsp_watch_reset:
3008  * @watch: a #GstRTSPWatch
3009  *
3010  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
3011  * when the file descriptors of the connection might have changed.
3012  *
3013  * Since: 0.10.23
3014  */
3015 void
3016 gst_rtsp_watch_reset (GstRTSPWatch * watch)
3017 {
3018   if (watch->readfd.fd != -1)
3019     g_source_remove_poll ((GSource *) watch, &watch->readfd);
3020   if (watch->writefd.fd != -1)
3021     g_source_remove_poll ((GSource *) watch, &watch->writefd);
3022
3023   watch->readfd.fd = watch->conn->readfd->fd;
3024   watch->readfd.events = READ_COND;
3025   watch->readfd.revents = 0;
3026
3027   watch->writefd.fd = watch->conn->writefd->fd;
3028   watch->writefd.events = WRITE_COND;
3029   watch->writefd.revents = 0;
3030   watch->write_added = FALSE;
3031
3032   g_source_add_poll ((GSource *) watch, &watch->readfd);
3033 }
3034
3035 /**
3036  * gst_rtsp_watch_attach:
3037  * @watch: a #GstRTSPWatch
3038  * @context: a GMainContext (if NULL, the default context will be used)
3039  *
3040  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
3041  *
3042  * Returns: the ID (greater than 0) for the watch within the GMainContext. 
3043  *
3044  * Since: 0.10.23
3045  */
3046 guint
3047 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
3048 {
3049   g_return_val_if_fail (watch != NULL, 0);
3050
3051   return g_source_attach ((GSource *) watch, context);
3052 }
3053
3054 /**
3055  * gst_rtsp_watch_unref:
3056  * @watch: a #GstRTSPWatch
3057  *
3058  * Decreases the reference count of @watch by one. If the resulting reference
3059  * count is zero the watch and associated memory will be destroyed.
3060  *
3061  * Since: 0.10.23
3062  */
3063 void
3064 gst_rtsp_watch_unref (GstRTSPWatch * watch)
3065 {
3066   g_return_if_fail (watch != NULL);
3067
3068   g_source_unref ((GSource *) watch);
3069 }
3070
3071 /**
3072  * gst_rtsp_watch_queue_data:
3073  * @watch: a #GstRTSPWatch
3074  * @data: the data to queue
3075  * @size: the size of @data
3076  *
3077  * Queue @data for transmission in @watch. It will be transmitted when the
3078  * connection of the @watch becomes writable.
3079  *
3080  * This function will take ownership of @data and g_free() it after use.
3081  *
3082  * The return value of this function will be used as the id argument in the
3083  * message_sent callback.
3084  *
3085  * Returns: an id.
3086  *
3087  * Since: 0.10.24
3088  */
3089 guint
3090 gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
3091     guint size)
3092 {
3093   GstRTSPRec *rec;
3094
3095   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3096   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
3097   g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
3098
3099   /* make a record with the data and id */
3100   rec = g_slice_new (GstRTSPRec);
3101   rec->data = (guint8 *) data;
3102   rec->size = size;
3103   do {
3104     /* make sure rec->id is never 0 */
3105     rec->id = ++watch->id;
3106   } while (G_UNLIKELY (rec->id == 0));
3107
3108   /* add the record to a queue. FIXME we would like to have an upper limit here */
3109   g_async_queue_push (watch->messages, rec);
3110
3111   /* FIXME: does the following need to be made thread-safe? (this might be
3112    * called from a streaming thread, like appsink's render function) */
3113   /* make sure the main context will now also check for writability on the
3114    * socket */
3115   if (!watch->write_added) {
3116     g_source_add_poll ((GSource *) watch, &watch->writefd);
3117     watch->write_added = TRUE;
3118   }
3119
3120   return rec->id;
3121 }
3122
3123 /**
3124  * gst_rtsp_watch_queue_message:
3125  * @watch: a #GstRTSPWatch
3126  * @message: a #GstRTSPMessage
3127  *
3128  * Queue a @message for transmission in @watch. The contents of this
3129  * message will be serialized and transmitted when the connection of the
3130  * @watch becomes writable.
3131  *
3132  * The return value of this function will be used as the id argument in the
3133  * message_sent callback.
3134  *
3135  * Returns: an id.
3136  *
3137  * Since: 0.10.23
3138  */
3139 guint
3140 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
3141 {
3142   GString *str;
3143   guint size;
3144
3145   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3146   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
3147
3148   /* make a record with the message as a string and id */
3149   str = message_to_string (watch->conn, message);
3150   size = str->len;
3151   return gst_rtsp_watch_queue_data (watch,
3152       (guint8 *) g_string_free (str, FALSE), size);
3153 }