rtsp: Use GLib's GChecksum instead of our own MD5 implementation
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @short_description: manage RTSP connections
46  * @see_also: gstrtspurl
47  *  
48  * <refsect2>
49  * <para>
50  * This object manages the RTSP connection to the server. It provides function
51  * to receive and send bytes and messages.
52  * </para>
53  * </refsect2>
54  *  
55  * Last reviewed on 2007-07-24 (0.10.14)
56  */
57
58 #ifdef HAVE_CONFIG_H
59 #  include <config.h>
60 #endif
61
62 #include <stdio.h>
63 #include <errno.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <time.h>
67
68 #ifdef HAVE_UNISTD_H
69 #include <unistd.h>
70 #endif
71
72 /* we include this here to get the G_OS_* defines */
73 #include <glib.h>
74 #include <gst/gst.h>
75
76 #ifdef G_OS_WIN32
77 /* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
78  * minwg32 headers check WINVER before allowing the use of these */
79 #ifndef WINVER
80 #define WINVER 0x0501
81 #endif
82 #include <winsock2.h>
83 #include <ws2tcpip.h>
84 #define EINPROGRESS WSAEINPROGRESS
85 #else
86 #include <sys/ioctl.h>
87 #include <netdb.h>
88 #include <sys/socket.h>
89 #include <fcntl.h>
90 #include <netinet/in.h>
91 #endif
92
93 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
94 #include <sys/filio.h>
95 #endif
96
97 #include "gstrtspconnection.h"
98 #include "gstrtspbase64.h"
99
100 union gst_sockaddr
101 {
102   struct sockaddr sa;
103   struct sockaddr_in sa_in;
104   struct sockaddr_in6 sa_in6;
105   struct sockaddr_storage sa_stor;
106 };
107
108 typedef struct
109 {
110   gint state;
111   guint save;
112   guchar out[3];                /* the size must be evenly divisible by 3 */
113   guint cout;
114   guint coutl;
115 } DecodeCtx;
116
117 static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx,
118     guint size, DecodeCtx * ctxp);
119 static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
120     guint keysize, gchar ** value);
121 static void parse_string (gchar * dest, gint size, gchar ** src);
122
123 #ifdef G_OS_WIN32
124 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
125 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
126 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
127 #define CLOSE_SOCKET(sock) closesocket (sock)
128 #define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
129 #define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
130 /* According to Microsoft's connect() documentation this one returns
131  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
132 #define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
133 #else
134 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
135 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
136 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
137 #define CLOSE_SOCKET(sock) close (sock)
138 #define ERRNO_IS_EAGAIN (errno == EAGAIN)
139 #define ERRNO_IS_EINTR (errno == EINTR)
140 #define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
141 #endif
142
143 #define ADD_POLLFD(fdset, pfd, fd)        \
144 G_STMT_START {                            \
145   (pfd)->fd = fd;                         \
146   gst_poll_add_fd (fdset, pfd);           \
147 } G_STMT_END
148
149 #define REMOVE_POLLFD(fdset, pfd)          \
150 G_STMT_START {                             \
151   if ((pfd)->fd != -1) {                   \
152     GST_DEBUG ("remove fd %d", (pfd)->fd); \
153     gst_poll_remove_fd (fdset, pfd);       \
154     CLOSE_SOCKET ((pfd)->fd);              \
155     (pfd)->fd = -1;                        \
156   }                                        \
157 } G_STMT_END
158
159 typedef enum
160 {
161   TUNNEL_STATE_NONE,
162   TUNNEL_STATE_GET,
163   TUNNEL_STATE_POST,
164   TUNNEL_STATE_COMPLETE
165 } GstRTSPTunnelState;
166
167 #define TUNNELID_LEN   24
168
169 struct _GstRTSPConnection
170 {
171   /*< private > */
172   /* URL for the connection */
173   GstRTSPUrl *url;
174
175   /* connection state */
176   GstPollFD fd0;
177   GstPollFD fd1;
178
179   GstPollFD *readfd;
180   GstPollFD *writefd;
181
182   gchar tunnelid[TUNNELID_LEN];
183   gboolean tunneled;
184   GstRTSPTunnelState tstate;
185
186   GstPoll *fdset;
187   gchar *ip;
188
189   /* Session state */
190   gint cseq;                    /* sequence number */
191   gchar session_id[512];        /* session id */
192   gint timeout;                 /* session timeout in seconds */
193   GTimer *timer;                /* timeout timer */
194
195   /* Authentication */
196   GstRTSPAuthMethod auth_method;
197   gchar *username;
198   gchar *passwd;
199   GHashTable *auth_params;
200
201   DecodeCtx ctx;
202   DecodeCtx *ctxp;
203
204   gchar *proxy_host;
205   guint proxy_port;
206 };
207
208 enum
209 {
210   STATE_START = 0,
211   STATE_DATA_HEADER,
212   STATE_DATA_BODY,
213   STATE_READ_LINES,
214   STATE_END,
215   STATE_LAST
216 };
217
218 /* a structure for constructing RTSPMessages */
219 typedef struct
220 {
221   gint state;
222   guint8 buffer[4096];
223   guint offset;
224
225   guint line;
226   guint8 *body_data;
227   glong body_len;
228 } GstRTSPBuilder;
229
230 static void
231 build_reset (GstRTSPBuilder * builder)
232 {
233   g_free (builder->body_data);
234   memset (builder, 0, sizeof (GstRTSPBuilder));
235 }
236
237 /**
238  * gst_rtsp_connection_create:
239  * @url: a #GstRTSPUrl 
240  * @conn: storage for a #GstRTSPConnection
241  *
242  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
243  * The connection will not yet attempt to connect to @url, use
244  * gst_rtsp_connection_connect().
245  *
246  * A copy of @url will be made.
247  *
248  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
249  */
250 GstRTSPResult
251 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
252 {
253   GstRTSPConnection *newconn;
254 #ifdef G_OS_WIN32
255   WSADATA w;
256   int error;
257 #endif
258
259   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
260
261 #ifdef G_OS_WIN32
262   error = WSAStartup (0x0202, &w);
263
264   if (error)
265     goto startup_error;
266
267   if (w.wVersion != 0x0202)
268     goto version_error;
269 #endif
270
271   newconn = g_new0 (GstRTSPConnection, 1);
272
273   if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
274     goto no_fdset;
275
276   newconn->url = gst_rtsp_url_copy (url);
277   newconn->fd0.fd = -1;
278   newconn->fd1.fd = -1;
279   newconn->timer = g_timer_new ();
280   newconn->timeout = 60;
281   newconn->cseq = 1;
282
283   newconn->auth_method = GST_RTSP_AUTH_NONE;
284   newconn->username = NULL;
285   newconn->passwd = NULL;
286   newconn->auth_params = NULL;
287
288   *conn = newconn;
289
290   return GST_RTSP_OK;
291
292   /* ERRORS */
293 #ifdef G_OS_WIN32
294 startup_error:
295   {
296     g_warning ("Error %d on WSAStartup", error);
297     return GST_RTSP_EWSASTART;
298   }
299 version_error:
300   {
301     g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
302         w.wVersion);
303     WSACleanup ();
304     return GST_RTSP_EWSAVERSION;
305   }
306 #endif
307 no_fdset:
308   {
309     g_free (newconn);
310 #ifdef G_OS_WIN32
311     WSACleanup ();
312 #endif
313     return GST_RTSP_ESYS;
314   }
315 }
316
317 /**
318  * gst_rtsp_connection_accept:
319  * @sock: a socket
320  * @conn: storage for a #GstRTSPConnection
321  *
322  * Accept a new connection on @sock and create a new #GstRTSPConnection for
323  * handling communication on new socket.
324  *
325  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
326  *
327  * Since: 0.10.23
328  */
329 GstRTSPResult
330 gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
331 {
332   int fd;
333   GstRTSPConnection *newconn = NULL;
334   union gst_sockaddr sa;
335   socklen_t slen = sizeof (sa);
336   gchar ip[INET6_ADDRSTRLEN];
337   GstRTSPUrl *url;
338 #ifdef G_OS_WIN32
339   gulong flags = 1;
340 #endif
341
342   memset (&sa, 0, slen);
343
344 #ifndef G_OS_WIN32
345   fd = accept (sock, &sa.sa, &slen);
346 #else
347   fd = accept (sock, &sa.sa, (gint *) & slen);
348 #endif /* G_OS_WIN32 */
349   if (fd == -1)
350     goto accept_failed;
351
352   if (getnameinfo (&sa.sa, slen, ip, sizeof (ip), NULL, 0, NI_NUMERICHOST) != 0)
353     goto getnameinfo_failed;
354   if (sa.sa.sa_family != AF_INET && sa.sa.sa_family != AF_INET6)
355     goto wrong_family;
356
357   /* set to non-blocking mode so that we can cancel the communication */
358 #ifndef G_OS_WIN32
359   fcntl (fd, F_SETFL, O_NONBLOCK);
360 #else
361   ioctlsocket (fd, FIONBIO, &flags);
362 #endif /* G_OS_WIN32 */
363
364   /* create a url for the client address */
365   url = g_new0 (GstRTSPUrl, 1);
366   url->host = g_strdup (ip);
367   if (sa.sa.sa_family == AF_INET)
368     url->port = sa.sa_in.sin_port;
369   else
370     url->port = sa.sa_in6.sin6_port;
371
372   /* now create the connection object */
373   gst_rtsp_connection_create (url, &newconn);
374   gst_rtsp_url_free (url);
375
376   ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);
377
378   /* both read and write initially */
379   newconn->readfd = &newconn->fd0;
380   newconn->writefd = &newconn->fd0;
381
382   *conn = newconn;
383
384   return GST_RTSP_OK;
385
386   /* ERRORS */
387 accept_failed:
388   {
389     return GST_RTSP_ESYS;
390   }
391 getnameinfo_failed:
392 wrong_family:
393   {
394     close (fd);
395     return GST_RTSP_ERROR;
396   }
397 }
398
399 static gchar *
400 do_resolve (const gchar * host)
401 {
402   static gchar ip[INET6_ADDRSTRLEN];
403   struct addrinfo *aires;
404   struct addrinfo *ai;
405   gint aierr;
406
407   aierr = getaddrinfo (host, NULL, NULL, &aires);
408   if (aierr != 0)
409     goto no_addrinfo;
410
411   for (ai = aires; ai; ai = ai->ai_next) {
412     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
413       break;
414     }
415   }
416   if (ai == NULL)
417     goto no_family;
418
419   aierr = getnameinfo (ai->ai_addr, ai->ai_addrlen, ip, sizeof (ip), NULL, 0,
420       NI_NUMERICHOST | NI_NUMERICSERV);
421   if (aierr != 0)
422     goto no_address;
423
424   freeaddrinfo (aires);
425
426   return g_strdup (ip);
427
428   /* ERRORS */
429 no_addrinfo:
430   {
431     GST_ERROR ("no addrinfo found for %s: %s", host, gai_strerror (aierr));
432     return NULL;
433   }
434 no_family:
435   {
436     GST_ERROR ("no family found for %s", host);
437     freeaddrinfo (aires);
438     return NULL;
439   }
440 no_address:
441   {
442     GST_ERROR ("no address found for %s: %s", host, gai_strerror (aierr));
443     freeaddrinfo (aires);
444     return NULL;
445   }
446 }
447
448 static GstRTSPResult
449 do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
450     GstPoll * fdset, GTimeVal * timeout)
451 {
452   gint fd;
453   struct addrinfo hints;
454   struct addrinfo *aires;
455   struct addrinfo *ai;
456   gint aierr;
457   gchar service[NI_MAXSERV];
458   gint ret;
459 #ifdef G_OS_WIN32
460   unsigned long flags = 1;
461 #endif /* G_OS_WIN32 */
462   GstClockTime to;
463   gint retval;
464
465   memset (&hints, 0, sizeof hints);
466   hints.ai_flags = AI_NUMERICHOST;
467   hints.ai_family = AF_UNSPEC;
468   hints.ai_socktype = SOCK_STREAM;
469   g_snprintf (service, sizeof (service) - 1, "%hu", port);
470   service[sizeof (service) - 1] = '\0';
471
472   aierr = getaddrinfo (ip, service, &hints, &aires);
473   if (aierr != 0)
474     goto no_addrinfo;
475
476   for (ai = aires; ai; ai = ai->ai_next) {
477     if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
478       break;
479     }
480   }
481   if (ai == NULL)
482     goto no_family;
483
484   fd = socket (ai->ai_family, SOCK_STREAM, 0);
485   if (fd == -1)
486     goto no_socket;
487
488   /* set to non-blocking mode so that we can cancel the connect */
489 #ifndef G_OS_WIN32
490   fcntl (fd, F_SETFL, O_NONBLOCK);
491 #else
492   ioctlsocket (fd, FIONBIO, &flags);
493 #endif /* G_OS_WIN32 */
494
495   /* add the socket to our fdset */
496   ADD_POLLFD (fdset, fdout, fd);
497
498   /* we are going to connect ASYNC now */
499   ret = connect (fd, ai->ai_addr, ai->ai_addrlen);
500   if (ret == 0)
501     goto done;
502   if (!ERRNO_IS_EINPROGRESS)
503     goto sys_error;
504
505   /* wait for connect to complete up to the specified timeout or until we got
506    * interrupted. */
507   gst_poll_fd_ctl_write (fdset, fdout, TRUE);
508
509   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
510
511   do {
512     retval = gst_poll_wait (fdset, to);
513   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
514
515   if (retval == 0)
516     goto timeout;
517   else if (retval == -1)
518     goto sys_error;
519
520   /* we can still have an error connecting on windows */
521   if (gst_poll_fd_has_error (fdset, fdout)) {
522     socklen_t len = sizeof (errno);
523 #ifndef G_OS_WIN32
524     getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
525 #else
526     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
527 #endif
528     goto sys_error;
529   }
530
531   gst_poll_fd_ignored (fdset, fdout);
532
533 done:
534   freeaddrinfo (aires);
535
536   return GST_RTSP_OK;
537
538   /* ERRORS */
539 no_addrinfo:
540   {
541     GST_ERROR ("no addrinfo found for %s: %s", ip, gai_strerror (aierr));
542     return GST_RTSP_ERROR;
543   }
544 no_family:
545   {
546     GST_ERROR ("no family found for %s", ip);
547     freeaddrinfo (aires);
548     return GST_RTSP_ERROR;
549   }
550 no_socket:
551   {
552     GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno));
553     freeaddrinfo (aires);
554     return GST_RTSP_ESYS;
555   }
556 sys_error:
557   {
558     GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
559     REMOVE_POLLFD (fdset, fdout);
560     freeaddrinfo (aires);
561     return GST_RTSP_ESYS;
562   }
563 timeout:
564   {
565     GST_ERROR ("timeout");
566     REMOVE_POLLFD (fdset, fdout);
567     freeaddrinfo (aires);
568     return GST_RTSP_ETIMEOUT;
569   }
570 }
571
572 static GstRTSPResult
573 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
574 {
575   gint i;
576   GstRTSPResult res;
577   gchar *str;
578   guint idx, line;
579   gint retval;
580   GstClockTime to;
581   gchar *ip, *url_port_str;
582   guint16 port, url_port;
583   gchar codestr[4], *resultstr;
584   gint code;
585   GstRTSPUrl *url;
586   gchar *hostparam;
587
588   /* create a random sessionid */
589   for (i = 0; i < TUNNELID_LEN; i++)
590     conn->tunnelid[i] = g_random_int_range ('a', 'z');
591   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
592
593   url = conn->url;
594   /* get the port from the url */
595   gst_rtsp_url_get_port (url, &url_port);
596
597   if (conn->proxy_host) {
598     hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
599     url_port_str = g_strdup_printf (":%d", url_port);
600     ip = conn->proxy_host;
601     port = conn->proxy_port;
602   } else {
603     hostparam = NULL;
604     url_port_str = NULL;
605     ip = conn->ip;
606     port = url_port;
607   }
608
609   /* */
610   str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n"
611       "%s"
612       "x-sessioncookie: %s\r\n"
613       "Accept: application/x-rtsp-tunnelled\r\n"
614       "Pragma: no-cache\r\n"
615       "Cache-Control: no-cache\r\n" "\r\n",
616       conn->proxy_host ? "http://" : "",
617       conn->proxy_host ? url->host : "",
618       conn->proxy_host ? url_port_str : "",
619       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
620       hostparam ? hostparam : "", conn->tunnelid);
621
622   /* we start by writing to this fd */
623   conn->writefd = &conn->fd0;
624
625   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
626   g_free (str);
627   if (res != GST_RTSP_OK)
628     goto write_failed;
629
630   gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE);
631   gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE);
632
633   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
634
635   line = 0;
636   while (TRUE) {
637     guint8 buffer[4096];
638
639     idx = 0;
640     while (TRUE) {
641       res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL);
642       if (res == GST_RTSP_EEOF)
643         goto eof;
644       if (res == GST_RTSP_OK)
645         break;
646       if (res != GST_RTSP_EINTR)
647         goto read_error;
648
649       do {
650         retval = gst_poll_wait (conn->fdset, to);
651       } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
652
653       /* check for timeout */
654       if (retval == 0)
655         goto timeout;
656
657       if (retval == -1) {
658         if (errno == EBUSY)
659           goto stopped;
660         else
661           goto select_error;
662       }
663     }
664
665     /* check for last line */
666     if (buffer[0] == '\r')
667       buffer[0] = '\0';
668     if (buffer[0] == '\0')
669       break;
670
671     if (line == 0) {
672       /* first line, parse response */
673       gchar versionstr[20];
674       gchar *bptr;
675
676       bptr = (gchar *) buffer;
677
678       parse_string (versionstr, sizeof (versionstr), &bptr);
679       parse_string (codestr, sizeof (codestr), &bptr);
680       code = atoi (codestr);
681
682       while (g_ascii_isspace (*bptr))
683         bptr++;
684
685       resultstr = bptr;
686
687       if (code != GST_RTSP_STS_OK)
688         goto wrong_result;
689     } else {
690       gchar key[32];
691       gchar *value;
692
693       /* other lines, parse key/value */
694       res = parse_key_value (buffer, key, sizeof (key), &value);
695       if (res == GST_RTSP_OK) {
696         /* we got a new ip address */
697         if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) {
698           if (conn->proxy_host) {
699             /* if we use a proxy we need to change the destination url */
700             g_free (url->host);
701             url->host = g_strdup (value);
702             g_free (hostparam);
703             g_free (url_port_str);
704             hostparam =
705                 g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
706             url_port_str = g_strdup_printf (":%d", url_port);
707           } else {
708             /* and resolve the new ip address */
709             if (!(ip = do_resolve (conn->ip)))
710               goto not_resolved;
711             g_free (conn->ip);
712             conn->ip = ip;
713           }
714         }
715       }
716     }
717     line++;
718   }
719
720   /* connect to the host/port */
721   res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout);
722   if (res != GST_RTSP_OK)
723     goto connect_failed;
724
725   /* this is now our writing socket */
726   conn->writefd = &conn->fd1;
727
728   /* */
729   str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n"
730       "%s"
731       "x-sessioncookie: %s\r\n"
732       "Content-Type: application/x-rtsp-tunnelled\r\n"
733       "Pragma: no-cache\r\n"
734       "Cache-Control: no-cache\r\n"
735       "Content-Length: 32767\r\n"
736       "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
737       "\r\n",
738       conn->proxy_host ? "http://" : "",
739       conn->proxy_host ? url->host : "",
740       conn->proxy_host ? url_port_str : "",
741       url->abspath, url->query ? "?" : "", url->query ? url->query : "",
742       hostparam ? hostparam : "", conn->tunnelid);
743
744   res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
745   g_free (str);
746   if (res != GST_RTSP_OK)
747     goto write_failed;
748
749 exit:
750   g_free (hostparam);
751   g_free (url_port_str);
752
753   return res;
754
755   /* ERRORS */
756 write_failed:
757   {
758     GST_ERROR ("write failed (%d)", res);
759     goto exit;
760   }
761 eof:
762   {
763     res = GST_RTSP_EEOF;
764     goto exit;
765   }
766 read_error:
767   {
768     goto exit;
769   }
770 timeout:
771   {
772     res = GST_RTSP_ETIMEOUT;
773     goto exit;
774   }
775 select_error:
776   {
777     res = GST_RTSP_ESYS;
778     goto exit;
779   }
780 stopped:
781   {
782     res = GST_RTSP_EINTR;
783     goto exit;
784   }
785 wrong_result:
786   {
787     GST_ERROR ("got failure response %d %s", code, resultstr);
788     res = GST_RTSP_ERROR;
789     goto exit;
790   }
791 not_resolved:
792   {
793     GST_ERROR ("could not resolve %s", conn->ip);
794     res = GST_RTSP_ENET;
795     goto exit;
796   }
797 connect_failed:
798   {
799     GST_ERROR ("failed to connect");
800     goto exit;
801   }
802 }
803
804 /**
805  * gst_rtsp_connection_connect:
806  * @conn: a #GstRTSPConnection 
807  * @timeout: a #GTimeVal timeout
808  *
809  * Attempt to connect to the url of @conn made with
810  * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
811  * forever. If @timeout contains a valid timeout, this function will return
812  * #GST_RTSP_ETIMEOUT after the timeout expired.
813  *
814  * This function can be cancelled with gst_rtsp_connection_flush().
815  *
816  * Returns: #GST_RTSP_OK when a connection could be made.
817  */
818 GstRTSPResult
819 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
820 {
821   GstRTSPResult res;
822   gchar *ip;
823   guint16 port;
824   GstRTSPUrl *url;
825
826   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
827   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
828   g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);
829
830   url = conn->url;
831
832   if (conn->proxy_host && conn->tunneled) {
833     if (!(ip = do_resolve (conn->proxy_host))) {
834       GST_ERROR ("could not resolve %s", conn->proxy_host);
835       goto not_resolved;
836     }
837     port = conn->proxy_port;
838     g_free (conn->proxy_host);
839     conn->proxy_host = ip;
840   } else {
841     if (!(ip = do_resolve (url->host))) {
842       GST_ERROR ("could not resolve %s", url->host);
843       goto not_resolved;
844     }
845     /* get the port from the url */
846     gst_rtsp_url_get_port (url, &port);
847
848     g_free (conn->ip);
849     conn->ip = ip;
850   }
851
852   /* connect to the host/port */
853   res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout);
854   if (res != GST_RTSP_OK)
855     goto connect_failed;
856
857   /* this is our read URL */
858   conn->readfd = &conn->fd0;
859
860   if (conn->tunneled) {
861     res = setup_tunneling (conn, timeout);
862     if (res != GST_RTSP_OK)
863       goto tunneling_failed;
864   } else {
865     conn->writefd = &conn->fd0;
866   }
867
868   return GST_RTSP_OK;
869
870 not_resolved:
871   {
872     return GST_RTSP_ENET;
873   }
874 connect_failed:
875   {
876     GST_ERROR ("failed to connect");
877     return res;
878   }
879 tunneling_failed:
880   {
881     GST_ERROR ("failed to setup tunneling");
882     return res;
883   }
884 }
885
886 static void
887 auth_digest_compute_hex_urp (const gchar * username,
888     const gchar * realm, const gchar * password, gchar hex_urp[33])
889 {
890   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
891   const gchar *digest_string;
892
893   g_checksum_update (md5_context, (const guchar *) username, strlen (username));
894   g_checksum_update (md5_context, (const guchar *) ":", 1);
895   g_checksum_update (md5_context, (const guchar *) realm, strlen (realm));
896   g_checksum_update (md5_context, (const guchar *) ":", 1);
897   g_checksum_update (md5_context, (const guchar *) password, strlen (password));
898   digest_string = g_checksum_get_string (md5_context);
899
900   memset (hex_urp, 0, 33);
901   memcpy (hex_urp, digest_string, strlen (digest_string));
902
903   g_checksum_free (md5_context);
904 }
905
906 static void
907 auth_digest_compute_response (const gchar * method,
908     const gchar * uri, const gchar * hex_a1, const gchar * nonce,
909     gchar response[33])
910 {
911   char hex_a2[33] = { 0, };
912   GChecksum *md5_context = g_checksum_new (G_CHECKSUM_MD5);
913   const gchar *digest_string;
914
915   /* compute A2 */
916   g_checksum_update (md5_context, (const guchar *) method, strlen (method));
917   g_checksum_update (md5_context, (const guchar *) ":", 1);
918   g_checksum_update (md5_context, (const guchar *) uri, strlen (uri));
919   digest_string = g_checksum_get_string (md5_context);
920   memcpy (hex_a2, digest_string, strlen (digest_string));
921
922   /* compute KD */
923   g_checksum_reset (md5_context);
924   g_checksum_update (md5_context, (const guchar *) hex_a1, strlen (hex_a1));
925   g_checksum_update (md5_context, (const guchar *) ":", 1);
926   g_checksum_update (md5_context, (const guchar *) nonce, strlen (nonce));
927   g_checksum_update (md5_context, (const guchar *) ":", 1);
928
929   g_checksum_update (md5_context, (const guchar *) hex_a2, 32);
930   digest_string = g_checksum_get_string (md5_context);
931   memset (response, 0, sizeof (response));
932   memcpy (response, digest_string, strlen (digest_string));
933
934   g_checksum_free (md5_context);
935 }
936
937 static void
938 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
939 {
940   switch (conn->auth_method) {
941     case GST_RTSP_AUTH_BASIC:{
942       gchar *user_pass;
943       gchar *user_pass64;
944       gchar *auth_string;
945
946       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
947       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
948       auth_string = g_strdup_printf ("Basic %s", user_pass64);
949
950       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
951           auth_string);
952
953       g_free (user_pass);
954       g_free (user_pass64);
955       break;
956     }
957     case GST_RTSP_AUTH_DIGEST:{
958       gchar response[33], hex_urp[33];
959       gchar *auth_string, *auth_string2;
960       gchar *realm;
961       gchar *nonce;
962       gchar *opaque;
963       const gchar *uri;
964       const gchar *method;
965
966       /* we need to have some params set */
967       if (conn->auth_params == NULL)
968         break;
969
970       /* we need the realm and nonce */
971       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
972       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
973       if (realm == NULL || nonce == NULL)
974         break;
975
976       auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
977           hex_urp);
978
979       method = gst_rtsp_method_as_text (message->type_data.request.method);
980       uri = message->type_data.request.uri;
981
982       /* Assume no qop, algorithm=md5, stale=false */
983       /* For algorithm MD5, a1 = urp. */
984       auth_digest_compute_response (method, uri, hex_urp, nonce, response);
985       auth_string = g_strdup_printf ("Digest username=\"%s\", "
986           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
987           conn->username, realm, nonce, uri, response);
988
989       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
990       if (opaque) {
991         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
992             opaque);
993         g_free (auth_string);
994         auth_string = auth_string2;
995       }
996       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
997           auth_string);
998       break;
999     }
1000     default:
1001       /* Nothing to do */
1002       break;
1003   }
1004 }
1005
1006 static void
1007 gen_date_string (gchar * date_string, guint len)
1008 {
1009   GTimeVal tv;
1010   time_t t;
1011 #ifdef HAVE_GMTIME_R
1012   struct tm tm_;
1013 #endif
1014
1015   g_get_current_time (&tv);
1016   t = (time_t) tv.tv_sec;
1017
1018 #ifdef HAVE_GMTIME_R
1019   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
1020 #else
1021   strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
1022 #endif
1023 }
1024
1025 static GstRTSPResult
1026 write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
1027 {
1028   guint left;
1029
1030   if (G_UNLIKELY (*idx > size))
1031     return GST_RTSP_ERROR;
1032
1033   left = size - *idx;
1034
1035   while (left) {
1036     gint r;
1037
1038     r = WRITE_SOCKET (fd, &buffer[*idx], left);
1039     if (G_UNLIKELY (r == 0)) {
1040       return GST_RTSP_EINTR;
1041     } else if (G_UNLIKELY (r < 0)) {
1042       if (ERRNO_IS_EAGAIN)
1043         return GST_RTSP_EINTR;
1044       if (!ERRNO_IS_EINTR)
1045         return GST_RTSP_ESYS;
1046     } else {
1047       left -= r;
1048       *idx += r;
1049     }
1050   }
1051   return GST_RTSP_OK;
1052 }
1053
1054 static gint
1055 fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx)
1056 {
1057   gint out = 0;
1058
1059   if (ctx) {
1060     while (size > 0) {
1061       guint8 in[sizeof (ctx->out) * 4 / 3];
1062       gint r;
1063
1064       while (size > 0 && ctx->cout < ctx->coutl) {
1065         /* we have some leftover bytes */
1066         *buffer++ = ctx->out[ctx->cout++];
1067         size--;
1068         out++;
1069       }
1070
1071       /* got what we needed? */
1072       if (size == 0)
1073         break;
1074
1075       /* try to read more bytes */
1076       r = READ_SOCKET (fd, in, sizeof (in));
1077       if (r <= 0) {
1078         if (out == 0)
1079           out = r;
1080         break;
1081       }
1082
1083       ctx->cout = 0;
1084       ctx->coutl =
1085           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1086           &ctx->save);
1087     }
1088   } else {
1089     out = READ_SOCKET (fd, buffer, size);
1090   }
1091
1092   return out;
1093 }
1094
1095 static GstRTSPResult
1096 read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
1097 {
1098   guint left;
1099
1100   if (G_UNLIKELY (*idx > size))
1101     return GST_RTSP_ERROR;
1102
1103   left = size - *idx;
1104
1105   while (left) {
1106     gint r;
1107
1108     r = fill_bytes (fd, &buffer[*idx], left, ctx);
1109     if (G_UNLIKELY (r == 0)) {
1110       return GST_RTSP_EEOF;
1111     } else if (G_UNLIKELY (r < 0)) {
1112       if (ERRNO_IS_EAGAIN)
1113         return GST_RTSP_EINTR;
1114       if (!ERRNO_IS_EINTR)
1115         return GST_RTSP_ESYS;
1116     } else {
1117       left -= r;
1118       *idx += r;
1119     }
1120   }
1121   return GST_RTSP_OK;
1122 }
1123
1124 static GstRTSPResult
1125 read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
1126 {
1127   while (TRUE) {
1128     guint8 c;
1129     gint r;
1130
1131     r = fill_bytes (fd, &c, 1, ctx);
1132     if (G_UNLIKELY (r == 0)) {
1133       return GST_RTSP_EEOF;
1134     } else if (G_UNLIKELY (r < 0)) {
1135       if (ERRNO_IS_EAGAIN)
1136         return GST_RTSP_EINTR;
1137       if (!ERRNO_IS_EINTR)
1138         return GST_RTSP_ESYS;
1139     } else {
1140       if (c == '\n')            /* end on \n */
1141         break;
1142       if (c == '\r')            /* ignore \r */
1143         continue;
1144
1145       if (G_LIKELY (*idx < size - 1))
1146         buffer[(*idx)++] = c;
1147     }
1148   }
1149   buffer[*idx] = '\0';
1150
1151   return GST_RTSP_OK;
1152 }
1153
1154 /**
1155  * gst_rtsp_connection_write:
1156  * @conn: a #GstRTSPConnection
1157  * @data: the data to write
1158  * @size: the size of @data
1159  * @timeout: a timeout value or #NULL
1160  *
1161  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1162  * the specified @timeout. @timeout can be #NULL, in which case this function
1163  * might block forever.
1164  * 
1165  * This function can be cancelled with gst_rtsp_connection_flush().
1166  *
1167  * Returns: #GST_RTSP_OK on success.
1168  */
1169 GstRTSPResult
1170 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1171     guint size, GTimeVal * timeout)
1172 {
1173   guint offset;
1174   gint retval;
1175   GstClockTime to;
1176   GstRTSPResult res;
1177
1178   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1179   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1180   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1181
1182   gst_poll_set_controllable (conn->fdset, TRUE);
1183   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
1184   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
1185   /* clear all previous poll results */
1186   gst_poll_fd_ignored (conn->fdset, conn->writefd);
1187   gst_poll_fd_ignored (conn->fdset, conn->readfd);
1188
1189   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1190
1191   offset = 0;
1192
1193   while (TRUE) {
1194     /* try to write */
1195     res = write_bytes (conn->writefd->fd, data, &offset, size);
1196     if (G_LIKELY (res == GST_RTSP_OK))
1197       break;
1198     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1199       goto write_error;
1200
1201     /* not all is written, wait until we can write more */
1202     do {
1203       retval = gst_poll_wait (conn->fdset, to);
1204     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1205
1206     if (G_UNLIKELY (retval == 0))
1207       goto timeout;
1208
1209     if (G_UNLIKELY (retval == -1)) {
1210       if (errno == EBUSY)
1211         goto stopped;
1212       else
1213         goto select_error;
1214     }
1215   }
1216   return GST_RTSP_OK;
1217
1218   /* ERRORS */
1219 timeout:
1220   {
1221     return GST_RTSP_ETIMEOUT;
1222   }
1223 select_error:
1224   {
1225     return GST_RTSP_ESYS;
1226   }
1227 stopped:
1228   {
1229     return GST_RTSP_EINTR;
1230   }
1231 write_error:
1232   {
1233     return res;
1234   }
1235 }
1236
1237 static GString *
1238 message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
1239 {
1240   GString *str = NULL;
1241
1242   str = g_string_new ("");
1243
1244   switch (message->type) {
1245     case GST_RTSP_MESSAGE_REQUEST:
1246       /* create request string, add CSeq */
1247       g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
1248           "CSeq: %d\r\n",
1249           gst_rtsp_method_as_text (message->type_data.request.method),
1250           message->type_data.request.uri, conn->cseq++);
1251       /* add session id if we have one */
1252       if (conn->session_id[0] != '\0') {
1253         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1254             conn->session_id);
1255       }
1256       /* add any authentication headers */
1257       add_auth_header (conn, message);
1258       break;
1259     case GST_RTSP_MESSAGE_RESPONSE:
1260       /* create response string */
1261       g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
1262           message->type_data.response.code, message->type_data.response.reason);
1263       break;
1264     case GST_RTSP_MESSAGE_DATA:
1265     {
1266       guint8 data_header[4];
1267
1268       /* prepare data header */
1269       data_header[0] = '$';
1270       data_header[1] = message->type_data.data.channel;
1271       data_header[2] = (message->body_size >> 8) & 0xff;
1272       data_header[3] = message->body_size & 0xff;
1273
1274       /* create string with header and data */
1275       str = g_string_append_len (str, (gchar *) data_header, 4);
1276       str =
1277           g_string_append_len (str, (gchar *) message->body,
1278           message->body_size);
1279       break;
1280     }
1281     default:
1282       g_string_free (str, TRUE);
1283       g_return_val_if_reached (NULL);
1284       break;
1285   }
1286
1287   /* append headers and body */
1288   if (message->type != GST_RTSP_MESSAGE_DATA) {
1289     gchar date_string[100];
1290
1291     gen_date_string (date_string, sizeof (date_string));
1292
1293     /* add date header */
1294     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1295
1296     /* append headers */
1297     gst_rtsp_message_append_headers (message, str);
1298
1299     /* append Content-Length and body if needed */
1300     if (message->body != NULL && message->body_size > 0) {
1301       gchar *len;
1302
1303       len = g_strdup_printf ("%d", message->body_size);
1304       g_string_append_printf (str, "%s: %s\r\n",
1305           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1306       g_free (len);
1307       /* header ends here */
1308       g_string_append (str, "\r\n");
1309       str =
1310           g_string_append_len (str, (gchar *) message->body,
1311           message->body_size);
1312     } else {
1313       /* just end headers */
1314       g_string_append (str, "\r\n");
1315     }
1316   }
1317
1318   return str;
1319 }
1320
1321 /**
1322  * gst_rtsp_connection_send:
1323  * @conn: a #GstRTSPConnection
1324  * @message: the message to send
1325  * @timeout: a timeout value or #NULL
1326  *
1327  * Attempt to send @message to the connected @conn, blocking up to
1328  * the specified @timeout. @timeout can be #NULL, in which case this function
1329  * might block forever.
1330  * 
1331  * This function can be cancelled with gst_rtsp_connection_flush().
1332  *
1333  * Returns: #GST_RTSP_OK on success.
1334  */
1335 GstRTSPResult
1336 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1337     GTimeVal * timeout)
1338 {
1339   GString *string = NULL;
1340   GstRTSPResult res;
1341   gchar *str;
1342   gsize len;
1343
1344   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1345   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1346
1347   if (G_UNLIKELY (!(string = message_to_string (conn, message))))
1348     goto no_message;
1349
1350   if (conn->tunneled) {
1351     str = g_base64_encode ((const guchar *) string->str, string->len);
1352     g_string_free (string, TRUE);
1353     len = strlen (str);
1354   } else {
1355     str = string->str;
1356     len = string->len;
1357     g_string_free (string, FALSE);
1358   }
1359
1360   /* write request */
1361   res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
1362
1363   g_free (str);
1364
1365   return res;
1366
1367 no_message:
1368   {
1369     g_warning ("Wrong message");
1370     return GST_RTSP_EINVAL;
1371   }
1372 }
1373
1374 static void
1375 parse_string (gchar * dest, gint size, gchar ** src)
1376 {
1377   gint idx;
1378
1379   idx = 0;
1380   /* skip spaces */
1381   while (g_ascii_isspace (**src))
1382     (*src)++;
1383
1384   while (!g_ascii_isspace (**src) && **src != '\0') {
1385     if (idx < size - 1)
1386       dest[idx++] = **src;
1387     (*src)++;
1388   }
1389   if (size > 0)
1390     dest[idx] = '\0';
1391 }
1392
1393 static void
1394 parse_key (gchar * dest, gint size, gchar ** src)
1395 {
1396   gint idx;
1397
1398   idx = 0;
1399   while (**src != ':' && **src != '\0') {
1400     if (idx < size - 1)
1401       dest[idx++] = **src;
1402     (*src)++;
1403   }
1404   if (size > 0)
1405     dest[idx] = '\0';
1406 }
1407
1408 static GstRTSPResult
1409 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
1410 {
1411   GstRTSPResult res;
1412   gchar versionstr[20];
1413   gchar codestr[4];
1414   gint code;
1415   gchar *bptr;
1416
1417   bptr = (gchar *) buffer;
1418
1419   parse_string (versionstr, sizeof (versionstr), &bptr);
1420   parse_string (codestr, sizeof (codestr), &bptr);
1421   code = atoi (codestr);
1422
1423   while (g_ascii_isspace (*bptr))
1424     bptr++;
1425
1426   if (strcmp (versionstr, "RTSP/1.0") == 0)
1427     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1428         parse_error);
1429   else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1430     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1431         parse_error);
1432     msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
1433   } else
1434     goto parse_error;
1435
1436   return GST_RTSP_OK;
1437
1438 parse_error:
1439   {
1440     return GST_RTSP_EPARSE;
1441   }
1442 }
1443
1444 static GstRTSPResult
1445 parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
1446     GstRTSPMessage * msg)
1447 {
1448   GstRTSPResult res = GST_RTSP_OK;
1449   gchar versionstr[20];
1450   gchar methodstr[20];
1451   gchar urlstr[4096];
1452   gchar *bptr;
1453   GstRTSPMethod method;
1454   GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;
1455
1456   bptr = (gchar *) buffer;
1457
1458   parse_string (methodstr, sizeof (methodstr), &bptr);
1459   method = gst_rtsp_find_method (methodstr);
1460   if (method == GST_RTSP_INVALID) {
1461     /* a tunnel request is allowed when we don't have one yet */
1462     if (conn->tstate != TUNNEL_STATE_NONE)
1463       goto invalid_method;
1464     /* we need GET or POST for a valid tunnel request */
1465     if (!strcmp (methodstr, "GET"))
1466       tstate = TUNNEL_STATE_GET;
1467     else if (!strcmp (methodstr, "POST"))
1468       tstate = TUNNEL_STATE_POST;
1469     else
1470       goto invalid_method;
1471   }
1472
1473   parse_string (urlstr, sizeof (urlstr), &bptr);
1474   if (G_UNLIKELY (*urlstr == '\0'))
1475     goto invalid_url;
1476
1477   parse_string (versionstr, sizeof (versionstr), &bptr);
1478
1479   if (G_UNLIKELY (*bptr != '\0'))
1480     goto invalid_version;
1481
1482   if (strcmp (versionstr, "RTSP/1.0") == 0) {
1483     res = gst_rtsp_message_init_request (msg, method, urlstr);
1484   } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1485     res = gst_rtsp_message_init_request (msg, method, urlstr);
1486     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1487   } else if (strcmp (versionstr, "HTTP/1.0") == 0) {
1488     /* tunnel request, we need a tunnel method */
1489     if (tstate == TUNNEL_STATE_NONE) {
1490       res = GST_RTSP_EPARSE;
1491     } else {
1492       conn->tstate = tstate;
1493     }
1494   } else {
1495     res = GST_RTSP_EPARSE;
1496   }
1497
1498   return res;
1499
1500   /* ERRORS */
1501 invalid_method:
1502   {
1503     GST_ERROR ("invalid method %s", methodstr);
1504     return GST_RTSP_EPARSE;
1505   }
1506 invalid_url:
1507   {
1508     GST_ERROR ("invalid url %s", urlstr);
1509     return GST_RTSP_EPARSE;
1510   }
1511 invalid_version:
1512   {
1513     GST_ERROR ("invalid version");
1514     return GST_RTSP_EPARSE;
1515   }
1516 }
1517
1518 static GstRTSPResult
1519 parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value)
1520 {
1521   gchar *bptr;
1522
1523   bptr = (gchar *) buffer;
1524
1525   /* read key */
1526   parse_key (key, keysize, &bptr);
1527   if (G_UNLIKELY (*bptr != ':'))
1528     goto no_column;
1529
1530   bptr++;
1531   while (g_ascii_isspace (*bptr))
1532     bptr++;
1533
1534   *value = bptr;
1535
1536   return GST_RTSP_OK;
1537
1538   /* ERRORS */
1539 no_column:
1540   {
1541     return GST_RTSP_EPARSE;
1542   }
1543 }
1544
1545 /* parsing lines means reading a Key: Value pair */
1546 static GstRTSPResult
1547 parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
1548 {
1549   GstRTSPResult res;
1550   gchar key[32];
1551   gchar *value;
1552   GstRTSPHeaderField field;
1553
1554   res = parse_key_value (buffer, key, sizeof (key), &value);
1555   if (G_UNLIKELY (res != GST_RTSP_OK))
1556     goto parse_error;
1557
1558   if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
1559     /* save the tunnel session in the connection */
1560     if (!strcmp (key, "x-sessioncookie")) {
1561       strncpy (conn->tunnelid, value, TUNNELID_LEN);
1562       conn->tunnelid[TUNNELID_LEN - 1] = '\0';
1563       conn->tunneled = TRUE;
1564     }
1565   } else {
1566     field = gst_rtsp_find_header_field (key);
1567     if (field != GST_RTSP_HDR_INVALID)
1568       gst_rtsp_message_add_header (msg, field, value);
1569   }
1570
1571   return GST_RTSP_OK;
1572
1573   /* ERRORS */
1574 parse_error:
1575   {
1576     return res;
1577   }
1578 }
1579
1580 /* returns:
1581  *  GST_RTSP_OK when a complete message was read.
1582  *  GST_RTSP_EEOF: when the socket is closed
1583  *  GST_RTSP_EINTR: when more data is needed.
1584  *  GST_RTSP_..: some other error occured.
1585  */
1586 static GstRTSPResult
1587 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
1588     GstRTSPConnection * conn)
1589 {
1590   GstRTSPResult res;
1591
1592   while (TRUE) {
1593     switch (builder->state) {
1594       case STATE_START:
1595         builder->offset = 0;
1596         res =
1597             read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
1598             &builder->offset, 1, conn->ctxp);
1599         if (res != GST_RTSP_OK)
1600           goto done;
1601
1602         /* we have 1 bytes now and we can see if this is a data message or
1603          * not */
1604         if (builder->buffer[0] == '$') {
1605           /* data message, prepare for the header */
1606           builder->state = STATE_DATA_HEADER;
1607         } else {
1608           builder->line = 0;
1609           builder->state = STATE_READ_LINES;
1610         }
1611         break;
1612       case STATE_DATA_HEADER:
1613       {
1614         res =
1615             read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
1616             &builder->offset, 4, conn->ctxp);
1617         if (res != GST_RTSP_OK)
1618           goto done;
1619
1620         gst_rtsp_message_init_data (message, builder->buffer[1]);
1621
1622         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
1623         builder->body_data = g_malloc (builder->body_len + 1);
1624         builder->body_data[builder->body_len] = '\0';
1625         builder->offset = 0;
1626         builder->state = STATE_DATA_BODY;
1627         break;
1628       }
1629       case STATE_DATA_BODY:
1630       {
1631         res =
1632             read_bytes (conn->readfd->fd, builder->body_data, &builder->offset,
1633             builder->body_len, conn->ctxp);
1634         if (res != GST_RTSP_OK)
1635           goto done;
1636
1637         /* we have the complete body now, store in the message adjusting the
1638          * length to include the traling '\0' */
1639         gst_rtsp_message_take_body (message,
1640             (guint8 *) builder->body_data, builder->body_len + 1);
1641         builder->body_data = NULL;
1642         builder->body_len = 0;
1643
1644         builder->state = STATE_END;
1645         break;
1646       }
1647       case STATE_READ_LINES:
1648       {
1649         res = read_line (conn->readfd->fd, builder->buffer, &builder->offset,
1650             sizeof (builder->buffer), conn->ctxp);
1651         if (res != GST_RTSP_OK)
1652           goto done;
1653
1654         /* we have a regular response */
1655         if (builder->buffer[0] == '\r') {
1656           builder->buffer[0] = '\0';
1657         }
1658
1659         if (builder->buffer[0] == '\0') {
1660           gchar *hdrval;
1661
1662           /* empty line, end of message header */
1663           /* see if there is a Content-Length header */
1664           if (gst_rtsp_message_get_header (message,
1665                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
1666             /* there is, prepare to read the body */
1667             builder->body_len = atol (hdrval);
1668             builder->body_data = g_malloc (builder->body_len + 1);
1669             builder->body_data[builder->body_len] = '\0';
1670             builder->offset = 0;
1671             builder->state = STATE_DATA_BODY;
1672           } else {
1673             builder->state = STATE_END;
1674           }
1675           break;
1676         }
1677
1678         /* we have a line */
1679         if (builder->line == 0) {
1680           /* first line, check for response status */
1681           if (memcmp (builder->buffer, "RTSP", 4) == 0) {
1682             res = parse_response_status (builder->buffer, message);
1683           } else {
1684             res = parse_request_line (conn, builder->buffer, message);
1685           }
1686           /* the first line must parse without errors */
1687           if (res != GST_RTSP_OK)
1688             goto done;
1689         } else {
1690           /* else just parse the line, ignore errors */
1691           parse_line (conn, builder->buffer, message);
1692         }
1693         builder->line++;
1694         builder->offset = 0;
1695         break;
1696       }
1697       case STATE_END:
1698       {
1699         gchar *session_id;
1700
1701         if (conn->tstate == TUNNEL_STATE_GET) {
1702           res = GST_RTSP_ETGET;
1703           goto done;
1704         } else if (conn->tstate == TUNNEL_STATE_POST) {
1705           res = GST_RTSP_ETPOST;
1706           goto done;
1707         }
1708
1709         if (message->type == GST_RTSP_MESSAGE_DATA) {
1710           /* data messages don't have headers */
1711           res = GST_RTSP_OK;
1712           goto done;
1713         }
1714
1715         /* save session id in the connection for further use */
1716         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
1717             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
1718                 &session_id, 0) == GST_RTSP_OK) {
1719           gint maxlen, i;
1720
1721           maxlen = sizeof (conn->session_id) - 1;
1722           /* the sessionid can have attributes marked with ;
1723            * Make sure we strip them */
1724           for (i = 0; session_id[i] != '\0'; i++) {
1725             if (session_id[i] == ';') {
1726               maxlen = i;
1727               /* parse timeout */
1728               do {
1729                 i++;
1730               } while (g_ascii_isspace (session_id[i]));
1731               if (g_str_has_prefix (&session_id[i], "timeout=")) {
1732                 gint to;
1733
1734                 /* if we parsed something valid, configure */
1735                 if ((to = atoi (&session_id[i + 8])) > 0)
1736                   conn->timeout = to;
1737               }
1738               break;
1739             }
1740           }
1741
1742           /* make sure to not overflow */
1743           strncpy (conn->session_id, session_id, maxlen);
1744           conn->session_id[maxlen] = '\0';
1745         }
1746         res = GST_RTSP_OK;
1747         goto done;
1748       }
1749       default:
1750         res = GST_RTSP_ERROR;
1751         break;
1752     }
1753   }
1754 done:
1755   return res;
1756 }
1757
1758 /**
1759  * gst_rtsp_connection_read:
1760  * @conn: a #GstRTSPConnection
1761  * @data: the data to read
1762  * @size: the size of @data
1763  * @timeout: a timeout value or #NULL
1764  *
1765  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
1766  * the specified @timeout. @timeout can be #NULL, in which case this function
1767  * might block forever.
1768  *
1769  * This function can be cancelled with gst_rtsp_connection_flush().
1770  *
1771  * Returns: #GST_RTSP_OK on success.
1772  */
1773 GstRTSPResult
1774 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
1775     GTimeVal * timeout)
1776 {
1777   guint offset;
1778   gint retval;
1779   GstClockTime to;
1780   GstRTSPResult res;
1781
1782   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1783   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
1784   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1785
1786   if (G_UNLIKELY (size == 0))
1787     return GST_RTSP_OK;
1788
1789   offset = 0;
1790
1791   /* configure timeout if any */
1792   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1793
1794   gst_poll_set_controllable (conn->fdset, TRUE);
1795   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1796   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1797
1798   while (TRUE) {
1799     res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp);
1800     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1801       goto eof;
1802     if (G_LIKELY (res == GST_RTSP_OK))
1803       break;
1804     if (G_UNLIKELY (res != GST_RTSP_EINTR))
1805       goto read_error;
1806
1807     do {
1808       retval = gst_poll_wait (conn->fdset, to);
1809     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1810
1811     /* check for timeout */
1812     if (G_UNLIKELY (retval == 0))
1813       goto select_timeout;
1814
1815     if (G_UNLIKELY (retval == -1)) {
1816       if (errno == EBUSY)
1817         goto stopped;
1818       else
1819         goto select_error;
1820     }
1821     gst_poll_set_controllable (conn->fdset, FALSE);
1822   }
1823   return GST_RTSP_OK;
1824
1825   /* ERRORS */
1826 select_error:
1827   {
1828     return GST_RTSP_ESYS;
1829   }
1830 select_timeout:
1831   {
1832     return GST_RTSP_ETIMEOUT;
1833   }
1834 stopped:
1835   {
1836     return GST_RTSP_EINTR;
1837   }
1838 eof:
1839   {
1840     return GST_RTSP_EEOF;
1841   }
1842 read_error:
1843   {
1844     return res;
1845   }
1846 }
1847
1848 static GString *
1849 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
1850 {
1851   GString *str;
1852   gchar date_string[100];
1853   const gchar *status;
1854
1855   gen_date_string (date_string, sizeof (date_string));
1856
1857   status = gst_rtsp_status_as_text (code);
1858   if (status == NULL) {
1859     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
1860     status = "Internal Server Error";
1861   }
1862
1863   str = g_string_new ("");
1864
1865   /* */
1866   g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
1867   g_string_append_printf (str,
1868       "Server: GStreamer RTSP Server\r\n"
1869       "Date: %s\r\n"
1870       "Connection: close\r\n"
1871       "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
1872   if (code == GST_RTSP_STS_OK) {
1873     if (conn->ip)
1874       g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
1875     g_string_append_printf (str,
1876         "Content-Type: application/x-rtsp-tunnelled\r\n");
1877   }
1878   g_string_append_printf (str, "\r\n");
1879   return str;
1880 }
1881
1882 /**
1883  * gst_rtsp_connection_receive:
1884  * @conn: a #GstRTSPConnection
1885  * @message: the message to read
1886  * @timeout: a timeout value or #NULL
1887  *
1888  * Attempt to read into @message from the connected @conn, blocking up to
1889  * the specified @timeout. @timeout can be #NULL, in which case this function
1890  * might block forever.
1891  * 
1892  * This function can be cancelled with gst_rtsp_connection_flush().
1893  *
1894  * Returns: #GST_RTSP_OK on success.
1895  */
1896 GstRTSPResult
1897 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
1898     GTimeVal * timeout)
1899 {
1900   GstRTSPResult res;
1901   GstRTSPBuilder builder;
1902   gint retval;
1903   GstClockTime to;
1904
1905   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1906   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1907   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1908
1909   /* configure timeout if any */
1910   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1911
1912   gst_poll_set_controllable (conn->fdset, TRUE);
1913   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1914   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1915
1916   memset (&builder, 0, sizeof (GstRTSPBuilder));
1917   while (TRUE) {
1918     res = build_next (&builder, message, conn);
1919     if (G_UNLIKELY (res == GST_RTSP_EEOF))
1920       goto eof;
1921     if (G_LIKELY (res == GST_RTSP_OK))
1922       break;
1923     if (res == GST_RTSP_ETGET) {
1924       GString *str;
1925
1926       /* tunnel GET request, we can reply now */
1927       str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
1928       res =
1929           gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
1930           timeout);
1931       g_string_free (str, TRUE);
1932     } else if (res == GST_RTSP_ETPOST) {
1933       /* tunnel POST request, return the value, the caller now has to link the
1934        * two connections. */
1935       break;
1936     } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
1937       goto read_error;
1938
1939     do {
1940       retval = gst_poll_wait (conn->fdset, to);
1941     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1942
1943     /* check for timeout */
1944     if (G_UNLIKELY (retval == 0))
1945       goto select_timeout;
1946
1947     if (G_UNLIKELY (retval == -1)) {
1948       if (errno == EBUSY)
1949         goto stopped;
1950       else
1951         goto select_error;
1952     }
1953     gst_poll_set_controllable (conn->fdset, FALSE);
1954   }
1955
1956   /* we have a message here */
1957   build_reset (&builder);
1958
1959   return GST_RTSP_OK;
1960
1961   /* ERRORS */
1962 select_error:
1963   {
1964     res = GST_RTSP_ESYS;
1965     goto cleanup;
1966   }
1967 select_timeout:
1968   {
1969     res = GST_RTSP_ETIMEOUT;
1970     goto cleanup;
1971   }
1972 stopped:
1973   {
1974     res = GST_RTSP_EINTR;
1975     goto cleanup;
1976   }
1977 eof:
1978   {
1979     res = GST_RTSP_EEOF;
1980     goto cleanup;
1981   }
1982 read_error:
1983 cleanup:
1984   {
1985     build_reset (&builder);
1986     gst_rtsp_message_unset (message);
1987     return res;
1988   }
1989 }
1990
1991 /**
1992  * gst_rtsp_connection_close:
1993  * @conn: a #GstRTSPConnection
1994  *
1995  * Close the connected @conn. After this call, the connection is in the same
1996  * state as when it was first created.
1997  * 
1998  * Returns: #GST_RTSP_OK on success.
1999  */
2000 GstRTSPResult
2001 gst_rtsp_connection_close (GstRTSPConnection * conn)
2002 {
2003   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2004
2005   g_free (conn->ip);
2006   conn->ip = NULL;
2007
2008   REMOVE_POLLFD (conn->fdset, &conn->fd0);
2009   REMOVE_POLLFD (conn->fdset, &conn->fd1);
2010   conn->writefd = NULL;
2011   conn->readfd = NULL;
2012   conn->tunneled = FALSE;
2013   conn->tstate = TUNNEL_STATE_NONE;
2014   conn->ctxp = NULL;
2015   g_free (conn->username);
2016   conn->username = NULL;
2017   g_free (conn->passwd);
2018   conn->passwd = NULL;
2019   gst_rtsp_connection_clear_auth_params (conn);
2020   conn->timeout = 60;
2021   conn->cseq = 0;
2022   conn->session_id[0] = '\0';
2023
2024   return GST_RTSP_OK;
2025 }
2026
2027 /**
2028  * gst_rtsp_connection_free:
2029  * @conn: a #GstRTSPConnection
2030  *
2031  * Close and free @conn.
2032  * 
2033  * Returns: #GST_RTSP_OK on success.
2034  */
2035 GstRTSPResult
2036 gst_rtsp_connection_free (GstRTSPConnection * conn)
2037 {
2038   GstRTSPResult res;
2039
2040   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2041
2042   res = gst_rtsp_connection_close (conn);
2043   gst_poll_free (conn->fdset);
2044   g_timer_destroy (conn->timer);
2045   gst_rtsp_url_free (conn->url);
2046   g_free (conn->proxy_host);
2047   g_free (conn);
2048 #ifdef G_OS_WIN32
2049   WSACleanup ();
2050 #endif
2051
2052   return res;
2053 }
2054
2055 /**
2056  * gst_rtsp_connection_poll:
2057  * @conn: a #GstRTSPConnection
2058  * @events: a bitmask of #GstRTSPEvent flags to check
2059  * @revents: location for result flags 
2060  * @timeout: a timeout
2061  *
2062  * Wait up to the specified @timeout for the connection to become available for
2063  * at least one of the operations specified in @events. When the function returns
2064  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2065  * @conn.
2066  *
2067  * @timeout can be #NULL, in which case this function might block forever.
2068  *
2069  * This function can be cancelled with gst_rtsp_connection_flush().
2070  * 
2071  * Returns: #GST_RTSP_OK on success.
2072  *
2073  * Since: 0.10.15
2074  */
2075 GstRTSPResult
2076 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2077     GstRTSPEvent * revents, GTimeVal * timeout)
2078 {
2079   GstClockTime to;
2080   gint retval;
2081
2082   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2083   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2084   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2085   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2086   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2087
2088   gst_poll_set_controllable (conn->fdset, TRUE);
2089
2090   /* add fd to writer set when asked to */
2091   gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
2092       events & GST_RTSP_EV_WRITE);
2093
2094   /* add fd to reader set when asked to */
2095   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);
2096
2097   /* configure timeout if any */
2098   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
2099
2100   do {
2101     retval = gst_poll_wait (conn->fdset, to);
2102   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
2103
2104   if (G_UNLIKELY (retval == 0))
2105     goto select_timeout;
2106
2107   if (G_UNLIKELY (retval == -1)) {
2108     if (errno == EBUSY)
2109       goto stopped;
2110     else
2111       goto select_error;
2112   }
2113
2114   *revents = 0;
2115   if (events & GST_RTSP_EV_READ) {
2116     if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
2117       *revents |= GST_RTSP_EV_READ;
2118   }
2119   if (events & GST_RTSP_EV_WRITE) {
2120     if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
2121       *revents |= GST_RTSP_EV_WRITE;
2122   }
2123   return GST_RTSP_OK;
2124
2125   /* ERRORS */
2126 select_timeout:
2127   {
2128     return GST_RTSP_ETIMEOUT;
2129   }
2130 select_error:
2131   {
2132     return GST_RTSP_ESYS;
2133   }
2134 stopped:
2135   {
2136     return GST_RTSP_EINTR;
2137   }
2138 }
2139
2140 /**
2141  * gst_rtsp_connection_next_timeout:
2142  * @conn: a #GstRTSPConnection
2143  * @timeout: a timeout
2144  *
2145  * Calculate the next timeout for @conn, storing the result in @timeout.
2146  * 
2147  * Returns: #GST_RTSP_OK.
2148  */
2149 GstRTSPResult
2150 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2151 {
2152   gdouble elapsed;
2153   glong sec;
2154   gulong usec;
2155
2156   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2157   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2158
2159   elapsed = g_timer_elapsed (conn->timer, &usec);
2160   if (elapsed >= conn->timeout) {
2161     sec = 0;
2162     usec = 0;
2163   } else {
2164     sec = conn->timeout - elapsed;
2165   }
2166
2167   timeout->tv_sec = sec;
2168   timeout->tv_usec = usec;
2169
2170   return GST_RTSP_OK;
2171 }
2172
2173 /**
2174  * gst_rtsp_connection_reset_timeout:
2175  * @conn: a #GstRTSPConnection
2176  *
2177  * Reset the timeout of @conn.
2178  * 
2179  * Returns: #GST_RTSP_OK.
2180  */
2181 GstRTSPResult
2182 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2183 {
2184   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2185
2186   g_timer_start (conn->timer);
2187
2188   return GST_RTSP_OK;
2189 }
2190
2191 /**
2192  * gst_rtsp_connection_flush:
2193  * @conn: a #GstRTSPConnection
2194  * @flush: start or stop the flush
2195  *
2196  * Start or stop the flushing action on @conn. When flushing, all current
2197  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2198  * is set to non-flushing mode again.
2199  * 
2200  * Returns: #GST_RTSP_OK.
2201  */
2202 GstRTSPResult
2203 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2204 {
2205   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2206
2207   gst_poll_set_flushing (conn->fdset, flush);
2208
2209   return GST_RTSP_OK;
2210 }
2211
2212 /**
2213  * gst_rtsp_connection_set_proxy:
2214  * @conn: a #GstRTSPConnection
2215  * @host: the proxy host
2216  * @port: the proxy port
2217  *
2218  * Set the proxy host and port.
2219  * 
2220  * Returns: #GST_RTSP_OK.
2221  *
2222  * Since: 0.10.23
2223  */
2224 GstRTSPResult
2225 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
2226     const gchar * host, guint port)
2227 {
2228   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2229
2230   g_free (conn->proxy_host);
2231   conn->proxy_host = g_strdup (host);
2232   conn->proxy_port = port;
2233
2234   return GST_RTSP_OK;
2235 }
2236
2237 /**
2238  * gst_rtsp_connection_set_auth:
2239  * @conn: a #GstRTSPConnection
2240  * @method: authentication method
2241  * @user: the user
2242  * @pass: the password
2243  *
2244  * Configure @conn for authentication mode @method with @user and @pass as the
2245  * user and password respectively.
2246  * 
2247  * Returns: #GST_RTSP_OK.
2248  */
2249 GstRTSPResult
2250 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
2251     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
2252 {
2253   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2254
2255   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
2256           || g_strrstr (user, ":") != NULL))
2257     return GST_RTSP_EINVAL;
2258
2259   /* Make sure the username and passwd are being set for authentication */
2260   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
2261     return GST_RTSP_EINVAL;
2262
2263   /* ":" chars are not allowed in usernames for basic auth */
2264   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
2265     return GST_RTSP_EINVAL;
2266
2267   g_free (conn->username);
2268   g_free (conn->passwd);
2269
2270   conn->auth_method = method;
2271   conn->username = g_strdup (user);
2272   conn->passwd = g_strdup (pass);
2273
2274   return GST_RTSP_OK;
2275 }
2276
2277 /**
2278  * str_case_hash:
2279  * @key: ASCII string to hash
2280  *
2281  * Hashes @key in a case-insensitive manner.
2282  *
2283  * Returns: the hash code.
2284  **/
2285 static guint
2286 str_case_hash (gconstpointer key)
2287 {
2288   const char *p = key;
2289   guint h = g_ascii_toupper (*p);
2290
2291   if (h)
2292     for (p += 1; *p != '\0'; p++)
2293       h = (h << 5) - h + g_ascii_toupper (*p);
2294
2295   return h;
2296 }
2297
2298 /**
2299  * str_case_equal:
2300  * @v1: an ASCII string
2301  * @v2: another ASCII string
2302  *
2303  * Compares @v1 and @v2 in a case-insensitive manner
2304  *
2305  * Returns: %TRUE if they are equal (modulo case)
2306  **/
2307 static gboolean
2308 str_case_equal (gconstpointer v1, gconstpointer v2)
2309 {
2310   const char *string1 = v1;
2311   const char *string2 = v2;
2312
2313   return g_ascii_strcasecmp (string1, string2) == 0;
2314 }
2315
2316 /**
2317  * gst_rtsp_connection_set_auth_param:
2318  * @conn: a #GstRTSPConnection
2319  * @param: authentication directive
2320  * @value: value
2321  *
2322  * Setup @conn with authentication directives. This is not necesary for
2323  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
2324  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
2325  * in the WWW-Authenticate response header and can include realm, domain,
2326  * nonce, opaque, stale, algorithm, qop as per RFC2617.
2327  * 
2328  * Since: 0.10.20
2329  */
2330 void
2331 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
2332     const gchar * param, const gchar * value)
2333 {
2334   g_return_if_fail (conn != NULL);
2335   g_return_if_fail (param != NULL);
2336
2337   if (conn->auth_params == NULL) {
2338     conn->auth_params =
2339         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
2340   }
2341   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
2342 }
2343
2344 /**
2345  * gst_rtsp_connection_clear_auth_params:
2346  * @conn: a #GstRTSPConnection
2347  *
2348  * Clear the list of authentication directives stored in @conn.
2349  *
2350  * Since: 0.10.20
2351  */
2352 void
2353 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
2354 {
2355   g_return_if_fail (conn != NULL);
2356
2357   if (conn->auth_params != NULL) {
2358     g_hash_table_destroy (conn->auth_params);
2359     conn->auth_params = NULL;
2360   }
2361 }
2362
2363 static GstRTSPResult
2364 set_qos_dscp (gint fd, guint qos_dscp)
2365 {
2366   union gst_sockaddr sa;
2367   socklen_t slen = sizeof (sa);
2368   gint af;
2369   gint tos;
2370
2371   if (fd == -1)
2372     return GST_RTSP_OK;
2373
2374   if (getsockname (fd, &sa.sa, &slen) < 0)
2375     goto no_getsockname;
2376
2377   af = sa.sa.sa_family;
2378
2379   /* if this is an IPv4-mapped address then do IPv4 QoS */
2380   if (af == AF_INET6) {
2381     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
2382       af = AF_INET;
2383   }
2384
2385   /* extract and shift 6 bits of the DSCP */
2386   tos = (qos_dscp & 0x3f) << 2;
2387
2388   switch (af) {
2389     case AF_INET:
2390       if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
2391         goto no_setsockopt;
2392       break;
2393     case AF_INET6:
2394 #ifdef IPV6_TCLASS
2395       if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
2396         goto no_setsockopt;
2397       break;
2398 #endif
2399     default:
2400       goto wrong_family;
2401   }
2402
2403   return GST_RTSP_OK;
2404
2405   /* ERRORS */
2406 no_getsockname:
2407 no_setsockopt:
2408   {
2409     return GST_RTSP_ESYS;
2410   }
2411
2412 wrong_family:
2413   {
2414     return GST_RTSP_ERROR;
2415   }
2416 }
2417
2418 /**
2419  * gst_rtsp_connection_set_qos_dscp:
2420  * @conn: a #GstRTSPConnection
2421  * @qos_dscp: DSCP value
2422  *
2423  * Configure @conn to use the specified DSCP value.
2424  *
2425  * Returns: #GST_RTSP_OK on success.
2426  *
2427  * Since: 0.10.20
2428  */
2429 GstRTSPResult
2430 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
2431 {
2432   GstRTSPResult res;
2433
2434   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2435   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
2436   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
2437
2438   res = set_qos_dscp (conn->fd0.fd, qos_dscp);
2439   if (res == GST_RTSP_OK)
2440     res = set_qos_dscp (conn->fd1.fd, qos_dscp);
2441
2442   return res;
2443 }
2444
2445
2446 /**
2447  * gst_rtsp_connection_get_url:
2448  * @conn: a #GstRTSPConnection
2449  *
2450  * Retrieve the URL of the other end of @conn.
2451  *
2452  * Returns: The URL. This value remains valid until the
2453  * connection is freed.
2454  *
2455  * Since: 0.10.23
2456  */
2457 GstRTSPUrl *
2458 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
2459 {
2460   g_return_val_if_fail (conn != NULL, NULL);
2461
2462   return conn->url;
2463 }
2464
2465 /**
2466  * gst_rtsp_connection_get_ip:
2467  * @conn: a #GstRTSPConnection
2468  *
2469  * Retrieve the IP address of the other end of @conn.
2470  *
2471  * Returns: The IP address as a string. this value remains valid until the
2472  * connection is closed.
2473  *
2474  * Since: 0.10.20
2475  */
2476 const gchar *
2477 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
2478 {
2479   g_return_val_if_fail (conn != NULL, NULL);
2480
2481   return conn->ip;
2482 }
2483
2484 /**
2485  * gst_rtsp_connection_set_ip:
2486  * @conn: a #GstRTSPConnection
2487  * @ip: an ip address
2488  *
2489  * Set the IP address of the server.
2490  *
2491  * Since: 0.10.23
2492  */
2493 void
2494 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
2495 {
2496   g_return_if_fail (conn != NULL);
2497
2498   g_free (conn->ip);
2499   conn->ip = g_strdup (ip);
2500 }
2501
2502 /**
2503  * gst_rtsp_connection_get_readfd:
2504  * @conn: a #GstRTSPConnection
2505  *
2506  * Get the file descriptor for reading.
2507  *
2508  * Returns: the file descriptor used for reading or -1 on error. The file
2509  * descriptor remains valid until the connection is closed.
2510  *
2511  * Since: 0.10.23
2512  */
2513 gint
2514 gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn)
2515 {
2516   g_return_val_if_fail (conn != NULL, -1);
2517   g_return_val_if_fail (conn->readfd != NULL, -1);
2518
2519   return conn->readfd->fd;
2520 }
2521
2522 /**
2523  * gst_rtsp_connection_get_writefd:
2524  * @conn: a #GstRTSPConnection
2525  *
2526  * Get the file descriptor for writing.
2527  *
2528  * Returns: the file descriptor used for writing or -1 on error. The file
2529  * descriptor remains valid until the connection is closed.
2530  *
2531  * Since: 0.10.23
2532  */
2533 gint
2534 gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn)
2535 {
2536   g_return_val_if_fail (conn != NULL, -1);
2537   g_return_val_if_fail (conn->writefd != NULL, -1);
2538
2539   return conn->writefd->fd;
2540 }
2541
2542
2543 /**
2544  * gst_rtsp_connection_set_tunneled:
2545  * @conn: a #GstRTSPConnection
2546  * @tunneled: the new state
2547  *
2548  * Set the HTTP tunneling state of the connection. This must be configured before
2549  * the @conn is connected.
2550  *
2551  * Since: 0.10.23
2552  */
2553 void
2554 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
2555 {
2556   g_return_if_fail (conn != NULL);
2557   g_return_if_fail (conn->readfd == NULL);
2558   g_return_if_fail (conn->writefd == NULL);
2559
2560   conn->tunneled = tunneled;
2561 }
2562
2563 /**
2564  * gst_rtsp_connection_is_tunneled:
2565  * @conn: a #GstRTSPConnection
2566  *
2567  * Get the tunneling state of the connection. 
2568  *
2569  * Returns: if @conn is using HTTP tunneling.
2570  *
2571  * Since: 0.10.23
2572  */
2573 gboolean
2574 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
2575 {
2576   g_return_val_if_fail (conn != NULL, FALSE);
2577
2578   return conn->tunneled;
2579 }
2580
2581 /**
2582  * gst_rtsp_connection_get_tunnelid:
2583  * @conn: a #GstRTSPConnection
2584  *
2585  * Get the tunnel session id the connection. 
2586  *
2587  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
2588  *
2589  * Since: 0.10.23
2590  */
2591 const gchar *
2592 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
2593 {
2594   g_return_val_if_fail (conn != NULL, NULL);
2595
2596   if (!conn->tunneled)
2597     return NULL;
2598
2599   return conn->tunnelid;
2600 }
2601
2602 /**
2603  * gst_rtsp_connection_do_tunnel:
2604  * @conn: a #GstRTSPConnection
2605  * @conn2: a #GstRTSPConnection
2606  *
2607  * If @conn received the first tunnel connection and @conn2 received
2608  * the second tunnel connection, link the two connections together so that
2609  * @conn manages the tunneled connection.
2610  *
2611  * After this call, @conn2 cannot be used anymore and must be freed with
2612  * gst_rtsp_connection_free().
2613  *
2614  * Returns: return GST_RTSP_OK on success.
2615  *
2616  * Since: 0.10.23
2617  */
2618 GstRTSPResult
2619 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
2620     GstRTSPConnection * conn2)
2621 {
2622   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2623   g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
2624   g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
2625   g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
2626   g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
2627       GST_RTSP_EINVAL);
2628
2629   /* both connections have fd0 as the read/write socket. start by taking the
2630    * socket from conn2 and set it as the socket in conn */
2631   conn->fd1 = conn2->fd0;
2632
2633   /* clean up some of the state of conn2 */
2634   gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
2635   conn2->fd0.fd = -1;
2636   conn2->readfd = conn2->writefd = NULL;
2637
2638   /* We make fd0 the write socket and fd1 the read socket. */
2639   conn->writefd = &conn->fd0;
2640   conn->readfd = &conn->fd1;
2641
2642   conn->tstate = TUNNEL_STATE_COMPLETE;
2643
2644   /* we need base64 decoding for the readfd */
2645   conn->ctx.state = 0;
2646   conn->ctx.save = 0;
2647   conn->ctx.cout = 0;
2648   conn->ctx.coutl = 0;
2649   conn->ctxp = &conn->ctx;
2650
2651   return GST_RTSP_OK;
2652 }
2653
2654 #define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
2655 #define WRITE_COND  (G_IO_OUT | G_IO_ERR)
2656
2657 typedef struct
2658 {
2659   guint8 *data;
2660   guint size;
2661   guint id;
2662 } GstRTSPRec;
2663
2664 /* async functions */
2665 struct _GstRTSPWatch
2666 {
2667   GSource source;
2668
2669   GstRTSPConnection *conn;
2670
2671   GstRTSPBuilder builder;
2672   GstRTSPMessage message;
2673
2674   GPollFD readfd;
2675   GPollFD writefd;
2676   gboolean write_added;
2677
2678   /* queued message for transmission */
2679   guint id;
2680   GAsyncQueue *messages;
2681   guint8 *write_data;
2682   guint write_off;
2683   guint write_size;
2684   guint write_id;
2685
2686   GstRTSPWatchFuncs funcs;
2687
2688   gpointer user_data;
2689   GDestroyNotify notify;
2690 };
2691
2692 static gboolean
2693 gst_rtsp_source_prepare (GSource * source, gint * timeout)
2694 {
2695   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2696
2697   *timeout = (watch->conn->timeout * 1000);
2698
2699   return FALSE;
2700 }
2701
2702 static gboolean
2703 gst_rtsp_source_check (GSource * source)
2704 {
2705   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2706
2707   if (watch->readfd.revents & READ_COND)
2708     return TRUE;
2709
2710   if (watch->writefd.revents & WRITE_COND)
2711     return TRUE;
2712
2713   return FALSE;
2714 }
2715
2716 static gboolean
2717 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
2718     gpointer user_data G_GNUC_UNUSED)
2719 {
2720   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2721   GstRTSPResult res;
2722
2723   /* first read as much as we can */
2724   if (watch->readfd.revents & READ_COND) {
2725     do {
2726       res = build_next (&watch->builder, &watch->message, watch->conn);
2727       if (res == GST_RTSP_EINTR)
2728         break;
2729       if (G_UNLIKELY (res == GST_RTSP_EEOF))
2730         goto eof;
2731       if (res == GST_RTSP_ETGET) {
2732         GString *str;
2733         GstRTSPStatusCode code;
2734         guint size;
2735
2736         if (watch->funcs.tunnel_start)
2737           code = watch->funcs.tunnel_start (watch, watch->user_data);
2738         else
2739           code = GST_RTSP_STS_OK;
2740
2741         /* queue the response string */
2742         str = gen_tunnel_reply (watch->conn, code);
2743         size = str->len;
2744         gst_rtsp_watch_queue_data (watch, (guint8 *) g_string_free (str, FALSE),
2745             size);
2746       } else if (res == GST_RTSP_ETPOST) {
2747         /* in the callback the connection should be tunneled with the
2748          * GET connection */
2749         if (watch->funcs.tunnel_complete)
2750           watch->funcs.tunnel_complete (watch, watch->user_data);
2751       } else if (G_UNLIKELY (res != GST_RTSP_OK))
2752         goto error;
2753
2754       if (G_LIKELY (res == GST_RTSP_OK)) {
2755         if (watch->funcs.message_received)
2756           watch->funcs.message_received (watch, &watch->message,
2757               watch->user_data);
2758
2759         gst_rtsp_message_unset (&watch->message);
2760       }
2761       build_reset (&watch->builder);
2762     } while (FALSE);
2763   }
2764
2765   if (watch->writefd.revents & WRITE_COND) {
2766     do {
2767       if (watch->write_data == NULL) {
2768         GstRTSPRec *rec;
2769
2770         /* get a new message from the queue */
2771         rec = g_async_queue_try_pop (watch->messages);
2772         if (rec == NULL)
2773           goto done;
2774
2775         watch->write_off = 0;
2776         watch->write_data = rec->data;
2777         watch->write_size = rec->size;
2778         watch->write_id = rec->id;
2779
2780         g_slice_free (GstRTSPRec, rec);
2781       }
2782
2783       res = write_bytes (watch->writefd.fd, watch->write_data,
2784           &watch->write_off, watch->write_size);
2785       if (res == GST_RTSP_EINTR)
2786         break;
2787       if (G_UNLIKELY (res != GST_RTSP_OK))
2788         goto error;
2789
2790       if (watch->funcs.message_sent)
2791         watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
2792
2793     done:
2794       if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
2795         g_source_remove_poll ((GSource *) watch, &watch->writefd);
2796         watch->write_added = FALSE;
2797         watch->writefd.revents = 0;
2798       }
2799       g_free (watch->write_data);
2800       watch->write_data = NULL;
2801     } while (FALSE);
2802   }
2803
2804   return TRUE;
2805
2806   /* ERRORS */
2807 eof:
2808   {
2809     if (watch->funcs.closed)
2810       watch->funcs.closed (watch, watch->user_data);
2811     return FALSE;
2812   }
2813 error:
2814   {
2815     if (watch->funcs.error)
2816       watch->funcs.error (watch, res, watch->user_data);
2817     return FALSE;
2818   }
2819 }
2820
2821 static void
2822 gst_rtsp_rec_free (gpointer data)
2823 {
2824   GstRTSPRec *rec = data;
2825
2826   g_free (rec->data);
2827   g_slice_free (GstRTSPRec, rec);
2828 }
2829
2830 static void
2831 gst_rtsp_source_finalize (GSource * source)
2832 {
2833   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2834
2835   build_reset (&watch->builder);
2836   gst_rtsp_message_unset (&watch->message);
2837
2838   g_async_queue_unref (watch->messages);
2839   watch->messages = NULL;
2840
2841   g_free (watch->write_data);
2842
2843   if (watch->notify)
2844     watch->notify (watch->user_data);
2845 }
2846
2847 static GSourceFuncs gst_rtsp_source_funcs = {
2848   gst_rtsp_source_prepare,
2849   gst_rtsp_source_check,
2850   gst_rtsp_source_dispatch,
2851   gst_rtsp_source_finalize,
2852   NULL,
2853   NULL
2854 };
2855
2856 /**
2857  * gst_rtsp_watch_new:
2858  * @conn: a #GstRTSPConnection
2859  * @funcs: watch functions
2860  * @user_data: user data to pass to @funcs
2861  * @notify: notify when @user_data is not referenced anymore
2862  *
2863  * Create a watch object for @conn. The functions provided in @funcs will be
2864  * called with @user_data when activity happened on the watch.
2865  *
2866  * The new watch is usually created so that it can be attached to a
2867  * maincontext with gst_rtsp_watch_attach(). 
2868  *
2869  * @conn must exist for the entire lifetime of the watch.
2870  *
2871  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
2872  * communication. Free with gst_rtsp_watch_unref () after usage.
2873  *
2874  * Since: 0.10.23
2875  */
2876 GstRTSPWatch *
2877 gst_rtsp_watch_new (GstRTSPConnection * conn,
2878     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
2879 {
2880   GstRTSPWatch *result;
2881
2882   g_return_val_if_fail (conn != NULL, NULL);
2883   g_return_val_if_fail (funcs != NULL, NULL);
2884   g_return_val_if_fail (conn->readfd != NULL, NULL);
2885   g_return_val_if_fail (conn->writefd != NULL, NULL);
2886
2887   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
2888       sizeof (GstRTSPWatch));
2889
2890   result->conn = conn;
2891   result->builder.state = STATE_START;
2892
2893   result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
2894
2895   result->readfd.fd = -1;
2896   result->writefd.fd = -1;
2897
2898   gst_rtsp_watch_reset (result);
2899
2900   result->funcs = *funcs;
2901   result->user_data = user_data;
2902   result->notify = notify;
2903
2904   /* only add the read fd, the write fd is only added when we have data
2905    * to send. */
2906   g_source_add_poll ((GSource *) result, &result->readfd);
2907
2908   return result;
2909 }
2910
2911 /**
2912  * gst_rtsp_watch_reset:
2913  * @watch: a #GstRTSPWatch
2914  *
2915  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
2916  * when the file descriptors of the connection might have changed.
2917  *
2918  * Since: 0.10.23
2919  */
2920 void
2921 gst_rtsp_watch_reset (GstRTSPWatch * watch)
2922 {
2923   if (watch->readfd.fd != -1)
2924     g_source_remove_poll ((GSource *) watch, &watch->readfd);
2925   if (watch->writefd.fd != -1)
2926     g_source_remove_poll ((GSource *) watch, &watch->writefd);
2927
2928   watch->readfd.fd = watch->conn->readfd->fd;
2929   watch->readfd.events = READ_COND;
2930   watch->readfd.revents = 0;
2931
2932   watch->writefd.fd = watch->conn->writefd->fd;
2933   watch->writefd.events = WRITE_COND;
2934   watch->writefd.revents = 0;
2935   watch->write_added = FALSE;
2936
2937   g_source_add_poll ((GSource *) watch, &watch->readfd);
2938 }
2939
2940 /**
2941  * gst_rtsp_watch_attach:
2942  * @watch: a #GstRTSPWatch
2943  * @context: a GMainContext (if NULL, the default context will be used)
2944  *
2945  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
2946  *
2947  * Returns: the ID (greater than 0) for the watch within the GMainContext. 
2948  *
2949  * Since: 0.10.23
2950  */
2951 guint
2952 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
2953 {
2954   g_return_val_if_fail (watch != NULL, 0);
2955
2956   return g_source_attach ((GSource *) watch, context);
2957 }
2958
2959 /**
2960  * gst_rtsp_watch_unref:
2961  * @watch: a #GstRTSPWatch
2962  *
2963  * Decreases the reference count of @watch by one. If the resulting reference
2964  * count is zero the watch and associated memory will be destroyed.
2965  *
2966  * Since: 0.10.23
2967  */
2968 void
2969 gst_rtsp_watch_unref (GstRTSPWatch * watch)
2970 {
2971   g_return_if_fail (watch != NULL);
2972
2973   g_source_unref ((GSource *) watch);
2974 }
2975
2976 /**
2977  * gst_rtsp_watch_queue_data:
2978  * @watch: a #GstRTSPWatch
2979  * @data: the data to queue
2980  * @size: the size of @data
2981  *
2982  * Queue @data for transmission in @watch. It will be transmitted when the
2983  * connection of the @watch becomes writable.
2984  *
2985  * This function will take ownership of @data and g_free() it after use.
2986  *
2987  * The return value of this function will be used as the id argument in the
2988  * message_sent callback.
2989  *
2990  * Returns: an id.
2991  *
2992  * Since: 0.10.24
2993  */
2994 guint
2995 gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
2996     guint size)
2997 {
2998   GstRTSPRec *rec;
2999
3000   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3001   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
3002   g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
3003
3004   /* make a record with the data and id */
3005   rec = g_slice_new (GstRTSPRec);
3006   rec->data = (guint8 *) data;
3007   rec->size = size;
3008   do {
3009     /* make sure rec->id is never 0 */
3010     rec->id = ++watch->id;
3011   } while (G_UNLIKELY (rec->id == 0));
3012
3013   /* add the record to a queue. FIXME we would like to have an upper limit here */
3014   g_async_queue_push (watch->messages, rec);
3015
3016   /* FIXME: does the following need to be made thread-safe? (this might be
3017    * called from a streaming thread, like appsink's render function) */
3018   /* make sure the main context will now also check for writability on the
3019    * socket */
3020   if (!watch->write_added) {
3021     g_source_add_poll ((GSource *) watch, &watch->writefd);
3022     watch->write_added = TRUE;
3023   }
3024
3025   return rec->id;
3026 }
3027
3028 /**
3029  * gst_rtsp_watch_queue_message:
3030  * @watch: a #GstRTSPWatch
3031  * @message: a #GstRTSPMessage
3032  *
3033  * Queue a @message for transmission in @watch. The contents of this
3034  * message will be serialized and transmitted when the connection of the
3035  * @watch becomes writable.
3036  *
3037  * The return value of this function will be used as the id argument in the
3038  * message_sent callback.
3039  *
3040  * Returns: an id.
3041  *
3042  * Since: 0.10.23
3043  */
3044 guint
3045 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
3046 {
3047   GString *str;
3048   guint size;
3049
3050   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
3051   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
3052
3053   /* make a record with the message as a string and id */
3054   str = message_to_string (watch->conn, message);
3055   size = str->len;
3056   return gst_rtsp_watch_queue_data (watch,
3057       (guint8 *) g_string_free (str, FALSE), size);
3058 }