rtsp: add _get_url method and separate sockets
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim@fluendo.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @short_description: manage RTSP connections
46  * @see_also: gstrtspurl
47  *  
48  * <refsect2>
49  * <para>
50  * This object manages the RTSP connection to the server. It provides function
51  * to receive and send bytes and messages.
52  * </para>
53  * </refsect2>
54  *  
55  * Last reviewed on 2007-07-24 (0.10.14)
56  */
57
58 #ifdef HAVE_CONFIG_H
59 #  include <config.h>
60 #endif
61
62 #include <stdio.h>
63 #include <errno.h>
64 #include <stdlib.h>
65 #include <string.h>
66 #include <time.h>
67
68 #ifdef HAVE_UNISTD_H
69 #include <unistd.h>
70 #endif
71
72
73 /* we include this here to get the G_OS_* defines */
74 #include <glib.h>
75 #include <gst/gst.h>
76
77 #ifdef G_OS_WIN32
78 #include <winsock2.h>
79 #include <ws2tcpip.h>
80 #define EINPROGRESS WSAEINPROGRESS
81 #else
82 #include <sys/ioctl.h>
83 #include <netdb.h>
84 #include <sys/socket.h>
85 #include <netinet/in.h>
86 #include <arpa/inet.h>
87 #include <fcntl.h>
88 #endif
89
90 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
91 #include <sys/filio.h>
92 #endif
93
94 #include "gstrtspconnection.h"
95 #include "gstrtspbase64.h"
96 #include "md5.h"
97
98 #ifdef G_OS_WIN32
99 #define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
100 #define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
101 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
102 #define CLOSE_SOCKET(sock) closesocket (sock)
103 #define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
104 #define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
105 /* According to Microsoft's connect() documentation this one returns
106  * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
107 #define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
108 #else
109 #define READ_SOCKET(fd, buf, len) read (fd, buf, len)
110 #define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
111 #define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
112 #define CLOSE_SOCKET(sock) close (sock)
113 #define ERRNO_IS_EAGAIN (errno == EAGAIN)
114 #define ERRNO_IS_EINTR (errno == EINTR)
115 #define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
116 #endif
117
118 #define ADD_POLLFD(fdset, pfd, fd)       \
119 G_STMT_START {                           \
120   pfd.fd = fd;                           \
121   gst_poll_add_fd (fdset, &pfd);         \
122 } G_STMT_END
123
124 #define REMOVE_POLLFD(fdset, pfd)        \
125 G_STMT_START {                           \
126   if (pfd.fd != -1) {                    \
127     GST_DEBUG ("remove fd %d", pfd.fd);  \
128     gst_poll_remove_fd (fdset, &pfd);    \
129     CLOSE_SOCKET (pfd.fd);               \
130     pfd.fd = -1;                         \
131   }                                      \
132 } G_STMT_END
133
134 struct _GstRTSPConnection
135 {
136   /*< private > */
137   /* URL for the connection */
138   GstRTSPUrl *url;
139
140   /* connection state */
141   GstPollFD fd0;
142   GstPollFD fd1;
143
144   GstPollFD *readfd;
145   GstPollFD *writefd;
146
147   GstPoll *fdset;
148   gchar *ip;
149
150   /* Session state */
151   gint cseq;                    /* sequence number */
152   gchar session_id[512];        /* session id */
153   gint timeout;                 /* session timeout in seconds */
154   GTimer *timer;                /* timeout timer */
155
156   /* Authentication */
157   GstRTSPAuthMethod auth_method;
158   gchar *username;
159   gchar *passwd;
160   GHashTable *auth_params;
161 };
162
163 #ifdef G_OS_WIN32
164 static int
165 inet_aton (const char *c, struct in_addr *paddr)
166 {
167   /* note that inet_addr is deprecated on unix because
168    * inet_addr returns -1 (INADDR_NONE) for the valid 255.255.255.255
169    * address. */
170   paddr->s_addr = inet_addr (c);
171
172   if (paddr->s_addr == INADDR_NONE)
173     return 0;
174
175   return 1;
176 }
177 #endif
178
179 enum
180 {
181   STATE_START = 0,
182   STATE_DATA_HEADER,
183   STATE_DATA_BODY,
184   STATE_READ_LINES,
185   STATE_END,
186   STATE_LAST
187 };
188
189 /* a structure for constructing RTSPMessages */
190 typedef struct
191 {
192   gint state;
193   guint8 buffer[4096];
194   guint offset;
195
196   guint line;
197   guint8 *body_data;
198   glong body_len;
199 } GstRTSPBuilder;
200
201 static void
202 build_reset (GstRTSPBuilder * builder)
203 {
204   g_free (builder->body_data);
205   memset (builder, 0, sizeof (builder));
206 }
207
208 /**
209  * gst_rtsp_connection_create:
210  * @url: a #GstRTSPUrl 
211  * @conn: storage for a #GstRTSPConnection
212  *
213  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
214  * The connection will not yet attempt to connect to @url, use
215  * gst_rtsp_connection_connect().
216  *
217  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
218  */
219 GstRTSPResult
220 gst_rtsp_connection_create (GstRTSPUrl * url, GstRTSPConnection ** conn)
221 {
222   GstRTSPConnection *newconn;
223 #ifdef G_OS_WIN32
224   WSADATA w;
225   int error;
226 #endif
227
228   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
229
230 #ifdef G_OS_WIN32
231   error = WSAStartup (0x0202, &w);
232
233   if (error)
234     goto startup_error;
235
236   if (w.wVersion != 0x0202)
237     goto version_error;
238 #endif
239
240   newconn = g_new0 (GstRTSPConnection, 1);
241
242   if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
243     goto no_fdset;
244
245   newconn->url = url;
246   newconn->fd0.fd = -1;
247   newconn->fd1.fd = -1;
248   newconn->timer = g_timer_new ();
249   newconn->timeout = 60;
250
251   newconn->auth_method = GST_RTSP_AUTH_NONE;
252   newconn->username = NULL;
253   newconn->passwd = NULL;
254   newconn->auth_params = NULL;
255
256   *conn = newconn;
257
258   return GST_RTSP_OK;
259
260   /* ERRORS */
261 #ifdef G_OS_WIN32
262 startup_error:
263   {
264     g_warning ("Error %d on WSAStartup", error);
265     return GST_RTSP_EWSASTART;
266   }
267 version_error:
268   {
269     g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
270         w.wVersion);
271     WSACleanup ();
272     return GST_RTSP_EWSAVERSION;
273   }
274 #endif
275 no_fdset:
276   {
277     g_free (newconn);
278 #ifdef G_OS_WIN32
279     WSACleanup ();
280 #endif
281     return GST_RTSP_ESYS;
282   }
283 }
284
285 /**
286  * gst_rtsp_connection_accept:
287  * @sock: a socket
288  * @conn: storage for a #GstRTSPConnection
289  *
290  * Accept a new connection on @sock and create a new #GstRTSPConnection for
291  * handling communication on new socket.
292  *
293  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
294  *
295  * Since: 0.10.23
296  */
297 GstRTSPResult
298 gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
299 {
300   int fd;
301   unsigned int address_len;
302   GstRTSPConnection *newconn = NULL;
303   struct sockaddr_in address;
304   GstRTSPUrl *url;
305 #ifdef G_OS_WIN32
306   gulong flags = 1;
307 #endif
308
309   address_len = sizeof (address);
310   memset (&address, 0, address_len);
311
312 #ifndef G_OS_WIN32
313   fd = accept (sock, (struct sockaddr *) &address, &address_len);
314 #else
315   fd = accept (sock, (struct sockaddr *) &address, (gint *) & address_len);
316 #endif /* G_OS_WIN32 */
317   if (fd == -1)
318     goto accept_failed;
319
320   /* set to non-blocking mode so that we can cancel the communication */
321 #ifndef G_OS_WIN32
322   fcntl (fd, F_SETFL, O_NONBLOCK);
323 #else
324   ioctlsocket (fd, FIONBIO, &flags);
325 #endif /* G_OS_WIN32 */
326
327   /* create a url for the client address */
328   url = g_new0 (GstRTSPUrl, 1);
329   url->host = g_strdup_printf ("%s", inet_ntoa (address.sin_addr));
330   url->port = address.sin_port;
331
332   /* now create the connection object */
333   gst_rtsp_connection_create (url, &newconn);
334   ADD_POLLFD (newconn->fdset, newconn->fd0, fd);
335
336   newconn->readfd = &newconn->fd0;
337   newconn->writefd = &newconn->fd0;
338
339   *conn = newconn;
340
341   return GST_RTSP_OK;
342
343   /* ERRORS */
344 accept_failed:
345   {
346     return GST_RTSP_ESYS;
347   }
348 }
349
350 /**
351  * gst_rtsp_connection_connect:
352  * @conn: a #GstRTSPConnection 
353  * @timeout: a #GTimeVal timeout
354  *
355  * Attempt to connect to the url of @conn made with
356  * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
357  * forever. If @timeout contains a valid timeout, this function will return
358  * #GST_RTSP_ETIMEOUT after the timeout expired.
359  *
360  * This function can be cancelled with gst_rtsp_connection_flush().
361  *
362  * Returns: #GST_RTSP_OK when a connection could be made.
363  */
364 GstRTSPResult
365 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
366 {
367   gint fd;
368   struct sockaddr_in sa_in;
369   struct hostent *hostinfo;
370   const gchar *ip;
371   struct in_addr addr;
372   gint ret;
373   guint16 port;
374   GstRTSPUrl *url;
375   GstClockTime to;
376   gint retval;
377
378 #ifdef G_OS_WIN32
379   unsigned long flags = 1;
380   struct in_addr *addrp;
381 #else
382   char **addrs;
383   gchar ipbuf[INET_ADDRSTRLEN];
384 #endif /* G_OS_WIN32 */
385
386   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
387   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
388   g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);
389
390   url = conn->url;
391
392   /* first check if it already is an IP address */
393   if (inet_aton (url->host, &addr)) {
394     ip = url->host;
395   } else {
396     hostinfo = gethostbyname (url->host);
397     if (!hostinfo)
398       goto not_resolved;        /* h_errno set */
399
400     if (hostinfo->h_addrtype != AF_INET)
401       goto not_ip;              /* host not an IP host */
402 #ifdef G_OS_WIN32
403     addrp = (struct in_addr *) hostinfo->h_addr_list[0];
404     /* this is not threadsafe */
405     ip = inet_ntoa (*addrp);
406 #else
407     addrs = hostinfo->h_addr_list;
408     ip = inet_ntop (AF_INET, (struct in_addr *) addrs[0], ipbuf,
409         sizeof (ipbuf));
410 #endif /* G_OS_WIN32 */
411   }
412
413   /* get the port from the url */
414   gst_rtsp_url_get_port (url, &port);
415
416   memset (&sa_in, 0, sizeof (sa_in));
417   sa_in.sin_family = AF_INET;   /* network socket */
418   sa_in.sin_port = htons (port);        /* on port */
419   sa_in.sin_addr.s_addr = inet_addr (ip);       /* on host ip */
420
421   fd = socket (AF_INET, SOCK_STREAM, 0);
422   if (fd == -1)
423     goto sys_error;
424
425   /* set to non-blocking mode so that we can cancel the connect */
426 #ifndef G_OS_WIN32
427   fcntl (fd, F_SETFL, O_NONBLOCK);
428 #else
429   ioctlsocket (fd, FIONBIO, &flags);
430 #endif /* G_OS_WIN32 */
431
432   /* add the socket to our fdset */
433   ADD_POLLFD (conn->fdset, conn->fd0, fd);
434
435   conn->readfd = &conn->fd0;
436   conn->writefd = &conn->fd0;
437
438   /* we are going to connect ASYNC now */
439   ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in));
440   if (ret == 0)
441     goto done;
442   if (!ERRNO_IS_EINPROGRESS)
443     goto sys_error;
444
445   /* wait for connect to complete up to the specified timeout or until we got
446    * interrupted. */
447   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
448
449   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
450
451   do {
452     retval = gst_poll_wait (conn->fdset, to);
453   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
454
455   if (retval == 0)
456     goto timeout;
457   else if (retval == -1)
458     goto sys_error;
459
460   /* we can still have an error connecting on windows */
461   if (gst_poll_fd_has_error (conn->fdset, conn->writefd)) {
462     socklen_t len = sizeof (errno);
463 #ifndef G_OS_WIN32
464     getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
465 #else
466     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
467 #endif
468     goto sys_error;
469   }
470
471   gst_poll_fd_ignored (conn->fdset, conn->writefd);
472   gst_poll_fd_ignored (conn->fdset, conn->readfd);
473
474 done:
475   conn->ip = g_strdup (ip);
476
477   return GST_RTSP_OK;
478
479 sys_error:
480   {
481     GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
482     REMOVE_POLLFD (conn->fdset, conn->fd0);
483     REMOVE_POLLFD (conn->fdset, conn->fd1);
484     return GST_RTSP_ESYS;
485   }
486 not_resolved:
487   {
488     GST_ERROR ("could not resolve %s", url->host);
489     return GST_RTSP_ENET;
490   }
491 not_ip:
492   {
493     GST_ERROR ("not an IP address");
494     return GST_RTSP_ENOTIP;
495   }
496 timeout:
497   {
498     GST_ERROR ("timeout");
499     REMOVE_POLLFD (conn->fdset, conn->fd0);
500     REMOVE_POLLFD (conn->fdset, conn->fd1);
501     return GST_RTSP_ETIMEOUT;
502   }
503 }
504
505 static void
506 md5_digest_to_hex_string (unsigned char digest[16], char string[33])
507 {
508   static const char hexdigits[] = "0123456789abcdef";
509   int i;
510
511   for (i = 0; i < 16; i++) {
512     string[i * 2] = hexdigits[(digest[i] >> 4) & 0x0f];
513     string[i * 2 + 1] = hexdigits[digest[i] & 0x0f];
514   }
515   string[32] = 0;
516 }
517
518 static void
519 auth_digest_compute_hex_urp (const gchar * username,
520     const gchar * realm, const gchar * password, gchar hex_urp[33])
521 {
522   struct MD5Context md5_context;
523   unsigned char digest[16];
524
525   MD5Init (&md5_context);
526   MD5Update (&md5_context, username, strlen (username));
527   MD5Update (&md5_context, ":", 1);
528   MD5Update (&md5_context, realm, strlen (realm));
529   MD5Update (&md5_context, ":", 1);
530   MD5Update (&md5_context, password, strlen (password));
531   MD5Final (digest, &md5_context);
532   md5_digest_to_hex_string (digest, hex_urp);
533 }
534
535 static void
536 auth_digest_compute_response (const gchar * method,
537     const gchar * uri, const gchar * hex_a1, const gchar * nonce,
538     gchar response[33])
539 {
540   char hex_a2[33];
541   struct MD5Context md5_context;
542   unsigned char digest[16];
543
544   /* compute A2 */
545   MD5Init (&md5_context);
546   MD5Update (&md5_context, method, strlen (method));
547   MD5Update (&md5_context, ":", 1);
548   MD5Update (&md5_context, uri, strlen (uri));
549   MD5Final (digest, &md5_context);
550   md5_digest_to_hex_string (digest, hex_a2);
551
552   /* compute KD */
553   MD5Init (&md5_context);
554   MD5Update (&md5_context, hex_a1, strlen (hex_a1));
555   MD5Update (&md5_context, ":", 1);
556   MD5Update (&md5_context, nonce, strlen (nonce));
557   MD5Update (&md5_context, ":", 1);
558
559   MD5Update (&md5_context, hex_a2, 32);
560   MD5Final (digest, &md5_context);
561   md5_digest_to_hex_string (digest, response);
562 }
563
564 static void
565 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
566 {
567   switch (conn->auth_method) {
568     case GST_RTSP_AUTH_BASIC:{
569       gchar *user_pass =
570           g_strdup_printf ("%s:%s", conn->username, conn->passwd);
571       gchar *user_pass64 =
572           gst_rtsp_base64_encode (user_pass, strlen (user_pass));
573       gchar *auth_string = g_strdup_printf ("Basic %s", user_pass64);
574
575       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
576           auth_string);
577
578       g_free (user_pass);
579       g_free (user_pass64);
580       break;
581     }
582     case GST_RTSP_AUTH_DIGEST:{
583       gchar response[33], hex_urp[33];
584       gchar *auth_string, *auth_string2;
585       gchar *realm;
586       gchar *nonce;
587       gchar *opaque;
588       const gchar *uri;
589       const gchar *method;
590
591       /* we need to have some params set */
592       if (conn->auth_params == NULL)
593         break;
594
595       /* we need the realm and nonce */
596       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
597       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
598       if (realm == NULL || nonce == NULL)
599         break;
600
601       auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
602           hex_urp);
603
604       method = gst_rtsp_method_as_text (message->type_data.request.method);
605       uri = message->type_data.request.uri;
606
607       /* Assume no qop, algorithm=md5, stale=false */
608       /* For algorithm MD5, a1 = urp. */
609       auth_digest_compute_response (method, uri, hex_urp, nonce, response);
610       auth_string = g_strdup_printf ("Digest username=\"%s\", "
611           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
612           conn->username, realm, nonce, uri, response);
613
614       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
615       if (opaque) {
616         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
617             opaque);
618         g_free (auth_string);
619         auth_string = auth_string2;
620       }
621       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
622           auth_string);
623       break;
624     }
625     default:
626       /* Nothing to do */
627       break;
628   }
629 }
630
631 static void
632 add_date_header (GstRTSPMessage * message)
633 {
634   GTimeVal tv;
635   gchar date_string[100];
636   time_t t;
637
638 #ifdef HAVE_GMTIME_R
639   struct tm tm_;
640 #endif
641
642   g_get_current_time (&tv);
643   t = (time_t) tv.tv_sec;
644
645 #ifdef HAVE_GMTIME_R
646   strftime (date_string, sizeof (date_string), "%a, %d %b %Y %H:%M:%S GMT",
647       gmtime_r (&t, &tm_));
648 #else
649   strftime (date_string, sizeof (date_string), "%a, %d %b %Y %H:%M:%S GMT",
650       gmtime (&t));
651 #endif
652
653   gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
654 }
655
656 static GstRTSPResult
657 write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
658 {
659   guint left;
660
661   if (*idx > size)
662     return GST_RTSP_ERROR;
663
664   left = size - *idx;
665
666   while (left) {
667     gint r;
668
669     r = WRITE_SOCKET (fd, &buffer[*idx], left);
670     if (r == 0) {
671       return GST_RTSP_EINTR;
672     } else if (r < 0) {
673       if (ERRNO_IS_EAGAIN)
674         return GST_RTSP_EINTR;
675       if (!ERRNO_IS_EINTR)
676         return GST_RTSP_ESYS;
677     } else {
678       left -= r;
679       *idx += r;
680     }
681   }
682   return GST_RTSP_OK;
683 }
684
685 static GstRTSPResult
686 read_bytes (gint fd, guint8 * buffer, guint * idx, guint size)
687 {
688   guint left;
689
690   if (*idx > size)
691     return GST_RTSP_ERROR;
692
693   left = size - *idx;
694
695   while (left) {
696     gint r;
697
698     r = READ_SOCKET (fd, &buffer[*idx], left);
699     if (r == 0) {
700       return GST_RTSP_EEOF;
701     } else if (r < 0) {
702       if (ERRNO_IS_EAGAIN)
703         return GST_RTSP_EINTR;
704       if (!ERRNO_IS_EINTR)
705         return GST_RTSP_ESYS;
706     } else {
707       left -= r;
708       *idx += r;
709     }
710   }
711   return GST_RTSP_OK;
712 }
713
714 static GstRTSPResult
715 read_line (gint fd, guint8 * buffer, guint * idx, guint size)
716 {
717   while (TRUE) {
718     guint8 c;
719     gint r;
720
721     r = READ_SOCKET (fd, &c, 1);
722     if (r == 0) {
723       return GST_RTSP_EEOF;
724     } else if (r < 0) {
725       if (ERRNO_IS_EAGAIN)
726         return GST_RTSP_EINTR;
727       if (!ERRNO_IS_EINTR)
728         return GST_RTSP_ESYS;
729     } else {
730       if (c == '\n')            /* end on \n */
731         break;
732       if (c == '\r')            /* ignore \r */
733         continue;
734
735       if (*idx < size - 1)
736         buffer[(*idx)++] = c;
737     }
738   }
739   buffer[*idx] = '\0';
740
741   return GST_RTSP_OK;
742 }
743
744 /**
745  * gst_rtsp_connection_write:
746  * @conn: a #GstRTSPConnection
747  * @data: the data to write
748  * @size: the size of @data
749  * @timeout: a timeout value or #NULL
750  *
751  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
752  * the specified @timeout. @timeout can be #NULL, in which case this function
753  * might block forever.
754  * 
755  * This function can be cancelled with gst_rtsp_connection_flush().
756  *
757  * Returns: #GST_RTSP_OK on success.
758  */
759 GstRTSPResult
760 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
761     guint size, GTimeVal * timeout)
762 {
763   guint offset;
764   gint retval;
765   GstClockTime to;
766   GstRTSPResult res;
767
768   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
769   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
770   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
771
772   gst_poll_set_controllable (conn->fdset, TRUE);
773   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
774   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
775   /* clear all previous poll results */
776   gst_poll_fd_ignored (conn->fdset, conn->writefd);
777   gst_poll_fd_ignored (conn->fdset, conn->readfd);
778
779   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
780
781   offset = 0;
782
783   while (TRUE) {
784     /* try to write */
785     res = write_bytes (conn->writefd->fd, data, &offset, size);
786     if (res == GST_RTSP_OK)
787       break;
788     if (res != GST_RTSP_EINTR)
789       goto write_error;
790
791     /* not all is written, wait until we can write more */
792     do {
793       retval = gst_poll_wait (conn->fdset, to);
794     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
795
796     if (retval == 0)
797       goto timeout;
798
799     if (retval == -1) {
800       if (errno == EBUSY)
801         goto stopped;
802       else
803         goto select_error;
804     }
805   }
806   return GST_RTSP_OK;
807
808   /* ERRORS */
809 timeout:
810   {
811     return GST_RTSP_ETIMEOUT;
812   }
813 select_error:
814   {
815     return GST_RTSP_ESYS;
816   }
817 stopped:
818   {
819     return GST_RTSP_EINTR;
820   }
821 write_error:
822   {
823     return res;
824   }
825 }
826
827 static GString *
828 message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
829 {
830   GString *str = NULL;
831
832   str = g_string_new ("");
833
834   switch (message->type) {
835     case GST_RTSP_MESSAGE_REQUEST:
836       /* create request string, add CSeq */
837       g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
838           "CSeq: %d\r\n",
839           gst_rtsp_method_as_text (message->type_data.request.method),
840           message->type_data.request.uri, conn->cseq++);
841       /* add session id if we have one */
842       if (conn->session_id[0] != '\0') {
843         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
844             conn->session_id);
845       }
846       /* add any authentication headers */
847       add_auth_header (conn, message);
848       break;
849     case GST_RTSP_MESSAGE_RESPONSE:
850       /* create response string */
851       g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
852           message->type_data.response.code, message->type_data.response.reason);
853       break;
854     case GST_RTSP_MESSAGE_DATA:
855     {
856       guint8 data_header[4];
857
858       /* prepare data header */
859       data_header[0] = '$';
860       data_header[1] = message->type_data.data.channel;
861       data_header[2] = (message->body_size >> 8) & 0xff;
862       data_header[3] = message->body_size & 0xff;
863
864       /* create string with header and data */
865       str = g_string_append_len (str, (gchar *) data_header, 4);
866       str =
867           g_string_append_len (str, (gchar *) message->body,
868           message->body_size);
869       break;
870     }
871     default:
872       g_string_free (str, TRUE);
873       g_return_val_if_reached (NULL);
874       break;
875   }
876
877   /* append headers and body */
878   if (message->type != GST_RTSP_MESSAGE_DATA) {
879     /* add date header */
880     add_date_header (message);
881
882     /* append headers */
883     gst_rtsp_message_append_headers (message, str);
884
885     /* append Content-Length and body if needed */
886     if (message->body != NULL && message->body_size > 0) {
887       gchar *len;
888
889       len = g_strdup_printf ("%d", message->body_size);
890       g_string_append_printf (str, "%s: %s\r\n",
891           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
892       g_free (len);
893       /* header ends here */
894       g_string_append (str, "\r\n");
895       str =
896           g_string_append_len (str, (gchar *) message->body,
897           message->body_size);
898     } else {
899       /* just end headers */
900       g_string_append (str, "\r\n");
901     }
902   }
903
904   return str;
905 }
906
907 /**
908  * gst_rtsp_connection_send:
909  * @conn: a #GstRTSPConnection
910  * @message: the message to send
911  * @timeout: a timeout value or #NULL
912  *
913  * Attempt to send @message to the connected @conn, blocking up to
914  * the specified @timeout. @timeout can be #NULL, in which case this function
915  * might block forever.
916  * 
917  * This function can be cancelled with gst_rtsp_connection_flush().
918  *
919  * Returns: #GST_RTSP_OK on success.
920  */
921 GstRTSPResult
922 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
923     GTimeVal * timeout)
924 {
925   GString *str = NULL;
926   GstRTSPResult res;
927
928   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
929   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
930
931   if (!(str = message_to_string (conn, message)))
932     goto no_message;
933
934   /* write request */
935   res =
936       gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len, timeout);
937
938   g_string_free (str, TRUE);
939
940   return res;
941
942 no_message:
943   {
944     g_warning ("Wrong message");
945     return GST_RTSP_EINVAL;
946   }
947 }
948
949 static void
950 parse_string (gchar * dest, gint size, gchar ** src)
951 {
952   gint idx;
953
954   idx = 0;
955   /* skip spaces */
956   while (g_ascii_isspace (**src))
957     (*src)++;
958
959   while (!g_ascii_isspace (**src) && **src != '\0') {
960     if (idx < size - 1)
961       dest[idx++] = **src;
962     (*src)++;
963   }
964   if (size > 0)
965     dest[idx] = '\0';
966 }
967
968 static void
969 parse_key (gchar * dest, gint size, gchar ** src)
970 {
971   gint idx;
972
973   idx = 0;
974   while (**src != ':' && **src != '\0') {
975     if (idx < size - 1)
976       dest[idx++] = **src;
977     (*src)++;
978   }
979   if (size > 0)
980     dest[idx] = '\0';
981 }
982
983 static GstRTSPResult
984 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
985 {
986   GstRTSPResult res;
987   gchar versionstr[20];
988   gchar codestr[4];
989   gint code;
990   gchar *bptr;
991
992   bptr = (gchar *) buffer;
993
994   parse_string (versionstr, sizeof (versionstr), &bptr);
995   parse_string (codestr, sizeof (codestr), &bptr);
996   code = atoi (codestr);
997
998   while (g_ascii_isspace (*bptr))
999     bptr++;
1000
1001   if (strcmp (versionstr, "RTSP/1.0") == 0)
1002     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1003         parse_error);
1004   else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1005     GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
1006         parse_error);
1007     msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
1008   } else
1009     goto parse_error;
1010
1011   return GST_RTSP_OK;
1012
1013 parse_error:
1014   {
1015     return GST_RTSP_EPARSE;
1016   }
1017 }
1018
1019 static GstRTSPResult
1020 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
1021 {
1022   GstRTSPResult res = GST_RTSP_OK;
1023   gchar versionstr[20];
1024   gchar methodstr[20];
1025   gchar urlstr[4096];
1026   gchar *bptr;
1027   GstRTSPMethod method;
1028
1029   bptr = (gchar *) buffer;
1030
1031   parse_string (methodstr, sizeof (methodstr), &bptr);
1032   method = gst_rtsp_find_method (methodstr);
1033
1034   parse_string (urlstr, sizeof (urlstr), &bptr);
1035   if (*urlstr == '\0')
1036     res = GST_RTSP_EPARSE;
1037
1038   parse_string (versionstr, sizeof (versionstr), &bptr);
1039
1040   if (*bptr != '\0')
1041     res = GST_RTSP_EPARSE;
1042
1043   if (strcmp (versionstr, "RTSP/1.0") == 0) {
1044     if (gst_rtsp_message_init_request (msg, method, urlstr) != GST_RTSP_OK)
1045       res = GST_RTSP_EPARSE;
1046   } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
1047     if (gst_rtsp_message_init_request (msg, method, urlstr) != GST_RTSP_OK)
1048       res = GST_RTSP_EPARSE;
1049     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1050   } else {
1051     gst_rtsp_message_init_request (msg, method, urlstr);
1052     msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
1053     res = GST_RTSP_EPARSE;
1054   }
1055
1056   return res;
1057 }
1058
1059 /* parsing lines means reading a Key: Value pair */
1060 static GstRTSPResult
1061 parse_line (guint8 * buffer, GstRTSPMessage * msg)
1062 {
1063   gchar key[32];
1064   gchar *bptr;
1065   GstRTSPHeaderField field;
1066
1067   bptr = (gchar *) buffer;
1068
1069   /* read key */
1070   parse_key (key, sizeof (key), &bptr);
1071   if (*bptr != ':')
1072     goto no_column;
1073
1074   bptr++;
1075
1076   field = gst_rtsp_find_header_field (key);
1077   if (field != GST_RTSP_HDR_INVALID) {
1078     while (g_ascii_isspace (*bptr))
1079       bptr++;
1080     gst_rtsp_message_add_header (msg, field, bptr);
1081   }
1082
1083   return GST_RTSP_OK;
1084
1085 no_column:
1086   {
1087     return GST_RTSP_EPARSE;
1088   }
1089 }
1090
1091 /* returns:
1092  *  GST_RTSP_OK when a complete message was read.
1093  *  GST_RTSP_EEOF: when the socket is closed
1094  *  GST_RTSP_EINTR: when more data is needed.
1095  *  GST_RTSP_..: some other error occured.
1096  */
1097 static GstRTSPResult
1098 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
1099     GstRTSPConnection * conn)
1100 {
1101   GstRTSPResult res;
1102
1103   while (TRUE) {
1104     switch (builder->state) {
1105       case STATE_START:
1106         builder->offset = 0;
1107         res =
1108             read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
1109             &builder->offset, 1);
1110         if (res != GST_RTSP_OK)
1111           goto done;
1112
1113         /* we have 1 bytes now and we can see if this is a data message or
1114          * not */
1115         if (builder->buffer[0] == '$') {
1116           /* data message, prepare for the header */
1117           builder->state = STATE_DATA_HEADER;
1118         } else {
1119           builder->line = 0;
1120           builder->state = STATE_READ_LINES;
1121         }
1122         break;
1123       case STATE_DATA_HEADER:
1124       {
1125         res =
1126             read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
1127             &builder->offset, 4);
1128         if (res != GST_RTSP_OK)
1129           goto done;
1130
1131         gst_rtsp_message_init_data (message, builder->buffer[1]);
1132
1133         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
1134         builder->body_data = g_malloc (builder->body_len + 1);
1135         builder->body_data[builder->body_len] = '\0';
1136         builder->offset = 0;
1137         builder->state = STATE_DATA_BODY;
1138         break;
1139       }
1140       case STATE_DATA_BODY:
1141       {
1142         res =
1143             read_bytes (conn->readfd->fd, builder->body_data, &builder->offset,
1144             builder->body_len);
1145         if (res != GST_RTSP_OK)
1146           goto done;
1147
1148         /* we have the complete body now, store in the message adjusting the
1149          * length to include the traling '\0' */
1150         gst_rtsp_message_take_body (message,
1151             (guint8 *) builder->body_data, builder->body_len + 1);
1152         builder->body_data = NULL;
1153         builder->body_len = 0;
1154
1155         builder->state = STATE_END;
1156         break;
1157       }
1158       case STATE_READ_LINES:
1159       {
1160         res = read_line (conn->readfd->fd, builder->buffer, &builder->offset,
1161             sizeof (builder->buffer));
1162         if (res != GST_RTSP_OK)
1163           goto done;
1164
1165         /* we have a regular response */
1166         if (builder->buffer[0] == '\r') {
1167           builder->buffer[0] = '\0';
1168         }
1169
1170         if (builder->buffer[0] == '\0') {
1171           gchar *hdrval;
1172
1173           /* empty line, end of message header */
1174           /* see if there is a Content-Length header */
1175           if (gst_rtsp_message_get_header (message,
1176                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
1177             /* there is, prepare to read the body */
1178             builder->body_len = atol (hdrval);
1179             builder->body_data = g_malloc (builder->body_len + 1);
1180             builder->body_data[builder->body_len] = '\0';
1181             builder->offset = 0;
1182             builder->state = STATE_DATA_BODY;
1183           } else {
1184             builder->state = STATE_END;
1185           }
1186           break;
1187         }
1188
1189         /* we have a line */
1190         if (builder->line == 0) {
1191           /* first line, check for response status */
1192           if (memcmp (builder->buffer, "RTSP", 4) == 0) {
1193             res = parse_response_status (builder->buffer, message);
1194           } else {
1195             res = parse_request_line (builder->buffer, message);
1196           }
1197         } else {
1198           /* else just parse the line */
1199           parse_line (builder->buffer, message);
1200         }
1201         builder->line++;
1202         builder->offset = 0;
1203         break;
1204       }
1205       case STATE_END:
1206       {
1207         gchar *session_id;
1208
1209         if (message->type == GST_RTSP_MESSAGE_DATA) {
1210           /* data messages don't have headers */
1211           res = GST_RTSP_OK;
1212           goto done;
1213         }
1214
1215         /* save session id in the connection for further use */
1216         if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
1217                 &session_id, 0) == GST_RTSP_OK) {
1218           gint maxlen, i;
1219
1220           maxlen = sizeof (conn->session_id) - 1;
1221           /* the sessionid can have attributes marked with ;
1222            * Make sure we strip them */
1223           for (i = 0; session_id[i] != '\0'; i++) {
1224             if (session_id[i] == ';') {
1225               maxlen = i;
1226               /* parse timeout */
1227               do {
1228                 i++;
1229               } while (g_ascii_isspace (session_id[i]));
1230               if (g_str_has_prefix (&session_id[i], "timeout=")) {
1231                 gint to;
1232
1233                 /* if we parsed something valid, configure */
1234                 if ((to = atoi (&session_id[i + 9])) > 0)
1235                   conn->timeout = to;
1236               }
1237               break;
1238             }
1239           }
1240
1241           /* make sure to not overflow */
1242           strncpy (conn->session_id, session_id, maxlen);
1243           conn->session_id[maxlen] = '\0';
1244         }
1245         res = GST_RTSP_OK;
1246         goto done;
1247       }
1248       default:
1249         res = GST_RTSP_ERROR;
1250         break;
1251     }
1252   }
1253 done:
1254   return res;
1255 }
1256
1257 /**
1258  * gst_rtsp_connection_read:
1259  * @conn: a #GstRTSPConnection
1260  * @data: the data to read
1261  * @size: the size of @data
1262  * @timeout: a timeout value or #NULL
1263  *
1264  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
1265  * the specified @timeout. @timeout can be #NULL, in which case this function
1266  * might block forever.
1267  *
1268  * This function can be cancelled with gst_rtsp_connection_flush().
1269  *
1270  * Returns: #GST_RTSP_OK on success.
1271  */
1272 GstRTSPResult
1273 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
1274     GTimeVal * timeout)
1275 {
1276   guint offset;
1277   gint retval;
1278   GstClockTime to;
1279   GstRTSPResult res;
1280
1281   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1282   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
1283   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1284
1285   if (size == 0)
1286     return GST_RTSP_OK;
1287
1288   offset = 0;
1289
1290   /* configure timeout if any */
1291   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1292
1293   gst_poll_set_controllable (conn->fdset, TRUE);
1294   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1295   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1296
1297   while (TRUE) {
1298     res = read_bytes (conn->readfd->fd, data, &offset, size);
1299     if (res == GST_RTSP_EEOF)
1300       goto eof;
1301     if (res == GST_RTSP_OK)
1302       break;
1303     if (res != GST_RTSP_EINTR)
1304       goto read_error;
1305
1306     do {
1307       retval = gst_poll_wait (conn->fdset, to);
1308     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1309
1310     /* check for timeout */
1311     if (retval == 0)
1312       goto select_timeout;
1313
1314     if (retval == -1) {
1315       if (errno == EBUSY)
1316         goto stopped;
1317       else
1318         goto select_error;
1319     }
1320     gst_poll_set_controllable (conn->fdset, FALSE);
1321   }
1322   return GST_RTSP_OK;
1323
1324   /* ERRORS */
1325 select_error:
1326   {
1327     return GST_RTSP_ESYS;
1328   }
1329 select_timeout:
1330   {
1331     return GST_RTSP_ETIMEOUT;
1332   }
1333 stopped:
1334   {
1335     return GST_RTSP_EINTR;
1336   }
1337 eof:
1338   {
1339     return GST_RTSP_EEOF;
1340   }
1341 read_error:
1342   {
1343     return res;
1344   }
1345 }
1346
1347
1348 /**
1349  * gst_rtsp_connection_receive:
1350  * @conn: a #GstRTSPConnection
1351  * @message: the message to read
1352  * @timeout: a timeout value or #NULL
1353  *
1354  * Attempt to read into @message from the connected @conn, blocking up to
1355  * the specified @timeout. @timeout can be #NULL, in which case this function
1356  * might block forever.
1357  * 
1358  * This function can be cancelled with gst_rtsp_connection_flush().
1359  *
1360  * Returns: #GST_RTSP_OK on success.
1361  */
1362 GstRTSPResult
1363 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
1364     GTimeVal * timeout)
1365 {
1366   GstRTSPResult res;
1367   GstRTSPBuilder builder = { 0 };
1368   gint retval;
1369   GstClockTime to;
1370
1371   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1372   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1373   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1374
1375   /* configure timeout if any */
1376   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1377
1378   gst_poll_set_controllable (conn->fdset, TRUE);
1379   gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
1380   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
1381
1382   while (TRUE) {
1383     res = build_next (&builder, message, conn);
1384     if (res == GST_RTSP_EEOF)
1385       goto eof;
1386     if (res == GST_RTSP_OK)
1387       break;
1388     if (res != GST_RTSP_EINTR)
1389       goto read_error;
1390
1391     do {
1392       retval = gst_poll_wait (conn->fdset, to);
1393     } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1394
1395     /* check for timeout */
1396     if (retval == 0)
1397       goto select_timeout;
1398
1399     if (retval == -1) {
1400       if (errno == EBUSY)
1401         goto stopped;
1402       else
1403         goto select_error;
1404     }
1405     gst_poll_set_controllable (conn->fdset, FALSE);
1406   }
1407
1408   /* we have a message here */
1409   build_reset (&builder);
1410
1411   return GST_RTSP_OK;
1412
1413   /* ERRORS */
1414 select_error:
1415   {
1416     res = GST_RTSP_ESYS;
1417     goto cleanup;
1418   }
1419 select_timeout:
1420   {
1421     res = GST_RTSP_ETIMEOUT;
1422     goto cleanup;
1423   }
1424 stopped:
1425   {
1426     res = GST_RTSP_EINTR;
1427     goto cleanup;
1428   }
1429 eof:
1430   {
1431     res = GST_RTSP_EEOF;
1432     goto cleanup;
1433   }
1434 read_error:
1435 cleanup:
1436   {
1437     build_reset (&builder);
1438     gst_rtsp_message_unset (message);
1439     return res;
1440   }
1441 }
1442
1443 /**
1444  * gst_rtsp_connection_close:
1445  * @conn: a #GstRTSPConnection
1446  *
1447  * Close the connected @conn.
1448  * 
1449  * Returns: #GST_RTSP_OK on success.
1450  */
1451 GstRTSPResult
1452 gst_rtsp_connection_close (GstRTSPConnection * conn)
1453 {
1454   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1455
1456   g_free (conn->ip);
1457   conn->ip = NULL;
1458
1459   REMOVE_POLLFD (conn->fdset, conn->fd0);
1460   REMOVE_POLLFD (conn->fdset, conn->fd1);
1461   conn->writefd = NULL;
1462   conn->readfd = NULL;
1463
1464   return GST_RTSP_OK;
1465 }
1466
1467 /**
1468  * gst_rtsp_connection_free:
1469  * @conn: a #GstRTSPConnection
1470  *
1471  * Close and free @conn.
1472  * 
1473  * Returns: #GST_RTSP_OK on success.
1474  */
1475 GstRTSPResult
1476 gst_rtsp_connection_free (GstRTSPConnection * conn)
1477 {
1478   GstRTSPResult res;
1479
1480   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1481
1482   res = gst_rtsp_connection_close (conn);
1483   gst_poll_free (conn->fdset);
1484   g_timer_destroy (conn->timer);
1485   g_free (conn->username);
1486   g_free (conn->passwd);
1487   gst_rtsp_connection_clear_auth_params (conn);
1488   g_free (conn);
1489 #ifdef G_OS_WIN32
1490   WSACleanup ();
1491 #endif
1492
1493   return res;
1494 }
1495
1496 /**
1497  * gst_rtsp_connection_poll:
1498  * @conn: a #GstRTSPConnection
1499  * @events: a bitmask of #GstRTSPEvent flags to check
1500  * @revents: location for result flags 
1501  * @timeout: a timeout
1502  *
1503  * Wait up to the specified @timeout for the connection to become available for
1504  * at least one of the operations specified in @events. When the function returns
1505  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
1506  * @conn.
1507  *
1508  * @timeout can be #NULL, in which case this function might block forever.
1509  *
1510  * This function can be cancelled with gst_rtsp_connection_flush().
1511  * 
1512  * Returns: #GST_RTSP_OK on success.
1513  *
1514  * Since: 0.10.15
1515  */
1516 GstRTSPResult
1517 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
1518     GstRTSPEvent * revents, GTimeVal * timeout)
1519 {
1520   GstClockTime to;
1521   gint retval;
1522
1523   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1524   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
1525   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
1526   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1527   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1528
1529   gst_poll_set_controllable (conn->fdset, TRUE);
1530
1531   /* add fd to writer set when asked to */
1532   gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
1533       events & GST_RTSP_EV_WRITE);
1534
1535   /* add fd to reader set when asked to */
1536   gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);
1537
1538   /* configure timeout if any */
1539   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
1540
1541   do {
1542     retval = gst_poll_wait (conn->fdset, to);
1543   } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
1544
1545   if (retval == 0)
1546     goto select_timeout;
1547
1548   if (retval == -1) {
1549     if (errno == EBUSY)
1550       goto stopped;
1551     else
1552       goto select_error;
1553   }
1554
1555   *revents = 0;
1556   if (events & GST_RTSP_EV_READ) {
1557     if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
1558       *revents |= GST_RTSP_EV_READ;
1559   }
1560   if (events & GST_RTSP_EV_WRITE) {
1561     if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
1562       *revents |= GST_RTSP_EV_WRITE;
1563   }
1564   return GST_RTSP_OK;
1565
1566   /* ERRORS */
1567 select_timeout:
1568   {
1569     return GST_RTSP_ETIMEOUT;
1570   }
1571 select_error:
1572   {
1573     return GST_RTSP_ESYS;
1574   }
1575 stopped:
1576   {
1577     return GST_RTSP_EINTR;
1578   }
1579 }
1580
1581 /**
1582  * gst_rtsp_connection_next_timeout:
1583  * @conn: a #GstRTSPConnection
1584  * @timeout: a timeout
1585  *
1586  * Calculate the next timeout for @conn, storing the result in @timeout.
1587  * 
1588  * Returns: #GST_RTSP_OK.
1589  */
1590 GstRTSPResult
1591 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
1592 {
1593   gdouble elapsed;
1594   glong sec;
1595   gulong usec;
1596
1597   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1598   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
1599
1600   elapsed = g_timer_elapsed (conn->timer, &usec);
1601   if (elapsed >= conn->timeout) {
1602     sec = 0;
1603     usec = 0;
1604   } else {
1605     sec = conn->timeout - elapsed;
1606   }
1607
1608   timeout->tv_sec = sec;
1609   timeout->tv_usec = usec;
1610
1611   return GST_RTSP_OK;
1612 }
1613
1614 /**
1615  * gst_rtsp_connection_reset_timeout:
1616  * @conn: a #GstRTSPConnection
1617  *
1618  * Reset the timeout of @conn.
1619  * 
1620  * Returns: #GST_RTSP_OK.
1621  */
1622 GstRTSPResult
1623 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
1624 {
1625   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1626
1627   g_timer_start (conn->timer);
1628
1629   return GST_RTSP_OK;
1630 }
1631
1632 /**
1633  * gst_rtsp_connection_flush:
1634  * @conn: a #GstRTSPConnection
1635  * @flush: start or stop the flush
1636  *
1637  * Start or stop the flushing action on @conn. When flushing, all current
1638  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
1639  * is set to non-flushing mode again.
1640  * 
1641  * Returns: #GST_RTSP_OK.
1642  */
1643 GstRTSPResult
1644 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
1645 {
1646   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1647
1648   gst_poll_set_flushing (conn->fdset, flush);
1649
1650   return GST_RTSP_OK;
1651 }
1652
1653 /**
1654  * gst_rtsp_connection_set_auth:
1655  * @conn: a #GstRTSPConnection
1656  * @method: authentication method
1657  * @user: the user
1658  * @pass: the password
1659  *
1660  * Configure @conn for authentication mode @method with @user and @pass as the
1661  * user and password respectively.
1662  * 
1663  * Returns: #GST_RTSP_OK.
1664  */
1665 GstRTSPResult
1666 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
1667     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
1668 {
1669   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1670
1671   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
1672           || g_strrstr (user, ":") != NULL))
1673     return GST_RTSP_EINVAL;
1674
1675   /* Make sure the username and passwd are being set for authentication */
1676   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
1677     return GST_RTSP_EINVAL;
1678
1679   /* ":" chars are not allowed in usernames for basic auth */
1680   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
1681     return GST_RTSP_EINVAL;
1682
1683   g_free (conn->username);
1684   g_free (conn->passwd);
1685
1686   conn->auth_method = method;
1687   conn->username = g_strdup (user);
1688   conn->passwd = g_strdup (pass);
1689
1690   return GST_RTSP_OK;
1691 }
1692
1693 /**
1694  * str_case_hash:
1695  * @key: ASCII string to hash
1696  *
1697  * Hashes @key in a case-insensitive manner.
1698  *
1699  * Returns: the hash code.
1700  **/
1701 static guint
1702 str_case_hash (gconstpointer key)
1703 {
1704   const char *p = key;
1705   guint h = g_ascii_toupper (*p);
1706
1707   if (h)
1708     for (p += 1; *p != '\0'; p++)
1709       h = (h << 5) - h + g_ascii_toupper (*p);
1710
1711   return h;
1712 }
1713
1714 /**
1715  * str_case_equal:
1716  * @v1: an ASCII string
1717  * @v2: another ASCII string
1718  *
1719  * Compares @v1 and @v2 in a case-insensitive manner
1720  *
1721  * Returns: %TRUE if they are equal (modulo case)
1722  **/
1723 static gboolean
1724 str_case_equal (gconstpointer v1, gconstpointer v2)
1725 {
1726   const char *string1 = v1;
1727   const char *string2 = v2;
1728
1729   return g_ascii_strcasecmp (string1, string2) == 0;
1730 }
1731
1732 /**
1733  * gst_rtsp_connection_set_auth_param:
1734  * @conn: a #GstRTSPConnection
1735  * @param: authentication directive
1736  * @value: value
1737  *
1738  * Setup @conn with authentication directives. This is not necesary for
1739  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
1740  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
1741  * in the WWW-Authenticate response header and can include realm, domain,
1742  * nonce, opaque, stale, algorithm, qop as per RFC2617.
1743  * 
1744  * Since: 0.10.20
1745  */
1746 void
1747 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
1748     const gchar * param, const gchar * value)
1749 {
1750   g_return_if_fail (conn != NULL);
1751   g_return_if_fail (param != NULL);
1752
1753   if (conn->auth_params == NULL) {
1754     conn->auth_params =
1755         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
1756   }
1757   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
1758 }
1759
1760 /**
1761  * gst_rtsp_connection_clear_auth_params:
1762  * @conn: a #GstRTSPConnection
1763  *
1764  * Clear the list of authentication directives stored in @conn.
1765  *
1766  * Since: 0.10.20
1767  */
1768 void
1769 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
1770 {
1771   g_return_if_fail (conn != NULL);
1772
1773   if (conn->auth_params != NULL) {
1774     g_hash_table_destroy (conn->auth_params);
1775     conn->auth_params = NULL;
1776   }
1777 }
1778
1779 static GstRTSPResult
1780 set_qos_dscp (gint fd, guint qos_dscp)
1781 {
1782   union gst_sockaddr
1783   {
1784     struct sockaddr sa;
1785     struct sockaddr_in6 sa_in6;
1786     struct sockaddr_storage sa_stor;
1787   } sa;
1788   socklen_t slen = sizeof (sa);
1789   gint af;
1790   gint tos;
1791
1792   if (fd == -1)
1793     return GST_RTSP_OK;
1794
1795   if (getsockname (fd, &sa.sa, &slen) < 0)
1796     goto no_getsockname;
1797
1798   af = sa.sa.sa_family;
1799
1800   /* if this is an IPv4-mapped address then do IPv4 QoS */
1801   if (af == AF_INET6) {
1802     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
1803       af = AF_INET;
1804   }
1805
1806   /* extract and shift 6 bits of the DSCP */
1807   tos = (qos_dscp & 0x3f) << 2;
1808
1809   switch (af) {
1810     case AF_INET:
1811       if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
1812         goto no_setsockopt;
1813       break;
1814     case AF_INET6:
1815 #ifdef IPV6_TCLASS
1816       if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
1817         goto no_setsockopt;
1818       break;
1819 #endif
1820     default:
1821       goto wrong_family;
1822   }
1823
1824   return GST_RTSP_OK;
1825
1826   /* ERRORS */
1827 no_getsockname:
1828 no_setsockopt:
1829   {
1830     return GST_RTSP_ESYS;
1831   }
1832
1833 wrong_family:
1834   {
1835     return GST_RTSP_ERROR;
1836   }
1837 }
1838
1839 /**
1840  * gst_rtsp_connection_set_qos_dscp:
1841  * @conn: a #GstRTSPConnection
1842  * @qos_dscp: DSCP value
1843  *
1844  * Configure @conn to use the specified DSCP value.
1845  *
1846  * Returns: #GST_RTSP_OK on success.
1847  *
1848  * Since: 0.10.20
1849  */
1850 GstRTSPResult
1851 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
1852 {
1853   GstRTSPResult res;
1854
1855   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1856   g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
1857   g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);
1858
1859   res = set_qos_dscp (conn->fd0.fd, qos_dscp);
1860   if (res == GST_RTSP_OK)
1861     res = set_qos_dscp (conn->fd1.fd, qos_dscp);
1862
1863   return res;
1864 }
1865
1866
1867 /**
1868  * gst_rtsp_connection_get_url:
1869  * @conn: a #GstRTSPConnection
1870  *
1871  * Retrieve the URL of the other end of @conn.
1872  *
1873  * Returns: The URL. This value remains valid until the
1874  * connection is closed.
1875  *
1876  * Since: 0.10.23
1877  */
1878 GstRTSPUrl *
1879 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
1880 {
1881   g_return_val_if_fail (conn != NULL, NULL);
1882
1883   return conn->url;
1884 }
1885
1886 /**
1887  * gst_rtsp_connection_get_ip:
1888  * @conn: a #GstRTSPConnection
1889  *
1890  * Retrieve the IP address of the other end of @conn.
1891  *
1892  * Returns: The IP address as a string. this value remains valid until the
1893  * connection is closed.
1894  *
1895  * Since: 0.10.20
1896  */
1897 const gchar *
1898 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
1899 {
1900   g_return_val_if_fail (conn != NULL, NULL);
1901
1902   return conn->ip;
1903 }
1904
1905 #define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
1906 #define WRITE_COND  (G_IO_OUT | G_IO_ERR)
1907
1908 typedef struct
1909 {
1910   GString *str;
1911   guint cseq;
1912 } GstRTSPRec;
1913
1914 /* async functions */
1915 struct _GstRTSPWatch
1916 {
1917   GSource source;
1918
1919   GstRTSPConnection *conn;
1920
1921   GstRTSPBuilder builder;
1922   GstRTSPMessage message;
1923
1924   GPollFD readfd;
1925   GPollFD writefd;
1926   gboolean write_added;
1927
1928   /* queued message for transmission */
1929   GList *messages;
1930   guint8 *write_data;
1931   guint write_off;
1932   guint write_len;
1933   guint write_cseq;
1934
1935   GstRTSPWatchFuncs funcs;
1936
1937   gpointer user_data;
1938   GDestroyNotify notify;
1939 };
1940
1941 static gboolean
1942 gst_rtsp_source_prepare (GSource * source, gint * timeout)
1943 {
1944   GstRTSPWatch *watch = (GstRTSPWatch *) source;
1945
1946   *timeout = (watch->conn->timeout * 1000);
1947
1948   return FALSE;
1949 }
1950
1951 static gboolean
1952 gst_rtsp_source_check (GSource * source)
1953 {
1954   GstRTSPWatch *watch = (GstRTSPWatch *) source;
1955
1956   if (watch->readfd.revents & READ_COND)
1957     return TRUE;
1958
1959   if (watch->writefd.revents & WRITE_COND)
1960     return TRUE;
1961
1962   return FALSE;
1963 }
1964
1965 static gboolean
1966 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
1967     gpointer user_data)
1968 {
1969   GstRTSPWatch *watch = (GstRTSPWatch *) source;
1970   GstRTSPResult res;
1971
1972   /* first read as much as we can */
1973   if (watch->readfd.revents & READ_COND) {
1974     do {
1975       res = build_next (&watch->builder, &watch->message, watch->conn);
1976       if (res == GST_RTSP_EINTR)
1977         break;
1978       if (res == GST_RTSP_EEOF)
1979         goto eof;
1980       if (res != GST_RTSP_OK)
1981         goto error;
1982
1983       if (watch->funcs.message_received)
1984         watch->funcs.message_received (watch, &watch->message,
1985             watch->user_data);
1986
1987       gst_rtsp_message_unset (&watch->message);
1988       build_reset (&watch->builder);
1989     } while (FALSE);
1990   }
1991
1992   if (watch->writefd.revents & WRITE_COND) {
1993     do {
1994       if (watch->write_data == NULL) {
1995         GstRTSPRec *data;
1996
1997         if (!watch->messages)
1998           goto done;
1999
2000         /* no data, get a new message from the queue */
2001         data = watch->messages->data;
2002         watch->messages = g_list_delete_link (watch->messages, watch->messages);
2003
2004         watch->write_off = 0;
2005         watch->write_len = data->str->len;
2006         watch->write_data = (guint8 *) g_string_free (data->str, FALSE);
2007         watch->write_cseq = data->cseq;
2008
2009         g_slice_free (GstRTSPRec, data);
2010       }
2011
2012       res = write_bytes (watch->writefd.fd, watch->write_data,
2013           &watch->write_off, watch->write_len);
2014       if (res == GST_RTSP_EINTR)
2015         break;
2016       if (res != GST_RTSP_OK)
2017         goto error;
2018
2019       if (watch->funcs.message_sent)
2020         watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data);
2021
2022     done:
2023       if (watch->messages == NULL && watch->write_added) {
2024         g_source_remove_poll ((GSource *) watch, &watch->writefd);
2025         watch->write_added = FALSE;
2026         watch->writefd.revents = 0;
2027       }
2028       g_free (watch->write_data);
2029       watch->write_data = NULL;
2030     } while (FALSE);
2031   }
2032
2033   return TRUE;
2034
2035   /* ERRORS */
2036 eof:
2037   {
2038     if (watch->funcs.closed)
2039       watch->funcs.closed (watch, watch->user_data);
2040     return FALSE;
2041   }
2042 error:
2043   {
2044     if (watch->funcs.error)
2045       watch->funcs.error (watch, res, watch->user_data);
2046     return FALSE;
2047   }
2048 }
2049
2050 static void
2051 gst_rtsp_source_finalize (GSource * source)
2052 {
2053   GstRTSPWatch *watch = (GstRTSPWatch *) source;
2054   GList *walk;
2055
2056   build_reset (&watch->builder);
2057
2058   for (walk = watch->messages; walk; walk = g_list_next (walk)) {
2059     GstRTSPRec *data = walk->data;
2060
2061     g_string_free (data->str, TRUE);
2062     g_slice_free (GstRTSPRec, data);
2063   }
2064   g_list_free (watch->messages);
2065   g_free (watch->write_data);
2066
2067   if (watch->notify)
2068     watch->notify (watch->user_data);
2069 }
2070
2071 static GSourceFuncs gst_rtsp_source_funcs = {
2072   gst_rtsp_source_prepare,
2073   gst_rtsp_source_check,
2074   gst_rtsp_source_dispatch,
2075   gst_rtsp_source_finalize
2076 };
2077
2078 /**
2079  * gst_rtsp_watch_new:
2080  * @conn: a #GstRTSPConnection
2081  * @funcs: watch functions
2082  * @user_data: user data to pass to @funcs
2083  *
2084  * Create a watch object for @conn. The functions provided in @funcs will be
2085  * called with @user_data when activity happened on the watch.
2086  *
2087  * The new watch is usually created so that it can be attached to a
2088  * maincontext with gst_rtsp_watch_attach(). 
2089  *
2090  * @conn must exist for the entire lifetime of the watch.
2091  *
2092  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
2093  * communication. Free with gst_rtsp_watch_unref () after usage.
2094  *
2095  * Since: 0.10.23
2096  */
2097 GstRTSPWatch *
2098 gst_rtsp_watch_new (GstRTSPConnection * conn,
2099     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
2100 {
2101   GstRTSPWatch *result;
2102
2103   g_return_val_if_fail (conn != NULL, NULL);
2104   g_return_val_if_fail (funcs != NULL, NULL);
2105   g_return_val_if_fail (conn->readfd != NULL, NULL);
2106   g_return_val_if_fail (conn->writefd != NULL, NULL);
2107
2108   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
2109       sizeof (GstRTSPWatch));
2110
2111   result->conn = conn;
2112   result->builder.state = STATE_START;
2113
2114   result->readfd.fd = conn->readfd->fd;
2115   result->readfd.events = READ_COND;
2116   result->readfd.revents = 0;
2117
2118   result->writefd.fd = conn->writefd->fd;
2119   result->writefd.events = WRITE_COND;
2120   result->writefd.revents = 0;
2121   result->write_added = FALSE;
2122
2123   result->funcs = *funcs;
2124   result->user_data = user_data;
2125   result->notify = notify;
2126
2127   /* only add the read fd, the write fd is only added when we have data
2128    * to send. */
2129   g_source_add_poll ((GSource *) result, &result->readfd);
2130
2131   return result;
2132 }
2133
2134 /**
2135  * gst_rtsp_watch_attach:
2136  * @watch: a #GstRTSPWatch
2137  * @context: a GMainContext (if NULL, the default context will be used)
2138  *
2139  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
2140  *
2141  * Returns: the ID (greater than 0) for the watch within the GMainContext. 
2142  *
2143  * Since: 0.10.23
2144  */
2145 guint
2146 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
2147 {
2148   g_return_val_if_fail (watch != NULL, 0);
2149
2150   return g_source_attach ((GSource *) watch, context);
2151 }
2152
2153 /**
2154  * gst_rtsp_watch_free:
2155  * @watch: a #GstRTSPWatch
2156  *
2157  * Decreases the reference count of @watch by one. If the resulting reference
2158  * count is zero the watch and associated memory will be destroyed.
2159  *
2160  * Since: 0.10.23
2161  */
2162 void
2163 gst_rtsp_watch_unref (GstRTSPWatch * watch)
2164 {
2165   g_return_if_fail (watch != NULL);
2166
2167   g_source_unref ((GSource *) watch);
2168 }
2169
2170 /**
2171  * gst_rtsp_watch_queue_message:
2172  * @watch: a #GstRTSPWatch
2173  * @message: a #GstRTSPMessage
2174  *
2175  * Queue a @message for transmission in @watch. The contents of this 
2176  * message will be serialized and transmitted when the connection of the
2177  * watch becomes writable.
2178  *
2179  * The return value of this function will be returned as the cseq argument in
2180  * the message_sent callback.
2181  *
2182  * Returns: the sequence number of the message or -1 if the cseq could not be
2183  * determined.
2184  *
2185  * Since: 0.10.23
2186  */
2187 guint
2188 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
2189 {
2190   GstRTSPRec *data;
2191   gchar *header;
2192   guint cseq;
2193
2194   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
2195   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2196
2197   /* get the cseq from the message, when we finish writing this message on the
2198    * socket we will have to pass the cseq to the callback. */
2199   if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ, &header,
2200           0) == GST_RTSP_OK) {
2201     cseq = atoi (header);
2202   } else {
2203     cseq = -1;
2204   }
2205
2206   /* make a record with the message as a string ans cseq */
2207   data = g_slice_new (GstRTSPRec);
2208   data->str = message_to_string (watch->conn, message);
2209   data->cseq = cseq;
2210
2211   /* add the record to a queue */
2212   watch->messages = g_list_append (watch->messages, data);
2213
2214   /* make sure the main context will now also check for writability on the
2215    * socket */
2216   if (!watch->write_added) {
2217     g_source_add_poll ((GSource *) watch, &watch->writefd);
2218     watch->write_added = TRUE;
2219   }
2220   return cseq;
2221 }