Use g_memdup2() where available and add fallback for older GLib versions
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @title: GstRTSPConnection
46  * @short_description: manage RTSP connections
47  * @see_also: gstrtspurl
48  *
49  * This object manages the RTSP connection to the server. It provides function
50  * to receive and send bytes and messages.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #  include <config.h>
55 #endif
56
57 #include <stdio.h>
58 #include <errno.h>
59 #include <stdlib.h>
60 #include <string.h>
61 #include <time.h>
62
63 /* we include this here to get the G_OS_* defines */
64 #include <glib.h>
65 #include <gst/gst.h>
66 #include <gst/base/base.h>
67
68 /* necessary for IP_TOS define */
69 #include <gio/gnetworking.h>
70
71 #include "gstrtspconnection.h"
72
73 #ifdef IP_TOS
74 union gst_sockaddr
75 {
76   struct sockaddr sa;
77   struct sockaddr_in sa_in;
78   struct sockaddr_in6 sa_in6;
79   struct sockaddr_storage sa_stor;
80 };
81 #endif
82
83 typedef struct
84 {
85   gint state;
86   guint save;
87   guchar out[3];                /* the size must be evenly divisible by 3 */
88   guint cout;
89   guint coutl;
90 } DecodeCtx;
91
92 typedef struct
93 {
94   /* If %TRUE we only own data and none of the
95    * other fields
96    */
97   gboolean borrowed;
98
99   /* Header or full message */
100   guint8 *data;
101   guint data_size;
102   gboolean data_is_data_header;
103
104   /* Payload following data, if any */
105   guint8 *body_data;
106   guint body_data_size;
107   /* or */
108   GstBuffer *body_buffer;
109
110   /* DATA packet header statically allocated for above */
111   guint8 data_header[4];
112
113   /* all below only for async writing */
114
115   guint data_offset;            /* == data_size when done */
116   guint body_offset;            /* into body_data or the buffer */
117
118   /* ID of the message for notification */
119   guint id;
120 } GstRTSPSerializedMessage;
121
122 static void
123 gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
124 {
125   if (!msg->borrowed) {
126     g_free (msg->body_data);
127     gst_buffer_replace (&msg->body_buffer, NULL);
128   }
129   g_free (msg->data);
130 }
131
132 #ifdef MSG_NOSIGNAL
133 #define SEND_FLAGS MSG_NOSIGNAL
134 #else
135 #define SEND_FLAGS 0
136 #endif
137
138 typedef enum
139 {
140   TUNNEL_STATE_NONE,
141   TUNNEL_STATE_GET,
142   TUNNEL_STATE_POST,
143   TUNNEL_STATE_COMPLETE
144 } GstRTSPTunnelState;
145
146 #define TUNNELID_LEN   24
147
148 struct _GstRTSPConnection
149 {
150   /*< private > */
151   /* URL for the remote connection */
152   GstRTSPUrl *url;
153   GstRTSPVersion version;
154
155   gboolean server;
156   GSocketClient *client;
157   GIOStream *stream0;
158   GIOStream *stream1;
159
160   GInputStream *input_stream;
161   GOutputStream *output_stream;
162   /* this is a read source we add on the write socket in tunneled mode to be
163    * able to detect when client disconnects the GET channel */
164   GInputStream *control_stream;
165
166   /* connection state */
167   GSocket *read_socket;
168   GSocket *write_socket;
169   GSocket *socket0, *socket1;
170   gboolean manual_http;
171   gboolean may_cancel;
172   GCancellable *cancellable;
173
174   gchar tunnelid[TUNNELID_LEN];
175   gboolean tunneled;
176   GstRTSPTunnelState tstate;
177
178   /* the remote and local ip */
179   gchar *remote_ip;
180   gchar *local_ip;
181
182   gint read_ahead;
183
184   gchar *initial_buffer;
185   gsize initial_buffer_offset;
186
187   gboolean remember_session_id; /* remember the session id or not */
188
189   /* Session state */
190   gint cseq;                    /* sequence number */
191   gchar session_id[512];        /* session id */
192   gint timeout;                 /* session timeout in seconds */
193   GTimer *timer;                /* timeout timer */
194
195   /* Authentication */
196   GstRTSPAuthMethod auth_method;
197   gchar *username;
198   gchar *passwd;
199   GHashTable *auth_params;
200
201   guint content_length_limit;
202
203   /* TLS */
204   GTlsDatabase *tls_database;
205   GTlsInteraction *tls_interaction;
206
207   GstRTSPConnectionAcceptCertificateFunc accept_certificate_func;
208   GDestroyNotify accept_certificate_destroy_notify;
209   gpointer accept_certificate_user_data;
210
211   DecodeCtx ctx;
212   DecodeCtx *ctxp;
213
214   gchar *proxy_host;
215   guint proxy_port;
216 };
217
218 enum
219 {
220   STATE_START = 0,
221   STATE_DATA_HEADER,
222   STATE_DATA_BODY,
223   STATE_READ_LINES,
224   STATE_END,
225   STATE_LAST
226 };
227
228 enum
229 {
230   READ_AHEAD_EOH = -1,          /* end of headers */
231   READ_AHEAD_CRLF = -2,
232   READ_AHEAD_CRLFCR = -3
233 };
234
235 /* a structure for constructing RTSPMessages */
236 typedef struct
237 {
238   gint state;
239   GstRTSPResult status;
240   guint8 buffer[4096];
241   guint offset;
242
243   guint line;
244   guint8 *body_data;
245   guint body_len;
246 } GstRTSPBuilder;
247
248 /* function prototypes */
249 static void add_auth_header (GstRTSPConnection * conn,
250     GstRTSPMessage * message);
251
252 static void
253 build_reset (GstRTSPBuilder * builder)
254 {
255   g_free (builder->body_data);
256   memset (builder, 0, sizeof (GstRTSPBuilder));
257 }
258
259 static GstRTSPResult
260 gst_rtsp_result_from_g_io_error (GError * error, GstRTSPResult default_res)
261 {
262   if (error == NULL)
263     return GST_RTSP_OK;
264
265   if (error->domain != G_IO_ERROR)
266     return default_res;
267
268   switch (error->code) {
269     case G_IO_ERROR_TIMED_OUT:
270       return GST_RTSP_ETIMEOUT;
271     case G_IO_ERROR_INVALID_ARGUMENT:
272       return GST_RTSP_EINVAL;
273     case G_IO_ERROR_CANCELLED:
274     case G_IO_ERROR_WOULD_BLOCK:
275       return GST_RTSP_EINTR;
276     default:
277       return default_res;
278   }
279 }
280
281 static gboolean
282 tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
283     GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
284 {
285   GError *error = NULL;
286   gboolean accept = FALSE;
287
288   if (rtspconn->tls_database) {
289     GSocketConnectable *peer_identity;
290     GTlsCertificateFlags validation_flags;
291
292     GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
293
294     peer_identity =
295         g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
296         (conn));
297
298     errors =
299         g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
300         G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
301         g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
302         NULL, &error);
303
304     if (error)
305       goto verify_error;
306
307     validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
308
309     accept = ((errors & validation_flags) == 0);
310     if (accept)
311       GST_DEBUG ("Peer certificate accepted");
312     else
313       GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
314   }
315
316   if (!accept && rtspconn->accept_certificate_func) {
317     accept =
318         rtspconn->accept_certificate_func (conn, peer_cert, errors,
319         rtspconn->accept_certificate_user_data);
320     GST_DEBUG ("Peer certificate %saccepted by accept-certificate function",
321         accept ? "" : "not ");
322   }
323
324   return accept;
325
326 /* ERRORS */
327 verify_error:
328   {
329     GST_ERROR ("An error occurred while verifying the peer certificate: %s",
330         error->message);
331     g_clear_error (&error);
332     return FALSE;
333   }
334 }
335
336 static void
337 socket_client_event (GSocketClient * client, GSocketClientEvent event,
338     GSocketConnectable * connectable, GTlsConnection * connection,
339     GstRTSPConnection * rtspconn)
340 {
341   if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
342     GST_DEBUG ("TLS handshaking about to start...");
343
344     g_signal_connect (connection, "accept-certificate",
345         (GCallback) tls_accept_certificate, rtspconn);
346
347     g_tls_connection_set_interaction (connection, rtspconn->tls_interaction);
348   }
349 }
350
351 /**
352  * gst_rtsp_connection_create:
353  * @url: a #GstRTSPUrl
354  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
355  *
356  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
357  * The connection will not yet attempt to connect to @url, use
358  * gst_rtsp_connection_connect().
359  *
360  * A copy of @url will be made.
361  *
362  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
363  */
364 GstRTSPResult
365 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
366 {
367   GstRTSPConnection *newconn;
368
369   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
370   g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
371
372   newconn = g_new0 (GstRTSPConnection, 1);
373
374   newconn->may_cancel = TRUE;
375   newconn->cancellable = g_cancellable_new ();
376   newconn->client = g_socket_client_new ();
377
378   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
379     g_socket_client_set_tls (newconn->client, TRUE);
380
381   g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
382       newconn);
383
384   newconn->url = gst_rtsp_url_copy (url);
385   newconn->timer = g_timer_new ();
386   newconn->timeout = 60;
387   newconn->cseq = 1;            /* RFC 7826: "it is RECOMMENDED to start at 0.",
388                                    but some servers don't copy values <1 due to bugs. */
389
390   newconn->remember_session_id = TRUE;
391
392   newconn->auth_method = GST_RTSP_AUTH_NONE;
393   newconn->username = NULL;
394   newconn->passwd = NULL;
395   newconn->auth_params = NULL;
396   newconn->version = 0;
397
398   newconn->content_length_limit = G_MAXUINT;
399
400   *conn = newconn;
401
402   return GST_RTSP_OK;
403 }
404
405 static gboolean
406 collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
407     gboolean remote, GError ** error)
408 {
409   GSocketAddress *addr;
410
411   if (remote)
412     addr = g_socket_get_remote_address (socket, error);
413   else
414     addr = g_socket_get_local_address (socket, error);
415   if (!addr)
416     return FALSE;
417
418   if (ip)
419     *ip = g_inet_address_to_string (g_inet_socket_address_get_address
420         (G_INET_SOCKET_ADDRESS (addr)));
421   if (port)
422     *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
423
424   g_object_unref (addr);
425
426   return TRUE;
427 }
428
429
430 /**
431  * gst_rtsp_connection_create_from_socket:
432  * @socket: a #GSocket
433  * @ip: the IP address of the other end
434  * @port: the port used by the other end
435  * @initial_buffer: data already read from @fd
436  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
437  *
438  * Create a new #GstRTSPConnection for handling communication on the existing
439  * socket @socket. The @initial_buffer contains zero terminated data already
440  * read from @socket which should be used before starting to read new data.
441  *
442  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
443  */
444 /* FIXME 2.0 We don't need the ip and port since they can be got from the
445  * GSocket */
446 GstRTSPResult
447 gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
448     guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn)
449 {
450   GstRTSPConnection *newconn = NULL;
451   GstRTSPUrl *url;
452   GstRTSPResult res;
453   GError *err = NULL;
454   gchar *local_ip;
455   GIOStream *stream;
456
457   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
458   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
459   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
460
461   if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
462     goto getnameinfo_failed;
463
464   /* create a url for the client address */
465   url = g_new0 (GstRTSPUrl, 1);
466   url->host = g_strdup (ip);
467   url->port = port;
468
469   /* now create the connection object */
470   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
471   gst_rtsp_url_free (url);
472
473   stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
474
475   /* both read and write initially */
476   newconn->server = TRUE;
477   newconn->socket0 = socket;
478   newconn->stream0 = stream;
479   newconn->write_socket = newconn->read_socket = newconn->socket0;
480   newconn->input_stream = g_io_stream_get_input_stream (stream);
481   newconn->output_stream = g_io_stream_get_output_stream (stream);
482   newconn->control_stream = NULL;
483   newconn->remote_ip = g_strdup (ip);
484   newconn->local_ip = local_ip;
485   newconn->initial_buffer = g_strdup (initial_buffer);
486
487   *conn = newconn;
488
489   return GST_RTSP_OK;
490
491   /* ERRORS */
492 getnameinfo_failed:
493   {
494     GST_ERROR ("failed to get local address: %s", err->message);
495     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
496     g_clear_error (&err);
497     return res;
498   }
499 newconn_failed:
500   {
501     GST_ERROR ("failed to make connection");
502     g_free (local_ip);
503     gst_rtsp_url_free (url);
504     return res;
505   }
506 }
507
508 /**
509  * gst_rtsp_connection_accept:
510  * @socket: a socket
511  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
512  * @cancellable: a #GCancellable to cancel the operation
513  *
514  * Accept a new connection on @socket and create a new #GstRTSPConnection for
515  * handling communication on new socket.
516  *
517  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
518  */
519 GstRTSPResult
520 gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
521     GCancellable * cancellable)
522 {
523   GError *err = NULL;
524   gchar *ip;
525   guint16 port;
526   GSocket *client_sock;
527   GstRTSPResult ret;
528
529   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
530   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
531
532   client_sock = g_socket_accept (socket, cancellable, &err);
533   if (!client_sock)
534     goto accept_failed;
535
536   /* get the remote ip address and port */
537   if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
538     goto getnameinfo_failed;
539
540   ret =
541       gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
542       conn);
543   g_object_unref (client_sock);
544   g_free (ip);
545
546   return ret;
547
548   /* ERRORS */
549 accept_failed:
550   {
551     GST_DEBUG ("Accepting client failed: %s", err->message);
552     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
553     g_clear_error (&err);
554     return ret;
555   }
556 getnameinfo_failed:
557   {
558     GST_DEBUG ("getnameinfo failed: %s", err->message);
559     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
560     g_clear_error (&err);
561     if (!g_socket_close (client_sock, &err)) {
562       GST_DEBUG ("Closing socket failed: %s", err->message);
563       g_clear_error (&err);
564     }
565     g_object_unref (client_sock);
566     return ret;
567   }
568 }
569
570 /**
571  * gst_rtsp_connection_get_tls:
572  * @conn: a #GstRTSPConnection
573  * @error: #GError for error reporting, or NULL to ignore.
574  *
575  * Get the TLS connection of @conn.
576  *
577  * For client side this will return the #GTlsClientConnection when connected
578  * over TLS.
579  *
580  * For server side connections, this function will create a GTlsServerConnection
581  * when called the first time and will return that same connection on subsequent
582  * calls. The server is then responsible for configuring the TLS connection.
583  *
584  * Returns: (transfer none): the TLS connection for @conn.
585  *
586  * Since: 1.2
587  */
588 GTlsConnection *
589 gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
590 {
591   GTlsConnection *result;
592
593   if (G_IS_TLS_CONNECTION (conn->stream0)) {
594     /* we already had one, return it */
595     result = G_TLS_CONNECTION (conn->stream0);
596   } else if (conn->server) {
597     /* no TLS connection but we are server, make one */
598     result = (GTlsConnection *)
599         g_tls_server_connection_new (conn->stream0, NULL, error);
600     if (result) {
601       g_object_unref (conn->stream0);
602       conn->stream0 = G_IO_STREAM (result);
603       conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
604       conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
605     }
606   } else {
607     /* client */
608     result = NULL;
609     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
610         "client not connected with TLS");
611   }
612   return result;
613 }
614
615 /**
616  * gst_rtsp_connection_set_tls_validation_flags:
617  * @conn: a #GstRTSPConnection
618  * @flags: the validation flags.
619  *
620  * Sets the TLS validation flags to be used to verify the peer
621  * certificate when a TLS connection is established.
622  *
623  * Returns: TRUE if the validation flags are set correctly, or FALSE if
624  * @conn is NULL or is not a TLS connection.
625  *
626  * Since: 1.2.1
627  */
628 gboolean
629 gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
630     GTlsCertificateFlags flags)
631 {
632   gboolean res = FALSE;
633
634   g_return_val_if_fail (conn != NULL, FALSE);
635
636   res = g_socket_client_get_tls (conn->client);
637   if (res)
638     g_socket_client_set_tls_validation_flags (conn->client, flags);
639
640   return res;
641 }
642
643 /**
644  * gst_rtsp_connection_get_tls_validation_flags:
645  * @conn: a #GstRTSPConnection
646  *
647  * Gets the TLS validation flags used to verify the peer certificate
648  * when a TLS connection is established.
649  *
650  * Returns: the validationg flags.
651  *
652  * Since: 1.2.1
653  */
654 GTlsCertificateFlags
655 gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
656 {
657   g_return_val_if_fail (conn != NULL, 0);
658
659   return g_socket_client_get_tls_validation_flags (conn->client);
660 }
661
662 /**
663  * gst_rtsp_connection_set_tls_database:
664  * @conn: a #GstRTSPConnection
665  * @database: a #GTlsDatabase
666  *
667  * Sets the anchor certificate authorities database. This certificate
668  * database will be used to verify the server's certificate in case it
669  * can't be verified with the default certificate database first.
670  *
671  * Since: 1.4
672  */
673 void
674 gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
675     GTlsDatabase * database)
676 {
677   GTlsDatabase *old_db;
678
679   g_return_if_fail (conn != NULL);
680
681   if (database)
682     g_object_ref (database);
683
684   old_db = conn->tls_database;
685   conn->tls_database = database;
686
687   if (old_db)
688     g_object_unref (old_db);
689 }
690
691 /**
692  * gst_rtsp_connection_get_tls_database:
693  * @conn: a #GstRTSPConnection
694  *
695  * Gets the anchor certificate authorities database that will be used
696  * after a server certificate can't be verified with the default
697  * certificate database.
698  *
699  * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
700  * database has been previously set. Use g_object_unref() to release the
701  * certificate database.
702  *
703  * Since: 1.4
704  */
705 GTlsDatabase *
706 gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
707 {
708   GTlsDatabase *result;
709
710   g_return_val_if_fail (conn != NULL, NULL);
711
712   if ((result = conn->tls_database))
713     g_object_ref (result);
714
715   return result;
716 }
717
718 /**
719  * gst_rtsp_connection_set_tls_interaction:
720  * @conn: a #GstRTSPConnection
721  * @interaction: a #GTlsInteraction
722  *
723  * Sets a #GTlsInteraction object to be used when the connection or certificate
724  * database need to interact with the user. This will be used to prompt the
725  * user for passwords where necessary.
726  *
727  * Since: 1.6
728  */
729 void
730 gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn,
731     GTlsInteraction * interaction)
732 {
733   GTlsInteraction *old_interaction;
734
735   g_return_if_fail (conn != NULL);
736
737   if (interaction)
738     g_object_ref (interaction);
739
740   old_interaction = conn->tls_interaction;
741   conn->tls_interaction = interaction;
742
743   if (old_interaction)
744     g_object_unref (old_interaction);
745 }
746
747 /**
748  * gst_rtsp_connection_get_tls_interaction:
749  * @conn: a #GstRTSPConnection
750  *
751  * Gets a #GTlsInteraction object to be used when the connection or certificate
752  * database need to interact with the user. This will be used to prompt the
753  * user for passwords where necessary.
754  *
755  * Returns: (transfer full): a reference on the #GTlsInteraction. Use
756  * g_object_unref() to release.
757  *
758  * Since: 1.6
759  */
760 GTlsInteraction *
761 gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn)
762 {
763   GTlsInteraction *result;
764
765   g_return_val_if_fail (conn != NULL, NULL);
766
767   if ((result = conn->tls_interaction))
768     g_object_ref (result);
769
770   return result;
771 }
772
773 /**
774  * gst_rtsp_connection_set_accept_certificate_func:
775  * @conn: a #GstRTSPConnection
776  * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates
777  * @destroy_notify: #GDestroyNotify for @user_data
778  * @user_data: User data passed to @func
779  *
780  * Sets a custom accept-certificate function for checking certificates for
781  * validity. This will directly map to #GTlsConnection 's "accept-certificate"
782  * signal and be performed after the default checks of #GstRTSPConnection
783  * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
784  * have failed. If no #GTlsDatabase is set on this connection, only @func will
785  * be called.
786  *
787  * Since: 1.14
788  */
789 void
790 gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn,
791     GstRTSPConnectionAcceptCertificateFunc func,
792     gpointer user_data, GDestroyNotify destroy_notify)
793 {
794   if (conn->accept_certificate_destroy_notify)
795     conn->
796         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
797   conn->accept_certificate_func = func;
798   conn->accept_certificate_user_data = user_data;
799   conn->accept_certificate_destroy_notify = destroy_notify;
800 }
801
802 static GstRTSPResult
803 setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
804     GstRTSPMessage * response)
805 {
806   gint i;
807   GstRTSPResult res;
808   gchar *value;
809   guint16 url_port;
810   GstRTSPMessage *msg;
811   gboolean old_http;
812   GstRTSPUrl *url;
813   GError *error = NULL;
814   GSocketConnection *connection;
815   GSocket *socket;
816   gchar *connection_uri = NULL;
817   gchar *request_uri = NULL;
818   gchar *host = NULL;
819
820   url = conn->url;
821
822   gst_rtsp_url_get_port (url, &url_port);
823   host = g_strdup_printf ("%s:%d", url->host, url_port);
824
825   /* create a random sessionid */
826   for (i = 0; i < TUNNELID_LEN; i++)
827     conn->tunnelid[i] = g_random_int_range ('a', 'z');
828   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
829
830   /* create the GET request for the read connection */
831   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
832       no_message);
833   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
834
835   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
836       conn->tunnelid);
837   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
838       "application/x-rtsp-tunnelled");
839   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
840   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
841   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
842
843   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
844    * request from being base64 encoded */
845   conn->tunneled = FALSE;
846   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
847       write_failed);
848   gst_rtsp_message_free (msg);
849   conn->tunneled = TRUE;
850
851   /* receive the response to the GET request */
852   /* we need to temporarily set manual_http to TRUE since
853    * gst_rtsp_connection_receive() will treat the HTTP response as a parsing
854    * failure otherwise */
855   old_http = conn->manual_http;
856   conn->manual_http = TRUE;
857   GST_RTSP_CHECK (gst_rtsp_connection_receive_usec (conn, response, timeout),
858       read_failed);
859   conn->manual_http = old_http;
860
861   if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE ||
862       response->type_data.response.code != GST_RTSP_STS_OK)
863     goto wrong_result;
864
865   if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
866           &value, 0) == GST_RTSP_OK) {
867     g_free (url->host);
868     url->host = g_strdup (value);
869     g_free (conn->remote_ip);
870     conn->remote_ip = g_strdup (value);
871   }
872
873   connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
874       url->abspath, url->query ? "?" : "", url->query ? url->query : "");
875
876   /* connect to the host/port */
877   if (conn->proxy_host) {
878     connection = g_socket_client_connect_to_host (conn->client,
879         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
880     request_uri = g_strdup (connection_uri);
881   } else {
882     connection = g_socket_client_connect_to_uri (conn->client,
883         connection_uri, 0, conn->cancellable, &error);
884     request_uri =
885         g_strdup_printf ("%s%s%s", url->abspath,
886         url->query ? "?" : "", url->query ? url->query : "");
887   }
888   if (connection == NULL)
889     goto connect_failed;
890
891   socket = g_socket_connection_get_socket (connection);
892
893   /* get remote address */
894   g_free (conn->remote_ip);
895   conn->remote_ip = NULL;
896
897   if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
898     goto remote_address_failed;
899
900   /* this is now our writing socket */
901   conn->stream1 = G_IO_STREAM (connection);
902   conn->socket1 = socket;
903   conn->write_socket = conn->socket1;
904   conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
905   conn->control_stream = NULL;
906
907   /* create the POST request for the write connection */
908   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST,
909           request_uri), no_message);
910   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
911
912   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
913       conn->tunnelid);
914   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
915       "application/x-rtsp-tunnelled");
916   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
917       "application/x-rtsp-tunnelled");
918   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
919   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
920   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES,
921       "Sun, 9 Jan 1972 00:00:00 GMT");
922   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767");
923   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
924
925   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
926    * request from being base64 encoded */
927   conn->tunneled = FALSE;
928   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
929       write_failed);
930   gst_rtsp_message_free (msg);
931   conn->tunneled = TRUE;
932
933 exit:
934   g_free (connection_uri);
935   g_free (request_uri);
936   g_free (host);
937
938   return res;
939
940   /* ERRORS */
941 no_message:
942   {
943     GST_ERROR ("failed to create request (%d)", res);
944     goto exit;
945   }
946 write_failed:
947   {
948     GST_ERROR ("write failed (%d)", res);
949     gst_rtsp_message_free (msg);
950     conn->tunneled = TRUE;
951     goto exit;
952   }
953 read_failed:
954   {
955     GST_ERROR ("read failed (%d)", res);
956     conn->manual_http = FALSE;
957     goto exit;
958   }
959 wrong_result:
960   {
961     GST_ERROR ("got failure response %d %s",
962         response->type_data.response.code, response->type_data.response.reason);
963     res = GST_RTSP_ERROR;
964     goto exit;
965   }
966 connect_failed:
967   {
968     GST_ERROR ("failed to connect: %s", error->message);
969     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
970     g_clear_error (&error);
971     goto exit;
972   }
973 remote_address_failed:
974   {
975     GST_ERROR ("failed to resolve address: %s", error->message);
976     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
977     g_object_unref (connection);
978     g_clear_error (&error);
979     return res;
980   }
981 }
982
983 /**
984  * gst_rtsp_connection_connect_with_response_usec:
985  * @conn: a #GstRTSPConnection
986  * @timeout: a timeout in microseconds
987  * @response: a #GstRTSPMessage
988  *
989  * Attempt to connect to the url of @conn made with
990  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
991  * forever. If @timeout contains a valid timeout, this function will return
992  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
993  * @response will contain a response to the tunneling request messages.
994  *
995  * This function can be cancelled with gst_rtsp_connection_flush().
996  *
997  * Returns: #GST_RTSP_OK when a connection could be made.
998  *
999  * Since: 1.18
1000  */
1001 GstRTSPResult
1002 gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
1003     gint64 timeout, GstRTSPMessage * response)
1004 {
1005   GstRTSPResult res;
1006   GSocketConnection *connection;
1007   GSocket *socket;
1008   GError *error = NULL;
1009   gchar *connection_uri, *request_uri, *remote_ip;
1010   GstClockTime to;
1011   guint16 url_port;
1012   GstRTSPUrl *url;
1013
1014   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1015   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
1016   g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
1017
1018   to = timeout * 1000;
1019   g_socket_client_set_timeout (conn->client,
1020       (to + GST_SECOND - 1) / GST_SECOND);
1021
1022   url = conn->url;
1023
1024   gst_rtsp_url_get_port (url, &url_port);
1025
1026   if (conn->tunneled) {
1027     connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
1028         url->abspath, url->query ? "?" : "", url->query ? url->query : "");
1029   } else {
1030     connection_uri = gst_rtsp_url_get_request_uri (url);
1031   }
1032
1033   if (conn->proxy_host) {
1034     connection = g_socket_client_connect_to_host (conn->client,
1035         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
1036     request_uri = g_strdup (connection_uri);
1037   } else {
1038     connection = g_socket_client_connect_to_uri (conn->client,
1039         connection_uri, url_port, conn->cancellable, &error);
1040
1041     /* use the relative component of the uri for non-proxy connections */
1042     request_uri = g_strdup_printf ("%s%s%s", url->abspath,
1043         url->query ? "?" : "", url->query ? url->query : "");
1044   }
1045   if (connection == NULL)
1046     goto connect_failed;
1047
1048   /* get remote address */
1049   socket = g_socket_connection_get_socket (connection);
1050
1051   if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
1052     goto remote_address_failed;
1053
1054   g_free (conn->remote_ip);
1055   conn->remote_ip = remote_ip;
1056   conn->stream0 = G_IO_STREAM (connection);
1057   conn->socket0 = socket;
1058   /* this is our read socket */
1059   conn->read_socket = conn->socket0;
1060   conn->write_socket = conn->socket0;
1061   conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
1062   conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
1063   conn->control_stream = NULL;
1064
1065   if (conn->tunneled) {
1066     res = setup_tunneling (conn, timeout, request_uri, response);
1067     if (res != GST_RTSP_OK)
1068       goto tunneling_failed;
1069   }
1070   g_free (connection_uri);
1071   g_free (request_uri);
1072
1073   return GST_RTSP_OK;
1074
1075   /* ERRORS */
1076 connect_failed:
1077   {
1078     GST_ERROR ("failed to connect: %s", error->message);
1079     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1080     g_clear_error (&error);
1081     g_free (connection_uri);
1082     g_free (request_uri);
1083     return res;
1084   }
1085 remote_address_failed:
1086   {
1087     GST_ERROR ("failed to connect: %s", error->message);
1088     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1089     g_object_unref (connection);
1090     g_clear_error (&error);
1091     g_free (connection_uri);
1092     g_free (request_uri);
1093     return res;
1094   }
1095 tunneling_failed:
1096   {
1097     GST_ERROR ("failed to setup tunneling");
1098     g_free (connection_uri);
1099     g_free (request_uri);
1100     return res;
1101   }
1102 }
1103
1104 static void
1105 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
1106 {
1107   switch (conn->auth_method) {
1108     case GST_RTSP_AUTH_BASIC:{
1109       gchar *user_pass;
1110       gchar *user_pass64;
1111       gchar *auth_string;
1112
1113       if (conn->username == NULL || conn->passwd == NULL)
1114         break;
1115
1116       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1117       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1118       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1119
1120       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1121           auth_string);
1122
1123       g_free (user_pass);
1124       g_free (user_pass64);
1125       break;
1126     }
1127     case GST_RTSP_AUTH_DIGEST:{
1128       gchar *response;
1129       gchar *auth_string, *auth_string2;
1130       gchar *realm;
1131       gchar *nonce;
1132       gchar *opaque;
1133       const gchar *uri;
1134       const gchar *method;
1135
1136       /* we need to have some params set */
1137       if (conn->auth_params == NULL || conn->username == NULL ||
1138           conn->passwd == NULL)
1139         break;
1140
1141       /* we need the realm and nonce */
1142       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1143       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1144       if (realm == NULL || nonce == NULL)
1145         break;
1146
1147       method = gst_rtsp_method_as_text (message->type_data.request.method);
1148       uri = message->type_data.request.uri;
1149
1150       response =
1151           gst_rtsp_generate_digest_auth_response (NULL, method, realm,
1152           conn->username, conn->passwd, uri, nonce);
1153       auth_string =
1154           g_strdup_printf ("Digest username=\"%s\", "
1155           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1156           conn->username, realm, nonce, uri, response);
1157       g_free (response);
1158
1159       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1160       if (opaque) {
1161         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1162             opaque);
1163         g_free (auth_string);
1164         auth_string = auth_string2;
1165       }
1166       /* Do not keep any old Authorization headers */
1167       gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1);
1168       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1169           auth_string);
1170       break;
1171     }
1172     default:
1173       /* Nothing to do */
1174       break;
1175   }
1176 }
1177
1178 /**
1179  * gst_rtsp_connection_connect_usec:
1180  * @conn: a #GstRTSPConnection
1181  * @timeout: a timeout in microseconds
1182  *
1183  * Attempt to connect to the url of @conn made with
1184  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1185  * forever. If @timeout contains a valid timeout, this function will return
1186  * #GST_RTSP_ETIMEOUT after the timeout expired.
1187  *
1188  * This function can be cancelled with gst_rtsp_connection_flush().
1189  *
1190  * Returns: #GST_RTSP_OK when a connection could be made.
1191  *
1192  * Since: 1.18
1193  */
1194 GstRTSPResult
1195 gst_rtsp_connection_connect_usec (GstRTSPConnection * conn, gint64 timeout)
1196 {
1197   GstRTSPResult result;
1198   GstRTSPMessage response;
1199
1200   memset (&response, 0, sizeof (response));
1201   gst_rtsp_message_init (&response);
1202
1203   result = gst_rtsp_connection_connect_with_response_usec (conn, timeout,
1204       &response);
1205
1206   gst_rtsp_message_unset (&response);
1207
1208   return result;
1209 }
1210
1211 static void
1212 gen_date_string (gchar * date_string, guint len)
1213 {
1214   static const char wkdays[7][4] =
1215       { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
1216   static const char months[12][4] =
1217       { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
1218     "Nov", "Dec"
1219   };
1220   struct tm tm;
1221   time_t t;
1222
1223   time (&t);
1224
1225 #ifdef HAVE_GMTIME_R
1226   gmtime_r (&t, &tm);
1227 #else
1228   tm = *gmtime (&t);
1229 #endif
1230
1231   g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
1232       wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
1233       tm.tm_hour, tm.tm_min, tm.tm_sec);
1234 }
1235
1236 static GstRTSPResult
1237 write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
1238     guint size, gboolean block, GCancellable * cancellable)
1239 {
1240   guint left;
1241   gssize r;
1242   GstRTSPResult res;
1243   GError *err = NULL;
1244
1245   if (G_UNLIKELY (*idx > size))
1246     return GST_RTSP_ERROR;
1247
1248   left = size - *idx;
1249
1250   while (left) {
1251     if (block)
1252       r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
1253           cancellable, &err);
1254     else
1255       r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
1256           (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
1257     if (G_UNLIKELY (r < 0))
1258       goto error;
1259
1260     left -= r;
1261     *idx += r;
1262   }
1263   return GST_RTSP_OK;
1264
1265   /* ERRORS */
1266 error:
1267   {
1268     if (G_UNLIKELY (r == 0))
1269       return GST_RTSP_EEOF;
1270
1271     if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1272       GST_WARNING ("%s", err->message);
1273     else
1274       GST_DEBUG ("%s", err->message);
1275
1276     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1277     g_clear_error (&err);
1278     return res;
1279   }
1280 }
1281
1282 /* NOTE: This changes the values of vectors if multiple iterations are needed! */
1283 #if GLIB_CHECK_VERSION(2, 59, 1)
1284 static GstRTSPResult
1285 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1286     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1287 {
1288   gsize _bytes_written = 0;
1289   gsize written;
1290   GstRTSPResult ret;
1291   GError *err = NULL;
1292   GPollableReturn res = G_POLLABLE_RETURN_OK;
1293
1294   while (n_vectors > 0) {
1295     if (block) {
1296       if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors,
1297                   &written, cancellable, &err))) {
1298         /* This will never return G_IO_ERROR_WOULD_BLOCK */
1299         res = G_POLLABLE_RETURN_FAILED;
1300         goto error;
1301       }
1302     } else {
1303       res =
1304           g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
1305           (stream), vectors, n_vectors, &written, cancellable, &err);
1306
1307       if (res != G_POLLABLE_RETURN_OK) {
1308         g_assert (written == 0);
1309         goto error;
1310       }
1311     }
1312     _bytes_written += written;
1313
1314     /* skip vectors that have been written in full */
1315     while (written > 0 && written >= vectors[0].size) {
1316       written -= vectors[0].size;
1317       ++vectors;
1318       --n_vectors;
1319     }
1320
1321     /* skip partially written vector data */
1322     if (written > 0) {
1323       vectors[0].size -= written;
1324       vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
1325     }
1326   }
1327
1328   *bytes_written = _bytes_written;
1329
1330   return GST_RTSP_OK;
1331
1332   /* ERRORS */
1333 error:
1334   {
1335     *bytes_written = _bytes_written;
1336
1337     if (err)
1338       GST_WARNING ("%s", err->message);
1339     if (res == G_POLLABLE_RETURN_WOULD_BLOCK) {
1340       g_assert (!err);
1341       return GST_RTSP_EINTR;
1342     } else if (G_UNLIKELY (written == 0)) {
1343       g_clear_error (&err);
1344       return GST_RTSP_EEOF;
1345     }
1346
1347     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1348     g_clear_error (&err);
1349     return ret;
1350   }
1351 }
1352 #else
1353 static GstRTSPResult
1354 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1355     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1356 {
1357   gsize _bytes_written = 0;
1358   guint written;
1359   gint i;
1360   GstRTSPResult res = GST_RTSP_OK;
1361
1362   for (i = 0; i < n_vectors; i++) {
1363     written = 0;
1364     res =
1365         write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
1366         block, cancellable);
1367     _bytes_written += written;
1368     if (G_UNLIKELY (res != GST_RTSP_OK))
1369       break;
1370   }
1371
1372   *bytes_written = _bytes_written;
1373
1374   return res;
1375 }
1376 #endif
1377
1378 static gint
1379 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1380     gboolean block, GError ** err)
1381 {
1382   gint out = 0;
1383
1384   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1385     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1386
1387     out = MIN (left, size);
1388     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1389
1390     if (left == (gsize) out) {
1391       g_free (conn->initial_buffer);
1392       conn->initial_buffer = NULL;
1393       conn->initial_buffer_offset = 0;
1394     } else
1395       conn->initial_buffer_offset += out;
1396   }
1397
1398   if (G_LIKELY (size > (guint) out)) {
1399     gssize r;
1400     gsize count = size - out;
1401     if (block)
1402       r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
1403           count, conn->may_cancel ? conn->cancellable : NULL, err);
1404     else
1405       r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
1406           (conn->input_stream), (gchar *) & buffer[out], count,
1407           conn->may_cancel ? conn->cancellable : NULL, err);
1408
1409     if (G_UNLIKELY (r < 0)) {
1410       if (out == 0) {
1411         /* propagate the error */
1412         out = r;
1413       } else {
1414         /* we have some data ignore error */
1415         g_clear_error (err);
1416       }
1417     } else
1418       out += r;
1419   }
1420
1421   return out;
1422 }
1423
1424 static gint
1425 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1426     gboolean block, GError ** err)
1427 {
1428   DecodeCtx *ctx = conn->ctxp;
1429   gint out = 0;
1430
1431   if (ctx) {
1432     while (size > 0) {
1433       guint8 in[sizeof (ctx->out) * 4 / 3];
1434       gint r;
1435
1436       while (size > 0 && ctx->cout < ctx->coutl) {
1437         /* we have some leftover bytes */
1438         *buffer++ = ctx->out[ctx->cout++];
1439         size--;
1440         out++;
1441       }
1442
1443       /* got what we needed? */
1444       if (size == 0)
1445         break;
1446
1447       /* try to read more bytes */
1448       r = fill_raw_bytes (conn, in, sizeof (in), block, err);
1449       if (r <= 0) {
1450         if (out == 0) {
1451           out = r;
1452         } else {
1453           /* we have some data ignore error */
1454           g_clear_error (err);
1455         }
1456         break;
1457       }
1458
1459       ctx->cout = 0;
1460       ctx->coutl =
1461           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1462           &ctx->save);
1463     }
1464   } else {
1465     out = fill_raw_bytes (conn, buffer, size, block, err);
1466   }
1467
1468   return out;
1469 }
1470
1471 static GstRTSPResult
1472 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1473     gboolean block)
1474 {
1475   guint left;
1476   gint r;
1477   GstRTSPResult res;
1478   GError *err = NULL;
1479
1480   if (G_UNLIKELY (*idx > size))
1481     return GST_RTSP_ERROR;
1482
1483   left = size - *idx;
1484
1485   while (left) {
1486     r = fill_bytes (conn, &buffer[*idx], left, block, &err);
1487     if (G_UNLIKELY (r <= 0))
1488       goto error;
1489
1490     left -= r;
1491     *idx += r;
1492   }
1493   return GST_RTSP_OK;
1494
1495   /* ERRORS */
1496 error:
1497   {
1498     if (G_UNLIKELY (r == 0))
1499       return GST_RTSP_EEOF;
1500
1501     GST_DEBUG ("%s", err->message);
1502     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1503     g_clear_error (&err);
1504     return res;
1505   }
1506 }
1507
1508 /* The code below tries to handle clients using \r, \n or \r\n to indicate the
1509  * end of a line. It even does its best to handle clients which mix them (even
1510  * though this is a really stupid idea (tm).) It also handles Line White Space
1511  * (LWS), where a line end followed by whitespace is considered LWS. This is
1512  * the method used in RTSP (and HTTP) to break long lines.
1513  */
1514 static GstRTSPResult
1515 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1516     gboolean block)
1517 {
1518   GstRTSPResult res;
1519
1520   while (TRUE) {
1521     guint8 c;
1522     guint i;
1523
1524     if (conn->read_ahead == READ_AHEAD_EOH) {
1525       /* the last call to read_line() already determined that we have reached
1526        * the end of the headers, so convey that information now */
1527       conn->read_ahead = 0;
1528       break;
1529     } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1530       /* the last call to read_line() left off after having read \r\n */
1531       c = '\n';
1532     } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1533       /* the last call to read_line() left off after having read \r\n\r */
1534       c = '\r';
1535     } else if (conn->read_ahead != 0) {
1536       /* the last call to read_line() left us with a character to start with */
1537       c = (guint8) conn->read_ahead;
1538       conn->read_ahead = 0;
1539     } else {
1540       /* read the next character */
1541       i = 0;
1542       res = read_bytes (conn, &c, &i, 1, block);
1543       if (G_UNLIKELY (res != GST_RTSP_OK))
1544         return res;
1545     }
1546
1547     /* special treatment of line endings */
1548     if (c == '\r' || c == '\n') {
1549       guint8 read_ahead;
1550
1551     retry:
1552       /* need to read ahead one more character to know what to do... */
1553       i = 0;
1554       res = read_bytes (conn, &read_ahead, &i, 1, block);
1555       if (G_UNLIKELY (res != GST_RTSP_OK))
1556         return res;
1557
1558       if (read_ahead == ' ' || read_ahead == '\t') {
1559         if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1560           /* got \r\n\r followed by whitespace, treat it as a normal line
1561            * followed by one starting with LWS */
1562           conn->read_ahead = read_ahead;
1563           break;
1564         } else {
1565           /* got LWS, change the line ending to a space and continue */
1566           c = ' ';
1567           conn->read_ahead = read_ahead;
1568         }
1569       } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1570         if (read_ahead == '\r' || read_ahead == '\n') {
1571           /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */
1572           conn->read_ahead = READ_AHEAD_EOH;
1573           break;
1574         } else {
1575           /* got \r\n\r followed by something else, this is not really
1576            * supported since we have probably just eaten the first character
1577            * of the body or the next message, so just ignore the second \r
1578            * and live with it... */
1579           conn->read_ahead = read_ahead;
1580           break;
1581         }
1582       } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1583         if (read_ahead == '\r') {
1584           /* got \r\n\r so far, need one more character... */
1585           conn->read_ahead = READ_AHEAD_CRLFCR;
1586           goto retry;
1587         } else if (read_ahead == '\n') {
1588           /* got \r\n\n, treat it as the end of the headers */
1589           conn->read_ahead = READ_AHEAD_EOH;
1590           break;
1591         } else {
1592           /* found the end of a line, keep read_ahead for the next line */
1593           conn->read_ahead = read_ahead;
1594           break;
1595         }
1596       } else if (c == read_ahead) {
1597         /* got double \r or \n, treat it as the end of the headers */
1598         conn->read_ahead = READ_AHEAD_EOH;
1599         break;
1600       } else if (c == '\r' && read_ahead == '\n') {
1601         /* got \r\n so far, still need more to know what to do... */
1602         conn->read_ahead = READ_AHEAD_CRLF;
1603         goto retry;
1604       } else {
1605         /* found the end of a line, keep read_ahead for the next line */
1606         conn->read_ahead = read_ahead;
1607         break;
1608       }
1609     }
1610
1611     if (G_LIKELY (*idx < size - 1))
1612       buffer[(*idx)++] = c;
1613   }
1614   buffer[*idx] = '\0';
1615
1616   return GST_RTSP_OK;
1617 }
1618
1619 /**
1620  * gst_rtsp_connection_write_usec:
1621  * @conn: a #GstRTSPConnection
1622  * @data: the data to write
1623  * @size: the size of @data
1624  * @timeout: a timeout value or 0
1625  *
1626  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1627  * the specified @timeout. @timeout can be 0, in which case this function
1628  * might block forever.
1629  *
1630  * This function can be cancelled with gst_rtsp_connection_flush().
1631  *
1632  * Returns: #GST_RTSP_OK on success.
1633  *
1634  * Since: 1.18
1635  */
1636 /* FIXME 2.0: This should've been static! */
1637 GstRTSPResult
1638 gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
1639     guint size, gint64 timeout)
1640 {
1641   guint offset;
1642   GstClockTime to;
1643   GstRTSPResult res;
1644
1645   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1646   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1647   g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
1648
1649   offset = 0;
1650
1651   to = timeout * 1000;
1652
1653   g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1654   res =
1655       write_bytes (conn->output_stream, data, &offset, size, TRUE,
1656       conn->cancellable);
1657   g_socket_set_timeout (conn->write_socket, 0);
1658
1659   return res;
1660 }
1661
1662 static gboolean
1663 serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
1664     GstRTSPSerializedMessage * serialized_message)
1665 {
1666   GString *str = NULL;
1667
1668   memset (serialized_message, 0, sizeof (*serialized_message));
1669
1670   /* Initially we borrow the body_data / body_buffer fields from
1671    * the message */
1672   serialized_message->borrowed = TRUE;
1673
1674   switch (message->type) {
1675     case GST_RTSP_MESSAGE_REQUEST:
1676       str = g_string_new ("");
1677
1678       /* create request string, add CSeq */
1679       g_string_append_printf (str, "%s %s RTSP/%s\r\n"
1680           "CSeq: %d\r\n",
1681           gst_rtsp_method_as_text (message->type_data.request.method),
1682           message->type_data.request.uri,
1683           gst_rtsp_version_as_text (message->type_data.request.version),
1684           conn->cseq++);
1685       /* add session id if we have one */
1686       if (conn->session_id[0] != '\0') {
1687         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1688         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1689             conn->session_id);
1690       }
1691       /* add any authentication headers */
1692       add_auth_header (conn, message);
1693       break;
1694     case GST_RTSP_MESSAGE_RESPONSE:
1695       str = g_string_new ("");
1696
1697       /* create response string */
1698       g_string_append_printf (str, "RTSP/%s %d %s\r\n",
1699           gst_rtsp_version_as_text (message->type_data.response.version),
1700           message->type_data.response.code, message->type_data.response.reason);
1701       break;
1702     case GST_RTSP_MESSAGE_HTTP_REQUEST:
1703       str = g_string_new ("");
1704
1705       /* create request string */
1706       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1707           gst_rtsp_method_as_text (message->type_data.request.method),
1708           message->type_data.request.uri,
1709           gst_rtsp_version_as_text (message->type_data.request.version));
1710       /* add any authentication headers */
1711       add_auth_header (conn, message);
1712       break;
1713     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1714       str = g_string_new ("");
1715
1716       /* create response string */
1717       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1718           gst_rtsp_version_as_text (message->type_data.request.version),
1719           message->type_data.response.code, message->type_data.response.reason);
1720       break;
1721     case GST_RTSP_MESSAGE_DATA:
1722     {
1723       guint8 *data_header = serialized_message->data_header;
1724
1725       /* prepare data header */
1726       data_header[0] = '$';
1727       data_header[1] = message->type_data.data.channel;
1728       data_header[2] = (message->body_size >> 8) & 0xff;
1729       data_header[3] = message->body_size & 0xff;
1730
1731       /* create serialized message with header and data */
1732       serialized_message->data_is_data_header = TRUE;
1733       serialized_message->data_size = 4;
1734
1735       if (message->body) {
1736         serialized_message->body_data = message->body;
1737         serialized_message->body_data_size = message->body_size;
1738       } else {
1739         g_assert (message->body_buffer != NULL);
1740         serialized_message->body_buffer = message->body_buffer;
1741       }
1742       break;
1743     }
1744     default:
1745       g_string_free (str, TRUE);
1746       g_return_val_if_reached (FALSE);
1747       break;
1748   }
1749
1750   /* append headers and body */
1751   if (message->type != GST_RTSP_MESSAGE_DATA) {
1752     gchar date_string[100];
1753
1754     g_assert (str != NULL);
1755
1756     gen_date_string (date_string, sizeof (date_string));
1757
1758     /* add date header */
1759     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1760     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1761
1762     /* append headers */
1763     gst_rtsp_message_append_headers (message, str);
1764
1765     /* append Content-Length and body if needed */
1766     if (message->body_size > 0) {
1767       gchar *len;
1768
1769       len = g_strdup_printf ("%d", message->body_size);
1770       g_string_append_printf (str, "%s: %s\r\n",
1771           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1772       g_free (len);
1773       /* header ends here */
1774       g_string_append (str, "\r\n");
1775
1776       if (message->body) {
1777         serialized_message->body_data = message->body;
1778         serialized_message->body_data_size = message->body_size;
1779       } else {
1780         g_assert (message->body_buffer != NULL);
1781         serialized_message->body_buffer = message->body_buffer;
1782       }
1783     } else {
1784       /* just end headers */
1785       g_string_append (str, "\r\n");
1786     }
1787
1788     serialized_message->data_size = str->len;
1789     serialized_message->data = (guint8 *) g_string_free (str, FALSE);
1790   }
1791
1792   return TRUE;
1793 }
1794
1795 /**
1796  * gst_rtsp_connection_send_usec:
1797  * @conn: a #GstRTSPConnection
1798  * @message: the message to send
1799  * @timeout: a timeout value in microseconds
1800  *
1801  * Attempt to send @message to the connected @conn, blocking up to
1802  * the specified @timeout. @timeout can be 0, in which case this function
1803  * might block forever.
1804  *
1805  * This function can be cancelled with gst_rtsp_connection_flush().
1806  *
1807  * Returns: #GST_RTSP_OK on success.
1808  *
1809  * Since: 1.18
1810  */
1811 GstRTSPResult
1812 gst_rtsp_connection_send_usec (GstRTSPConnection * conn,
1813     GstRTSPMessage * message, gint64 timeout)
1814 {
1815   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1816   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1817
1818   return gst_rtsp_connection_send_messages_usec (conn, message, 1, timeout);
1819 }
1820
1821 /**
1822  * gst_rtsp_connection_send_messages_usec:
1823  * @conn: a #GstRTSPConnection
1824  * @messages: (array length=n_messages): the messages to send
1825  * @n_messages: the number of messages to send
1826  * @timeout: a timeout value in microseconds
1827  *
1828  * Attempt to send @messages to the connected @conn, blocking up to
1829  * the specified @timeout. @timeout can be 0, in which case this function
1830  * might block forever.
1831  *
1832  * This function can be cancelled with gst_rtsp_connection_flush().
1833  *
1834  * Returns: #GST_RTSP_OK on Since.
1835  *
1836  * Since: 1.18
1837  */
1838 GstRTSPResult
1839 gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
1840     GstRTSPMessage * messages, guint n_messages, gint64 timeout)
1841 {
1842   GstClockTime to;
1843   GstRTSPResult res;
1844   GstRTSPSerializedMessage *serialized_messages;
1845   GOutputVector *vectors;
1846   GstMapInfo *map_infos;
1847   guint n_vectors, n_memories;
1848   gint i, j, k;
1849   gsize bytes_to_write, bytes_written;
1850
1851   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1852   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
1853
1854   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
1855   memset (serialized_messages, 0,
1856       sizeof (GstRTSPSerializedMessage) * n_messages);
1857
1858   for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
1859       i++) {
1860     if (G_UNLIKELY (!serialize_message (conn, &messages[i],
1861                 &serialized_messages[i])))
1862       goto no_message;
1863
1864     if (conn->tunneled) {
1865       gint state = 0, save = 0;
1866       gchar *base64_buffer, *out_buffer;
1867       gsize written = 0;
1868       gsize in_length;
1869
1870       in_length = serialized_messages[i].data_size;
1871       if (serialized_messages[i].body_data)
1872         in_length += serialized_messages[i].body_data_size;
1873       else if (serialized_messages[i].body_buffer)
1874         in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
1875
1876       in_length = (in_length / 3 + 1) * 4 + 4 + 1;
1877       base64_buffer = out_buffer = g_malloc0 (in_length);
1878
1879       written =
1880           g_base64_encode_step (serialized_messages[i].data_is_data_header ?
1881           serialized_messages[i].data_header : serialized_messages[i].data,
1882           serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
1883       out_buffer += written;
1884
1885       if (serialized_messages[i].body_data) {
1886         written =
1887             g_base64_encode_step (serialized_messages[i].body_data,
1888             serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
1889             &save);
1890         out_buffer += written;
1891       } else if (serialized_messages[i].body_buffer) {
1892         guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1893
1894         for (j = 0; j < n; j++) {
1895           GstMemory *mem =
1896               gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
1897           GstMapInfo map;
1898
1899           gst_memory_map (mem, &map, GST_MAP_READ);
1900
1901           written = g_base64_encode_step (map.data, map.size,
1902               FALSE, out_buffer, &state, &save);
1903           out_buffer += written;
1904
1905           gst_memory_unmap (mem, &map);
1906         }
1907       }
1908
1909       written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
1910       out_buffer += written;
1911
1912       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1913       memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
1914
1915       serialized_messages[i].data = (guint8 *) base64_buffer;
1916       serialized_messages[i].data_size = (out_buffer - base64_buffer);
1917       n_vectors++;
1918     } else {
1919       n_vectors++;
1920       if (serialized_messages[i].body_data) {
1921         n_vectors++;
1922       } else if (serialized_messages[i].body_buffer) {
1923         n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1924         n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1925       }
1926     }
1927   }
1928
1929   vectors = g_newa (GOutputVector, n_vectors);
1930   map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
1931
1932   for (i = 0, j = 0, k = 0; i < n_messages; i++) {
1933     vectors[j].buffer = serialized_messages[i].data_is_data_header ?
1934         serialized_messages[i].data_header : serialized_messages[i].data;
1935     vectors[j].size = serialized_messages[i].data_size;
1936     bytes_to_write += vectors[j].size;
1937     j++;
1938
1939     if (serialized_messages[i].body_data) {
1940       vectors[j].buffer = serialized_messages[i].body_data;
1941       vectors[j].size = serialized_messages[i].body_data_size;
1942       bytes_to_write += vectors[j].size;
1943       j++;
1944     } else if (serialized_messages[i].body_buffer) {
1945       gint l, n;
1946
1947       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1948       for (l = 0; l < n; l++) {
1949         GstMemory *mem =
1950             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1951
1952         gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
1953         vectors[j].buffer = map_infos[k].data;
1954         vectors[j].size = map_infos[k].size;
1955         bytes_to_write += vectors[j].size;
1956
1957         k++;
1958         j++;
1959       }
1960     }
1961   }
1962
1963   /* write request: this is synchronous */
1964   to = timeout * 1000;
1965
1966   g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1967   res =
1968       writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
1969       TRUE, conn->cancellable);
1970   g_socket_set_timeout (conn->write_socket, 0);
1971
1972   g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
1973
1974   /* free everything */
1975   for (i = 0, k = 0; i < n_messages; i++) {
1976     if (serialized_messages[i].body_buffer) {
1977       gint l, n;
1978
1979       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1980       for (l = 0; l < n; l++) {
1981         GstMemory *mem =
1982             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1983
1984         gst_memory_unmap (mem, &map_infos[k]);
1985         k++;
1986       }
1987     }
1988
1989     g_free (serialized_messages[i].data);
1990   }
1991
1992   return res;
1993
1994 no_message:
1995   {
1996     for (i = 0; i < n_messages; i++) {
1997       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1998     }
1999     g_warning ("Wrong message");
2000     return GST_RTSP_EINVAL;
2001   }
2002 }
2003
2004 static GstRTSPResult
2005 parse_string (gchar * dest, gint size, gchar ** src)
2006 {
2007   GstRTSPResult res = GST_RTSP_OK;
2008   gint idx;
2009
2010   idx = 0;
2011   /* skip spaces */
2012   while (g_ascii_isspace (**src))
2013     (*src)++;
2014
2015   while (!g_ascii_isspace (**src) && **src != '\0') {
2016     if (idx < size - 1)
2017       dest[idx++] = **src;
2018     else
2019       res = GST_RTSP_EPARSE;
2020     (*src)++;
2021   }
2022   if (size > 0)
2023     dest[idx] = '\0';
2024
2025   return res;
2026 }
2027
2028 static GstRTSPResult
2029 parse_protocol_version (gchar * protocol, GstRTSPMsgType * type,
2030     GstRTSPVersion * version)
2031 {
2032   GstRTSPVersion rversion;
2033   GstRTSPResult res = GST_RTSP_OK;
2034   gchar *ver;
2035
2036   if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) {
2037     guint major;
2038     guint minor;
2039     gchar dummychar;
2040
2041     *ver++ = '\0';
2042
2043     /* the version number must be formatted as X.Y with nothing following */
2044     if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2)
2045       res = GST_RTSP_EPARSE;
2046
2047     rversion = major * 0x10 + minor;
2048     if (g_ascii_strcasecmp (protocol, "RTSP") == 0) {
2049
2050       if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) {
2051         *version = GST_RTSP_VERSION_INVALID;
2052         res = GST_RTSP_ERROR;
2053       }
2054     } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) {
2055       if (*type == GST_RTSP_MESSAGE_REQUEST)
2056         *type = GST_RTSP_MESSAGE_HTTP_REQUEST;
2057       else if (*type == GST_RTSP_MESSAGE_RESPONSE)
2058         *type = GST_RTSP_MESSAGE_HTTP_RESPONSE;
2059
2060       if (rversion != GST_RTSP_VERSION_1_0 &&
2061           rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0)
2062         res = GST_RTSP_ERROR;
2063     } else
2064       res = GST_RTSP_EPARSE;
2065   } else
2066     res = GST_RTSP_EPARSE;
2067
2068   if (res == GST_RTSP_OK)
2069     *version = rversion;
2070
2071   return res;
2072 }
2073
2074 static GstRTSPResult
2075 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
2076 {
2077   GstRTSPResult res = GST_RTSP_OK;
2078   GstRTSPResult res2;
2079   gchar versionstr[20];
2080   gchar codestr[4];
2081   gint code;
2082   gchar *bptr;
2083
2084   bptr = (gchar *) buffer;
2085
2086   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2087     res = GST_RTSP_EPARSE;
2088
2089   if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK)
2090     res = GST_RTSP_EPARSE;
2091   code = atoi (codestr);
2092   if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600))
2093     res = GST_RTSP_EPARSE;
2094
2095   while (g_ascii_isspace (*bptr))
2096     bptr++;
2097
2098   if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr,
2099               NULL) != GST_RTSP_OK))
2100     res = GST_RTSP_EPARSE;
2101
2102   res2 = parse_protocol_version (versionstr, &msg->type,
2103       &msg->type_data.response.version);
2104   if (G_LIKELY (res == GST_RTSP_OK))
2105     res = res2;
2106
2107   return res;
2108 }
2109
2110 static GstRTSPResult
2111 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
2112 {
2113   GstRTSPResult res = GST_RTSP_OK;
2114   GstRTSPResult res2;
2115   gchar versionstr[20];
2116   gchar methodstr[20];
2117   gchar urlstr[4096];
2118   gchar *bptr;
2119   GstRTSPMethod method;
2120
2121   bptr = (gchar *) buffer;
2122
2123   if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK)
2124     res = GST_RTSP_EPARSE;
2125   method = gst_rtsp_find_method (methodstr);
2126
2127   if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK)
2128     res = GST_RTSP_EPARSE;
2129   if (G_UNLIKELY (*urlstr == '\0'))
2130     res = GST_RTSP_EPARSE;
2131
2132   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2133     res = GST_RTSP_EPARSE;
2134
2135   if (G_UNLIKELY (*bptr != '\0'))
2136     res = GST_RTSP_EPARSE;
2137
2138   if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method,
2139               urlstr) != GST_RTSP_OK))
2140     res = GST_RTSP_EPARSE;
2141
2142   res2 = parse_protocol_version (versionstr, &msg->type,
2143       &msg->type_data.request.version);
2144   if (G_LIKELY (res == GST_RTSP_OK))
2145     res = res2;
2146
2147   if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) {
2148     /* GET and POST are not allowed as RTSP methods */
2149     if (msg->type_data.request.method == GST_RTSP_GET ||
2150         msg->type_data.request.method == GST_RTSP_POST) {
2151       msg->type_data.request.method = GST_RTSP_INVALID;
2152       if (res == GST_RTSP_OK)
2153         res = GST_RTSP_ERROR;
2154     }
2155   } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2156     /* only GET and POST are allowed as HTTP methods */
2157     if (msg->type_data.request.method != GST_RTSP_GET &&
2158         msg->type_data.request.method != GST_RTSP_POST) {
2159       msg->type_data.request.method = GST_RTSP_INVALID;
2160       if (res == GST_RTSP_OK)
2161         res = GST_RTSP_ERROR;
2162     }
2163   }
2164
2165   return res;
2166 }
2167
2168 /* parsing lines means reading a Key: Value pair */
2169 static GstRTSPResult
2170 parse_line (guint8 * buffer, GstRTSPMessage * msg)
2171 {
2172   GstRTSPHeaderField field;
2173   gchar *line = (gchar *) buffer;
2174   gchar *field_name = NULL;
2175   gchar *value;
2176
2177   if ((value = strchr (line, ':')) == NULL || value == line)
2178     goto parse_error;
2179
2180   /* trim space before the colon */
2181   if (value[-1] == ' ')
2182     value[-1] = '\0';
2183
2184   /* replace the colon with a NUL */
2185   *value++ = '\0';
2186
2187   /* find the header */
2188   field = gst_rtsp_find_header_field (line);
2189   /* custom header not present in the list of pre-defined headers */
2190   if (field == GST_RTSP_HDR_INVALID)
2191     field_name = line;
2192
2193   /* split up the value in multiple key:value pairs if it contains comma(s) */
2194   while (*value != '\0') {
2195     gchar *next_value;
2196     gchar *comma = NULL;
2197     gboolean quoted = FALSE;
2198     guint comment = 0;
2199
2200     /* trim leading space */
2201     if (*value == ' ')
2202       value++;
2203
2204     /* for headers which may not appear multiple times, and thus may not
2205      * contain multiple values on the same line, we can short-circuit the loop
2206      * below and the entire value results in just one key:value pair*/
2207     if (!gst_rtsp_header_allow_multiple (field))
2208       next_value = value + strlen (value);
2209     else
2210       next_value = value;
2211
2212     /* find the next value, taking special care of quotes and comments */
2213     while (*next_value != '\0') {
2214       if ((quoted || comment != 0) && *next_value == '\\' &&
2215           next_value[1] != '\0')
2216         next_value++;
2217       else if (comment == 0 && *next_value == '"')
2218         quoted = !quoted;
2219       else if (!quoted && *next_value == '(')
2220         comment++;
2221       else if (comment != 0 && *next_value == ')')
2222         comment--;
2223       else if (!quoted && comment == 0) {
2224         /* To quote RFC 2068: "User agents MUST take special care in parsing
2225          * the WWW-Authenticate field value if it contains more than one
2226          * challenge, or if more than one WWW-Authenticate header field is
2227          * provided, since the contents of a challenge may itself contain a
2228          * comma-separated list of authentication parameters."
2229          *
2230          * What this means is that we cannot just look for an unquoted comma
2231          * when looking for multiple values in Proxy-Authenticate and
2232          * WWW-Authenticate headers. Instead we need to look for the sequence
2233          * "comma [space] token space token" before we can split after the
2234          * comma...
2235          */
2236         if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE ||
2237             field == GST_RTSP_HDR_WWW_AUTHENTICATE) {
2238           if (*next_value == ',') {
2239             if (next_value[1] == ' ') {
2240               /* skip any space following the comma so we do not mistake it for
2241                * separating between two tokens */
2242               next_value++;
2243             }
2244             comma = next_value;
2245           } else if (*next_value == ' ' && next_value[1] != ',' &&
2246               next_value[1] != '=' && comma != NULL) {
2247             next_value = comma;
2248             comma = NULL;
2249             break;
2250           }
2251         } else if (*next_value == ',')
2252           break;
2253       }
2254
2255       next_value++;
2256     }
2257
2258     if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) {
2259       /* The timeout parameter is only allowed in a session response header
2260        * but some clients send it as part of the session request header.
2261        * Ignore everything from the semicolon to the end of the line. */
2262       next_value = value;
2263       while (*next_value != '\0') {
2264         if (*next_value == ';') {
2265           break;
2266         }
2267         next_value++;
2268       }
2269     }
2270
2271     /* trim space */
2272     if (value != next_value && next_value[-1] == ' ')
2273       next_value[-1] = '\0';
2274
2275     if (*next_value != '\0')
2276       *next_value++ = '\0';
2277
2278     /* add the key:value pair */
2279     if (*value != '\0') {
2280       if (field != GST_RTSP_HDR_INVALID)
2281         gst_rtsp_message_add_header (msg, field, value);
2282       else
2283         gst_rtsp_message_add_header_by_name (msg, field_name, value);
2284     }
2285
2286     value = next_value;
2287   }
2288
2289   return GST_RTSP_OK;
2290
2291   /* ERRORS */
2292 parse_error:
2293   {
2294     return GST_RTSP_EPARSE;
2295   }
2296 }
2297
2298 /* convert all consecutive whitespace to a single space */
2299 static void
2300 normalize_line (guint8 * buffer)
2301 {
2302   while (*buffer) {
2303     if (g_ascii_isspace (*buffer)) {
2304       guint8 *tmp;
2305
2306       *buffer++ = ' ';
2307       for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) {
2308       }
2309       if (buffer != tmp)
2310         memmove (buffer, tmp, strlen ((gchar *) tmp) + 1);
2311     } else {
2312       buffer++;
2313     }
2314   }
2315 }
2316
2317 static gboolean
2318 cseq_validation (GstRTSPConnection * conn, GstRTSPMessage * message)
2319 {
2320   gchar *cseq_header;
2321   gint64 cseq = 0;
2322   GstRTSPResult res;
2323
2324   if (message->type == GST_RTSP_MESSAGE_RESPONSE ||
2325       message->type == GST_RTSP_MESSAGE_REQUEST) {
2326     if ((res = gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ,
2327                 &cseq_header, 0)) != GST_RTSP_OK) {
2328       /* rfc2326 This field MUST be present in all RTSP req and resp */
2329       goto invalid_format;
2330     }
2331
2332     errno = 0;
2333     cseq = g_ascii_strtoll (cseq_header, NULL, 10);
2334     if (errno != 0 || cseq < 0) {
2335       /* CSeq has no valid value */
2336       goto invalid_format;
2337     }
2338
2339     if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2340         (conn->cseq == 0 || conn->cseq < cseq)) {
2341       /* Response CSeq can't be higher than the number of outgoing requests
2342        * neither is a response valid if no request has been made */
2343       goto invalid_format;
2344     }
2345   }
2346   return GST_RTSP_OK;
2347
2348 invalid_format:
2349   {
2350     return GST_RTSP_EPARSE;
2351   }
2352 }
2353
2354 /* returns:
2355  *  GST_RTSP_OK when a complete message was read.
2356  *  GST_RTSP_EEOF: when the read socket is closed
2357  *  GST_RTSP_EINTR: when more data is needed.
2358  *  GST_RTSP_..: some other error occurred.
2359  */
2360 static GstRTSPResult
2361 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
2362     GstRTSPConnection * conn, gboolean block)
2363 {
2364   GstRTSPResult res;
2365
2366   while (TRUE) {
2367     switch (builder->state) {
2368       case STATE_START:
2369       {
2370         guint8 c;
2371
2372         builder->offset = 0;
2373         res =
2374             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
2375             block);
2376         if (res != GST_RTSP_OK)
2377           goto done;
2378
2379         c = builder->buffer[0];
2380
2381         /* we have 1 bytes now and we can see if this is a data message or
2382          * not */
2383         if (c == '$') {
2384           /* data message, prepare for the header */
2385           builder->state = STATE_DATA_HEADER;
2386           conn->may_cancel = FALSE;
2387         } else if (c == '\n' || c == '\r') {
2388           /* skip \n and \r */
2389           builder->offset = 0;
2390         } else {
2391           builder->line = 0;
2392           builder->state = STATE_READ_LINES;
2393           conn->may_cancel = FALSE;
2394         }
2395         break;
2396       }
2397       case STATE_DATA_HEADER:
2398       {
2399         res =
2400             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
2401             block);
2402         if (res != GST_RTSP_OK)
2403           goto done;
2404
2405         gst_rtsp_message_init_data (message, builder->buffer[1]);
2406
2407         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
2408         builder->body_data = g_malloc (builder->body_len + 1);
2409         builder->body_data[builder->body_len] = '\0';
2410         builder->offset = 0;
2411         builder->state = STATE_DATA_BODY;
2412         break;
2413       }
2414       case STATE_DATA_BODY:
2415       {
2416         res =
2417             read_bytes (conn, builder->body_data, &builder->offset,
2418             builder->body_len, block);
2419         if (res != GST_RTSP_OK)
2420           goto done;
2421
2422         /* we have the complete body now, store in the message adjusting the
2423          * length to include the trailing '\0' */
2424         gst_rtsp_message_take_body (message,
2425             (guint8 *) builder->body_data, builder->body_len + 1);
2426         builder->body_data = NULL;
2427         builder->body_len = 0;
2428
2429         builder->state = STATE_END;
2430         break;
2431       }
2432       case STATE_READ_LINES:
2433       {
2434         res = read_line (conn, builder->buffer, &builder->offset,
2435             sizeof (builder->buffer), block);
2436         if (res != GST_RTSP_OK)
2437           goto done;
2438
2439         /* we have a regular response */
2440         if (builder->buffer[0] == '\0') {
2441           gchar *hdrval;
2442           gint64 content_length_parsed = 0;
2443
2444           /* empty line, end of message header */
2445           /* see if there is a Content-Length header, but ignore it if this
2446            * is a POST request with an x-sessioncookie header */
2447           if (gst_rtsp_message_get_header (message,
2448                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK &&
2449               (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST ||
2450                   message->type_data.request.method != GST_RTSP_POST ||
2451                   gst_rtsp_message_get_header (message,
2452                       GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
2453             /* there is, prepare to read the body */
2454             errno = 0;
2455             content_length_parsed = g_ascii_strtoll (hdrval, NULL, 10);
2456             if (errno != 0 || content_length_parsed < 0) {
2457               res = GST_RTSP_EPARSE;
2458               goto invalid_body_len;
2459             } else if (content_length_parsed > conn->content_length_limit) {
2460               res = GST_RTSP_ENOMEM;
2461               goto invalid_body_len;
2462             }
2463             builder->body_len = content_length_parsed;
2464             builder->body_data = g_try_malloc (builder->body_len + 1);
2465             /* we can't do much here, we need the length to know how many bytes
2466              * we need to read next and when allocation fails, we can't read the payload. */
2467             if (builder->body_data == NULL) {
2468               res = GST_RTSP_ENOMEM;
2469               goto invalid_body_len;
2470             }
2471
2472             builder->body_data[builder->body_len] = '\0';
2473             builder->offset = 0;
2474             builder->state = STATE_DATA_BODY;
2475           } else {
2476             builder->state = STATE_END;
2477           }
2478           break;
2479         }
2480
2481         /* we have a line */
2482         normalize_line (builder->buffer);
2483         if (builder->line == 0) {
2484           /* first line, check for response status */
2485           if (memcmp (builder->buffer, "RTSP", 4) == 0 ||
2486               memcmp (builder->buffer, "HTTP", 4) == 0) {
2487             builder->status = parse_response_status (builder->buffer, message);
2488           } else {
2489             builder->status = parse_request_line (builder->buffer, message);
2490           }
2491         } else {
2492           /* else just parse the line */
2493           res = parse_line (builder->buffer, message);
2494           if (res != GST_RTSP_OK)
2495             builder->status = res;
2496         }
2497         if (builder->status != GST_RTSP_OK) {
2498           res = builder->status;
2499           goto invalid_format;
2500         }
2501
2502         builder->line++;
2503         builder->offset = 0;
2504         break;
2505       }
2506       case STATE_END:
2507       {
2508         gchar *session_cookie;
2509         gchar *session_id;
2510
2511         conn->may_cancel = TRUE;
2512
2513         if ((res = cseq_validation (conn, message)) != GST_RTSP_OK) {
2514           /* message don't comply with rfc2326 regarding CSeq */
2515           goto invalid_format;
2516         }
2517
2518         if (message->type == GST_RTSP_MESSAGE_DATA) {
2519           /* data messages don't have headers */
2520           res = GST_RTSP_OK;
2521           goto done;
2522         }
2523
2524         /* save the tunnel session in the connection */
2525         if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST &&
2526             !conn->manual_http &&
2527             conn->tstate == TUNNEL_STATE_NONE &&
2528             gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE,
2529                 &session_cookie, 0) == GST_RTSP_OK) {
2530           strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN);
2531           conn->tunnelid[TUNNELID_LEN - 1] = '\0';
2532           conn->tunneled = TRUE;
2533         }
2534
2535         /* save session id in the connection for further use */
2536         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2537             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
2538                 &session_id, 0) == GST_RTSP_OK) {
2539           gint maxlen, i;
2540
2541           maxlen = sizeof (conn->session_id) - 1;
2542           /* the sessionid can have attributes marked with ;
2543            * Make sure we strip them */
2544           for (i = 0; i < maxlen && session_id[i] != '\0'; i++) {
2545             if (session_id[i] == ';') {
2546               maxlen = i;
2547               /* parse timeout */
2548               do {
2549                 i++;
2550               } while (g_ascii_isspace (session_id[i]));
2551               if (g_str_has_prefix (&session_id[i], "timeout=")) {
2552                 gint to;
2553
2554                 /* if we parsed something valid, configure */
2555                 if ((to = atoi (&session_id[i + 8])) > 0)
2556                   conn->timeout = to;
2557               }
2558               break;
2559             }
2560           }
2561
2562           /* make sure to not overflow */
2563           if (conn->remember_session_id) {
2564             strncpy (conn->session_id, session_id, maxlen);
2565             conn->session_id[maxlen] = '\0';
2566           }
2567         }
2568         res = builder->status;
2569         goto done;
2570       }
2571       default:
2572         res = GST_RTSP_ERROR;
2573         goto done;
2574     }
2575   }
2576 done:
2577   conn->may_cancel = TRUE;
2578   return res;
2579
2580   /* ERRORS */
2581 invalid_body_len:
2582   {
2583     conn->may_cancel = TRUE;
2584     GST_DEBUG ("could not allocate body");
2585     return res;
2586   }
2587 invalid_format:
2588   {
2589     conn->may_cancel = TRUE;
2590     GST_DEBUG ("could not parse");
2591     return res;
2592   }
2593 }
2594
2595 /**
2596  * gst_rtsp_connection_read_usec:
2597  * @conn: a #GstRTSPConnection
2598  * @data: the data to read
2599  * @size: the size of @data
2600  * @timeout: a timeout value in microseconds
2601  *
2602  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
2603  * the specified @timeout. @timeout can be 0, in which case this function
2604  * might block forever.
2605  *
2606  * This function can be cancelled with gst_rtsp_connection_flush().
2607  *
2608  * Returns: #GST_RTSP_OK on success.
2609  *
2610  * Since: 1.18
2611  */
2612 GstRTSPResult
2613 gst_rtsp_connection_read_usec (GstRTSPConnection * conn, guint8 * data,
2614     guint size, gint64 timeout)
2615 {
2616   guint offset;
2617   GstClockTime to;
2618   GstRTSPResult res;
2619
2620   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2621   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
2622   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2623
2624   if (G_UNLIKELY (size == 0))
2625     return GST_RTSP_OK;
2626
2627   offset = 0;
2628
2629   /* configure timeout if any */
2630   to = timeout * 1000;
2631
2632   g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2633   res = read_bytes (conn, data, &offset, size, TRUE);
2634   g_socket_set_timeout (conn->read_socket, 0);
2635
2636   return res;
2637 }
2638
2639 static GstRTSPMessage *
2640 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
2641     const GstRTSPMessage * request)
2642 {
2643   GstRTSPMessage *msg;
2644   GstRTSPResult res;
2645
2646   if (gst_rtsp_status_as_text (code) == NULL)
2647     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
2648
2649   GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request),
2650       no_message);
2651
2652   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER,
2653       "GStreamer RTSP Server");
2654   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close");
2655   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store");
2656   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
2657
2658   if (code == GST_RTSP_STS_OK) {
2659     /* add the local ip address to the tunnel reply, this is where the client
2660      * should send the POST request to */
2661     if (conn->local_ip)
2662       gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
2663           conn->local_ip);
2664     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
2665         "application/x-rtsp-tunnelled");
2666   }
2667
2668   return msg;
2669
2670   /* ERRORS */
2671 no_message:
2672   {
2673     return NULL;
2674   }
2675 }
2676
2677 /**
2678  * gst_rtsp_connection_receive_usec:
2679  * @conn: a #GstRTSPConnection
2680  * @message: the message to read
2681  * @timeout: a timeout value or 0
2682  *
2683  * Attempt to read into @message from the connected @conn, blocking up to
2684  * the specified @timeout. @timeout can be 0, in which case this function
2685  * might block forever.
2686  *
2687  * This function can be cancelled with gst_rtsp_connection_flush().
2688  *
2689  * Returns: #GST_RTSP_OK on success.
2690  *
2691  * Since: 1.18
2692  */
2693 GstRTSPResult
2694 gst_rtsp_connection_receive_usec (GstRTSPConnection * conn,
2695     GstRTSPMessage * message, gint64 timeout)
2696 {
2697   GstRTSPResult res;
2698   GstRTSPBuilder builder;
2699   GstClockTime to;
2700
2701   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2702   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2703   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2704
2705   /* configure timeout if any */
2706   to = timeout * 1000;
2707
2708   g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2709   memset (&builder, 0, sizeof (GstRTSPBuilder));
2710   res = build_next (&builder, message, conn, TRUE);
2711   g_socket_set_timeout (conn->read_socket, 0);
2712
2713   if (G_UNLIKELY (res != GST_RTSP_OK))
2714     goto read_error;
2715
2716   if (!conn->manual_http) {
2717     if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2718       if (conn->tstate == TUNNEL_STATE_NONE &&
2719           message->type_data.request.method == GST_RTSP_GET) {
2720         GstRTSPMessage *response;
2721
2722         conn->tstate = TUNNEL_STATE_GET;
2723
2724         /* tunnel GET request, we can reply now */
2725         response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
2726         res = gst_rtsp_connection_send_usec (conn, response, timeout);
2727         gst_rtsp_message_free (response);
2728         if (res == GST_RTSP_OK)
2729           res = GST_RTSP_ETGET;
2730         goto cleanup;
2731       } else if (conn->tstate == TUNNEL_STATE_NONE &&
2732           message->type_data.request.method == GST_RTSP_POST) {
2733         conn->tstate = TUNNEL_STATE_POST;
2734
2735         /* tunnel POST request, the caller now has to link the two
2736          * connections. */
2737         res = GST_RTSP_ETPOST;
2738         goto cleanup;
2739       } else {
2740         res = GST_RTSP_EPARSE;
2741         goto cleanup;
2742       }
2743     } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
2744       res = GST_RTSP_EPARSE;
2745       goto cleanup;
2746     }
2747   }
2748
2749   /* we have a message here */
2750   build_reset (&builder);
2751
2752   return GST_RTSP_OK;
2753
2754   /* ERRORS */
2755 read_error:
2756 cleanup:
2757   {
2758     build_reset (&builder);
2759     gst_rtsp_message_unset (message);
2760     return res;
2761   }
2762 }
2763
2764 /**
2765  * gst_rtsp_connection_close:
2766  * @conn: a #GstRTSPConnection
2767  *
2768  * Close the connected @conn. After this call, the connection is in the same
2769  * state as when it was first created.
2770  *
2771  * Returns: #GST_RTSP_OK on success.
2772  */
2773 GstRTSPResult
2774 gst_rtsp_connection_close (GstRTSPConnection * conn)
2775 {
2776   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2777
2778   /* last unref closes the connection we don't want to explicitly close here
2779    * because these sockets might have been provided at construction */
2780   if (conn->stream0) {
2781     g_object_unref (conn->stream0);
2782     conn->stream0 = NULL;
2783     conn->socket0 = NULL;
2784   }
2785   if (conn->stream1) {
2786     g_object_unref (conn->stream1);
2787     conn->stream1 = NULL;
2788     conn->socket1 = NULL;
2789   }
2790
2791   /* these were owned by the stream */
2792   conn->input_stream = NULL;
2793   conn->output_stream = NULL;
2794   conn->control_stream = NULL;
2795
2796   g_free (conn->remote_ip);
2797   conn->remote_ip = NULL;
2798   g_free (conn->local_ip);
2799   conn->local_ip = NULL;
2800
2801   conn->read_ahead = 0;
2802
2803   g_free (conn->initial_buffer);
2804   conn->initial_buffer = NULL;
2805   conn->initial_buffer_offset = 0;
2806
2807   conn->write_socket = NULL;
2808   conn->read_socket = NULL;
2809   conn->tunneled = FALSE;
2810   conn->tstate = TUNNEL_STATE_NONE;
2811   conn->ctxp = NULL;
2812   g_free (conn->username);
2813   conn->username = NULL;
2814   g_free (conn->passwd);
2815   conn->passwd = NULL;
2816   gst_rtsp_connection_clear_auth_params (conn);
2817   conn->timeout = 60;
2818   conn->cseq = 0;
2819   conn->session_id[0] = '\0';
2820
2821   return GST_RTSP_OK;
2822 }
2823
2824 /**
2825  * gst_rtsp_connection_free:
2826  * @conn: a #GstRTSPConnection
2827  *
2828  * Close and free @conn.
2829  *
2830  * Returns: #GST_RTSP_OK on success.
2831  */
2832 GstRTSPResult
2833 gst_rtsp_connection_free (GstRTSPConnection * conn)
2834 {
2835   GstRTSPResult res;
2836
2837   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2838
2839   res = gst_rtsp_connection_close (conn);
2840
2841   if (conn->cancellable)
2842     g_object_unref (conn->cancellable);
2843   if (conn->client)
2844     g_object_unref (conn->client);
2845   if (conn->tls_database)
2846     g_object_unref (conn->tls_database);
2847   if (conn->tls_interaction)
2848     g_object_unref (conn->tls_interaction);
2849   if (conn->accept_certificate_destroy_notify)
2850     conn->
2851         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
2852
2853   g_timer_destroy (conn->timer);
2854   gst_rtsp_url_free (conn->url);
2855   g_free (conn->proxy_host);
2856   g_free (conn);
2857
2858   return res;
2859 }
2860
2861 /**
2862  * gst_rtsp_connection_poll_usec:
2863  * @conn: a #GstRTSPConnection
2864  * @events: a bitmask of #GstRTSPEvent flags to check
2865  * @revents: location for result flags
2866  * @timeout: a timeout in microseconds
2867  *
2868  * Wait up to the specified @timeout for the connection to become available for
2869  * at least one of the operations specified in @events. When the function returns
2870  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2871  * @conn.
2872  *
2873  * @timeout can be 0, in which case this function might block forever.
2874  *
2875  * This function can be cancelled with gst_rtsp_connection_flush().
2876  *
2877  * Returns: #GST_RTSP_OK on success.
2878  *
2879  * Since: 1.18
2880  */
2881 GstRTSPResult
2882 gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
2883     GstRTSPEvent * revents, gint64 timeout)
2884 {
2885   GMainContext *ctx;
2886   GSource *rs, *ws, *ts;
2887   GIOCondition condition;
2888
2889   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2890   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2891   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2892   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2893   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
2894
2895   ctx = g_main_context_new ();
2896
2897   /* configure timeout if any */
2898   if (timeout) {
2899     ts = g_timeout_source_new (timeout / 1000);
2900     g_source_set_dummy_callback (ts);
2901     g_source_attach (ts, ctx);
2902     g_source_unref (ts);
2903   }
2904
2905   if (events & GST_RTSP_EV_READ) {
2906     rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
2907         conn->cancellable);
2908     g_source_set_dummy_callback (rs);
2909     g_source_attach (rs, ctx);
2910     g_source_unref (rs);
2911   }
2912
2913   if (events & GST_RTSP_EV_WRITE) {
2914     ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
2915         conn->cancellable);
2916     g_source_set_dummy_callback (ws);
2917     g_source_attach (ws, ctx);
2918     g_source_unref (ws);
2919   }
2920
2921   /* Returns after handling all pending events */
2922   while (!g_main_context_iteration (ctx, TRUE));
2923
2924   g_main_context_unref (ctx);
2925
2926   *revents = 0;
2927   if (events & GST_RTSP_EV_READ) {
2928     condition = g_socket_condition_check (conn->read_socket,
2929         G_IO_IN | G_IO_PRI);
2930     if ((condition & G_IO_IN) || (condition & G_IO_PRI))
2931       *revents |= GST_RTSP_EV_READ;
2932   }
2933   if (events & GST_RTSP_EV_WRITE) {
2934     condition = g_socket_condition_check (conn->write_socket, G_IO_OUT);
2935     if ((condition & G_IO_OUT))
2936       *revents |= GST_RTSP_EV_WRITE;
2937   }
2938
2939   if (*revents == 0)
2940     return GST_RTSP_ETIMEOUT;
2941
2942   return GST_RTSP_OK;
2943 }
2944
2945 /**
2946  * gst_rtsp_connection_next_timeout_usec:
2947  * @conn: a #GstRTSPConnection
2948  *
2949  * Calculate the next timeout for @conn
2950  *
2951  * Returns: #the next timeout in microseconds
2952  *
2953  * Since: 1.18
2954  */
2955 gint64
2956 gst_rtsp_connection_next_timeout_usec (GstRTSPConnection * conn)
2957 {
2958   gdouble elapsed;
2959   gulong usec;
2960   gint ctimeout;
2961   gint64 timeout = 0;
2962
2963   g_return_val_if_fail (conn != NULL, 1);
2964
2965   ctimeout = conn->timeout;
2966   if (ctimeout >= 20) {
2967     /* Because we should act before the timeout we timeout 5
2968      * seconds in advance. */
2969     ctimeout -= 5;
2970   } else if (ctimeout >= 5) {
2971     /* else timeout 20% earlier */
2972     ctimeout -= ctimeout / 5;
2973   } else if (ctimeout >= 1) {
2974     /* else timeout 1 second earlier */
2975     ctimeout -= 1;
2976   }
2977
2978   elapsed = g_timer_elapsed (conn->timer, &usec);
2979   if (elapsed >= ctimeout) {
2980     timeout = 0;
2981   } else {
2982     gint64 sec = ctimeout - elapsed;
2983     if (usec <= G_USEC_PER_SEC)
2984       usec = G_USEC_PER_SEC - usec;
2985     else
2986       usec = 0;
2987     timeout = usec + sec * G_USEC_PER_SEC;
2988   }
2989
2990   return timeout;
2991 }
2992
2993 /**
2994  * gst_rtsp_connection_reset_timeout:
2995  * @conn: a #GstRTSPConnection
2996  *
2997  * Reset the timeout of @conn.
2998  *
2999  * Returns: #GST_RTSP_OK.
3000  */
3001 GstRTSPResult
3002 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
3003 {
3004   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3005
3006   g_timer_start (conn->timer);
3007
3008   return GST_RTSP_OK;
3009 }
3010
3011 /**
3012  * gst_rtsp_connection_flush:
3013  * @conn: a #GstRTSPConnection
3014  * @flush: start or stop the flush
3015  *
3016  * Start or stop the flushing action on @conn. When flushing, all current
3017  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
3018  * is set to non-flushing mode again.
3019  *
3020  * Returns: #GST_RTSP_OK.
3021  */
3022 GstRTSPResult
3023 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
3024 {
3025   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3026
3027   if (flush) {
3028     g_cancellable_cancel (conn->cancellable);
3029   } else {
3030     g_object_unref (conn->cancellable);
3031     conn->cancellable = g_cancellable_new ();
3032   }
3033
3034   return GST_RTSP_OK;
3035 }
3036
3037 /**
3038  * gst_rtsp_connection_set_proxy:
3039  * @conn: a #GstRTSPConnection
3040  * @host: the proxy host
3041  * @port: the proxy port
3042  *
3043  * Set the proxy host and port.
3044  *
3045  * Returns: #GST_RTSP_OK.
3046  */
3047 GstRTSPResult
3048 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
3049     const gchar * host, guint port)
3050 {
3051   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3052
3053   g_free (conn->proxy_host);
3054   conn->proxy_host = g_strdup (host);
3055   conn->proxy_port = port;
3056
3057   return GST_RTSP_OK;
3058 }
3059
3060 /**
3061  * gst_rtsp_connection_set_auth:
3062  * @conn: a #GstRTSPConnection
3063  * @method: authentication method
3064  * @user: the user
3065  * @pass: the password
3066  *
3067  * Configure @conn for authentication mode @method with @user and @pass as the
3068  * user and password respectively.
3069  *
3070  * Returns: #GST_RTSP_OK.
3071  */
3072 GstRTSPResult
3073 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
3074     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
3075 {
3076   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3077
3078   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
3079           || g_strrstr (user, ":") != NULL))
3080     return GST_RTSP_EINVAL;
3081
3082   /* Make sure the username and passwd are being set for authentication */
3083   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
3084     return GST_RTSP_EINVAL;
3085
3086   /* ":" chars are not allowed in usernames for basic auth */
3087   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
3088     return GST_RTSP_EINVAL;
3089
3090   g_free (conn->username);
3091   g_free (conn->passwd);
3092
3093   conn->auth_method = method;
3094   conn->username = g_strdup (user);
3095   conn->passwd = g_strdup (pass);
3096
3097   return GST_RTSP_OK;
3098 }
3099
3100 /**
3101  * str_case_hash:
3102  * @key: ASCII string to hash
3103  *
3104  * Hashes @key in a case-insensitive manner.
3105  *
3106  * Returns: the hash code.
3107  **/
3108 static guint
3109 str_case_hash (gconstpointer key)
3110 {
3111   const char *p = key;
3112   guint h = g_ascii_toupper (*p);
3113
3114   if (h)
3115     for (p += 1; *p != '\0'; p++)
3116       h = (h << 5) - h + g_ascii_toupper (*p);
3117
3118   return h;
3119 }
3120
3121 /**
3122  * str_case_equal:
3123  * @v1: an ASCII string
3124  * @v2: another ASCII string
3125  *
3126  * Compares @v1 and @v2 in a case-insensitive manner
3127  *
3128  * Returns: %TRUE if they are equal (modulo case)
3129  **/
3130 static gboolean
3131 str_case_equal (gconstpointer v1, gconstpointer v2)
3132 {
3133   const char *string1 = v1;
3134   const char *string2 = v2;
3135
3136   return g_ascii_strcasecmp (string1, string2) == 0;
3137 }
3138
3139 /**
3140  * gst_rtsp_connection_set_auth_param:
3141  * @conn: a #GstRTSPConnection
3142  * @param: authentication directive
3143  * @value: value
3144  *
3145  * Setup @conn with authentication directives. This is not necessary for
3146  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
3147  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
3148  * in the WWW-Authenticate response header and can include realm, domain,
3149  * nonce, opaque, stale, algorithm, qop as per RFC2617.
3150  */
3151 void
3152 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
3153     const gchar * param, const gchar * value)
3154 {
3155   g_return_if_fail (conn != NULL);
3156   g_return_if_fail (param != NULL);
3157
3158   if (conn->auth_params == NULL) {
3159     conn->auth_params =
3160         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
3161   }
3162   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
3163 }
3164
3165 /**
3166  * gst_rtsp_connection_clear_auth_params:
3167  * @conn: a #GstRTSPConnection
3168  *
3169  * Clear the list of authentication directives stored in @conn.
3170  */
3171 void
3172 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
3173 {
3174   g_return_if_fail (conn != NULL);
3175
3176   if (conn->auth_params != NULL) {
3177     g_hash_table_destroy (conn->auth_params);
3178     conn->auth_params = NULL;
3179   }
3180 }
3181
3182 static GstRTSPResult
3183 set_qos_dscp (GSocket * socket, guint qos_dscp)
3184 {
3185 #ifndef IP_TOS
3186   GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
3187   return GST_RTSP_OK;
3188 #else
3189   gint fd;
3190   union gst_sockaddr sa;
3191   socklen_t slen = sizeof (sa);
3192   gint af;
3193   gint tos;
3194
3195   if (!socket)
3196     return GST_RTSP_OK;
3197
3198   fd = g_socket_get_fd (socket);
3199   if (getsockname (fd, &sa.sa, &slen) < 0)
3200     goto no_getsockname;
3201
3202   af = sa.sa.sa_family;
3203
3204   /* if this is an IPv4-mapped address then do IPv4 QoS */
3205   if (af == AF_INET6) {
3206     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
3207       af = AF_INET;
3208   }
3209
3210   /* extract and shift 6 bits of the DSCP */
3211   tos = (qos_dscp & 0x3f) << 2;
3212
3213 #ifdef G_OS_WIN32
3214 #  define SETSOCKOPT_ARG4_TYPE const char *
3215 #else
3216 #  define SETSOCKOPT_ARG4_TYPE const void *
3217 #endif
3218
3219   switch (af) {
3220     case AF_INET:
3221       if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos,
3222               sizeof (tos)) < 0)
3223         goto no_setsockopt;
3224       break;
3225     case AF_INET6:
3226 #ifdef IPV6_TCLASS
3227       if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS,
3228               (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0)
3229         goto no_setsockopt;
3230       break;
3231 #endif
3232     default:
3233       goto wrong_family;
3234   }
3235
3236   return GST_RTSP_OK;
3237
3238   /* ERRORS */
3239 no_getsockname:
3240 no_setsockopt:
3241   {
3242     return GST_RTSP_ESYS;
3243   }
3244 wrong_family:
3245   {
3246     return GST_RTSP_ERROR;
3247   }
3248 #endif
3249 }
3250
3251 /**
3252  * gst_rtsp_connection_set_qos_dscp:
3253  * @conn: a #GstRTSPConnection
3254  * @qos_dscp: DSCP value
3255  *
3256  * Configure @conn to use the specified DSCP value.
3257  *
3258  * Returns: #GST_RTSP_OK on success.
3259  */
3260 GstRTSPResult
3261 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
3262 {
3263   GstRTSPResult res;
3264
3265   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3266   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
3267   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
3268
3269   res = set_qos_dscp (conn->socket0, qos_dscp);
3270   if (res == GST_RTSP_OK)
3271     res = set_qos_dscp (conn->socket1, qos_dscp);
3272
3273   return res;
3274 }
3275
3276 /**
3277  * gst_rtsp_connection_set_content_length_limit:
3278  * @conn: a #GstRTSPConnection
3279  * @limit: Content-Length limit
3280  *
3281  * Configure @conn to use the specified Content-Length limit.
3282  * Both requests and responses are validated. If content-length is
3283  * exceeded, ENOMEM error will be returned.
3284  *
3285  * Since: 1.18
3286  */
3287 void
3288 gst_rtsp_connection_set_content_length_limit (GstRTSPConnection * conn,
3289     guint limit)
3290 {
3291   g_return_if_fail (conn != NULL);
3292
3293   conn->content_length_limit = limit;
3294 }
3295
3296 /**
3297  * gst_rtsp_connection_get_url:
3298  * @conn: a #GstRTSPConnection
3299  *
3300  * Retrieve the URL of the other end of @conn.
3301  *
3302  * Returns: The URL. This value remains valid until the
3303  * connection is freed.
3304  */
3305 GstRTSPUrl *
3306 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
3307 {
3308   g_return_val_if_fail (conn != NULL, NULL);
3309
3310   return conn->url;
3311 }
3312
3313 /**
3314  * gst_rtsp_connection_get_ip:
3315  * @conn: a #GstRTSPConnection
3316  *
3317  * Retrieve the IP address of the other end of @conn.
3318  *
3319  * Returns: The IP address as a string. this value remains valid until the
3320  * connection is closed.
3321  */
3322 const gchar *
3323 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
3324 {
3325   g_return_val_if_fail (conn != NULL, NULL);
3326
3327   return conn->remote_ip;
3328 }
3329
3330 /**
3331  * gst_rtsp_connection_set_ip:
3332  * @conn: a #GstRTSPConnection
3333  * @ip: an ip address
3334  *
3335  * Set the IP address of the server.
3336  */
3337 void
3338 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
3339 {
3340   g_return_if_fail (conn != NULL);
3341
3342   g_free (conn->remote_ip);
3343   conn->remote_ip = g_strdup (ip);
3344 }
3345
3346 /**
3347  * gst_rtsp_connection_get_read_socket:
3348  * @conn: a #GstRTSPConnection
3349  *
3350  * Get the file descriptor for reading.
3351  *
3352  * Returns: (transfer none): the file descriptor used for reading or %NULL on
3353  * error. The file descriptor remains valid until the connection is closed.
3354  */
3355 GSocket *
3356 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
3357 {
3358   g_return_val_if_fail (conn != NULL, NULL);
3359   g_return_val_if_fail (conn->read_socket != NULL, NULL);
3360
3361   return conn->read_socket;
3362 }
3363
3364 /**
3365  * gst_rtsp_connection_get_write_socket:
3366  * @conn: a #GstRTSPConnection
3367  *
3368  * Get the file descriptor for writing.
3369  *
3370  * Returns: (transfer none): the file descriptor used for writing or NULL on
3371  * error. The file descriptor remains valid until the connection is closed.
3372  */
3373 GSocket *
3374 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
3375 {
3376   g_return_val_if_fail (conn != NULL, NULL);
3377   g_return_val_if_fail (conn->write_socket != NULL, NULL);
3378
3379   return conn->write_socket;
3380 }
3381
3382 /**
3383  * gst_rtsp_connection_set_http_mode:
3384  * @conn: a #GstRTSPConnection
3385  * @enable: %TRUE to enable manual HTTP mode
3386  *
3387  * By setting the HTTP mode to %TRUE the message parsing will support HTTP
3388  * messages in addition to the RTSP messages. It will also disable the
3389  * automatic handling of setting up an HTTP tunnel.
3390  */
3391 void
3392 gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
3393 {
3394   g_return_if_fail (conn != NULL);
3395
3396   conn->manual_http = enable;
3397 }
3398
3399 /**
3400  * gst_rtsp_connection_set_tunneled:
3401  * @conn: a #GstRTSPConnection
3402  * @tunneled: the new state
3403  *
3404  * Set the HTTP tunneling state of the connection. This must be configured before
3405  * the @conn is connected.
3406  */
3407 void
3408 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
3409 {
3410   g_return_if_fail (conn != NULL);
3411   g_return_if_fail (conn->read_socket == NULL);
3412   g_return_if_fail (conn->write_socket == NULL);
3413
3414   conn->tunneled = tunneled;
3415 }
3416
3417 /**
3418  * gst_rtsp_connection_is_tunneled:
3419  * @conn: a #GstRTSPConnection
3420  *
3421  * Get the tunneling state of the connection.
3422  *
3423  * Returns: if @conn is using HTTP tunneling.
3424  */
3425 gboolean
3426 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
3427 {
3428   g_return_val_if_fail (conn != NULL, FALSE);
3429
3430   return conn->tunneled;
3431 }
3432
3433 /**
3434  * gst_rtsp_connection_get_tunnelid:
3435  * @conn: a #GstRTSPConnection
3436  *
3437  * Get the tunnel session id the connection.
3438  *
3439  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
3440  */
3441 const gchar *
3442 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
3443 {
3444   g_return_val_if_fail (conn != NULL, NULL);
3445
3446   if (!conn->tunneled)
3447     return NULL;
3448
3449   return conn->tunnelid;
3450 }
3451
3452 /**
3453  * gst_rtsp_connection_do_tunnel:
3454  * @conn: a #GstRTSPConnection
3455  * @conn2: a #GstRTSPConnection or %NULL
3456  *
3457  * If @conn received the first tunnel connection and @conn2 received
3458  * the second tunnel connection, link the two connections together so that
3459  * @conn manages the tunneled connection.
3460  *
3461  * After this call, @conn2 cannot be used anymore and must be freed with
3462  * gst_rtsp_connection_free().
3463  *
3464  * If @conn2 is %NULL then only the base64 decoding context will be setup for
3465  * @conn.
3466  *
3467  * Returns: return GST_RTSP_OK on success.
3468  */
3469 GstRTSPResult
3470 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
3471     GstRTSPConnection * conn2)
3472 {
3473   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3474
3475   if (conn2 != NULL) {
3476     GstRTSPTunnelState ts1 = conn->tstate;
3477     GstRTSPTunnelState ts2 = conn2->tstate;
3478
3479     g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST)
3480         || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET),
3481         GST_RTSP_EINVAL);
3482     g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid,
3483             TUNNELID_LEN), GST_RTSP_EINVAL);
3484
3485     /* both connections have socket0 as the read/write socket */
3486     if (ts1 == TUNNEL_STATE_GET) {
3487       /* conn2 is the HTTP POST channel. take its socket and set it as read
3488        * socket in conn */
3489       conn->socket1 = conn2->socket0;
3490       conn->stream1 = conn2->stream0;
3491       conn->input_stream = conn2->input_stream;
3492       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3493       conn2->output_stream = NULL;
3494     } else {
3495       /* conn2 is the HTTP GET channel. take its socket and set it as write
3496        * socket in conn */
3497       conn->socket1 = conn->socket0;
3498       conn->stream1 = conn->stream0;
3499       conn->socket0 = conn2->socket0;
3500       conn->stream0 = conn2->stream0;
3501       conn->output_stream = conn2->output_stream;
3502       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3503     }
3504
3505     /* clean up some of the state of conn2 */
3506     g_cancellable_cancel (conn2->cancellable);
3507     conn2->write_socket = conn2->read_socket = NULL;
3508     conn2->socket0 = NULL;
3509     conn2->stream0 = NULL;
3510     conn2->socket1 = NULL;
3511     conn2->stream1 = NULL;
3512     conn2->input_stream = NULL;
3513     conn2->control_stream = NULL;
3514     g_object_unref (conn2->cancellable);
3515     conn2->cancellable = NULL;
3516
3517     /* We make socket0 the write socket and socket1 the read socket. */
3518     conn->write_socket = conn->socket0;
3519     conn->read_socket = conn->socket1;
3520
3521     conn->tstate = TUNNEL_STATE_COMPLETE;
3522
3523     g_free (conn->initial_buffer);
3524     conn->initial_buffer = conn2->initial_buffer;
3525     conn2->initial_buffer = NULL;
3526     conn->initial_buffer_offset = conn2->initial_buffer_offset;
3527   }
3528
3529   /* we need base64 decoding for the readfd */
3530   conn->ctx.state = 0;
3531   conn->ctx.save = 0;
3532   conn->ctx.cout = 0;
3533   conn->ctx.coutl = 0;
3534   conn->ctxp = &conn->ctx;
3535
3536   return GST_RTSP_OK;
3537 }
3538
3539 /**
3540  * gst_rtsp_connection_set_remember_session_id:
3541  * @conn: a #GstRTSPConnection
3542  * @remember: %TRUE if the connection should remember the session id
3543  *
3544  * Sets if the #GstRTSPConnection should remember the session id from the last
3545  * response received and force it onto any further requests.
3546  *
3547  * The default value is %TRUE
3548  */
3549
3550 void
3551 gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
3552     gboolean remember)
3553 {
3554   conn->remember_session_id = remember;
3555   if (!remember)
3556     conn->session_id[0] = '\0';
3557 }
3558
3559 /**
3560  * gst_rtsp_connection_get_remember_session_id:
3561  * @conn: a #GstRTSPConnection
3562  *
3563  * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
3564  * last response to set it on any further request.
3565  */
3566
3567 gboolean
3568 gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
3569 {
3570   return conn->remember_session_id;
3571 }
3572
3573
3574 #define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3575 #define READ_COND   (G_IO_IN | READ_ERR)
3576 #define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3577 #define WRITE_COND  (G_IO_OUT | WRITE_ERR)
3578
3579 /* async functions */
3580 struct _GstRTSPWatch
3581 {
3582   GSource source;
3583
3584   GstRTSPConnection *conn;
3585
3586   GstRTSPBuilder builder;
3587   GstRTSPMessage message;
3588
3589   GSource *readsrc;
3590   GSource *writesrc;
3591   GSource *controlsrc;
3592
3593   gboolean keep_running;
3594
3595   /* queued message for transmission */
3596   guint id;
3597   GMutex mutex;
3598   GstQueueArray *messages;
3599   gsize messages_bytes;
3600   guint messages_count;
3601
3602   gsize max_bytes;
3603   guint max_messages;
3604   GCond queue_not_full;
3605   gboolean flushing;
3606
3607   GstRTSPWatchFuncs funcs;
3608
3609   gpointer user_data;
3610   GDestroyNotify notify;
3611 };
3612
3613 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
3614       ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
3615
3616 static gboolean
3617 gst_rtsp_source_prepare (GSource * source, gint * timeout)
3618 {
3619   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3620
3621   if (watch->conn->initial_buffer != NULL)
3622     return TRUE;
3623
3624   *timeout = (watch->conn->timeout * 1000);
3625
3626   return FALSE;
3627 }
3628
3629 static gboolean
3630 gst_rtsp_source_check (GSource * source)
3631 {
3632   return FALSE;
3633 }
3634
3635 static gboolean
3636 gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
3637     GstRTSPWatch * watch)
3638 {
3639   gssize count;
3640   guint8 buffer[1024];
3641   GError *error = NULL;
3642
3643   /* try to read in order to be able to detect errors, we read 1k in case some
3644    * client actually decides to send data on the GET channel */
3645   count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
3646       &error);
3647   if (count == 0) {
3648     /* other end closed the socket */
3649     goto eof;
3650   }
3651
3652   if (count < 0) {
3653     GST_DEBUG ("%s", error->message);
3654     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
3655         g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
3656       g_clear_error (&error);
3657       goto done;
3658     }
3659     g_clear_error (&error);
3660     goto read_error;
3661   }
3662
3663   /* client sent data on the GET channel, ignore it */
3664
3665 done:
3666   return TRUE;
3667
3668   /* ERRORS */
3669 eof:
3670   {
3671     if (watch->funcs.closed)
3672       watch->funcs.closed (watch, watch->user_data);
3673
3674     /* the read connection was closed, stop the watch now */
3675     watch->keep_running = FALSE;
3676
3677     return FALSE;
3678   }
3679 read_error:
3680   {
3681     if (watch->funcs.error_full)
3682       watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
3683           0, watch->user_data);
3684     else if (watch->funcs.error)
3685       watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
3686
3687     goto eof;
3688   }
3689 }
3690
3691 static gboolean
3692 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
3693     GstRTSPWatch * watch)
3694 {
3695   GstRTSPResult res = GST_RTSP_ERROR;
3696   GstRTSPConnection *conn = watch->conn;
3697
3698   /* if this connection was already closed, stop now */
3699   if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
3700     goto eof;
3701
3702   res = build_next (&watch->builder, &watch->message, conn, FALSE);
3703   if (res == GST_RTSP_EINTR)
3704     goto done;
3705   else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
3706     g_mutex_lock (&watch->mutex);
3707     if (watch->readsrc) {
3708       if (!g_source_is_destroyed ((GSource *) watch))
3709         g_source_remove_child_source ((GSource *) watch, watch->readsrc);
3710       g_source_unref (watch->readsrc);
3711       watch->readsrc = NULL;
3712     }
3713
3714     if (conn->stream1) {
3715       g_object_unref (conn->stream1);
3716       conn->stream1 = NULL;
3717       conn->socket1 = NULL;
3718       conn->input_stream = NULL;
3719     }
3720     g_mutex_unlock (&watch->mutex);
3721
3722     /* When we are in tunnelled mode, the read socket can be closed and we
3723      * should be prepared for a new POST method to reopen it */
3724     if (conn->tstate == TUNNEL_STATE_COMPLETE) {
3725       /* remove the read connection for the tunnel */
3726       /* we accept a new POST request */
3727       conn->tstate = TUNNEL_STATE_GET;
3728       /* and signal that we lost our tunnel */
3729       if (watch->funcs.tunnel_lost)
3730         res = watch->funcs.tunnel_lost (watch, watch->user_data);
3731       /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */
3732       g_mutex_lock (&watch->mutex);
3733       if (watch->conn->control_stream && !watch->controlsrc) {
3734         watch->controlsrc =
3735             g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3736             (watch->conn->control_stream), NULL);
3737         g_source_set_callback (watch->controlsrc,
3738             (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3739             NULL);
3740         g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3741       }
3742       g_mutex_unlock (&watch->mutex);
3743       goto read_done;
3744     } else
3745       goto eof;
3746   } else if (G_LIKELY (res == GST_RTSP_OK)) {
3747     if (!conn->manual_http &&
3748         watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3749       if (conn->tstate == TUNNEL_STATE_NONE &&
3750           watch->message.type_data.request.method == GST_RTSP_GET) {
3751         GstRTSPMessage *response;
3752         GstRTSPStatusCode code;
3753
3754         conn->tstate = TUNNEL_STATE_GET;
3755
3756         if (watch->funcs.tunnel_start)
3757           code = watch->funcs.tunnel_start (watch, watch->user_data);
3758         else
3759           code = GST_RTSP_STS_OK;
3760
3761         /* queue the response */
3762         response = gen_tunnel_reply (conn, code, &watch->message);
3763         if (watch->funcs.tunnel_http_response)
3764           watch->funcs.tunnel_http_response (watch, &watch->message, response,
3765               watch->user_data);
3766         gst_rtsp_watch_send_message (watch, response, NULL);
3767         gst_rtsp_message_free (response);
3768         goto read_done;
3769       } else if (conn->tstate == TUNNEL_STATE_NONE &&
3770           watch->message.type_data.request.method == GST_RTSP_POST) {
3771         conn->tstate = TUNNEL_STATE_POST;
3772
3773         /* in the callback the connection should be tunneled with the
3774          * GET connection */
3775         if (watch->funcs.tunnel_complete) {
3776           watch->funcs.tunnel_complete (watch, watch->user_data);
3777         }
3778         goto read_done;
3779       }
3780     }
3781   } else
3782     goto read_error;
3783
3784   if (!conn->manual_http) {
3785     /* if manual HTTP support is not enabled, then restore the message to
3786      * what it would have looked like without the support for parsing HTTP
3787      * messages being present */
3788     if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3789       watch->message.type = GST_RTSP_MESSAGE_REQUEST;
3790       watch->message.type_data.request.method = GST_RTSP_INVALID;
3791       if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
3792         watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
3793       res = GST_RTSP_EPARSE;
3794     } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
3795       watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
3796       if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
3797         watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
3798       res = GST_RTSP_EPARSE;
3799     }
3800   }
3801   if (G_LIKELY (res != GST_RTSP_OK))
3802     goto read_error;
3803
3804   if (watch->funcs.message_received)
3805     watch->funcs.message_received (watch, &watch->message, watch->user_data);
3806
3807 read_done:
3808   gst_rtsp_message_unset (&watch->message);
3809   build_reset (&watch->builder);
3810
3811 done:
3812   return TRUE;
3813
3814   /* ERRORS */
3815 eof:
3816   {
3817     if (watch->funcs.closed)
3818       watch->funcs.closed (watch, watch->user_data);
3819
3820     /* we closed the read connection, stop the watch now */
3821     watch->keep_running = FALSE;
3822
3823     /* always stop when the input returns EOF in non-tunneled mode */
3824     return FALSE;
3825   }
3826 read_error:
3827   {
3828     if (watch->funcs.error_full)
3829       watch->funcs.error_full (watch, res, &watch->message,
3830           0, watch->user_data);
3831     else if (watch->funcs.error)
3832       watch->funcs.error (watch, res, watch->user_data);
3833
3834     goto eof;
3835   }
3836 }
3837
3838 static gboolean
3839 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
3840     gpointer user_data G_GNUC_UNUSED)
3841 {
3842   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3843   GstRTSPConnection *conn = watch->conn;
3844
3845   if (conn->initial_buffer != NULL) {
3846     gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
3847         watch);
3848   }
3849   return watch->keep_running;
3850 }
3851
3852 static gboolean
3853 gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
3854     GstRTSPWatch * watch)
3855 {
3856   GstRTSPResult res = GST_RTSP_ERROR;
3857   GstRTSPConnection *conn = watch->conn;
3858
3859   /* if this connection was already closed, stop now */
3860   if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3861       !watch->messages)
3862     goto eof;
3863
3864   g_mutex_lock (&watch->mutex);
3865   do {
3866     guint n_messages = gst_queue_array_get_length (watch->messages);
3867     GOutputVector *vectors;
3868     GstMapInfo *map_infos;
3869     guint *ids;
3870     gsize bytes_to_write, bytes_written;
3871     guint n_vectors, n_memories, n_ids, drop_messages;
3872     gint i, j, l, n_mmap;
3873     GstRTSPSerializedMessage *msg;
3874
3875     /* if this connection was already closed, stop now */
3876     if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3877         !watch->messages) {
3878       g_mutex_unlock (&watch->mutex);
3879       goto eof;
3880     }
3881
3882     if (n_messages == 0) {
3883       if (watch->writesrc) {
3884         if (!g_source_is_destroyed ((GSource *) watch))
3885           g_source_remove_child_source ((GSource *) watch, watch->writesrc);
3886         g_source_unref (watch->writesrc);
3887         watch->writesrc = NULL;
3888         /* we create and add the write source again when we actually have
3889          * something to write */
3890
3891         /* since write source is now removed we add read source on the write
3892          * socket instead to be able to detect when client closes get channel
3893          * in tunneled mode */
3894         if (watch->conn->control_stream) {
3895           watch->controlsrc =
3896               g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3897               (watch->conn->control_stream), NULL);
3898           g_source_set_callback (watch->controlsrc,
3899               (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3900               NULL);
3901           g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3902         } else {
3903           watch->controlsrc = NULL;
3904         }
3905       }
3906       break;
3907     }
3908
3909     for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
3910       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3911       if (msg->id != 0)
3912         n_ids++;
3913
3914       if (msg->data_offset < msg->data_size)
3915         n_vectors++;
3916
3917       if (msg->body_data && msg->body_offset < msg->body_data_size) {
3918         n_vectors++;
3919       } else if (msg->body_buffer) {
3920         guint m, n;
3921         guint offset = 0;
3922
3923         n = gst_buffer_n_memory (msg->body_buffer);
3924         for (m = 0; m < n; m++) {
3925           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3926
3927           /* Skip all memories we already wrote */
3928           if (offset + mem->size <= msg->body_offset) {
3929             offset += mem->size;
3930             continue;
3931           }
3932           offset += mem->size;
3933
3934           n_memories++;
3935           n_vectors++;
3936         }
3937       }
3938     }
3939
3940     vectors = g_newa (GOutputVector, n_vectors);
3941     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
3942     ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
3943     if (ids)
3944       memset (ids, 0, sizeof (guint) * (n_ids + 1));
3945
3946     for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages;
3947         i++) {
3948       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3949
3950       if (msg->data_offset < msg->data_size) {
3951         vectors[j].buffer = (msg->data_is_data_header ?
3952             msg->data_header : msg->data) + msg->data_offset;
3953         vectors[j].size = msg->data_size - msg->data_offset;
3954         bytes_to_write += vectors[j].size;
3955         j++;
3956       }
3957
3958       if (msg->body_data) {
3959         if (msg->body_offset < msg->body_data_size) {
3960           vectors[j].buffer = msg->body_data + msg->body_offset;
3961           vectors[j].size = msg->body_data_size - msg->body_offset;
3962           bytes_to_write += vectors[j].size;
3963           j++;
3964         }
3965       } else if (msg->body_buffer) {
3966         guint m, n;
3967         guint offset = 0;
3968         n = gst_buffer_n_memory (msg->body_buffer);
3969         for (m = 0; m < n; m++) {
3970           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3971           guint off;
3972
3973           /* Skip all memories we already wrote */
3974           if (offset + mem->size <= msg->body_offset) {
3975             offset += mem->size;
3976             continue;
3977           }
3978
3979           if (offset < msg->body_offset)
3980             off = msg->body_offset - offset;
3981           else
3982             off = 0;
3983
3984           offset += mem->size;
3985
3986           g_assert (off < mem->size);
3987
3988           gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ);
3989           vectors[j].buffer = map_infos[n_mmap].data + off;
3990           vectors[j].size = map_infos[n_mmap].size - off;
3991           bytes_to_write += vectors[j].size;
3992
3993           n_mmap++;
3994           j++;
3995         }
3996       }
3997     }
3998
3999     res =
4000         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4001         &bytes_written, FALSE, watch->conn->cancellable);
4002     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4003
4004     /* First unmap all memories here, this simplifies the code below
4005      * as we don't have to skip all memories that were already written
4006      * before */
4007     for (i = 0; i < n_mmap; i++) {
4008       gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
4009     }
4010
4011     if (bytes_written == bytes_to_write) {
4012       /* fast path, just unmap all memories, free memory, drop all messages and notify them */
4013       l = 0;
4014       while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4015         if (msg->id) {
4016           ids[l] = msg->id;
4017           l++;
4018         }
4019
4020         gst_rtsp_serialized_message_clear (msg);
4021       }
4022
4023       g_assert (watch->messages_bytes >= bytes_written);
4024       watch->messages_bytes -= bytes_written;
4025     } else if (bytes_written > 0) {
4026       /* not done, let's skip all messages that were sent already and free them */
4027       for (i = 0, drop_messages = 0; i < n_messages; i++) {
4028         msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4029
4030         if (bytes_written >= msg->data_size - msg->data_offset) {
4031           guint body_size;
4032
4033           /* all data of this message is sent, check body and otherwise
4034            * skip the whole message for next time */
4035           bytes_written -= (msg->data_size - msg->data_offset);
4036           watch->messages_bytes -= (msg->data_size - msg->data_offset);
4037           msg->data_offset = msg->data_size;
4038
4039           if (msg->body_data) {
4040             body_size = msg->body_data_size;
4041           } else if (msg->body_buffer) {
4042             body_size = gst_buffer_get_size (msg->body_buffer);
4043           } else {
4044             body_size = 0;
4045           }
4046
4047           if (bytes_written + msg->body_offset >= body_size) {
4048             /* body written, drop this message */
4049             bytes_written -= body_size - msg->body_offset;
4050             watch->messages_bytes -= body_size - msg->body_offset;
4051             msg->body_offset = body_size;
4052             drop_messages++;
4053
4054             if (msg->id) {
4055               ids[l] = msg->id;
4056               l++;
4057             }
4058
4059             gst_rtsp_serialized_message_clear (msg);
4060           } else {
4061             msg->body_offset += bytes_written;
4062             watch->messages_bytes -= bytes_written;
4063             bytes_written = 0;
4064           }
4065         } else {
4066           /* Need to continue sending from the data of this message */
4067           msg->data_offset += bytes_written;
4068           watch->messages_bytes -= bytes_written;
4069           bytes_written = 0;
4070         }
4071       }
4072
4073       while (drop_messages > 0) {
4074         msg = gst_queue_array_pop_head_struct (watch->messages);
4075         g_assert (msg);
4076         drop_messages--;
4077       }
4078
4079       g_assert (watch->messages_bytes >= bytes_written);
4080       watch->messages_bytes -= bytes_written;
4081     }
4082
4083     if (!IS_BACKLOG_FULL (watch))
4084       g_cond_signal (&watch->queue_not_full);
4085     g_mutex_unlock (&watch->mutex);
4086
4087     /* notify all messages that were successfully written */
4088     if (ids) {
4089       while (*ids) {
4090         /* only decrease the counter for messages that have an id. Only
4091          * the last message of a messages chunk is counted */
4092         watch->messages_count--;
4093
4094         if (watch->funcs.message_sent)
4095           watch->funcs.message_sent (watch, *ids, watch->user_data);
4096         ids++;
4097       }
4098     }
4099
4100     if (res == GST_RTSP_EINTR) {
4101       goto write_blocked;
4102     } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
4103       goto write_error;
4104     }
4105     g_mutex_lock (&watch->mutex);
4106   } while (TRUE);
4107   g_mutex_unlock (&watch->mutex);
4108
4109 write_blocked:
4110   return TRUE;
4111
4112   /* ERRORS */
4113 eof:
4114   {
4115     return FALSE;
4116   }
4117 write_error:
4118   {
4119     if (watch->funcs.error_full) {
4120       guint i, n_messages;
4121
4122       n_messages = gst_queue_array_get_length (watch->messages);
4123       for (i = 0; i < n_messages; i++) {
4124         GstRTSPSerializedMessage *msg =
4125             gst_queue_array_peek_nth_struct (watch->messages, i);
4126         if (msg->id)
4127           watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
4128       }
4129     } else if (watch->funcs.error) {
4130       watch->funcs.error (watch, res, watch->user_data);
4131     }
4132
4133     return FALSE;
4134   }
4135 }
4136
4137 static void
4138 gst_rtsp_source_finalize (GSource * source)
4139 {
4140   GstRTSPWatch *watch = (GstRTSPWatch *) source;
4141   GstRTSPSerializedMessage *msg;
4142
4143   if (watch->notify)
4144     watch->notify (watch->user_data);
4145
4146   build_reset (&watch->builder);
4147   gst_rtsp_message_unset (&watch->message);
4148
4149   while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4150     gst_rtsp_serialized_message_clear (msg);
4151   }
4152   gst_queue_array_free (watch->messages);
4153   watch->messages = NULL;
4154   watch->messages_bytes = 0;
4155   watch->messages_count = 0;
4156
4157   g_cond_clear (&watch->queue_not_full);
4158
4159   if (watch->readsrc)
4160     g_source_unref (watch->readsrc);
4161   if (watch->writesrc)
4162     g_source_unref (watch->writesrc);
4163   if (watch->controlsrc)
4164     g_source_unref (watch->controlsrc);
4165
4166   g_mutex_clear (&watch->mutex);
4167 }
4168
4169 static GSourceFuncs gst_rtsp_source_funcs = {
4170   gst_rtsp_source_prepare,
4171   gst_rtsp_source_check,
4172   gst_rtsp_source_dispatch,
4173   gst_rtsp_source_finalize,
4174   NULL,
4175   NULL
4176 };
4177
4178 /**
4179  * gst_rtsp_watch_new: (skip)
4180  * @conn: a #GstRTSPConnection
4181  * @funcs: watch functions
4182  * @user_data: user data to pass to @funcs
4183  * @notify: notify when @user_data is not referenced anymore
4184  *
4185  * Create a watch object for @conn. The functions provided in @funcs will be
4186  * called with @user_data when activity happened on the watch.
4187  *
4188  * The new watch is usually created so that it can be attached to a
4189  * maincontext with gst_rtsp_watch_attach().
4190  *
4191  * @conn must exist for the entire lifetime of the watch.
4192  *
4193  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
4194  * communication. Free with gst_rtsp_watch_unref () after usage.
4195  */
4196 GstRTSPWatch *
4197 gst_rtsp_watch_new (GstRTSPConnection * conn,
4198     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
4199 {
4200   GstRTSPWatch *result;
4201
4202   g_return_val_if_fail (conn != NULL, NULL);
4203   g_return_val_if_fail (funcs != NULL, NULL);
4204   g_return_val_if_fail (conn->read_socket != NULL, NULL);
4205   g_return_val_if_fail (conn->write_socket != NULL, NULL);
4206
4207   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
4208       sizeof (GstRTSPWatch));
4209
4210   result->conn = conn;
4211   result->builder.state = STATE_START;
4212
4213   g_mutex_init (&result->mutex);
4214   result->messages =
4215       gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
4216   g_cond_init (&result->queue_not_full);
4217
4218   gst_rtsp_watch_reset (result);
4219   result->keep_running = TRUE;
4220   result->flushing = FALSE;
4221
4222   result->funcs = *funcs;
4223   result->user_data = user_data;
4224   result->notify = notify;
4225
4226   return result;
4227 }
4228
4229 /**
4230  * gst_rtsp_watch_reset:
4231  * @watch: a #GstRTSPWatch
4232  *
4233  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
4234  * when the file descriptors of the connection might have changed.
4235  */
4236 void
4237 gst_rtsp_watch_reset (GstRTSPWatch * watch)
4238 {
4239   g_mutex_lock (&watch->mutex);
4240   if (watch->readsrc) {
4241     g_source_remove_child_source ((GSource *) watch, watch->readsrc);
4242     g_source_unref (watch->readsrc);
4243   }
4244   if (watch->writesrc) {
4245     g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4246     g_source_unref (watch->writesrc);
4247     watch->writesrc = NULL;
4248   }
4249   if (watch->controlsrc) {
4250     g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4251     g_source_unref (watch->controlsrc);
4252     watch->controlsrc = NULL;
4253   }
4254
4255   if (watch->conn->input_stream) {
4256     watch->readsrc =
4257         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4258         (watch->conn->input_stream), NULL);
4259     g_source_set_callback (watch->readsrc,
4260         (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
4261     g_source_add_child_source ((GSource *) watch, watch->readsrc);
4262   } else {
4263     watch->readsrc = NULL;
4264   }
4265
4266   /* we create and add the write source when we actually have something to
4267    * write */
4268
4269   /* when write source is not added we add read source on the write socket
4270    * instead to be able to detect when client closes get channel in tunneled
4271    * mode */
4272   if (watch->conn->control_stream) {
4273     watch->controlsrc =
4274         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4275         (watch->conn->control_stream), NULL);
4276     g_source_set_callback (watch->controlsrc,
4277         (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
4278     g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4279   } else {
4280     watch->controlsrc = NULL;
4281   }
4282   g_mutex_unlock (&watch->mutex);
4283 }
4284
4285 /**
4286  * gst_rtsp_watch_attach:
4287  * @watch: a #GstRTSPWatch
4288  * @context: a GMainContext (if NULL, the default context will be used)
4289  *
4290  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
4291  *
4292  * Returns: the ID (greater than 0) for the watch within the GMainContext.
4293  */
4294 guint
4295 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
4296 {
4297   g_return_val_if_fail (watch != NULL, 0);
4298
4299   return g_source_attach ((GSource *) watch, context);
4300 }
4301
4302 /**
4303  * gst_rtsp_watch_unref:
4304  * @watch: a #GstRTSPWatch
4305  *
4306  * Decreases the reference count of @watch by one. If the resulting reference
4307  * count is zero the watch and associated memory will be destroyed.
4308  */
4309 void
4310 gst_rtsp_watch_unref (GstRTSPWatch * watch)
4311 {
4312   g_return_if_fail (watch != NULL);
4313
4314   g_source_unref ((GSource *) watch);
4315 }
4316
4317 /**
4318  * gst_rtsp_watch_set_send_backlog:
4319  * @watch: a #GstRTSPWatch
4320  * @bytes: maximum bytes
4321  * @messages: maximum messages
4322  *
4323  * Set the maximum amount of bytes and messages that will be queued in @watch.
4324  * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
4325  * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
4326  *
4327  * A value of 0 for @bytes or @messages means no limits.
4328  *
4329  * Since: 1.2
4330  */
4331 void
4332 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
4333     gsize bytes, guint messages)
4334 {
4335   g_return_if_fail (watch != NULL);
4336
4337   g_mutex_lock (&watch->mutex);
4338   watch->max_bytes = bytes;
4339   watch->max_messages = messages;
4340   if (!IS_BACKLOG_FULL (watch))
4341     g_cond_signal (&watch->queue_not_full);
4342   g_mutex_unlock (&watch->mutex);
4343
4344   GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
4345       bytes, messages);
4346 }
4347
4348 /**
4349  * gst_rtsp_watch_get_send_backlog:
4350  * @watch: a #GstRTSPWatch
4351  * @bytes: (out) (allow-none): maximum bytes
4352  * @messages: (out) (allow-none): maximum messages
4353  *
4354  * Get the maximum amount of bytes and messages that will be queued in @watch.
4355  * See gst_rtsp_watch_set_send_backlog().
4356  *
4357  * Since: 1.2
4358  */
4359 void
4360 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
4361     gsize * bytes, guint * messages)
4362 {
4363   g_return_if_fail (watch != NULL);
4364
4365   g_mutex_lock (&watch->mutex);
4366   if (bytes)
4367     *bytes = watch->max_bytes;
4368   if (messages)
4369     *messages = watch->max_messages;
4370   g_mutex_unlock (&watch->mutex);
4371 }
4372
4373 static GstRTSPResult
4374 gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
4375     GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
4376 {
4377   GstRTSPResult res;
4378   GMainContext *context = NULL;
4379   gint i;
4380
4381   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4382   g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
4383
4384   g_mutex_lock (&watch->mutex);
4385   if (watch->flushing)
4386     goto flushing;
4387
4388   /* try to send the message synchronously first */
4389   if (gst_queue_array_get_length (watch->messages) == 0) {
4390     gint j, k;
4391     GOutputVector *vectors;
4392     GstMapInfo *map_infos;
4393     gsize bytes_to_write, bytes_written;
4394     guint n_vectors, n_memories, drop_messages;
4395
4396     for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
4397       n_vectors++;
4398       if (messages[i].body_data) {
4399         n_vectors++;
4400       } else if (messages[i].body_buffer) {
4401         n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
4402         n_memories += gst_buffer_n_memory (messages[i].body_buffer);
4403       }
4404     }
4405
4406     vectors = g_newa (GOutputVector, n_vectors);
4407     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4408
4409     for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
4410       vectors[j].buffer = messages[i].data_is_data_header ?
4411           messages[i].data_header : messages[i].data;
4412       vectors[j].size = messages[i].data_size;
4413       bytes_to_write += vectors[j].size;
4414       j++;
4415
4416       if (messages[i].body_data) {
4417         vectors[j].buffer = messages[i].body_data;
4418         vectors[j].size = messages[i].body_data_size;
4419         bytes_to_write += vectors[j].size;
4420         j++;
4421       } else if (messages[i].body_buffer) {
4422         gint l, n;
4423
4424         n = gst_buffer_n_memory (messages[i].body_buffer);
4425         for (l = 0; l < n; l++) {
4426           GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
4427
4428           gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
4429           vectors[j].buffer = map_infos[k].data;
4430           vectors[j].size = map_infos[k].size;
4431           bytes_to_write += vectors[j].size;
4432
4433           k++;
4434           j++;
4435         }
4436       }
4437     }
4438
4439     res =
4440         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4441         &bytes_written, FALSE, watch->conn->cancellable);
4442     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4443
4444     /* At this point we sent everything we could without blocking or
4445      * error and updated the offsets inside the message accordingly */
4446
4447     /* First of all unmap all memories. This simplifies the code below */
4448     for (k = 0; k < n_memories; k++) {
4449       gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
4450     }
4451
4452     if (res != GST_RTSP_EINTR) {
4453       /* actual error or done completely */
4454       if (id != NULL)
4455         *id = 0;
4456
4457       /* free everything */
4458       for (i = 0, k = 0; i < n_messages; i++) {
4459         gst_rtsp_serialized_message_clear (&messages[i]);
4460       }
4461
4462       goto done;
4463     }
4464
4465     /* not done, let's skip all messages that were sent already and free them */
4466     for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
4467       if (bytes_written >= messages[i].data_size) {
4468         guint body_size;
4469
4470         /* all data of this message is sent, check body and otherwise
4471          * skip the whole message for next time */
4472         messages[i].data_offset = messages[i].data_size;
4473         bytes_written -= messages[i].data_size;
4474
4475         if (messages[i].body_data) {
4476           body_size = messages[i].body_data_size;
4477
4478         } else if (messages[i].body_buffer) {
4479           body_size = gst_buffer_get_size (messages[i].body_buffer);
4480         } else {
4481           body_size = 0;
4482         }
4483
4484         if (bytes_written >= body_size) {
4485           /* body written, drop this message */
4486           messages[i].body_offset = body_size;
4487           bytes_written -= body_size;
4488           drop_messages++;
4489
4490           gst_rtsp_serialized_message_clear (&messages[i]);
4491         } else {
4492           messages[i].body_offset = bytes_written;
4493           bytes_written = 0;
4494         }
4495       } else {
4496         /* Need to continue sending from the data of this message */
4497         messages[i].data_offset = bytes_written;
4498         bytes_written = 0;
4499       }
4500     }
4501
4502     g_assert (n_messages > drop_messages);
4503
4504     messages += drop_messages;
4505     n_messages -= drop_messages;
4506   }
4507
4508   /* check limits */
4509   if (IS_BACKLOG_FULL (watch))
4510     goto too_much_backlog;
4511
4512   for (i = 0; i < n_messages; i++) {
4513     GstRTSPSerializedMessage local_message;
4514
4515     /* make a record with the data and id for sending async */
4516     local_message = messages[i];
4517
4518     /* copy the body data or take an additional reference to the body buffer
4519      * we don't own them here */
4520     if (local_message.body_data) {
4521       local_message.body_data =
4522           g_memdup2 (local_message.body_data, local_message.body_data_size);
4523     } else if (local_message.body_buffer) {
4524       gst_buffer_ref (local_message.body_buffer);
4525     }
4526     local_message.borrowed = FALSE;
4527
4528     /* set an id for the very last message */
4529     if (i == n_messages - 1) {
4530       do {
4531         /* make sure rec->id is never 0 */
4532         local_message.id = ++watch->id;
4533       } while (G_UNLIKELY (local_message.id == 0));
4534
4535       if (id != NULL)
4536         *id = local_message.id;
4537     } else {
4538       local_message.id = 0;
4539     }
4540
4541     /* add the record to a queue. */
4542     gst_queue_array_push_tail_struct (watch->messages, &local_message);
4543     watch->messages_bytes +=
4544         (local_message.data_size - local_message.data_offset);
4545     if (local_message.body_data)
4546       watch->messages_bytes +=
4547           (local_message.body_data_size - local_message.body_offset);
4548     else if (local_message.body_buffer)
4549       watch->messages_bytes +=
4550           (gst_buffer_get_size (local_message.body_buffer) -
4551           local_message.body_offset);
4552   }
4553   /* each message chunks is one unit */
4554   watch->messages_count++;
4555
4556   /* make sure the main context will now also check for writability on the
4557    * socket */
4558   context = ((GSource *) watch)->context;
4559   if (!watch->writesrc) {
4560     /* remove the read source on the write socket, we will be able to detect
4561      * errors while writing */
4562     if (watch->controlsrc) {
4563       g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4564       g_source_unref (watch->controlsrc);
4565       watch->controlsrc = NULL;
4566     }
4567
4568     watch->writesrc =
4569         g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
4570         (watch->conn->output_stream), NULL);
4571     g_source_set_callback (watch->writesrc,
4572         (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
4573     g_source_add_child_source ((GSource *) watch, watch->writesrc);
4574   }
4575   res = GST_RTSP_OK;
4576
4577 done:
4578   g_mutex_unlock (&watch->mutex);
4579
4580   if (context)
4581     g_main_context_wakeup (context);
4582
4583   return res;
4584
4585   /* ERRORS */
4586 flushing:
4587   {
4588     GST_DEBUG ("we are flushing");
4589     g_mutex_unlock (&watch->mutex);
4590     for (i = 0; i < n_messages; i++) {
4591       gst_rtsp_serialized_message_clear (&messages[i]);
4592     }
4593     return GST_RTSP_EINTR;
4594   }
4595 too_much_backlog:
4596   {
4597     GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
4598         G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
4599         watch->messages_bytes, watch->max_messages, watch->messages_count);
4600     g_mutex_unlock (&watch->mutex);
4601     for (i = 0; i < n_messages; i++) {
4602       gst_rtsp_serialized_message_clear (&messages[i]);
4603     }
4604     return GST_RTSP_ENOMEM;
4605   }
4606
4607   return GST_RTSP_OK;
4608 }
4609
4610 /**
4611  * gst_rtsp_watch_write_data:
4612  * @watch: a #GstRTSPWatch
4613  * @data: (array length=size) (transfer full): the data to queue
4614  * @size: the size of @data
4615  * @id: (out) (allow-none): location for a message ID or %NULL
4616  *
4617  * Write @data using the connection of the @watch. If it cannot be sent
4618  * immediately, it will be queued for transmission in @watch. The contents of
4619  * @message will then be serialized and transmitted when the connection of the
4620  * @watch becomes writable. In case the @message is queued, the ID returned in
4621  * @id will be non-zero and used as the ID argument in the message_sent
4622  * callback.
4623  *
4624  * This function will take ownership of @data and g_free() it after use.
4625  *
4626  * If the amount of queued data exceeds the limits set with
4627  * gst_rtsp_watch_set_send_backlog(), this function will return
4628  * #GST_RTSP_ENOMEM.
4629  *
4630  * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
4631  * are reached. #GST_RTSP_EINTR when @watch was flushing.
4632  */
4633 /* FIXME 2.0: This should've been static! */
4634 GstRTSPResult
4635 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
4636     guint size, guint * id)
4637 {
4638   GstRTSPSerializedMessage serialized_message;
4639
4640   memset (&serialized_message, 0, sizeof (serialized_message));
4641   serialized_message.data = (guint8 *) data;
4642   serialized_message.data_size = size;
4643
4644   return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
4645       1, id);
4646 }
4647
4648 /**
4649  * gst_rtsp_watch_send_message:
4650  * @watch: a #GstRTSPWatch
4651  * @message: a #GstRTSPMessage
4652  * @id: (out) (allow-none): location for a message ID or %NULL
4653  *
4654  * Send a @message using the connection of the @watch. If it cannot be sent
4655  * immediately, it will be queued for transmission in @watch. The contents of
4656  * @message will then be serialized and transmitted when the connection of the
4657  * @watch becomes writable. In case the @message is queued, the ID returned in
4658  * @id will be non-zero and used as the ID argument in the message_sent
4659  * callback.
4660  *
4661  * Returns: #GST_RTSP_OK on success.
4662  */
4663 GstRTSPResult
4664 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
4665     guint * id)
4666 {
4667   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4668   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
4669
4670   return gst_rtsp_watch_send_messages (watch, message, 1, id);
4671 }
4672
4673 /**
4674  * gst_rtsp_watch_send_messages:
4675  * @watch: a #GstRTSPWatch
4676  * @messages: (array length=n_messages): the messages to send
4677  * @n_messages: the number of messages to send
4678  * @id: (out) (allow-none): location for a message ID or %NULL
4679  *
4680  * Sends @messages using the connection of the @watch. If they cannot be sent
4681  * immediately, they will be queued for transmission in @watch. The contents of
4682  * @messages will then be serialized and transmitted when the connection of the
4683  * @watch becomes writable. In case the @messages are queued, the ID returned in
4684  * @id will be non-zero and used as the ID argument in the message_sent
4685  * callback once the last message is sent. The callback will only be called
4686  * once for the last message.
4687  *
4688  * Returns: #GST_RTSP_OK on success.
4689  *
4690  * Since: 1.16
4691  */
4692 GstRTSPResult
4693 gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
4694     guint n_messages, guint * id)
4695 {
4696   GstRTSPSerializedMessage *serialized_messages;
4697   gint i;
4698
4699   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4700   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
4701
4702   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
4703   memset (serialized_messages, 0,
4704       sizeof (GstRTSPSerializedMessage) * n_messages);
4705
4706   for (i = 0; i < n_messages; i++) {
4707     if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
4708       goto error;
4709   }
4710
4711   return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
4712       n_messages, id);
4713
4714 error:
4715   for (i = 0; i < n_messages; i++) {
4716     gst_rtsp_serialized_message_clear (&serialized_messages[i]);
4717   }
4718
4719   return GST_RTSP_EINVAL;
4720 }
4721
4722 /**
4723  * gst_rtsp_watch_wait_backlog_usec:
4724  * @watch: a #GstRTSPWatch
4725  * @timeout: a timeout in microseconds
4726  *
4727  * Wait until there is place in the backlog queue, @timeout is reached
4728  * or @watch is set to flushing.
4729  *
4730  * If @timeout is 0 this function can block forever. If @timeout
4731  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
4732  * after the timeout expired.
4733  *
4734  * The typically use of this function is when gst_rtsp_watch_write_data
4735  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
4736  * free space in the backlog queue and try again.
4737  *
4738  * Returns: %GST_RTSP_OK when if there is room in queue.
4739  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
4740  *          %GST_RTSP_EINTR when @watch is flushing
4741  *          %GST_RTSP_EINVAL when called with invalid parameters.
4742  *
4743  * Since: 1.18
4744  */
4745 GstRTSPResult
4746 gst_rtsp_watch_wait_backlog_usec (GstRTSPWatch * watch, gint64 timeout)
4747 {
4748   gint64 end_time;
4749
4750   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4751
4752   end_time = g_get_monotonic_time () + timeout;
4753
4754   g_mutex_lock (&watch->mutex);
4755   if (watch->flushing)
4756     goto flushing;
4757
4758   while (IS_BACKLOG_FULL (watch)) {
4759     gboolean res;
4760
4761     res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time);
4762     if (watch->flushing)
4763       goto flushing;
4764
4765     if (!res)
4766       goto timeout;
4767   }
4768   g_mutex_unlock (&watch->mutex);
4769
4770   return GST_RTSP_OK;
4771
4772   /* ERRORS */
4773 flushing:
4774   {
4775     GST_DEBUG ("we are flushing");
4776     g_mutex_unlock (&watch->mutex);
4777     return GST_RTSP_EINTR;
4778   }
4779 timeout:
4780   {
4781     GST_DEBUG ("we timed out");
4782     g_mutex_unlock (&watch->mutex);
4783     return GST_RTSP_ETIMEOUT;
4784   }
4785 }
4786
4787 /**
4788  * gst_rtsp_watch_set_flushing:
4789  * @watch: a #GstRTSPWatch
4790  * @flushing: new flushing state
4791  *
4792  * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog()
4793  * and make sure gst_rtsp_watch_write_data() returns immediately with
4794  * #GST_RTSP_EINTR. And empty the queue.
4795  *
4796  * Since: 1.4
4797  */
4798 void
4799 gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
4800 {
4801   g_return_if_fail (watch != NULL);
4802
4803   g_mutex_lock (&watch->mutex);
4804   watch->flushing = flushing;
4805   g_cond_signal (&watch->queue_not_full);
4806   if (flushing) {
4807     GstRTSPSerializedMessage *msg;
4808
4809     while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4810       gst_rtsp_serialized_message_clear (msg);
4811     }
4812   }
4813   g_mutex_unlock (&watch->mutex);
4814 }
4815
4816
4817 #ifndef GST_DISABLE_DEPRECATED
4818 G_GNUC_BEGIN_IGNORE_DEPRECATIONS
4819 /* Deprecated */
4820 #define TV_TO_USEC(tv) ((tv) ? ((tv)->tv_sec * G_USEC_PER_SEC + (tv)->tv_usec) : 0)
4821 /**
4822  * gst_rtsp_connection_connect:
4823  * @conn: a #GstRTSPConnection
4824  * @timeout: a GTimeVal timeout
4825  *
4826  * Attempt to connect to the url of @conn made with
4827  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4828  * forever. If @timeout contains a valid timeout, this function will return
4829  * #GST_RTSP_ETIMEOUT after the timeout expired.
4830  *
4831  * This function can be cancelled with gst_rtsp_connection_flush().
4832  *
4833  * Returns: #GST_RTSP_OK when a connection could be made.
4834  *
4835  * Deprecated: 1.18
4836  */
4837     GstRTSPResult
4838 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
4839 {
4840   return gst_rtsp_connection_connect_usec (conn, TV_TO_USEC (timeout));
4841 }
4842
4843 /**
4844  * gst_rtsp_connection_connect_with_response:
4845  * @conn: a #GstRTSPConnection
4846  * @timeout: a GTimeVal timeout
4847  * @response: a #GstRTSPMessage
4848  *
4849  * Attempt to connect to the url of @conn made with
4850  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4851  * forever. If @timeout contains a valid timeout, this function will return
4852  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
4853  * @response will contain a response to the tunneling request messages.
4854  *
4855  * This function can be cancelled with gst_rtsp_connection_flush().
4856  *
4857  * Returns: #GST_RTSP_OK when a connection could be made.
4858  *
4859  * Since: 1.8
4860  * Deprecated: 1.18
4861  */
4862 GstRTSPResult
4863 gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn,
4864     GTimeVal * timeout, GstRTSPMessage * response)
4865 {
4866   return gst_rtsp_connection_connect_with_response_usec (conn,
4867       TV_TO_USEC (timeout), response);
4868 }
4869
4870 /**
4871  * gst_rtsp_connection_read:
4872  * @conn: a #GstRTSPConnection
4873  * @data: the data to read
4874  * @size: the size of @data
4875  * @timeout: a timeout value or %NULL
4876  *
4877  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
4878  * the specified @timeout. @timeout can be %NULL, in which case this function
4879  * might block forever.
4880  *
4881  * This function can be cancelled with gst_rtsp_connection_flush().
4882  *
4883  * Returns: #GST_RTSP_OK on success.
4884  *
4885  * Deprecated: 1.18
4886  */
4887 GstRTSPResult
4888 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
4889     GTimeVal * timeout)
4890 {
4891   return gst_rtsp_connection_read_usec (conn, data, size, TV_TO_USEC (timeout));
4892 }
4893
4894 /**
4895  * gst_rtsp_connection_write:
4896  * @conn: a #GstRTSPConnection
4897  * @data: the data to write
4898  * @size: the size of @data
4899  * @timeout: a timeout value or %NULL
4900  *
4901  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
4902  * the specified @timeout. @timeout can be %NULL, in which case this function
4903  * might block forever.
4904  *
4905  * This function can be cancelled with gst_rtsp_connection_flush().
4906  *
4907  * Returns: #GST_RTSP_OK on success.
4908  *
4909  * Deprecated: 1.18
4910  */
4911 GstRTSPResult
4912 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
4913     guint size, GTimeVal * timeout)
4914 {
4915   return gst_rtsp_connection_write_usec (conn, data, size,
4916       TV_TO_USEC (timeout));
4917 }
4918
4919 /**
4920  * gst_rtsp_connection_send:
4921  * @conn: a #GstRTSPConnection
4922  * @message: the message to send
4923  * @timeout: a timeout value or %NULL
4924  *
4925  * Attempt to send @message to the connected @conn, blocking up to
4926  * the specified @timeout. @timeout can be %NULL, in which case this function
4927  * might block forever.
4928  *
4929  * This function can be cancelled with gst_rtsp_connection_flush().
4930  *
4931  * Returns: #GST_RTSP_OK on success.
4932  *
4933  * Deprecated: 1.18
4934  */
4935 GstRTSPResult
4936 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
4937     GTimeVal * timeout)
4938 {
4939   return gst_rtsp_connection_send_usec (conn, message, TV_TO_USEC (timeout));
4940 }
4941
4942 /**
4943  * gst_rtsp_connection_send_messages:
4944  * @conn: a #GstRTSPConnection
4945  * @messages: (array length=n_messages): the messages to send
4946  * @n_messages: the number of messages to send
4947  * @timeout: a timeout value or %NULL
4948  *
4949  * Attempt to send @messages to the connected @conn, blocking up to
4950  * the specified @timeout. @timeout can be %NULL, in which case this function
4951  * might block forever.
4952  *
4953  * This function can be cancelled with gst_rtsp_connection_flush().
4954  *
4955  * Returns: #GST_RTSP_OK on success.
4956  *
4957  * Since: 1.16
4958  * Deprecated: 1.18
4959  */
4960 GstRTSPResult
4961 gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
4962     GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
4963 {
4964   return gst_rtsp_connection_send_messages_usec (conn, messages, n_messages,
4965       TV_TO_USEC (timeout));
4966 }
4967
4968 /**
4969  * gst_rtsp_connection_receive:
4970  * @conn: a #GstRTSPConnection
4971  * @message: the message to read
4972  * @timeout: a timeout value or %NULL
4973  *
4974  * Attempt to read into @message from the connected @conn, blocking up to
4975  * the specified @timeout. @timeout can be %NULL, in which case this function
4976  * might block forever.
4977  *
4978  * This function can be cancelled with gst_rtsp_connection_flush().
4979  *
4980  * Returns: #GST_RTSP_OK on success.
4981  *
4982  * Deprecated: 1.18
4983  */
4984 GstRTSPResult
4985 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
4986     GTimeVal * timeout)
4987 {
4988   return gst_rtsp_connection_receive_usec (conn, message, TV_TO_USEC (timeout));
4989 }
4990
4991 /**
4992  * gst_rtsp_connection_poll:
4993  * @conn: a #GstRTSPConnection
4994  * @events: a bitmask of #GstRTSPEvent flags to check
4995  * @revents: location for result flags
4996  * @timeout: a timeout
4997  *
4998  * Wait up to the specified @timeout for the connection to become available for
4999  * at least one of the operations specified in @events. When the function returns
5000  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
5001  * @conn.
5002  *
5003  * @timeout can be %NULL, in which case this function might block forever.
5004  *
5005  * This function can be cancelled with gst_rtsp_connection_flush().
5006  *
5007  * Returns: #GST_RTSP_OK on success.
5008  *
5009  * Deprecated: 1.18
5010  */
5011 GstRTSPResult
5012 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
5013     GstRTSPEvent * revents, GTimeVal * timeout)
5014 {
5015   return gst_rtsp_connection_poll_usec (conn, events, revents,
5016       TV_TO_USEC (timeout));
5017 }
5018
5019 /**
5020  * gst_rtsp_connection_next_timeout:
5021  * @conn: a #GstRTSPConnection
5022  * @timeout: a timeout
5023  *
5024  * Calculate the next timeout for @conn, storing the result in @timeout.
5025  *
5026  * Returns: #GST_RTSP_OK.
5027  *
5028  * Deprecated: 1.18
5029  */
5030 GstRTSPResult
5031 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
5032 {
5033   gint64 tmptimeout = 0;
5034
5035   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
5036
5037   tmptimeout = gst_rtsp_connection_next_timeout_usec (conn);
5038
5039   timeout->tv_sec = tmptimeout / G_USEC_PER_SEC;
5040   timeout->tv_usec = tmptimeout % G_USEC_PER_SEC;
5041
5042   return GST_RTSP_OK;
5043 }
5044
5045
5046 /**
5047  * gst_rtsp_watch_wait_backlog:
5048  * @watch: a #GstRTSPWatch
5049  * @timeout: a GTimeVal timeout
5050  *
5051  * Wait until there is place in the backlog queue, @timeout is reached
5052  * or @watch is set to flushing.
5053  *
5054  * If @timeout is %NULL this function can block forever. If @timeout
5055  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
5056  * after the timeout expired.
5057  *
5058  * The typically use of this function is when gst_rtsp_watch_write_data
5059  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
5060  * free space in the backlog queue and try again.
5061  *
5062  * Returns: %GST_RTSP_OK when if there is room in queue.
5063  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
5064  *          %GST_RTSP_EINTR when @watch is flushing
5065  *          %GST_RTSP_EINVAL when called with invalid parameters.
5066  *
5067  * Since: 1.4
5068  * Deprecated: 1.18
5069  */
5070 GstRTSPResult
5071 gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout)
5072 {
5073   return gst_rtsp_watch_wait_backlog_usec (watch, TV_TO_USEC (timeout));
5074 }
5075
5076 G_GNUC_END_IGNORE_DEPRECATIONS
5077 #endif /* GST_DISABLE_DEPRECATED */