rtsp: Call message_sent() callback for all sent messages.
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @short_description: manage RTSP connections
46  * @see_also: gstrtspurl
47  *  
48  * <refsect2>
49  * <para>
50  * This object manages the RTSP connection to the server. It provides function
51  * to receive and send bytes and messages.
52  * </para>
53  * </refsect2>
54  *  
55  * Last reviewed on 2007-07-24 (0.10.14)
56  */
57
58 #ifdef HAVE_CONFIG_H
59 #  include <config.h>
60 #endif
61
62 #include <stdio.h>
63 #include <errno.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <time.h>
67
68 #ifdef HAVE_UNISTD_H
69 #include <unistd.h>
70 #endif
71
72
73 /* we include this here to get the G_OS_* defines */
74 #include <glib.h>
75 #include <gst/gst.h>
76
77 #ifdef G_OS_WIN32
78 #include <winsock2.h>
79 #include <ws2tcpip.h>
80 #define EINPROGRESS WSAEINPROGRESS
81 #else
82 #include <sys/ioctl.h>
83 #include <netdb.h>
84 #include <sys/socket.h>
85 #include <netinet/in.h>
86 #include <arpa/inet.h>
87 #include <fcntl.h>
88 #endif
89
90 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
91 #include <sys/filio.h>
92 #endif
93
94 #include "gstrtspconnection.h"
95 #include "gstrtspbase64.h"
96 #include "md5.h"
97
98 typedef struct
99 {
100   gint state;
101   guint save;
102   gchar in[4];
103   guint cin;
104   gchar out[3];
105   guint cout;
106   guint coutl;
107 } DecodeCtx;
108
109 static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx,
110     guint size, DecodeCtx * ctxp);
111 static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
112     guint keysize, gchar ** value);
113 static void parse_string (gchar * dest, gint size, gchar ** src);
114
115 #ifdef G_OS_WIN32
116 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
117 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
118 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
119 #define CLOSE_SOCKET(sock) closesocket (sock)
120 #define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
121 #define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
122 /* According to Microsoft's connect() documentation this one returns
123  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
124 #define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
125 #else
126 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
127 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
128 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
129 #define CLOSE_SOCKET(sock) close (sock)
130 #define ERRNO_IS_EAGAIN (errno == EAGAIN)
131 #define ERRNO_IS_EINTR (errno == EINTR)
132 #define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
133 #endif
134
135 #define ADD_POLLFD(fdset, pfd, fd)        \
136 G_STMT_START {                            \
137   (pfd)->fd = fd;                         \
138   gst_poll_add_fd (fdset, pfd);           \
139 } G_STMT_END
140
141 #define REMOVE_POLLFD(fdset, pfd)          \
142 G_STMT_START {                             \
143   if ((pfd)->fd != -1) {                   \
144     GST_DEBUG ("remove fd %d", (pfd)->fd); \
145     gst_poll_remove_fd (fdset, pfd);       \
146     CLOSE_SOCKET ((pfd)->fd);              \
147     (pfd)->fd = -1;                        \
148   }                                        \
149 } G_STMT_END
150
151 typedef enum
152 {
153   TUNNEL_STATE_NONE,
154   TUNNEL_STATE_GET,
155   TUNNEL_STATE_POST,
156   TUNNEL_STATE_COMPLETE
157 } GstRTSPTunnelState;
158
159 #define TUNNELID_LEN   24
160
161 struct _GstRTSPConnection
162 {
163   /*< private > */
164   /* URL for the connection */
165   GstRTSPUrl *url;
166
167   /* connection state */
168   GstPollFD fd0;
169   GstPollFD fd1;
170
171   GstPollFD *readfd;
172   GstPollFD *writefd;
173
174   gchar tunnelid[TUNNELID_LEN];
175   gboolean tunneled;
176   GstRTSPTunnelState tstate;
177
178   GstPoll *fdset;
179   gchar *ip;
180
181   /* Session state */
182   gint cseq;                    /* sequence number */
183   gchar session_id[512];        /* session id */
184   gint timeout;                 /* session timeout in seconds */
185   GTimer *timer;                /* timeout timer */
186
187   /* Authentication */
188   GstRTSPAuthMethod auth_method;
189   gchar *username;
190   gchar *passwd;
191   GHashTable *auth_params;
192
193   DecodeCtx ctx;
194   DecodeCtx *ctxp;
195
196   gchar *proxy_host;
197   guint proxy_port;
198 };
199
200 #ifdef G_OS_WIN32
201 static int
202 inet_aton (const char *c, struct in_addr *paddr)
203 {
204   /* note that inet_addr is deprecated on unix because
205    * inet_addr returns -1 (INADDR_NONE) for the valid 255.255.255.255
206    * address. */
207   paddr->s_addr = inet_addr (c);
208
209   if (paddr->s_addr == INADDR_NONE)
210     return 0;
211
212   return 1;
213 }
214 #endif
215
216 enum
217 {
218   STATE_START = 0,
219   STATE_DATA_HEADER,
220   STATE_DATA_BODY,
221   STATE_READ_LINES,
222   STATE_END,
223   STATE_LAST
224 };
225
226 /* a structure for constructing RTSPMessages */
227 typedef struct
228 {
229   gint state;
230   guint8 buffer[4096];
231   guint offset;
232
233   guint line;
234   guint8 *body_data;
235   glong body_len;
236 } GstRTSPBuilder;
237
238 static void
239 build_reset (GstRTSPBuilder * builder)
240 {
241   g_free (builder->body_data);
242   memset (builder, 0, sizeof (GstRTSPBuilder));
243 }
244
245 /**
246  * gst_rtsp_connection_create:
247  * @url: a #GstRTSPUrl 
248  * @conn: storage for a #GstRTSPConnection
249  *
250  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
251  * The connection will not yet attempt to connect to @url, use
252  * gst_rtsp_connection_connect().
253  *
254  * A copy of @url will be made.
255  *
256  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
257  */
258 GstRTSPResult
259 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
260 {
261   GstRTSPConnection *newconn;
262 #ifdef G_OS_WIN32
263   WSADATA w;
264   int error;
265 #endif
266
267   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
268
269 #ifdef G_OS_WIN32
270   error = WSAStartup (0x0202, &w);
271
272   if (error)
273     goto startup_error;
274
275   if (w.wVersion != 0x0202)
276     goto version_error;
277 #endif
278
279   newconn = g_new0 (GstRTSPConnection, 1);
280
281   if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
282     goto no_fdset;
283
284   newconn->url = gst_rtsp_url_copy (url);
285   newconn->fd0.fd = -1;
286   newconn->fd1.fd = -1;
287   newconn->timer = g_timer_new ();
288   newconn->timeout = 60;
289   newconn->cseq = 1;
290
291   newconn->auth_method = GST_RTSP_AUTH_NONE;
292   newconn->username = NULL;
293   newconn->passwd = NULL;
294   newconn->auth_params = NULL;
295
296   *conn = newconn;
297
298   return GST_RTSP_OK;
299
300   /* ERRORS */
301 #ifdef G_OS_WIN32
302 startup_error:
303   {
304     g_warning ("Error %d on WSAStartup", error);
305     return GST_RTSP_EWSASTART;
306   }
307 version_error:
308   {
309     g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
310         w.wVersion);
311     WSACleanup ();
312     return GST_RTSP_EWSAVERSION;
313   }
314 #endif
315 no_fdset:
316   {
317     g_free (newconn);
318 #ifdef G_OS_WIN32
319     WSACleanup ();
320 #endif
321     return GST_RTSP_ESYS;
322   }
323 }
324
325 /**
326  * gst_rtsp_connection_accept:
327  * @sock: a socket
328  * @conn: storage for a #GstRTSPConnection
329  *
330  * Accept a new connection on @sock and create a new #GstRTSPConnection for
331  * handling communication on new socket.
332  *
333  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
334  *
335  * Since: 0.10.23
336  */
337 GstRTSPResult
338 gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
339 {
340   int fd;
341   unsigned int address_len;
342   GstRTSPConnection *newconn = NULL;
343   struct sockaddr_in address;
344   GstRTSPUrl *url;
345 #ifdef G_OS_WIN32
346   gulong flags = 1;
347 #endif
348
349   address_len = sizeof (address);
350   memset (&address, 0, address_len);
351
352 #ifndef G_OS_WIN32
353   fd = accept (sock, (struct sockaddr *) &address, &address_len);
354 #else
355   fd = accept (sock, (struct sockaddr *) &address, (gint *) & address_len);
356 #endif /* G_OS_WIN32 */
357   if (fd == -1)
358     goto accept_failed;
359
360   /* set to non-blocking mode so that we can cancel the communication */
361 #ifndef G_OS_WIN32
362   fcntl (fd, F_SETFL, O_NONBLOCK);
363 #else
364   ioctlsocket (fd, FIONBIO, &flags);
365 #endif /* G_OS_WIN32 */
366
367   /* create a url for the client address */
368   url = g_new0 (GstRTSPUrl, 1);
369   url->host = g_strdup_printf ("%s", inet_ntoa (address.sin_addr));
370   url->port = address.sin_port;
371
372   /* now create the connection object */
373   gst_rtsp_connection_create (url, &newconn);
374   gst_rtsp_url_free (url);
375
376   ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);
377
378   /* both read and write initially */
379   newconn->readfd = &newconn->fd0;
380   newconn->writefd = &newconn->fd0;
381
382   *conn = newconn;
383
384   return GST_RTSP_OK;
385
386   /* ERRORS */
387 accept_failed:
388   {
389     return GST_RTSP_ESYS;
390   }
391 }
392
393 static gchar *
394 do_resolve (const gchar * host)
395 {
396   struct hostent *hostinfo;
397   struct in_addr addr;
398   const gchar *ip;
399 #ifdef G_OS_WIN32
400   struct in_addr *addrp;
401 #else
402   char **addrs;
403   gchar ipbuf[INET_ADDRSTRLEN];
404 #endif /* G_OS_WIN32 */
405
406   ip = NULL;
407
408   /* first check if it already is an IP address */
409   if (inet_aton (host, &addr)) {
410     ip = host;
411   } else {
412     hostinfo = gethostbyname (host);
413     if (!hostinfo)
414       goto not_resolved;        /* h_errno set */
415
416     if (hostinfo->h_addrtype != AF_INET)
417       goto not_ip;              /* host not an IP host */
418 #ifdef G_OS_WIN32
419     addrp = (struct in_addr *) hostinfo->h_addr_list[0];
420     /* this is not threadsafe */
421     ip = inet_ntoa (*addrp);
422 #else
423     addrs = hostinfo->h_addr_list;
424     ip = inet_ntop (AF_INET, (struct in_addr *) addrs[0], ipbuf,
425         sizeof (ipbuf));
426 #endif /* G_OS_WIN32 */
427   }
428   return g_strdup (ip);
429
430   /* ERRORS */
431 not_resolved:
432   {
433     GST_ERROR ("could not resolve %s", host);
434     return NULL;
435   }
436 not_ip:
437   {
438     GST_ERROR ("not an IP address");
439     return NULL;
440   }
441 }
442
443 static GstRTSPResult
444 do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
445     GstPoll * fdset, GTimeVal * timeout)
446 {
447   gint fd;
448   struct sockaddr_in sa_in;
449   gint ret;
450 #ifdef G_OS_WIN32
451   unsigned long flags = 1;
452 #endif /* G_OS_WIN32 */
453   GstClockTime to;
454   gint retval;
455
456   memset (&sa_in, 0, sizeof (sa_in));
457   sa_in.sin_family = AF_INET;   /* network socket */
458   sa_in.sin_port = htons (port);        /* on port */
459   sa_in.sin_addr.s_addr = inet_addr (ip);       /* on host ip */
460
461   fd = socket (AF_INET, SOCK_STREAM, 0);
462   if (fd == -1)
463     goto no_socket;
464
465   /* set to non-blocking mode so that we can cancel the connect */
466 #ifndef G_OS_WIN32
467   fcntl (fd, F_SETFL, O_NONBLOCK);
468 #else
469   ioctlsocket (fd, FIONBIO, &flags);
470 #endif /* G_OS_WIN32 */
471
472   /* add the socket to our fdset */
473   ADD_POLLFD (fdset, fdout, fd);
474
475   /* we are going to connect ASYNC now */
476   ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in));
477   if (ret == 0)
478     goto done;
479   if (!ERRNO_IS_EINPROGRESS)
480     goto sys_error;
481
482   /* wait for connect to complete up to the specified timeout or until we got
483    * interrupted. */
484   gst_poll_fd_ctl_write (fdset, fdout, TRUE);
485
486   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
487
488   do {
489     retval = gst_poll_wait (fdset, to);
490   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
491
492   if (retval == 0)
493     goto timeout;
494   else if (retval == -1)
495     goto sys_error;
496
497   /* we can still have an error connecting on windows */
498   if (gst_poll_fd_has_error (fdset, fdout)) {
499     socklen_t len = sizeof (errno);
500 #ifndef G_OS_WIN32
501     getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
502 #else
503     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
504 #endif
505     goto sys_error;
506   }
507
508   gst_poll_fd_ignored (fdset, fdout);
509 done:
510
511   return GST_RTSP_OK;
512
513   /* ERRORS */
514 no_socket:
515   {
516     GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno));
517     return GST_RTSP_ESYS;
518   }
519 sys_error:
520   {
521     GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
522     REMOVE_POLLFD (fdset, fdout);
523     return GST_RTSP_ESYS;
524   }
525 timeout:
526   {
527     GST_ERROR ("timeout");
528     REMOVE_POLLFD (fdset, fdout);
529     return GST_RTSP_ETIMEOUT;
530   }
531 }
532
533 static GstRTSPResult
534 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
535 {
536   gint i;
537   GstRTSPResult res;
538   gchar *str;
539   guint idx, line;
540   gint retval;
541   GstClockTime to;
542   gchar *ip, *url_port_str;
543   guint16 port, url_port;
544   gchar codestr[4], *resultstr;
545   gint code;
546   GstRTSPUrl *url;
547   gchar *hostparam;
548
549   /* create a random sessionid */
550   for (i = 0; i < TUNNELID_LEN; i++)
551     conn->tunnelid[i] = g_random_int_range ('a', 'z');
552   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
553
554   url = conn->url;
555   /* get the port from the url */
556   gst_rtsp_url_get_port (url, &url_port);
557
558   if (conn->proxy_host) {
559     hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
560     url_port_str = g_strdup_printf (":%d", url_port);
561     ip = conn->proxy_host;
562     port = conn->proxy_port;
563   } else {
564     hostparam = NULL;
565     url_port_str = NULL;
566     ip = conn->ip;
567     port = url_port;
568   }
569
570   /* */
571   str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n"
572       "%s"
573       "x-sessioncookie: %s\r\n"
574       "Accept: application/x-rtsp-tunnelled\r\n"
575       "Pragma: no-cache\r\n"
576       "Cache-Control: no-cache\r\n" "\r\n",
577       conn->proxy_host ? "http://" : "",
578       conn->proxy_host ? url->host : "",
579       conn->proxy_host ? url_port_str : "",
580       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
581       hostparam ? hostparam : "", conn->tunnelid);
582
583   /* we start by writing to this fd */
584   conn->writefd = &conn->fd0;
585
586   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
587   g_free (str);
588   if (res != GST_RTSP_OK)
589     goto write_failed;
590
591   gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE);
592   gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE);
593
594   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
595
596   line = 0;
597   while (TRUE) {
598     guint8 buffer[4096];
599
600     idx = 0;
601     while (TRUE) {
602       res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL);
603       if (res == GST_RTSP_EEOF)
604         goto eof;
605       if (res == GST_RTSP_OK)
606         break;
607       if (res != GST_RTSP_EINTR)
608         goto read_error;
609
610       do {
611         retval = gst_poll_wait (conn->fdset, to);
612       } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
613
614       /* check for timeout */
615       if (retval == 0)
616         goto timeout;
617
618       if (retval == -1) {
619         if (errno == EBUSY)
620           goto stopped;
621         else
622           goto select_error;
623       }
624     }
625
626     /* check for last line */
627     if (buffer[0] == '\r')
628       buffer[0] = '\0';
629     if (buffer[0] == '\0')
630       break;
631
632     if (line == 0) {
633       /* first line, parse response */
634       gchar versionstr[20];
635       gchar *bptr;
636
637       bptr = (gchar *) buffer;
638
639       parse_string (versionstr, sizeof (versionstr), &bptr);
640       parse_string (codestr, sizeof (codestr), &bptr);
641       code = atoi (codestr);
642
643       while (g_ascii_isspace (*bptr))
644         bptr++;
645
646       resultstr = bptr;
647
648       if (code != GST_RTSP_STS_OK)
649         goto wrong_result;
650     } else {
651       gchar key[32];
652       gchar *value;
653
654       /* other lines, parse key/value */
655       res = parse_key_value (buffer, key, sizeof (key), &value);
656       if (res == GST_RTSP_OK) {
657         /* we got a new ip address */
658         if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) {
659           if (conn->proxy_host) {
660             /* if we use a proxy we need to change the destination url */
661             g_free (url->host);
662             url->host = g_strdup (value);
663             g_free (hostparam);
664             g_free (url_port_str);
665             hostparam =
666                 g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
667             url_port_str = g_strdup_printf (":%d", url_port);
668           } else {
669             /* and resolve the new ip address */
670             if (!(ip = do_resolve (conn->ip)))
671               goto not_resolved;
672             g_free (conn->ip);
673             conn->ip = ip;
674           }
675         }
676       }
677     }
678     line++;
679   }
680
681   /* connect to the host/port */
682   res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout);
683   if (res != GST_RTSP_OK)
684     goto connect_failed;
685
686   /* this is now our writing socket */
687   conn->writefd = &conn->fd1;
688
689   /* */
690   str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n"
691       "%s"
692       "x-sessioncookie: %s\r\n"
693       "Content-Type: application/x-rtsp-tunnelled\r\n"
694       "Pragma: no-cache\r\n"
695       "Cache-Control: no-cache\r\n"
696       "Content-Length: 32767\r\n"
697       "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
698       "\r\n",
699       conn->proxy_host ? "http://" : "",
700       conn->proxy_host ? url->host : "",
701       conn->proxy_host ? url_port_str : "",
702       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
703       hostparam ? hostparam : "", conn->tunnelid);
704
705   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
706   g_free (str);
707   if (res != GST_RTSP_OK)
708     goto write_failed;
709
710 exit:
711   g_free (hostparam);
712   g_free (url_port_str);
713
714   return res;
715
716   /* ERRORS */
717 write_failed:
718   {
719     GST_ERROR ("write failed (%d)", res);
720     goto exit;
721   }
722 eof:
723   {
724     res = GST_RTSP_EEOF;
725     goto exit;
726   }
727 read_error:
728   {
729     goto exit;
730   }
731 timeout:
732   {
733     res = GST_RTSP_ETIMEOUT;
734     goto exit;
735   }
736 select_error:
737   {
738     res = GST_RTSP_ESYS;
739     goto exit;
740   }
741 stopped:
742   {
743     res = GST_RTSP_EINTR;
744     goto exit;
745   }
746 wrong_result:
747   {
748     GST_ERROR ("got failure response %d %s", code, resultstr);
749     res = GST_RTSP_ERROR;
750     goto exit;
751   }
752 not_resolved:
753   {
754     GST_ERROR ("could not resolve %s", conn->ip);
755     res = GST_RTSP_ENET;
756     goto exit;
757   }
758 connect_failed:
759   {
760     GST_ERROR ("failed to connect");
761     goto exit;
762   }
763 }
764
765 /**
766  * gst_rtsp_connection_connect:
767  * @conn: a #GstRTSPConnection 
768  * @timeout: a #GTimeVal timeout
769  *
770  * Attempt to connect to the url of @conn made with
771  * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
772  * forever. If @timeout contains a valid timeout, this function will return
773  * #GST_RTSP_ETIMEOUT after the timeout expired.
774  *
775  * This function can be cancelled with gst_rtsp_connection_flush().
776  *
777  * Returns: #GST_RTSP_OK when a connection could be made.
778  */
779 GstRTSPResult
780 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
781 {
782   GstRTSPResult res;
783   gchar *ip;
784   guint16 port;
785   GstRTSPUrl *url;
786
787   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
788   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
789   g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);
790
791   url = conn->url;
792
793   if (conn->proxy_host && conn->tunneled) {
794     if (!(ip = do_resolve (conn->proxy_host))) {
795       GST_ERROR ("could not resolve %s", conn->proxy_host);
796       goto not_resolved;
797     }
798     port = conn->proxy_port;
799     g_free (conn->proxy_host);
800     conn->proxy_host = ip;
801   } else {
802     if (!(ip = do_resolve (url->host))) {
803       GST_ERROR ("could not resolve %s", url->host);
804       goto not_resolved;
805     }
806     /* get the port from the url */
807     gst_rtsp_url_get_port (url, &port);
808
809     g_free (conn->ip);
810     conn->ip = ip;
811   }
812
813   /* connect to the host/port */
814   res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout);
815   if (res != GST_RTSP_OK)
816     goto connect_failed;
817
818   /* this is our read URL */
819   conn->readfd = &conn->fd0;
820
821   if (conn->tunneled) {
822     res = setup_tunneling (conn, timeout);
823     if (res != GST_RTSP_OK)
824       goto tunneling_failed;
825   } else {
826     conn->writefd = &conn->fd0;
827   }
828
829   return GST_RTSP_OK;
830
831 not_resolved:
832   {
833     return GST_RTSP_ENET;
834   }
835 connect_failed:
836   {
837     GST_ERROR ("failed to connect");
838     return res;
839   }
840 tunneling_failed:
841   {
842     GST_ERROR ("failed to setup tunneling");
843     return res;
844   }
845 }
846
847 static void
848 md5_digest_to_hex_string (unsigned char digest[16], char string[33])
849 {
850   static const char hexdigits[] = "0123456789abcdef";
851   int i;
852
853   for (i = 0; i < 16; i++) {
854     string[i * 2] = hexdigits[(digest[i] >> 4) & 0x0f];
855     string[i * 2 + 1] = hexdigits[digest[i] & 0x0f];
856   }
857   string[32] = 0;
858 }
859
860 static void
861 auth_digest_compute_hex_urp (const gchar * username,
862     const gchar * realm, const gchar * password, gchar hex_urp[33])
863 {
864   struct MD5Context md5_context;
865   unsigned char digest[16];
866
867   MD5Init (&md5_context);
868   MD5Update (&md5_context, username, strlen (username));
869   MD5Update (&md5_context, ":", 1);
870   MD5Update (&md5_context, realm, strlen (realm));
871   MD5Update (&md5_context, ":", 1);
872   MD5Update (&md5_context, password, strlen (password));
873   MD5Final (digest, &md5_context);
874   md5_digest_to_hex_string (digest, hex_urp);
875 }
876
877 static void
878 auth_digest_compute_response (const gchar * method,
879     const gchar * uri, const gchar * hex_a1, const gchar * nonce,
880     gchar response[33])
881 {
882   char hex_a2[33];
883   struct MD5Context md5_context;
884   unsigned char digest[16];
885
886   /* compute A2 */
887   MD5Init (&md5_context);
888   MD5Update (&md5_context, method, strlen (method));
889   MD5Update (&md5_context, ":", 1);
890   MD5Update (&md5_context, uri, strlen (uri));
891   MD5Final (digest, &md5_context);
892   md5_digest_to_hex_string (digest, hex_a2);
893
894   /* compute KD */
895   MD5Init (&md5_context);
896   MD5Update (&md5_context, hex_a1, strlen (hex_a1));
897   MD5Update (&md5_context, ":", 1);
898   MD5Update (&md5_context, nonce, strlen (nonce));
899   MD5Update (&md5_context, ":", 1);
900
901   MD5Update (&md5_context, hex_a2, 32);
902   MD5Final (digest, &md5_context);
903   md5_digest_to_hex_string (digest, response);
904 }
905
906 static void
907 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
908 {
909   switch (conn->auth_method) {
910     case GST_RTSP_AUTH_BASIC:{
911       gchar *user_pass;
912       gchar *user_pass64;
913       gchar *auth_string;
914
915       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
916       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
917       auth_string = g_strdup_printf ("Basic %s", user_pass64);
918
919       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
920           auth_string);
921
922       g_free (user_pass);
923       g_free (user_pass64);
924       break;
925     }
926     case GST_RTSP_AUTH_DIGEST:{
927       gchar response[33], hex_urp[33];
928       gchar *auth_string, *auth_string2;
929       gchar *realm;
930       gchar *nonce;
931       gchar *opaque;
932       const gchar *uri;
933       const gchar *method;
934
935       /* we need to have some params set */
936       if (conn->auth_params == NULL)
937         break;
938
939       /* we need the realm and nonce */
940       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
941       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
942       if (realm == NULL || nonce == NULL)
943         break;
944
945       auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
946           hex_urp);
947
948       method = gst_rtsp_method_as_text (message->type_data.request.method);
949       uri = message->type_data.request.uri;
950
951       /* Assume no qop, algorithm=md5, stale=false */
952       /* For algorithm MD5, a1 = urp. */
953       auth_digest_compute_response (method, uri, hex_urp, nonce, response);
954       auth_string = g_strdup_printf ("Digest username=\"%s\", "
955           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
956           conn->username, realm, nonce, uri, response);
957
958       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
959       if (opaque) {
960         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
961             opaque);
962         g_free (auth_string);
963         auth_string = auth_string2;
964       }
965       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
966           auth_string);
967       break;
968     }
969     default:
970       /* Nothing to do */
971       break;
972   }
973 }
974
975 static void
976 gen_date_string (gchar * date_string, guint len)
977 {
978   GTimeVal tv;
979   time_t t;
980 #ifdef HAVE_GMTIME_R
981   struct tm tm_;
982 #endif
983
984   g_get_current_time (&tv);
985   t = (time_t) tv.tv_sec;
986
987 #ifdef HAVE_GMTIME_R
988   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
989 #else
990   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
991 #endif
992 }
993
994 static GstRTSPResult
995 write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
996 {
997   guint left;
998
999   if (G_UNLIKELY (*idx > size))
1000     return GST_RTSP_ERROR;
1001
1002   left = size - *idx;
1003
1004   while (left) {
1005     gint r;
1006
1007     r = WRITE_SOCKET (fd, &buffer[*idx], left);
1008     if (G_UNLIKELY (r == 0)) {
1009       return GST_RTSP_EINTR;
1010     } else if (G_UNLIKELY (r < 0)) {
1011       if (ERRNO_IS_EAGAIN)
1012         return GST_RTSP_EINTR;
1013       if (!ERRNO_IS_EINTR)
1014         return GST_RTSP_ESYS;
1015     } else {
1016       left -= r;
1017       *idx += r;
1018     }
1019   }
1020   return GST_RTSP_OK;
1021 }
1022
1023 static gint
1024 fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx)
1025 {
1026   gint out = 0;
1027
1028   if (ctx) {
1029     gint r;
1030
1031     while (size > 0) {
1032       while (size > 0 && ctx->cout < ctx->coutl) {
1033         /* we have some leftover bytes */
1034         *buffer++ = ctx->out[ctx->cout];
1035         ctx->cout++;
1036         size--;
1037         out++;
1038       }
1039       /* nothing in the buffer */
1040       if (size == 0)
1041         break;
1042
1043       /* try to read more bytes */
1044       r = READ_SOCKET (fd, &ctx->in[ctx->cin], 4 - ctx->cin);
1045       if (r <= 0) {
1046         if (out == 0)
1047           out = r;
1048         break;
1049       }
1050
1051       ctx->cin += r;
1052       if (ctx->cin == 4) {
1053         r = g_base64_decode_step ((const gchar *) ctx->in, 4,
1054             (guchar *) ctx->out, &ctx->state, &ctx->save);
1055         ctx->cout = 0;
1056         ctx->coutl = r;
1057         ctx->cin = 0;
1058       }
1059     }
1060   } else {
1061     out = READ_SOCKET (fd, buffer, size);
1062   }
1063
1064   return out;
1065 }
1066
1067 static GstRTSPResult
1068 read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
1069 {
1070   guint left;
1071
1072   if (G_UNLIKELY (*idx > size))
1073     return GST_RTSP_ERROR;
1074
1075   left = size - *idx;
1076
1077   while (left) {
1078     gint r;
1079
1080     r = fill_bytes (fd, &buffer[*idx], left, ctx);
1081     if (G_UNLIKELY (r == 0)) {
1082       return GST_RTSP_EEOF;
1083     } else if (G_UNLIKELY (r < 0)) {
1084       if (ERRNO_IS_EAGAIN)
1085         return GST_RTSP_EINTR;
1086       if (!ERRNO_IS_EINTR)
1087         return GST_RTSP_ESYS;
1088     } else {
1089       left -= r;
1090       *idx += r;
1091     }
1092   }
1093   return GST_RTSP_OK;
1094 }
1095
1096 static GstRTSPResult
1097 read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
1098 {
1099   while (TRUE) {
1100     guint8 c;
1101     gint r;
1102
1103     r = fill_bytes (fd, &c, 1, ctx);
1104     if (G_UNLIKELY (r == 0)) {
1105       return GST_RTSP_EEOF;
1106     } else if (G_UNLIKELY (r < 0)) {
1107       if (ERRNO_IS_EAGAIN)
1108         return GST_RTSP_EINTR;
1109       if (!ERRNO_IS_EINTR)
1110         return GST_RTSP_ESYS;
1111     } else {
1112       if (c == '\n')            /* end on \n */
1113         break;
1114       if (c == '\r')            /* ignore \r */
1115         continue;
1116
1117       if (G_LIKELY (*idx < size - 1))
1118         buffer[(*idx)++] = c;
1119     }
1120   }
1121   buffer[*idx] = '\0';
1122
1123   return GST_RTSP_OK;
1124 }
1125
1126 /**
1127  * gst_rtsp_connection_write:
1128  * @conn: a #GstRTSPConnection
1129  * @data: the data to write
1130  * @size: the size of @data
1131  * @timeout: a timeout value or #NULL
1132  *
1133  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1134  * the specified @timeout. @timeout can be #NULL, in which case this function
1135  * might block forever.
1136  * 
1137  * This function can be cancelled with gst_rtsp_connection_flush().
1138  *
1139  * Returns: #GST_RTSP_OK on success.
1140  */
1141 GstRTSPResult
1142 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1143     guint size, GTimeVal * timeout)
1144 {
1145   guint offset;
1146   gint retval;
1147   GstClockTime to;
1148   GstRTSPResult res;
1149
1150   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1151   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1152   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1153
1154   gst_poll_set_controllable (conn->fdset, TRUE);
1155   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
1156   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
1157   /* clear all previous poll results */
1158   gst_poll_fd_ignored (conn->fdset, conn->writefd);
1159   gst_poll_fd_ignored (conn->fdset, conn->readfd);
1160
1161   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1162
1163   offset = 0;
1164
1165   while (TRUE) {
1166     /* try to write */
1167     res = write_bytes (conn->writefd->fd, data, &offset, size);
1168     if (G_LIKELY (res == GST_RTSP_OK))
1169       break;
1170     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1171       goto write_error;
1172
1173     /* not all is written, wait until we can write more */
1174     do {
1175       retval = gst_poll_wait (conn->fdset, to);
1176     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1177
1178     if (G_UNLIKELY (retval == 0))
1179       goto timeout;
1180
1181     if (G_UNLIKELY (retval == -1)) {
1182       if (errno == EBUSY)
1183         goto stopped;
1184       else
1185         goto select_error;
1186     }
1187   }
1188   return GST_RTSP_OK;
1189
1190   /* ERRORS */
1191 timeout:
1192   {
1193     return GST_RTSP_ETIMEOUT;
1194   }
1195 select_error:
1196   {
1197     return GST_RTSP_ESYS;
1198   }
1199 stopped:
1200   {
1201     return GST_RTSP_EINTR;
1202   }
1203 write_error:
1204   {
1205     return res;
1206   }
1207 }
1208
1209 static GString *
1210 message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
1211 {
1212   GString *str = NULL;
1213
1214   str = g_string_new ("");
1215
1216   switch (message->type) {
1217     case GST_RTSP_MESSAGE_REQUEST:
1218       /* create request string, add CSeq */
1219       g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
1220           "CSeq: %d\r\n",
1221           gst_rtsp_method_as_text (message->type_data.request.method),
1222           message->type_data.request.uri, conn->cseq++);
1223       /* add session id if we have one */
1224       if (conn->session_id[0] != '\0') {
1225         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1226             conn->session_id);
1227       }
1228       /* add any authentication headers */
1229       add_auth_header (conn, message);
1230       break;
1231     case GST_RTSP_MESSAGE_RESPONSE:
1232       /* create response string */
1233       g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
1234           message->type_data.response.code, message->type_data.response.reason);
1235       break;
1236     case GST_RTSP_MESSAGE_DATA:
1237     {
1238       guint8 data_header[4];
1239
1240       /* prepare data header */
1241       data_header[0] = '$';
1242       data_header[1] = message->type_data.data.channel;
1243       data_header[2] = (message->body_size >> 8) & 0xff;
1244       data_header[3] = message->body_size & 0xff;
1245
1246       /* create string with header and data */
1247       str = g_string_append_len (str, (gchar *) data_header, 4);
1248       str =
1249           g_string_append_len (str, (gchar *) message->body,
1250           message->body_size);
1251       break;
1252     }
1253     default:
1254       g_string_free (str, TRUE);
1255       g_return_val_if_reached (NULL);
1256       break;
1257   }
1258
1259   /* append headers and body */
1260   if (message->type != GST_RTSP_MESSAGE_DATA) {
1261     gchar date_string[100];
1262
1263     gen_date_string (date_string, sizeof (date_string));
1264
1265     /* add date header */
1266     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1267
1268     /* append headers */
1269     gst_rtsp_message_append_headers (message, str);
1270
1271     /* append Content-Length and body if needed */
1272     if (message->body != NULL && message->body_size > 0) {
1273       gchar *len;
1274
1275       len = g_strdup_printf ("%d", message->body_size);
1276       g_string_append_printf (str, "%s: %s\r\n",
1277           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1278       g_free (len);
1279       /* header ends here */
1280       g_string_append (str, "\r\n");
1281       str =
1282           g_string_append_len (str, (gchar *) message->body,
1283           message->body_size);
1284     } else {
1285       /* just end headers */
1286       g_string_append (str, "\r\n");
1287     }
1288   }
1289
1290   return str;
1291 }
1292
1293 /**
1294  * gst_rtsp_connection_send:
1295  * @conn: a #GstRTSPConnection
1296  * @message: the message to send
1297  * @timeout: a timeout value or #NULL
1298  *
1299  * Attempt to send @message to the connected @conn, blocking up to
1300  * the specified @timeout. @timeout can be #NULL, in which case this function
1301  * might block forever.
1302  * 
1303  * This function can be cancelled with gst_rtsp_connection_flush().
1304  *
1305  * Returns: #GST_RTSP_OK on success.
1306  */
1307 GstRTSPResult
1308 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1309     GTimeVal * timeout)
1310 {
1311   GString *string = NULL;
1312   GstRTSPResult res;
1313   gchar *str;
1314   gsize len;
1315
1316   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1317   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1318
1319   if (G_UNLIKELY (!(string = message_to_string (conn, message))))
1320     goto no_message;
1321
1322   if (conn->tunneled) {
1323     str = g_base64_encode ((const guchar *) string->str, string->len);
1324     g_string_free (string, TRUE);
1325     len = strlen (str);
1326   } else {
1327     str = string->str;
1328     len = string->len;
1329     g_string_free (string, FALSE);
1330   }
1331
1332   /* write request */
1333   res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
1334
1335   g_free (str);
1336
1337   return res;
1338
1339 no_message:
1340   {
1341     g_warning ("Wrong message");
1342     return GST_RTSP_EINVAL;
1343   }
1344 }
1345
1346 static void
1347 parse_string (gchar * dest, gint size, gchar ** src)
1348 {
1349   gint idx;
1350
1351   idx = 0;
1352   /* skip spaces */
1353   while (g_ascii_isspace (**src))
1354     (*src)++;
1355
1356   while (!g_ascii_isspace (**src) && **src != '\0') {
1357     if (idx < size - 1)
1358       dest[idx++] = **src;
1359     (*src)++;
1360   }
1361   if (size > 0)
1362     dest[idx] = '\0';
1363 }
1364
1365 static void
1366 parse_key (gchar * dest, gint size, gchar ** src)
1367 {
1368   gint idx;
1369
1370   idx = 0;
1371   while (**src != ':' && **src != '\0') {
1372     if (idx < size - 1)
1373       dest[idx++] = **src;
1374     (*src)++;
1375   }
1376   if (size > 0)
1377     dest[idx] = '\0';
1378 }
1379
1380 static GstRTSPResult
1381 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
1382 {
1383   GstRTSPResult res;
1384   gchar versionstr[20];
1385   gchar codestr[4];
1386   gint code;
1387   gchar *bptr;
1388
1389   bptr = (gchar *) buffer;
1390
1391   parse_string (versionstr, sizeof (versionstr), &bptr);
1392   parse_string (codestr, sizeof (codestr), &bptr);
1393   code = atoi (codestr);
1394
1395   while (g_ascii_isspace (*bptr))
1396     bptr++;
1397
1398   if (strcmp (versionstr, "RTSP/1.0") == 0)
1399     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1400         parse_error);
1401   else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1402     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1403         parse_error);
1404     msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
1405   } else
1406     goto parse_error;
1407
1408   return GST_RTSP_OK;
1409
1410 parse_error:
1411   {
1412     return GST_RTSP_EPARSE;
1413   }
1414 }
1415
1416 static GstRTSPResult
1417 parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
1418     GstRTSPMessage * msg)
1419 {
1420   GstRTSPResult res = GST_RTSP_OK;
1421   gchar versionstr[20];
1422   gchar methodstr[20];
1423   gchar urlstr[4096];
1424   gchar *bptr;
1425   GstRTSPMethod method;
1426   GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;
1427
1428   bptr = (gchar *) buffer;
1429
1430   parse_string (methodstr, sizeof (methodstr), &bptr);
1431   method = gst_rtsp_find_method (methodstr);
1432   if (method == GST_RTSP_INVALID) {
1433     /* a tunnel request is allowed when we don't have one yet */
1434     if (conn->tstate != TUNNEL_STATE_NONE)
1435       goto invalid_method;
1436     /* we need GET or POST for a valid tunnel request */
1437     if (!strcmp (methodstr, "GET"))
1438       tstate = TUNNEL_STATE_GET;
1439     else if (!strcmp (methodstr, "POST"))
1440       tstate = TUNNEL_STATE_POST;
1441     else
1442       goto invalid_method;
1443   }
1444
1445   parse_string (urlstr, sizeof (urlstr), &bptr);
1446   if (G_UNLIKELY (*urlstr == '\0'))
1447     goto invalid_url;
1448
1449   parse_string (versionstr, sizeof (versionstr), &bptr);
1450
1451   if (G_UNLIKELY (*bptr != '\0'))
1452     goto invalid_version;
1453
1454   if (strcmp (versionstr, "RTSP/1.0") == 0) {
1455     res = gst_rtsp_message_init_request (msg, method, urlstr);
1456   } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1457     res = gst_rtsp_message_init_request (msg, method, urlstr);
1458     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1459   } else if (strcmp (versionstr, "HTTP/1.0") == 0) {
1460     /* tunnel request, we need a tunnel method */
1461     if (tstate == TUNNEL_STATE_NONE) {
1462       res = GST_RTSP_EPARSE;
1463     } else {
1464       conn->tstate = tstate;
1465     }
1466   } else {
1467     res = GST_RTSP_EPARSE;
1468   }
1469
1470   return res;
1471
1472   /* ERRORS */
1473 invalid_method:
1474   {
1475     GST_ERROR ("invalid method %s", methodstr);
1476     return GST_RTSP_EPARSE;
1477   }
1478 invalid_url:
1479   {
1480     GST_ERROR ("invalid url %s", urlstr);
1481     return GST_RTSP_EPARSE;
1482   }
1483 invalid_version:
1484   {
1485     GST_ERROR ("invalid version");
1486     return GST_RTSP_EPARSE;
1487   }
1488 }
1489
1490 static GstRTSPResult
1491 parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value)
1492 {
1493   gchar *bptr;
1494
1495   bptr = (gchar *) buffer;
1496
1497   /* read key */
1498   parse_key (key, keysize, &bptr);
1499   if (G_UNLIKELY (*bptr != ':'))
1500     goto no_column;
1501
1502   bptr++;
1503   while (g_ascii_isspace (*bptr))
1504     bptr++;
1505
1506   *value = bptr;
1507
1508   return GST_RTSP_OK;
1509
1510   /* ERRORS */
1511 no_column:
1512   {
1513     return GST_RTSP_EPARSE;
1514   }
1515 }
1516
1517 /* parsing lines means reading a Key: Value pair */
1518 static GstRTSPResult
1519 parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
1520 {
1521   GstRTSPResult res;
1522   gchar key[32];
1523   gchar *value;
1524   GstRTSPHeaderField field;
1525
1526   res = parse_key_value (buffer, key, sizeof (key), &value);
1527   if (G_UNLIKELY (res != GST_RTSP_OK))
1528     goto parse_error;
1529
1530   if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
1531     /* save the tunnel session in the connection */
1532     if (!strcmp (key, "x-sessioncookie")) {
1533       strncpy (conn->tunnelid, value, TUNNELID_LEN);
1534       conn->tunnelid[TUNNELID_LEN - 1] = '\0';
1535       conn->tunneled = TRUE;
1536     }
1537   } else {
1538     field = gst_rtsp_find_header_field (key);
1539     if (field != GST_RTSP_HDR_INVALID)
1540       gst_rtsp_message_add_header (msg, field, value);
1541   }
1542
1543   return GST_RTSP_OK;
1544
1545   /* ERRORS */
1546 parse_error:
1547   {
1548     return res;
1549   }
1550 }
1551
1552 /* returns:
1553  *  GST_RTSP_OK when a complete message was read.
1554  *  GST_RTSP_EEOF: when the socket is closed
1555  *  GST_RTSP_EINTR: when more data is needed.
1556  *  GST_RTSP_..: some other error occured.
1557  */
1558 static GstRTSPResult
1559 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
1560     GstRTSPConnection * conn)
1561 {
1562   GstRTSPResult res;
1563
1564   while (TRUE) {
1565     switch (builder->state) {
1566       case STATE_START:
1567         builder->offset = 0;
1568         res =
1569             read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
1570             &builder->offset, 1, conn->ctxp);
1571         if (res != GST_RTSP_OK)
1572           goto done;
1573
1574         /* we have 1 bytes now and we can see if this is a data message or
1575          * not */
1576         if (builder->buffer[0] == '$') {
1577           /* data message, prepare for the header */
1578           builder->state = STATE_DATA_HEADER;
1579         } else {
1580           builder->line = 0;
1581           builder->state = STATE_READ_LINES;
1582         }
1583         break;
1584       case STATE_DATA_HEADER:
1585       {
1586         res =
1587             read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
1588             &builder->offset, 4, conn->ctxp);
1589         if (res != GST_RTSP_OK)
1590           goto done;
1591
1592         gst_rtsp_message_init_data (message, builder->buffer[1]);
1593
1594         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
1595         builder->body_data = g_malloc (builder->body_len + 1);
1596         builder->body_data[builder->body_len] = '\0';
1597         builder->offset = 0;
1598         builder->state = STATE_DATA_BODY;
1599         break;
1600       }
1601       case STATE_DATA_BODY:
1602       {
1603         res =
1604             read_bytes (conn->readfd->fd, builder->body_data, &builder->offset,
1605             builder->body_len, conn->ctxp);
1606         if (res != GST_RTSP_OK)
1607           goto done;
1608
1609         /* we have the complete body now, store in the message adjusting the
1610          * length to include the traling '\0' */
1611         gst_rtsp_message_take_body (message,
1612             (guint8 *) builder->body_data, builder->body_len + 1);
1613         builder->body_data = NULL;
1614         builder->body_len = 0;
1615
1616         builder->state = STATE_END;
1617         break;
1618       }
1619       case STATE_READ_LINES:
1620       {
1621         res = read_line (conn->readfd->fd, builder->buffer, &builder->offset,
1622             sizeof (builder->buffer), conn->ctxp);
1623         if (res != GST_RTSP_OK)
1624           goto done;
1625
1626         /* we have a regular response */
1627         if (builder->buffer[0] == '\r') {
1628           builder->buffer[0] = '\0';
1629         }
1630
1631         if (builder->buffer[0] == '\0') {
1632           gchar *hdrval;
1633
1634           /* empty line, end of message header */
1635           /* see if there is a Content-Length header */
1636           if (gst_rtsp_message_get_header (message,
1637                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
1638             /* there is, prepare to read the body */
1639             builder->body_len = atol (hdrval);
1640             builder->body_data = g_malloc (builder->body_len + 1);
1641             builder->body_data[builder->body_len] = '\0';
1642             builder->offset = 0;
1643             builder->state = STATE_DATA_BODY;
1644           } else {
1645             builder->state = STATE_END;
1646           }
1647           break;
1648         }
1649
1650         /* we have a line */
1651         if (builder->line == 0) {
1652           /* first line, check for response status */
1653           if (memcmp (builder->buffer, "RTSP", 4) == 0) {
1654             res = parse_response_status (builder->buffer, message);
1655           } else {
1656             res = parse_request_line (conn, builder->buffer, message);
1657           }
1658           /* the first line must parse without errors */
1659           if (res != GST_RTSP_OK)
1660             goto done;
1661         } else {
1662           /* else just parse the line, ignore errors */
1663           parse_line (conn, builder->buffer, message);
1664         }
1665         builder->line++;
1666         builder->offset = 0;
1667         break;
1668       }
1669       case STATE_END:
1670       {
1671         gchar *session_id;
1672
1673         if (conn->tstate == TUNNEL_STATE_GET) {
1674           res = GST_RTSP_ETGET;
1675           goto done;
1676         } else if (conn->tstate == TUNNEL_STATE_POST) {
1677           res = GST_RTSP_ETPOST;
1678           goto done;
1679         }
1680
1681         if (message->type == GST_RTSP_MESSAGE_DATA) {
1682           /* data messages don't have headers */
1683           res = GST_RTSP_OK;
1684           goto done;
1685         }
1686
1687         /* save session id in the connection for further use */
1688         if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
1689                 &session_id, 0) == GST_RTSP_OK) {
1690           gint maxlen, i;
1691
1692           maxlen = sizeof (conn->session_id) - 1;
1693           /* the sessionid can have attributes marked with ;
1694            * Make sure we strip them */
1695           for (i = 0; session_id[i] != '\0'; i++) {
1696             if (session_id[i] == ';') {
1697               maxlen = i;
1698               /* parse timeout */
1699               do {
1700                 i++;
1701               } while (g_ascii_isspace (session_id[i]));
1702               if (g_str_has_prefix (&session_id[i], "timeout=")) {
1703                 gint to;
1704
1705                 /* if we parsed something valid, configure */
1706                 if ((to = atoi (&session_id[i + 8])) > 0)
1707                   conn->timeout = to;
1708               }
1709               break;
1710             }
1711           }
1712
1713           /* make sure to not overflow */
1714           strncpy (conn->session_id, session_id, maxlen);
1715           conn->session_id[maxlen] = '\0';
1716         }
1717         res = GST_RTSP_OK;
1718         goto done;
1719       }
1720       default:
1721         res = GST_RTSP_ERROR;
1722         break;
1723     }
1724   }
1725 done:
1726   return res;
1727 }
1728
1729 /**
1730  * gst_rtsp_connection_read:
1731  * @conn: a #GstRTSPConnection
1732  * @data: the data to read
1733  * @size: the size of @data
1734  * @timeout: a timeout value or #NULL
1735  *
1736  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
1737  * the specified @timeout. @timeout can be #NULL, in which case this function
1738  * might block forever.
1739  *
1740  * This function can be cancelled with gst_rtsp_connection_flush().
1741  *
1742  * Returns: #GST_RTSP_OK on success.
1743  */
1744 GstRTSPResult
1745 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
1746     GTimeVal * timeout)
1747 {
1748   guint offset;
1749   gint retval;
1750   GstClockTime to;
1751   GstRTSPResult res;
1752
1753   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1754   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
1755   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1756
1757   if (G_UNLIKELY (size == 0))
1758     return GST_RTSP_OK;
1759
1760   offset = 0;
1761
1762   /* configure timeout if any */
1763   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1764
1765   gst_poll_set_controllable (conn->fdset, TRUE);
1766   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1767   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1768
1769   while (TRUE) {
1770     res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp);
1771     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1772       goto eof;
1773     if (G_LIKELY (res == GST_RTSP_OK))
1774       break;
1775     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1776       goto read_error;
1777
1778     do {
1779       retval = gst_poll_wait (conn->fdset, to);
1780     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1781
1782     /* check for timeout */
1783     if (G_UNLIKELY (retval == 0))
1784       goto select_timeout;
1785
1786     if (G_UNLIKELY (retval == -1)) {
1787       if (errno == EBUSY)
1788         goto stopped;
1789       else
1790         goto select_error;
1791     }
1792     gst_poll_set_controllable (conn->fdset, FALSE);
1793   }
1794   return GST_RTSP_OK;
1795
1796   /* ERRORS */
1797 select_error:
1798   {
1799     return GST_RTSP_ESYS;
1800   }
1801 select_timeout:
1802   {
1803     return GST_RTSP_ETIMEOUT;
1804   }
1805 stopped:
1806   {
1807     return GST_RTSP_EINTR;
1808   }
1809 eof:
1810   {
1811     return GST_RTSP_EEOF;
1812   }
1813 read_error:
1814   {
1815     return res;
1816   }
1817 }
1818
1819 static GString *
1820 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
1821 {
1822   GString *str;
1823   gchar date_string[100];
1824   const gchar *status;
1825
1826   gen_date_string (date_string, sizeof (date_string));
1827
1828   status = gst_rtsp_status_as_text (code);
1829   if (status == NULL) {
1830     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
1831     status = "Internal Server Error";
1832   }
1833
1834   str = g_string_new ("");
1835
1836   /* */
1837   g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
1838   g_string_append_printf (str,
1839       "Server: GStreamer RTSP Server\r\n"
1840       "Date: %s\r\n"
1841       "Connection: close\r\n"
1842       "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
1843   if (code == GST_RTSP_STS_OK) {
1844     if (conn->ip)
1845       g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
1846     g_string_append_printf (str,
1847         "Content-Type: application/x-rtsp-tunnelled\r\n");
1848   }
1849   g_string_append_printf (str, "\r\n");
1850   return str;
1851 }
1852
1853 /**
1854  * gst_rtsp_connection_receive:
1855  * @conn: a #GstRTSPConnection
1856  * @message: the message to read
1857  * @timeout: a timeout value or #NULL
1858  *
1859  * Attempt to read into @message from the connected @conn, blocking up to
1860  * the specified @timeout. @timeout can be #NULL, in which case this function
1861  * might block forever.
1862  * 
1863  * This function can be cancelled with gst_rtsp_connection_flush().
1864  *
1865  * Returns: #GST_RTSP_OK on success.
1866  */
1867 GstRTSPResult
1868 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
1869     GTimeVal * timeout)
1870 {
1871   GstRTSPResult res;
1872   GstRTSPBuilder builder;
1873   gint retval;
1874   GstClockTime to;
1875
1876   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1877   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1878   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1879
1880   /* configure timeout if any */
1881   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1882
1883   gst_poll_set_controllable (conn->fdset, TRUE);
1884   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1885   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1886
1887   memset (&builder, 0, sizeof (GstRTSPBuilder));
1888   while (TRUE) {
1889     res = build_next (&builder, message, conn);
1890     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1891       goto eof;
1892     if (G_LIKELY (res == GST_RTSP_OK))
1893       break;
1894     if (res == GST_RTSP_ETGET) {
1895       GString *str;
1896
1897       /* tunnel GET request, we can reply now */
1898       str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
1899       res =
1900           gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
1901           timeout);
1902       g_string_free (str, TRUE);
1903     } else if (res == GST_RTSP_ETPOST) {
1904       /* tunnel POST request, return the value, the caller now has to link the
1905        * two connections. */
1906       break;
1907     } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
1908       goto read_error;
1909
1910     do {
1911       retval = gst_poll_wait (conn->fdset, to);
1912     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1913
1914     /* check for timeout */
1915     if (G_UNLIKELY (retval == 0))
1916       goto select_timeout;
1917
1918     if (G_UNLIKELY (retval == -1)) {
1919       if (errno == EBUSY)
1920         goto stopped;
1921       else
1922         goto select_error;
1923     }
1924     gst_poll_set_controllable (conn->fdset, FALSE);
1925   }
1926
1927   /* we have a message here */
1928   build_reset (&builder);
1929
1930   return GST_RTSP_OK;
1931
1932   /* ERRORS */
1933 select_error:
1934   {
1935     res = GST_RTSP_ESYS;
1936     goto cleanup;
1937   }
1938 select_timeout:
1939   {
1940     res = GST_RTSP_ETIMEOUT;
1941     goto cleanup;
1942   }
1943 stopped:
1944   {
1945     res = GST_RTSP_EINTR;
1946     goto cleanup;
1947   }
1948 eof:
1949   {
1950     res = GST_RTSP_EEOF;
1951     goto cleanup;
1952   }
1953 read_error:
1954 cleanup:
1955   {
1956     build_reset (&builder);
1957     gst_rtsp_message_unset (message);
1958     return res;
1959   }
1960 }
1961
1962 /**
1963  * gst_rtsp_connection_close:
1964  * @conn: a #GstRTSPConnection
1965  *
1966  * Close the connected @conn. After this call, the connection is in the same
1967  * state as when it was first created.
1968  * 
1969  * Returns: #GST_RTSP_OK on success.
1970  */
1971 GstRTSPResult
1972 gst_rtsp_connection_close (GstRTSPConnection * conn)
1973 {
1974   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1975
1976   g_free (conn->ip);
1977   conn->ip = NULL;
1978
1979   REMOVE_POLLFD (conn->fdset, &conn->fd0);
1980   REMOVE_POLLFD (conn->fdset, &conn->fd1);
1981   conn->writefd = NULL;
1982   conn->readfd = NULL;
1983   conn->tunneled = FALSE;
1984   conn->tstate = TUNNEL_STATE_NONE;
1985   conn->ctxp = NULL;
1986   g_free (conn->username);
1987   conn->username = NULL;
1988   g_free (conn->passwd);
1989   conn->passwd = NULL;
1990   gst_rtsp_connection_clear_auth_params (conn);
1991   conn->timeout = 60;
1992   conn->cseq = 0;
1993   conn->session_id[0] = '\0';
1994
1995   return GST_RTSP_OK;
1996 }
1997
1998 /**
1999  * gst_rtsp_connection_free:
2000  * @conn: a #GstRTSPConnection
2001  *
2002  * Close and free @conn.
2003  * 
2004  * Returns: #GST_RTSP_OK on success.
2005  */
2006 GstRTSPResult
2007 gst_rtsp_connection_free (GstRTSPConnection * conn)
2008 {
2009   GstRTSPResult res;
2010
2011   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2012
2013   res = gst_rtsp_connection_close (conn);
2014   gst_poll_free (conn->fdset);
2015   g_timer_destroy (conn->timer);
2016   gst_rtsp_url_free (conn->url);
2017   g_free (conn->proxy_host);
2018   g_free (conn);
2019 #ifdef G_OS_WIN32
2020   WSACleanup ();
2021 #endif
2022
2023   return res;
2024 }
2025
2026 /**
2027  * gst_rtsp_connection_poll:
2028  * @conn: a #GstRTSPConnection
2029  * @events: a bitmask of #GstRTSPEvent flags to check
2030  * @revents: location for result flags 
2031  * @timeout: a timeout
2032  *
2033  * Wait up to the specified @timeout for the connection to become available for
2034  * at least one of the operations specified in @events. When the function returns
2035  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2036  * @conn.
2037  *
2038  * @timeout can be #NULL, in which case this function might block forever.
2039  *
2040  * This function can be cancelled with gst_rtsp_connection_flush().
2041  * 
2042  * Returns: #GST_RTSP_OK on success.
2043  *
2044  * Since: 0.10.15
2045  */
2046 GstRTSPResult
2047 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2048     GstRTSPEvent * revents, GTimeVal * timeout)
2049 {
2050   GstClockTime to;
2051   gint retval;
2052
2053   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2054   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2055   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2056   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2057   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2058
2059   gst_poll_set_controllable (conn->fdset, TRUE);
2060
2061   /* add fd to writer set when asked to */
2062   gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
2063       events & GST_RTSP_EV_WRITE);
2064
2065   /* add fd to reader set when asked to */
2066   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);
2067
2068   /* configure timeout if any */
2069   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
2070
2071   do {
2072     retval = gst_poll_wait (conn->fdset, to);
2073   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2074
2075   if (G_UNLIKELY (retval == 0))
2076     goto select_timeout;
2077
2078   if (G_UNLIKELY (retval == -1)) {
2079     if (errno == EBUSY)
2080       goto stopped;
2081     else
2082       goto select_error;
2083   }
2084
2085   *revents = 0;
2086   if (events & GST_RTSP_EV_READ) {
2087     if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
2088       *revents |= GST_RTSP_EV_READ;
2089   }
2090   if (events & GST_RTSP_EV_WRITE) {
2091     if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
2092       *revents |= GST_RTSP_EV_WRITE;
2093   }
2094   return GST_RTSP_OK;
2095
2096   /* ERRORS */
2097 select_timeout:
2098   {
2099     return GST_RTSP_ETIMEOUT;
2100   }
2101 select_error:
2102   {
2103     return GST_RTSP_ESYS;
2104   }
2105 stopped:
2106   {
2107     return GST_RTSP_EINTR;
2108   }
2109 }
2110
2111 /**
2112  * gst_rtsp_connection_next_timeout:
2113  * @conn: a #GstRTSPConnection
2114  * @timeout: a timeout
2115  *
2116  * Calculate the next timeout for @conn, storing the result in @timeout.
2117  * 
2118  * Returns: #GST_RTSP_OK.
2119  */
2120 GstRTSPResult
2121 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2122 {
2123   gdouble elapsed;
2124   glong sec;
2125   gulong usec;
2126
2127   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2128   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2129
2130   elapsed = g_timer_elapsed (conn->timer, &usec);
2131   if (elapsed >= conn->timeout) {
2132     sec = 0;
2133     usec = 0;
2134   } else {
2135     sec = conn->timeout - elapsed;
2136   }
2137
2138   timeout->tv_sec = sec;
2139   timeout->tv_usec = usec;
2140
2141   return GST_RTSP_OK;
2142 }
2143
2144 /**
2145  * gst_rtsp_connection_reset_timeout:
2146  * @conn: a #GstRTSPConnection
2147  *
2148  * Reset the timeout of @conn.
2149  * 
2150  * Returns: #GST_RTSP_OK.
2151  */
2152 GstRTSPResult
2153 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2154 {
2155   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2156
2157   g_timer_start (conn->timer);
2158
2159   return GST_RTSP_OK;
2160 }
2161
2162 /**
2163  * gst_rtsp_connection_flush:
2164  * @conn: a #GstRTSPConnection
2165  * @flush: start or stop the flush
2166  *
2167  * Start or stop the flushing action on @conn. When flushing, all current
2168  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2169  * is set to non-flushing mode again.
2170  * 
2171  * Returns: #GST_RTSP_OK.
2172  */
2173 GstRTSPResult
2174 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2175 {
2176   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2177
2178   gst_poll_set_flushing (conn->fdset, flush);
2179
2180   return GST_RTSP_OK;
2181 }
2182
2183 /**
2184  * gst_rtsp_connection_set_proxy:
2185  * @conn: a #GstRTSPConnection
2186  * @host: the proxy host
2187  * @port: the proxy port
2188  *
2189  * Set the proxy host and port.
2190  * 
2191  * Returns: #GST_RTSP_OK.
2192  *
2193  * Since: 0.10.23
2194  */
2195 GstRTSPResult
2196 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
2197     const gchar * host, guint port)
2198 {
2199   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2200
2201   g_free (conn->proxy_host);
2202   conn->proxy_host = g_strdup (host);
2203   conn->proxy_port = port;
2204
2205   return GST_RTSP_OK;
2206 }
2207
2208 /**
2209  * gst_rtsp_connection_set_auth:
2210  * @conn: a #GstRTSPConnection
2211  * @method: authentication method
2212  * @user: the user
2213  * @pass: the password
2214  *
2215  * Configure @conn for authentication mode @method with @user and @pass as the
2216  * user and password respectively.
2217  * 
2218  * Returns: #GST_RTSP_OK.
2219  */
2220 GstRTSPResult
2221 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
2222     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
2223 {
2224   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2225
2226   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
2227           || g_strrstr (user, ":") != NULL))
2228     return GST_RTSP_EINVAL;
2229
2230   /* Make sure the username and passwd are being set for authentication */
2231   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
2232     return GST_RTSP_EINVAL;
2233
2234   /* ":" chars are not allowed in usernames for basic auth */
2235   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
2236     return GST_RTSP_EINVAL;
2237
2238   g_free (conn->username);
2239   g_free (conn->passwd);
2240
2241   conn->auth_method = method;
2242   conn->username = g_strdup (user);
2243   conn->passwd = g_strdup (pass);
2244
2245   return GST_RTSP_OK;
2246 }
2247
2248 /**
2249  * str_case_hash:
2250  * @key: ASCII string to hash
2251  *
2252  * Hashes @key in a case-insensitive manner.
2253  *
2254  * Returns: the hash code.
2255  **/
2256 static guint
2257 str_case_hash (gconstpointer key)
2258 {
2259   const char *p = key;
2260   guint h = g_ascii_toupper (*p);
2261
2262   if (h)
2263     for (p += 1; *p != '\0'; p++)
2264       h = (h << 5) - h + g_ascii_toupper (*p);
2265
2266   return h;
2267 }
2268
2269 /**
2270  * str_case_equal:
2271  * @v1: an ASCII string
2272  * @v2: another ASCII string
2273  *
2274  * Compares @v1 and @v2 in a case-insensitive manner
2275  *
2276  * Returns: %TRUE if they are equal (modulo case)
2277  **/
2278 static gboolean
2279 str_case_equal (gconstpointer v1, gconstpointer v2)
2280 {
2281   const char *string1 = v1;
2282   const char *string2 = v2;
2283
2284   return g_ascii_strcasecmp (string1, string2) == 0;
2285 }
2286
2287 /**
2288  * gst_rtsp_connection_set_auth_param:
2289  * @conn: a #GstRTSPConnection
2290  * @param: authentication directive
2291  * @value: value
2292  *
2293  * Setup @conn with authentication directives. This is not necesary for
2294  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
2295  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
2296  * in the WWW-Authenticate response header and can include realm, domain,
2297  * nonce, opaque, stale, algorithm, qop as per RFC2617.
2298  * 
2299  * Since: 0.10.20
2300  */
2301 void
2302 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
2303     const gchar * param, const gchar * value)
2304 {
2305   g_return_if_fail (conn != NULL);
2306   g_return_if_fail (param != NULL);
2307
2308   if (conn->auth_params == NULL) {
2309     conn->auth_params =
2310         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
2311   }
2312   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
2313 }
2314
2315 /**
2316  * gst_rtsp_connection_clear_auth_params:
2317  * @conn: a #GstRTSPConnection
2318  *
2319  * Clear the list of authentication directives stored in @conn.
2320  *
2321  * Since: 0.10.20
2322  */
2323 void
2324 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
2325 {
2326   g_return_if_fail (conn != NULL);
2327
2328   if (conn->auth_params != NULL) {
2329     g_hash_table_destroy (conn->auth_params);
2330     conn->auth_params = NULL;
2331   }
2332 }
2333
2334 static GstRTSPResult
2335 set_qos_dscp (gint fd, guint qos_dscp)
2336 {
2337   union gst_sockaddr
2338   {
2339     struct sockaddr sa;
2340     struct sockaddr_in6 sa_in6;
2341     struct sockaddr_storage sa_stor;
2342   } sa;
2343   socklen_t slen = sizeof (sa);
2344   gint af;
2345   gint tos;
2346
2347   if (fd == -1)
2348     return GST_RTSP_OK;
2349
2350   if (getsockname (fd, &sa.sa, &slen) < 0)
2351     goto no_getsockname;
2352
2353   af = sa.sa.sa_family;
2354
2355   /* if this is an IPv4-mapped address then do IPv4 QoS */
2356   if (af == AF_INET6) {
2357     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
2358       af = AF_INET;
2359   }
2360
2361   /* extract and shift 6 bits of the DSCP */
2362   tos = (qos_dscp & 0x3f) << 2;
2363
2364   switch (af) {
2365     case AF_INET:
2366       if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
2367         goto no_setsockopt;
2368       break;
2369     case AF_INET6:
2370 #ifdef IPV6_TCLASS
2371       if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
2372         goto no_setsockopt;
2373       break;
2374 #endif
2375     default:
2376       goto wrong_family;
2377   }
2378
2379   return GST_RTSP_OK;
2380
2381   /* ERRORS */
2382 no_getsockname:
2383 no_setsockopt:
2384   {
2385     return GST_RTSP_ESYS;
2386   }
2387
2388 wrong_family:
2389   {
2390     return GST_RTSP_ERROR;
2391   }
2392 }
2393
2394 /**
2395  * gst_rtsp_connection_set_qos_dscp:
2396  * @conn: a #GstRTSPConnection
2397  * @qos_dscp: DSCP value
2398  *
2399  * Configure @conn to use the specified DSCP value.
2400  *
2401  * Returns: #GST_RTSP_OK on success.
2402  *
2403  * Since: 0.10.20
2404  */
2405 GstRTSPResult
2406 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
2407 {
2408   GstRTSPResult res;
2409
2410   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2411   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2412   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2413
2414   res = set_qos_dscp (conn->fd0.fd, qos_dscp);
2415   if (res == GST_RTSP_OK)
2416     res = set_qos_dscp (conn->fd1.fd, qos_dscp);
2417
2418   return res;
2419 }
2420
2421
2422 /**
2423  * gst_rtsp_connection_get_url:
2424  * @conn: a #GstRTSPConnection
2425  *
2426  * Retrieve the URL of the other end of @conn.
2427  *
2428  * Returns: The URL. This value remains valid until the
2429  * connection is freed.
2430  *
2431  * Since: 0.10.23
2432  */
2433 GstRTSPUrl *
2434 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
2435 {
2436   g_return_val_if_fail (conn != NULL, NULL);
2437
2438   return conn->url;
2439 }
2440
2441 /**
2442  * gst_rtsp_connection_get_ip:
2443  * @conn: a #GstRTSPConnection
2444  *
2445  * Retrieve the IP address of the other end of @conn.
2446  *
2447  * Returns: The IP address as a string. this value remains valid until the
2448  * connection is closed.
2449  *
2450  * Since: 0.10.20
2451  */
2452 const gchar *
2453 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
2454 {
2455   g_return_val_if_fail (conn != NULL, NULL);
2456
2457   return conn->ip;
2458 }
2459
2460 /**
2461  * gst_rtsp_connection_set_ip:
2462  * @conn: a #GstRTSPConnection
2463  * @ip: an ip address
2464  *
2465  * Set the IP address of the server.
2466  *
2467  * Since: 0.10.23
2468  */
2469 void
2470 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
2471 {
2472   g_return_if_fail (conn != NULL);
2473
2474   g_free (conn->ip);
2475   conn->ip = g_strdup (ip);
2476 }
2477
2478 /**
2479  * gst_rtsp_connection_get_readfd:
2480  * @conn: a #GstRTSPConnection
2481  *
2482  * Get the file descriptor for reading.
2483  *
2484  * Returns: the file descriptor used for reading or -1 on error. The file
2485  * descriptor remains valid until the connection is closed.
2486  *
2487  * Since: 0.10.23
2488  */
2489 gint
2490 gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn)
2491 {
2492   g_return_val_if_fail (conn != NULL, -1);
2493   g_return_val_if_fail (conn->readfd != NULL, -1);
2494
2495   return conn->readfd->fd;
2496 }
2497
2498 /**
2499  * gst_rtsp_connection_get_writefd:
2500  * @conn: a #GstRTSPConnection
2501  *
2502  * Get the file descriptor for writing.
2503  *
2504  * Returns: the file descriptor used for writing or -1 on error. The file
2505  * descriptor remains valid until the connection is closed.
2506  *
2507  * Since: 0.10.23
2508  */
2509 gint
2510 gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn)
2511 {
2512   g_return_val_if_fail (conn != NULL, -1);
2513   g_return_val_if_fail (conn->writefd != NULL, -1);
2514
2515   return conn->writefd->fd;
2516 }
2517
2518
2519 /**
2520  * gst_rtsp_connection_set_tunneled:
2521  * @conn: a #GstRTSPConnection
2522  * @tunneled: the new state
2523  *
2524  * Set the HTTP tunneling state of the connection. This must be configured before
2525  * the @conn is connected.
2526  *
2527  * Since: 0.10.23
2528  */
2529 void
2530 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
2531 {
2532   g_return_if_fail (conn != NULL);
2533   g_return_if_fail (conn->readfd == NULL);
2534   g_return_if_fail (conn->writefd == NULL);
2535
2536   conn->tunneled = tunneled;
2537 }
2538
2539 /**
2540  * gst_rtsp_connection_is_tunneled:
2541  * @conn: a #GstRTSPConnection
2542  *
2543  * Get the tunneling state of the connection. 
2544  *
2545  * Returns: if @conn is using HTTP tunneling.
2546  *
2547  * Since: 0.10.23
2548  */
2549 gboolean
2550 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
2551 {
2552   g_return_val_if_fail (conn != NULL, FALSE);
2553
2554   return conn->tunneled;
2555 }
2556
2557 /**
2558  * gst_rtsp_connection_get_tunnelid:
2559  * @conn: a #GstRTSPConnection
2560  *
2561  * Get the tunnel session id the connection. 
2562  *
2563  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
2564  *
2565  * Since: 0.10.23
2566  */
2567 const gchar *
2568 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
2569 {
2570   g_return_val_if_fail (conn != NULL, NULL);
2571
2572   if (!conn->tunneled)
2573     return NULL;
2574
2575   return conn->tunnelid;
2576 }
2577
2578 /**
2579  * gst_rtsp_connection_do_tunnel:
2580  * @conn: a #GstRTSPConnection
2581  * @conn2: a #GstRTSPConnection
2582  *
2583  * If @conn received the first tunnel connection and @conn2 received
2584  * the second tunnel connection, link the two connections together so that
2585  * @conn manages the tunneled connection.
2586  *
2587  * After this call, @conn2 cannot be used anymore and must be freed with
2588  * gst_rtsp_connection_free().
2589  *
2590  * Returns: return GST_RTSP_OK on success.
2591  *
2592  * Since: 0.10.23
2593  */
2594 GstRTSPResult
2595 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
2596     GstRTSPConnection * conn2)
2597 {
2598   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2599   g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
2600   g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
2601   g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
2602   g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
2603       GST_RTSP_EINVAL);
2604
2605   /* both connections have fd0 as the read/write socket. start by taking the
2606    * socket from conn2 and set it as the socket in conn */
2607   conn->fd1 = conn2->fd0;
2608
2609   /* clean up some of the state of conn2 */
2610   gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
2611   conn2->fd0.fd = -1;
2612   conn2->readfd = conn2->writefd = NULL;
2613
2614   /* We make fd0 the write socket and fd1 the read socket. */
2615   conn->writefd = &conn->fd0;
2616   conn->readfd = &conn->fd1;
2617
2618   conn->tstate = TUNNEL_STATE_COMPLETE;
2619
2620   /* we need base64 decoding for the readfd */
2621   conn->ctx.state = 0;
2622   conn->ctx.cin = 0;
2623   conn->ctx.cout = 3;
2624   conn->ctx.save = 0;
2625   conn->ctxp = &conn->ctx;
2626
2627   return GST_RTSP_OK;
2628 }
2629
2630 #define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
2631 #define WRITE_COND  (G_IO_OUT | G_IO_ERR)
2632
2633 typedef struct
2634 {
2635   GString *str;
2636   guint id;
2637 } GstRTSPRec;
2638
2639 static guint queue_response (GstRTSPWatch * watch, GString * str);
2640
2641 /* async functions */
2642 struct _GstRTSPWatch
2643 {
2644   GSource source;
2645
2646   GstRTSPConnection *conn;
2647
2648   GstRTSPBuilder builder;
2649   GstRTSPMessage message;
2650
2651   GPollFD readfd;
2652   GPollFD writefd;
2653   gboolean write_added;
2654
2655   /* queued message for transmission */
2656   guint id;
2657   GAsyncQueue *messages;
2658   guint8 *write_data;
2659   guint write_off;
2660   guint write_len;
2661   guint write_id;
2662
2663   GstRTSPWatchFuncs funcs;
2664
2665   gpointer user_data;
2666   GDestroyNotify notify;
2667 };
2668
2669 static gboolean
2670 gst_rtsp_source_prepare (GSource * source, gint * timeout)
2671 {
2672   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2673
2674   *timeout = (watch->conn->timeout * 1000);
2675
2676   return FALSE;
2677 }
2678
2679 static gboolean
2680 gst_rtsp_source_check (GSource * source)
2681 {
2682   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2683
2684   if (watch->readfd.revents & READ_COND)
2685     return TRUE;
2686
2687   if (watch->writefd.revents & WRITE_COND)
2688     return TRUE;
2689
2690   return FALSE;
2691 }
2692
2693 static gboolean
2694 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
2695     gpointer user_data G_GNUC_UNUSED)
2696 {
2697   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2698   GstRTSPResult res;
2699
2700   /* first read as much as we can */
2701   if (watch->readfd.revents & READ_COND) {
2702     do {
2703       res = build_next (&watch->builder, &watch->message, watch->conn);
2704       if (res == GST_RTSP_EINTR)
2705         break;
2706       if (G_UNLIKELY (res == GST_RTSP_EEOF))
2707         goto eof;
2708       if (res == GST_RTSP_ETGET) {
2709         GString *str;
2710         GstRTSPStatusCode code;
2711
2712         if (watch->funcs.tunnel_start)
2713           code = watch->funcs.tunnel_start (watch, watch->user_data);
2714         else
2715           code = GST_RTSP_STS_OK;
2716
2717         /* queue the response string */
2718         str = gen_tunnel_reply (watch->conn, code);
2719         queue_response (watch, str);
2720       } else if (res == GST_RTSP_ETPOST) {
2721         /* in the callback the connection should be tunneled with the
2722          * GET connection */
2723         if (watch->funcs.tunnel_complete)
2724           watch->funcs.tunnel_complete (watch, watch->user_data);
2725       } else if (G_UNLIKELY (res != GST_RTSP_OK))
2726         goto error;
2727
2728       if (G_LIKELY (res == GST_RTSP_OK)) {
2729         if (watch->funcs.message_received)
2730           watch->funcs.message_received (watch, &watch->message,
2731               watch->user_data);
2732
2733         gst_rtsp_message_unset (&watch->message);
2734       }
2735       build_reset (&watch->builder);
2736     } while (FALSE);
2737   }
2738
2739   if (watch->writefd.revents & WRITE_COND) {
2740     do {
2741       if (watch->write_data == NULL) {
2742         GstRTSPRec *data;
2743
2744         /* get a new message from the queue */
2745         data = g_async_queue_try_pop (watch->messages);
2746         if (data == NULL)
2747           goto done;
2748
2749         watch->write_off = 0;
2750         watch->write_len = data->str->len;
2751         watch->write_data = (guint8 *) g_string_free (data->str, FALSE);
2752         watch->write_id = data->id;
2753
2754         data->str = NULL;
2755         g_slice_free (GstRTSPRec, data);
2756       }
2757
2758       res = write_bytes (watch->writefd.fd, watch->write_data,
2759           &watch->write_off, watch->write_len);
2760       if (res == GST_RTSP_EINTR)
2761         break;
2762       if (G_UNLIKELY (res != GST_RTSP_OK))
2763         goto error;
2764
2765       if (watch->funcs.message_sent)
2766         watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
2767
2768     done:
2769       if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
2770         g_source_remove_poll ((GSource *) watch, &watch->writefd);
2771         watch->write_added = FALSE;
2772         watch->writefd.revents = 0;
2773       }
2774       g_free (watch->write_data);
2775       watch->write_data = NULL;
2776     } while (FALSE);
2777   }
2778
2779   return TRUE;
2780
2781   /* ERRORS */
2782 eof:
2783   {
2784     if (watch->funcs.closed)
2785       watch->funcs.closed (watch, watch->user_data);
2786     return FALSE;
2787   }
2788 error:
2789   {
2790     if (watch->funcs.error)
2791       watch->funcs.error (watch, res, watch->user_data);
2792     return FALSE;
2793   }
2794 }
2795
2796 static void
2797 gst_rtsp_rec_free (gpointer data)
2798 {
2799   GstRTSPRec *rec = data;
2800
2801   g_string_free (rec->str, TRUE);
2802   rec->str = NULL;
2803   g_slice_free (GstRTSPRec, rec);
2804 }
2805
2806 static void
2807 gst_rtsp_source_finalize (GSource * source)
2808 {
2809   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2810
2811   build_reset (&watch->builder);
2812   gst_rtsp_message_unset (&watch->message);
2813
2814   g_async_queue_unref (watch->messages);
2815   watch->messages = NULL;
2816
2817   g_free (watch->write_data);
2818
2819   if (watch->notify)
2820     watch->notify (watch->user_data);
2821 }
2822
2823 static GSourceFuncs gst_rtsp_source_funcs = {
2824   gst_rtsp_source_prepare,
2825   gst_rtsp_source_check,
2826   gst_rtsp_source_dispatch,
2827   gst_rtsp_source_finalize,
2828   NULL,
2829   NULL
2830 };
2831
2832 /**
2833  * gst_rtsp_watch_new:
2834  * @conn: a #GstRTSPConnection
2835  * @funcs: watch functions
2836  * @user_data: user data to pass to @funcs
2837  * @notify: notify when @user_data is not referenced anymore
2838  *
2839  * Create a watch object for @conn. The functions provided in @funcs will be
2840  * called with @user_data when activity happened on the watch.
2841  *
2842  * The new watch is usually created so that it can be attached to a
2843  * maincontext with gst_rtsp_watch_attach(). 
2844  *
2845  * @conn must exist for the entire lifetime of the watch.
2846  *
2847  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
2848  * communication. Free with gst_rtsp_watch_unref () after usage.
2849  *
2850  * Since: 0.10.23
2851  */
2852 GstRTSPWatch *
2853 gst_rtsp_watch_new (GstRTSPConnection * conn,
2854     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
2855 {
2856   GstRTSPWatch *result;
2857
2858   g_return_val_if_fail (conn != NULL, NULL);
2859   g_return_val_if_fail (funcs != NULL, NULL);
2860   g_return_val_if_fail (conn->readfd != NULL, NULL);
2861   g_return_val_if_fail (conn->writefd != NULL, NULL);
2862
2863   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
2864       sizeof (GstRTSPWatch));
2865
2866   result->conn = conn;
2867   result->builder.state = STATE_START;
2868
2869   result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
2870
2871   result->readfd.fd = -1;
2872   result->writefd.fd = -1;
2873
2874   gst_rtsp_watch_reset (result);
2875
2876   result->funcs = *funcs;
2877   result->user_data = user_data;
2878   result->notify = notify;
2879
2880   /* only add the read fd, the write fd is only added when we have data
2881    * to send. */
2882   g_source_add_poll ((GSource *) result, &result->readfd);
2883
2884   return result;
2885 }
2886
2887 /**
2888  * gst_rtsp_watch_reset:
2889  * @watch: a #GstRTSPWatch
2890  *
2891  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
2892  * when the file descriptors of the connection might have changed.
2893  *
2894  * Since: 0.10.23
2895  */
2896 void
2897 gst_rtsp_watch_reset (GstRTSPWatch * watch)
2898 {
2899   if (watch->readfd.fd != -1)
2900     g_source_remove_poll ((GSource *) watch, &watch->readfd);
2901   if (watch->writefd.fd != -1)
2902     g_source_remove_poll ((GSource *) watch, &watch->writefd);
2903
2904   watch->readfd.fd = watch->conn->readfd->fd;
2905   watch->readfd.events = READ_COND;
2906   watch->readfd.revents = 0;
2907
2908   watch->writefd.fd = watch->conn->writefd->fd;
2909   watch->writefd.events = WRITE_COND;
2910   watch->writefd.revents = 0;
2911   watch->write_added = FALSE;
2912
2913   g_source_add_poll ((GSource *) watch, &watch->readfd);
2914 }
2915
2916 /**
2917  * gst_rtsp_watch_attach:
2918  * @watch: a #GstRTSPWatch
2919  * @context: a GMainContext (if NULL, the default context will be used)
2920  *
2921  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
2922  *
2923  * Returns: the ID (greater than 0) for the watch within the GMainContext. 
2924  *
2925  * Since: 0.10.23
2926  */
2927 guint
2928 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
2929 {
2930   g_return_val_if_fail (watch != NULL, 0);
2931
2932   return g_source_attach ((GSource *) watch, context);
2933 }
2934
2935 /**
2936  * gst_rtsp_watch_unref:
2937  * @watch: a #GstRTSPWatch
2938  *
2939  * Decreases the reference count of @watch by one. If the resulting reference
2940  * count is zero the watch and associated memory will be destroyed.
2941  *
2942  * Since: 0.10.23
2943  */
2944 void
2945 gst_rtsp_watch_unref (GstRTSPWatch * watch)
2946 {
2947   g_return_if_fail (watch != NULL);
2948
2949   g_source_unref ((GSource *) watch);
2950 }
2951
2952 static guint
2953 queue_response (GstRTSPWatch * watch, GString * str)
2954 {
2955   GstRTSPRec *data;
2956
2957   /* make a record with the message as a string and id */
2958   data = g_slice_new (GstRTSPRec);
2959   data->str = str;
2960   data->id = ++watch->id;
2961
2962   /* add the record to a queue. FIXME we would like to have an upper limit here */
2963   g_async_queue_push (watch->messages, data);
2964
2965   /* FIXME: does the following need to be made thread-safe? (this might be
2966    * called from a streaming thread, like appsink's render function) */
2967   /* make sure the main context will now also check for writability on the
2968    * socket */
2969   if (!watch->write_added) {
2970     g_source_add_poll ((GSource *) watch, &watch->writefd);
2971     watch->write_added = TRUE;
2972   }
2973
2974   return data->id;
2975 }
2976
2977 /**
2978  * gst_rtsp_watch_queue_message:
2979  * @watch: a #GstRTSPWatch
2980  * @message: a #GstRTSPMessage
2981  *
2982  * Queue a @message for transmission in @watch. The contents of this
2983  * message will be serialized and transmitted when the connection of the
2984  * @watch becomes writable.
2985  *
2986  * The return value of this function will be used as the id argument in the
2987  * message_sent callback.
2988  *
2989  * Returns: an id.
2990  *
2991  * Since: 0.10.23
2992  */
2993 guint
2994 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
2995 {
2996   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
2997   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2998
2999   /* make a record with the message as a string and id */
3000   return queue_response (watch, message_to_string (watch->conn, message));
3001 }