Merging gst-libav
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @title: GstRTSPConnection
46  * @short_description: manage RTSP connections
47  * @see_also: gstrtspurl
48  *
49  * This object manages the RTSP connection to the server. It provides function
50  * to receive and send bytes and messages.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #  include <config.h>
55 #endif
56
57 #include <stdio.h>
58 #include <errno.h>
59 #include <stdlib.h>
60 #include <string.h>
61 #include <time.h>
62
63 /* we include this here to get the G_OS_* defines */
64 #include <glib.h>
65 #include <gst/gst.h>
66 #include <gst/base/base.h>
67
68 /* necessary for IP_TOS define */
69 #include <gio/gnetworking.h>
70
71 #include "gstrtspconnection.h"
72
73 #ifdef IP_TOS
74 union gst_sockaddr
75 {
76   struct sockaddr sa;
77   struct sockaddr_in sa_in;
78   struct sockaddr_in6 sa_in6;
79   struct sockaddr_storage sa_stor;
80 };
81 #endif
82
83 typedef struct
84 {
85   gint state;
86   guint save;
87   guchar out[3];                /* the size must be evenly divisible by 3 */
88   guint cout;
89   guint coutl;
90 } DecodeCtx;
91
92 typedef struct
93 {
94   /* If %TRUE we only own data and none of the
95    * other fields
96    */
97   gboolean borrowed;
98
99   /* Header or full message */
100   guint8 *data;
101   guint data_size;
102   gboolean data_is_data_header;
103
104   /* Payload following data, if any */
105   guint8 *body_data;
106   guint body_data_size;
107   /* or */
108   GstBuffer *body_buffer;
109
110   /* DATA packet header statically allocated for above */
111   guint8 data_header[4];
112
113   /* all below only for async writing */
114
115   guint data_offset;            /* == data_size when done */
116   guint body_offset;            /* into body_data or the buffer */
117
118   /* ID of the message for notification */
119   guint id;
120 } GstRTSPSerializedMessage;
121
122 static void
123 gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
124 {
125   if (!msg->borrowed) {
126     g_free (msg->body_data);
127     gst_buffer_replace (&msg->body_buffer, NULL);
128   }
129   g_free (msg->data);
130 }
131
132 #ifdef MSG_NOSIGNAL
133 #define SEND_FLAGS MSG_NOSIGNAL
134 #else
135 #define SEND_FLAGS 0
136 #endif
137
138 typedef enum
139 {
140   TUNNEL_STATE_NONE,
141   TUNNEL_STATE_GET,
142   TUNNEL_STATE_POST,
143   TUNNEL_STATE_COMPLETE
144 } GstRTSPTunnelState;
145
146 #define TUNNELID_LEN   24
147
148 struct _GstRTSPConnection
149 {
150   /*< private > */
151   /* URL for the remote connection */
152   GstRTSPUrl *url;
153   GstRTSPVersion version;
154
155   gboolean server;
156   GSocketClient *client;
157   GIOStream *stream0;
158   GIOStream *stream1;
159
160   GInputStream *input_stream;
161   GOutputStream *output_stream;
162   /* this is a read source we add on the write socket in tunneled mode to be
163    * able to detect when client disconnects the GET channel */
164   GInputStream *control_stream;
165
166   /* connection state */
167   GSocket *read_socket;
168   GSocket *write_socket;
169   GSocket *socket0, *socket1;
170   gboolean read_socket_used;
171   gboolean write_socket_used;
172   GMutex socket_use_mutex;
173   gboolean manual_http;
174   gboolean may_cancel;
175   GCancellable *cancellable;
176
177   gchar tunnelid[TUNNELID_LEN];
178   gboolean tunneled;
179   gboolean ignore_x_server_reply;
180   GstRTSPTunnelState tstate;
181
182   /* the remote and local ip */
183   gchar *remote_ip;
184   gchar *local_ip;
185
186   gint read_ahead;
187
188   gchar *initial_buffer;
189   gsize initial_buffer_offset;
190
191   gboolean remember_session_id; /* remember the session id or not */
192
193   /* Session state */
194   gint cseq;                    /* sequence number */
195   gchar session_id[512];        /* session id */
196   gint timeout;                 /* session timeout in seconds */
197   GTimer *timer;                /* timeout timer */
198
199   /* Authentication */
200   GstRTSPAuthMethod auth_method;
201   gchar *username;
202   gchar *passwd;
203   GHashTable *auth_params;
204
205   guint content_length_limit;
206
207   /* TLS */
208   GTlsDatabase *tls_database;
209   GTlsInteraction *tls_interaction;
210
211   GstRTSPConnectionAcceptCertificateFunc accept_certificate_func;
212   GDestroyNotify accept_certificate_destroy_notify;
213   gpointer accept_certificate_user_data;
214
215   DecodeCtx ctx;
216   DecodeCtx *ctxp;
217
218   gchar *proxy_host;
219   guint proxy_port;
220 };
221
222 enum
223 {
224   STATE_START = 0,
225   STATE_DATA_HEADER,
226   STATE_DATA_BODY,
227   STATE_READ_LINES,
228   STATE_END,
229   STATE_LAST
230 };
231
232 enum
233 {
234   READ_AHEAD_EOH = -1,          /* end of headers */
235   READ_AHEAD_CRLF = -2,
236   READ_AHEAD_CRLFCR = -3
237 };
238
239 /* a structure for constructing RTSPMessages */
240 typedef struct
241 {
242   gint state;
243   GstRTSPResult status;
244   guint8 buffer[4096];
245   guint offset;
246
247   guint line;
248   guint8 *body_data;
249   guint body_len;
250 } GstRTSPBuilder;
251
252 /* function prototypes */
253 static void add_auth_header (GstRTSPConnection * conn,
254     GstRTSPMessage * message);
255
256 static void
257 build_reset (GstRTSPBuilder * builder)
258 {
259   g_free (builder->body_data);
260   memset (builder, 0, sizeof (GstRTSPBuilder));
261 }
262
263 static GstRTSPResult
264 gst_rtsp_result_from_g_io_error (GError * error, GstRTSPResult default_res)
265 {
266   if (error == NULL)
267     return GST_RTSP_OK;
268
269   if (error->domain != G_IO_ERROR)
270     return default_res;
271
272   switch (error->code) {
273     case G_IO_ERROR_TIMED_OUT:
274       return GST_RTSP_ETIMEOUT;
275     case G_IO_ERROR_INVALID_ARGUMENT:
276       return GST_RTSP_EINVAL;
277     case G_IO_ERROR_CANCELLED:
278     case G_IO_ERROR_WOULD_BLOCK:
279       return GST_RTSP_EINTR;
280     default:
281       return default_res;
282   }
283 }
284
285 static gboolean
286 tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
287     GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
288 {
289   GError *error = NULL;
290   gboolean accept = FALSE;
291
292   if (rtspconn->tls_database) {
293     GSocketConnectable *peer_identity;
294     GTlsCertificateFlags validation_flags;
295
296     GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
297
298     peer_identity =
299         g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
300         (conn));
301
302     errors =
303         g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
304         G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
305         g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
306         NULL, &error);
307
308     if (error)
309       goto verify_error;
310
311     validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
312
313     accept = ((errors & validation_flags) == 0);
314     if (accept)
315       GST_DEBUG ("Peer certificate accepted");
316     else
317       GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
318   }
319
320   if (!accept && rtspconn->accept_certificate_func) {
321     accept =
322         rtspconn->accept_certificate_func (conn, peer_cert, errors,
323         rtspconn->accept_certificate_user_data);
324     GST_DEBUG ("Peer certificate %saccepted by accept-certificate function",
325         accept ? "" : "not ");
326   }
327
328   return accept;
329
330 /* ERRORS */
331 verify_error:
332   {
333     GST_ERROR ("An error occurred while verifying the peer certificate: %s",
334         error->message);
335     g_clear_error (&error);
336     return FALSE;
337   }
338 }
339
340 static void
341 socket_client_event (GSocketClient * client, GSocketClientEvent event,
342     GSocketConnectable * connectable, GTlsConnection * connection,
343     GstRTSPConnection * rtspconn)
344 {
345   if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
346     GST_DEBUG ("TLS handshaking about to start...");
347
348     g_signal_connect (connection, "accept-certificate",
349         (GCallback) tls_accept_certificate, rtspconn);
350
351     g_tls_connection_set_interaction (connection, rtspconn->tls_interaction);
352   }
353 }
354
355 /**
356  * gst_rtsp_connection_create:
357  * @url: a #GstRTSPUrl
358  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
359  *
360  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
361  * The connection will not yet attempt to connect to @url, use
362  * gst_rtsp_connection_connect().
363  *
364  * A copy of @url will be made.
365  *
366  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
367  */
368 GstRTSPResult
369 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
370 {
371   GstRTSPConnection *newconn;
372
373   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
374   g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
375
376   newconn = g_new0 (GstRTSPConnection, 1);
377
378   newconn->may_cancel = TRUE;
379   newconn->cancellable = g_cancellable_new ();
380   newconn->client = g_socket_client_new ();
381
382   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
383     g_socket_client_set_tls (newconn->client, TRUE);
384
385   g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
386       newconn);
387
388   newconn->url = gst_rtsp_url_copy (url);
389   newconn->timer = g_timer_new ();
390   newconn->timeout = 60;
391   newconn->cseq = 1;            /* RFC 7826: "it is RECOMMENDED to start at 0.",
392                                    but some servers don't copy values <1 due to bugs. */
393
394   newconn->remember_session_id = TRUE;
395
396   newconn->auth_method = GST_RTSP_AUTH_NONE;
397   newconn->username = NULL;
398   newconn->passwd = NULL;
399   newconn->auth_params = NULL;
400   newconn->version = 0;
401
402   newconn->content_length_limit = G_MAXUINT;
403
404   *conn = newconn;
405
406   return GST_RTSP_OK;
407 }
408
409 static gboolean
410 collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
411     gboolean remote, GError ** error)
412 {
413   GSocketAddress *addr;
414
415   if (remote)
416     addr = g_socket_get_remote_address (socket, error);
417   else
418     addr = g_socket_get_local_address (socket, error);
419   if (!addr)
420     return FALSE;
421
422   if (ip)
423     *ip = g_inet_address_to_string (g_inet_socket_address_get_address
424         (G_INET_SOCKET_ADDRESS (addr)));
425   if (port)
426     *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
427
428   g_object_unref (addr);
429
430   return TRUE;
431 }
432
433
434 /**
435  * gst_rtsp_connection_create_from_socket:
436  * @socket: a #GSocket
437  * @ip: the IP address of the other end
438  * @port: the port used by the other end
439  * @initial_buffer: data already read from @fd
440  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
441  *
442  * Create a new #GstRTSPConnection for handling communication on the existing
443  * socket @socket. The @initial_buffer contains zero terminated data already
444  * read from @socket which should be used before starting to read new data.
445  *
446  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
447  */
448 /* FIXME 2.0 We don't need the ip and port since they can be got from the
449  * GSocket */
450 GstRTSPResult
451 gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
452     guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn)
453 {
454   GstRTSPConnection *newconn = NULL;
455   GstRTSPUrl *url;
456   GstRTSPResult res;
457   GError *err = NULL;
458   gchar *local_ip;
459   GIOStream *stream;
460
461   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
462   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
463   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
464
465   if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
466     goto getnameinfo_failed;
467
468   /* create a url for the client address */
469   url = g_new0 (GstRTSPUrl, 1);
470   url->host = g_strdup (ip);
471   url->port = port;
472
473   /* now create the connection object */
474   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
475   gst_rtsp_url_free (url);
476
477   stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
478
479   /* both read and write initially */
480   newconn->server = TRUE;
481   newconn->socket0 = socket;
482   newconn->stream0 = stream;
483   newconn->write_socket = newconn->read_socket = newconn->socket0;
484   newconn->read_socket_used = FALSE;
485   newconn->write_socket_used = FALSE;
486   g_mutex_init (&newconn->socket_use_mutex);
487   newconn->input_stream = g_io_stream_get_input_stream (stream);
488   newconn->output_stream = g_io_stream_get_output_stream (stream);
489   newconn->control_stream = NULL;
490   newconn->remote_ip = g_strdup (ip);
491   newconn->local_ip = local_ip;
492   newconn->initial_buffer = g_strdup (initial_buffer);
493
494   *conn = newconn;
495
496   return GST_RTSP_OK;
497
498   /* ERRORS */
499 getnameinfo_failed:
500   {
501     GST_ERROR ("failed to get local address: %s", err->message);
502     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
503     g_clear_error (&err);
504     return res;
505   }
506 newconn_failed:
507   {
508     GST_ERROR ("failed to make connection");
509     g_free (local_ip);
510     gst_rtsp_url_free (url);
511     return res;
512   }
513 }
514
515 /**
516  * gst_rtsp_connection_accept:
517  * @socket: a socket
518  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
519  * @cancellable: a #GCancellable to cancel the operation
520  *
521  * Accept a new connection on @socket and create a new #GstRTSPConnection for
522  * handling communication on new socket.
523  *
524  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
525  */
526 GstRTSPResult
527 gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
528     GCancellable * cancellable)
529 {
530   GError *err = NULL;
531   gchar *ip;
532   guint16 port;
533   GSocket *client_sock;
534   GstRTSPResult ret;
535
536   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
537   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
538
539   client_sock = g_socket_accept (socket, cancellable, &err);
540   if (!client_sock)
541     goto accept_failed;
542
543   /* get the remote ip address and port */
544   if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
545     goto getnameinfo_failed;
546
547   ret =
548       gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
549       conn);
550   g_object_unref (client_sock);
551   g_free (ip);
552
553   return ret;
554
555   /* ERRORS */
556 accept_failed:
557   {
558     GST_DEBUG ("Accepting client failed: %s", err->message);
559     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
560     g_clear_error (&err);
561     return ret;
562   }
563 getnameinfo_failed:
564   {
565     GST_DEBUG ("getnameinfo failed: %s", err->message);
566     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
567     g_clear_error (&err);
568     if (!g_socket_close (client_sock, &err)) {
569       GST_DEBUG ("Closing socket failed: %s", err->message);
570       g_clear_error (&err);
571     }
572     g_object_unref (client_sock);
573     return ret;
574   }
575 }
576
577 /**
578  * gst_rtsp_connection_get_tls:
579  * @conn: a #GstRTSPConnection
580  * @error: #GError for error reporting, or NULL to ignore.
581  *
582  * Get the TLS connection of @conn.
583  *
584  * For client side this will return the #GTlsClientConnection when connected
585  * over TLS.
586  *
587  * For server side connections, this function will create a GTlsServerConnection
588  * when called the first time and will return that same connection on subsequent
589  * calls. The server is then responsible for configuring the TLS connection.
590  *
591  * Returns: (transfer none): the TLS connection for @conn.
592  *
593  * Since: 1.2
594  */
595 GTlsConnection *
596 gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
597 {
598   GTlsConnection *result;
599
600   if (G_IS_TLS_CONNECTION (conn->stream0)) {
601     /* we already had one, return it */
602     result = G_TLS_CONNECTION (conn->stream0);
603   } else if (conn->server) {
604     /* no TLS connection but we are server, make one */
605     result = (GTlsConnection *)
606         g_tls_server_connection_new (conn->stream0, NULL, error);
607     if (result) {
608       g_object_unref (conn->stream0);
609       conn->stream0 = G_IO_STREAM (result);
610       conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
611       conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
612     }
613   } else {
614     /* client */
615     result = NULL;
616     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
617         "client not connected with TLS");
618   }
619   return result;
620 }
621
622 /**
623  * gst_rtsp_connection_set_tls_validation_flags:
624  * @conn: a #GstRTSPConnection
625  * @flags: the validation flags.
626  *
627  * Sets the TLS validation flags to be used to verify the peer
628  * certificate when a TLS connection is established.
629  *
630  * Returns: TRUE if the validation flags are set correctly, or FALSE if
631  * @conn is NULL or is not a TLS connection.
632  *
633  * Since: 1.2.1
634  */
635 gboolean
636 gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
637     GTlsCertificateFlags flags)
638 {
639   gboolean res = FALSE;
640
641   g_return_val_if_fail (conn != NULL, FALSE);
642
643   res = g_socket_client_get_tls (conn->client);
644   if (res)
645     g_socket_client_set_tls_validation_flags (conn->client, flags);
646
647   return res;
648 }
649
650 /**
651  * gst_rtsp_connection_get_tls_validation_flags:
652  * @conn: a #GstRTSPConnection
653  *
654  * Gets the TLS validation flags used to verify the peer certificate
655  * when a TLS connection is established.
656  *
657  * Returns: the validationg flags.
658  *
659  * Since: 1.2.1
660  */
661 GTlsCertificateFlags
662 gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
663 {
664   g_return_val_if_fail (conn != NULL, 0);
665
666   return g_socket_client_get_tls_validation_flags (conn->client);
667 }
668
669 /**
670  * gst_rtsp_connection_set_tls_database:
671  * @conn: a #GstRTSPConnection
672  * @database: a #GTlsDatabase
673  *
674  * Sets the anchor certificate authorities database. This certificate
675  * database will be used to verify the server's certificate in case it
676  * can't be verified with the default certificate database first.
677  *
678  * Since: 1.4
679  */
680 void
681 gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
682     GTlsDatabase * database)
683 {
684   GTlsDatabase *old_db;
685
686   g_return_if_fail (conn != NULL);
687
688   if (database)
689     g_object_ref (database);
690
691   old_db = conn->tls_database;
692   conn->tls_database = database;
693
694   if (old_db)
695     g_object_unref (old_db);
696 }
697
698 /**
699  * gst_rtsp_connection_get_tls_database:
700  * @conn: a #GstRTSPConnection
701  *
702  * Gets the anchor certificate authorities database that will be used
703  * after a server certificate can't be verified with the default
704  * certificate database.
705  *
706  * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
707  * database has been previously set. Use g_object_unref() to release the
708  * certificate database.
709  *
710  * Since: 1.4
711  */
712 GTlsDatabase *
713 gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
714 {
715   GTlsDatabase *result;
716
717   g_return_val_if_fail (conn != NULL, NULL);
718
719   if ((result = conn->tls_database))
720     g_object_ref (result);
721
722   return result;
723 }
724
725 /**
726  * gst_rtsp_connection_set_tls_interaction:
727  * @conn: a #GstRTSPConnection
728  * @interaction: a #GTlsInteraction
729  *
730  * Sets a #GTlsInteraction object to be used when the connection or certificate
731  * database need to interact with the user. This will be used to prompt the
732  * user for passwords where necessary.
733  *
734  * Since: 1.6
735  */
736 void
737 gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn,
738     GTlsInteraction * interaction)
739 {
740   GTlsInteraction *old_interaction;
741
742   g_return_if_fail (conn != NULL);
743
744   if (interaction)
745     g_object_ref (interaction);
746
747   old_interaction = conn->tls_interaction;
748   conn->tls_interaction = interaction;
749
750   if (old_interaction)
751     g_object_unref (old_interaction);
752 }
753
754 /**
755  * gst_rtsp_connection_get_tls_interaction:
756  * @conn: a #GstRTSPConnection
757  *
758  * Gets a #GTlsInteraction object to be used when the connection or certificate
759  * database need to interact with the user. This will be used to prompt the
760  * user for passwords where necessary.
761  *
762  * Returns: (transfer full): a reference on the #GTlsInteraction. Use
763  * g_object_unref() to release.
764  *
765  * Since: 1.6
766  */
767 GTlsInteraction *
768 gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn)
769 {
770   GTlsInteraction *result;
771
772   g_return_val_if_fail (conn != NULL, NULL);
773
774   if ((result = conn->tls_interaction))
775     g_object_ref (result);
776
777   return result;
778 }
779
780 /**
781  * gst_rtsp_connection_set_accept_certificate_func:
782  * @conn: a #GstRTSPConnection
783  * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates
784  * @destroy_notify: #GDestroyNotify for @user_data
785  * @user_data: User data passed to @func
786  *
787  * Sets a custom accept-certificate function for checking certificates for
788  * validity. This will directly map to #GTlsConnection 's "accept-certificate"
789  * signal and be performed after the default checks of #GstRTSPConnection
790  * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
791  * have failed. If no #GTlsDatabase is set on this connection, only @func will
792  * be called.
793  *
794  * Since: 1.14
795  */
796 void
797 gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn,
798     GstRTSPConnectionAcceptCertificateFunc func,
799     gpointer user_data, GDestroyNotify destroy_notify)
800 {
801   if (conn->accept_certificate_destroy_notify)
802     conn->
803         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
804   conn->accept_certificate_func = func;
805   conn->accept_certificate_user_data = user_data;
806   conn->accept_certificate_destroy_notify = destroy_notify;
807 }
808
809 static gchar *
810 get_tunneled_connection_uri_strdup (GstRTSPUrl * url, guint16 port)
811 {
812   const gchar *pre_host = "";
813   const gchar *post_host = "";
814
815   if (url->family == GST_RTSP_FAM_INET6) {
816     pre_host = "[";
817     post_host = "]";
818   }
819
820   return g_strdup_printf ("http://%s%s%s:%d%s%s%s", pre_host, url->host,
821       post_host, port, url->abspath, url->query ? "?" : "",
822       url->query ? url->query : "");
823 }
824
825 static GstRTSPResult
826 setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
827     GstRTSPMessage * response)
828 {
829   gint i;
830   GstRTSPResult res;
831   gchar *value;
832   guint16 url_port;
833   GstRTSPMessage *msg;
834   gboolean old_http;
835   GstRTSPUrl *url;
836   GError *error = NULL;
837   GSocketConnection *connection;
838   GSocket *socket;
839   gchar *connection_uri = NULL;
840   gchar *request_uri = NULL;
841   gchar *host = NULL;
842
843   url = conn->url;
844
845   gst_rtsp_url_get_port (url, &url_port);
846   host = g_strdup_printf ("%s:%d", url->host, url_port);
847
848   /* create a random sessionid */
849   for (i = 0; i < TUNNELID_LEN; i++)
850     conn->tunnelid[i] = g_random_int_range ('a', 'z');
851   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
852
853   /* create the GET request for the read connection */
854   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
855       no_message);
856   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
857
858   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
859       conn->tunnelid);
860   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
861       "application/x-rtsp-tunnelled");
862   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
863   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
864   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
865
866   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
867    * request from being base64 encoded */
868   conn->tunneled = FALSE;
869   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
870       write_failed);
871   gst_rtsp_message_free (msg);
872   conn->tunneled = TRUE;
873
874   /* receive the response to the GET request */
875   /* we need to temporarily set manual_http to TRUE since
876    * gst_rtsp_connection_receive() will treat the HTTP response as a parsing
877    * failure otherwise */
878   old_http = conn->manual_http;
879   conn->manual_http = TRUE;
880   GST_RTSP_CHECK (gst_rtsp_connection_receive_usec (conn, response, timeout),
881       read_failed);
882   conn->manual_http = old_http;
883
884   if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE ||
885       response->type_data.response.code != GST_RTSP_STS_OK)
886     goto wrong_result;
887
888   if (!conn->ignore_x_server_reply &&
889       gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
890           &value, 0) == GST_RTSP_OK) {
891     g_free (url->host);
892     url->host = g_strdup (value);
893     g_free (conn->remote_ip);
894     conn->remote_ip = g_strdup (value);
895   }
896
897   connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
898
899   /* connect to the host/port */
900   if (conn->proxy_host) {
901     connection = g_socket_client_connect_to_host (conn->client,
902         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
903     request_uri = g_strdup (connection_uri);
904   } else {
905     connection = g_socket_client_connect_to_uri (conn->client,
906         connection_uri, 0, conn->cancellable, &error);
907     request_uri =
908         g_strdup_printf ("%s%s%s", url->abspath,
909         url->query ? "?" : "", url->query ? url->query : "");
910   }
911   if (connection == NULL)
912     goto connect_failed;
913
914   socket = g_socket_connection_get_socket (connection);
915
916   /* get remote address */
917   g_free (conn->remote_ip);
918   conn->remote_ip = NULL;
919
920   if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
921     goto remote_address_failed;
922
923   /* this is now our writing socket */
924   conn->stream1 = G_IO_STREAM (connection);
925   conn->socket1 = socket;
926   conn->write_socket = conn->socket1;
927   conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
928   conn->control_stream = NULL;
929
930   /* create the POST request for the write connection */
931   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST,
932           request_uri), no_message);
933   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
934
935   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
936       conn->tunnelid);
937   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
938       "application/x-rtsp-tunnelled");
939   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
940       "application/x-rtsp-tunnelled");
941   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
942   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
943   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES,
944       "Sun, 9 Jan 1972 00:00:00 GMT");
945   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767");
946   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
947
948   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
949    * request from being base64 encoded */
950   conn->tunneled = FALSE;
951   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
952       write_failed);
953   gst_rtsp_message_free (msg);
954   conn->tunneled = TRUE;
955
956 exit:
957   g_free (connection_uri);
958   g_free (request_uri);
959   g_free (host);
960
961   return res;
962
963   /* ERRORS */
964 no_message:
965   {
966     GST_ERROR ("failed to create request (%d)", res);
967     goto exit;
968   }
969 write_failed:
970   {
971     GST_ERROR ("write failed (%d)", res);
972     gst_rtsp_message_free (msg);
973     conn->tunneled = TRUE;
974     goto exit;
975   }
976 read_failed:
977   {
978     GST_ERROR ("read failed (%d)", res);
979     conn->manual_http = FALSE;
980     goto exit;
981   }
982 wrong_result:
983   {
984     GST_ERROR ("got failure response %d %s",
985         response->type_data.response.code, response->type_data.response.reason);
986     res = GST_RTSP_ERROR;
987     goto exit;
988   }
989 connect_failed:
990   {
991     GST_ERROR ("failed to connect: %s", error->message);
992     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
993     g_clear_error (&error);
994     goto exit;
995   }
996 remote_address_failed:
997   {
998     GST_ERROR ("failed to resolve address: %s", error->message);
999     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1000     g_object_unref (connection);
1001     g_clear_error (&error);
1002     return res;
1003   }
1004 }
1005
1006 /**
1007  * gst_rtsp_connection_connect_with_response_usec:
1008  * @conn: a #GstRTSPConnection
1009  * @timeout: a timeout in microseconds
1010  * @response: a #GstRTSPMessage
1011  *
1012  * Attempt to connect to the url of @conn made with
1013  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1014  * forever. If @timeout contains a valid timeout, this function will return
1015  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
1016  * @response will contain a response to the tunneling request messages.
1017  *
1018  * This function can be cancelled with gst_rtsp_connection_flush().
1019  *
1020  * Returns: #GST_RTSP_OK when a connection could be made.
1021  *
1022  * Since: 1.18
1023  */
1024 GstRTSPResult
1025 gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
1026     gint64 timeout, GstRTSPMessage * response)
1027 {
1028   GstRTSPResult res;
1029   GSocketConnection *connection;
1030   GSocket *socket;
1031   GError *error = NULL;
1032   gchar *connection_uri, *request_uri, *remote_ip;
1033   GstClockTime to;
1034   guint16 url_port;
1035   GstRTSPUrl *url;
1036
1037   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1038   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
1039   g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
1040
1041   to = timeout * 1000;
1042   g_socket_client_set_timeout (conn->client,
1043       (to + GST_SECOND - 1) / GST_SECOND);
1044
1045   url = conn->url;
1046
1047   gst_rtsp_url_get_port (url, &url_port);
1048
1049   if (conn->tunneled) {
1050     connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
1051   } else {
1052     connection_uri = gst_rtsp_url_get_request_uri (url);
1053   }
1054
1055   if (conn->proxy_host) {
1056     connection = g_socket_client_connect_to_host (conn->client,
1057         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
1058     request_uri = g_strdup (connection_uri);
1059   } else {
1060     connection = g_socket_client_connect_to_uri (conn->client,
1061         connection_uri, url_port, conn->cancellable, &error);
1062
1063     /* use the relative component of the uri for non-proxy connections */
1064     request_uri = g_strdup_printf ("%s%s%s", url->abspath,
1065         url->query ? "?" : "", url->query ? url->query : "");
1066   }
1067   if (connection == NULL)
1068     goto connect_failed;
1069
1070   /* get remote address */
1071   socket = g_socket_connection_get_socket (connection);
1072
1073   if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
1074     goto remote_address_failed;
1075
1076   g_free (conn->remote_ip);
1077   conn->remote_ip = remote_ip;
1078   conn->stream0 = G_IO_STREAM (connection);
1079   conn->socket0 = socket;
1080   /* this is our read socket */
1081   conn->read_socket = conn->socket0;
1082   conn->write_socket = conn->socket0;
1083   conn->read_socket_used = FALSE;
1084   conn->write_socket_used = FALSE;
1085   conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
1086   conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
1087   conn->control_stream = NULL;
1088
1089   if (conn->tunneled) {
1090     res = setup_tunneling (conn, timeout, request_uri, response);
1091     if (res != GST_RTSP_OK)
1092       goto tunneling_failed;
1093   }
1094   g_free (connection_uri);
1095   g_free (request_uri);
1096
1097   return GST_RTSP_OK;
1098
1099   /* ERRORS */
1100 connect_failed:
1101   {
1102     GST_ERROR ("failed to connect: %s", error->message);
1103     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1104     g_clear_error (&error);
1105     g_free (connection_uri);
1106     g_free (request_uri);
1107     return res;
1108   }
1109 remote_address_failed:
1110   {
1111     GST_ERROR ("failed to connect: %s", error->message);
1112     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1113     g_object_unref (connection);
1114     g_clear_error (&error);
1115     g_free (connection_uri);
1116     g_free (request_uri);
1117     return res;
1118   }
1119 tunneling_failed:
1120   {
1121     GST_ERROR ("failed to setup tunneling");
1122     g_free (connection_uri);
1123     g_free (request_uri);
1124     return res;
1125   }
1126 }
1127
1128 static void
1129 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
1130 {
1131   switch (conn->auth_method) {
1132     case GST_RTSP_AUTH_BASIC:{
1133       gchar *user_pass;
1134       gchar *user_pass64;
1135       gchar *auth_string;
1136
1137       if (conn->username == NULL || conn->passwd == NULL)
1138         break;
1139
1140       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1141       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1142       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1143
1144       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1145           auth_string);
1146
1147       g_free (user_pass);
1148       g_free (user_pass64);
1149       break;
1150     }
1151     case GST_RTSP_AUTH_DIGEST:{
1152       gchar *response;
1153       gchar *auth_string, *auth_string2;
1154       gchar *realm;
1155       gchar *nonce;
1156       gchar *opaque;
1157       const gchar *uri;
1158       const gchar *method;
1159
1160       /* we need to have some params set */
1161       if (conn->auth_params == NULL || conn->username == NULL ||
1162           conn->passwd == NULL)
1163         break;
1164
1165       /* we need the realm and nonce */
1166       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1167       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1168       if (realm == NULL || nonce == NULL)
1169         break;
1170
1171       method = gst_rtsp_method_as_text (message->type_data.request.method);
1172       uri = message->type_data.request.uri;
1173
1174       response =
1175           gst_rtsp_generate_digest_auth_response (NULL, method, realm,
1176           conn->username, conn->passwd, uri, nonce);
1177       auth_string =
1178           g_strdup_printf ("Digest username=\"%s\", "
1179           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1180           conn->username, realm, nonce, uri, response);
1181       g_free (response);
1182
1183       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1184       if (opaque) {
1185         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1186             opaque);
1187         g_free (auth_string);
1188         auth_string = auth_string2;
1189       }
1190       /* Do not keep any old Authorization headers */
1191       gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1);
1192       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1193           auth_string);
1194       break;
1195     }
1196     default:
1197       /* Nothing to do */
1198       break;
1199   }
1200 }
1201
1202 /**
1203  * gst_rtsp_connection_connect_usec:
1204  * @conn: a #GstRTSPConnection
1205  * @timeout: a timeout in microseconds
1206  *
1207  * Attempt to connect to the url of @conn made with
1208  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1209  * forever. If @timeout contains a valid timeout, this function will return
1210  * #GST_RTSP_ETIMEOUT after the timeout expired.
1211  *
1212  * This function can be cancelled with gst_rtsp_connection_flush().
1213  *
1214  * Returns: #GST_RTSP_OK when a connection could be made.
1215  *
1216  * Since: 1.18
1217  */
1218 GstRTSPResult
1219 gst_rtsp_connection_connect_usec (GstRTSPConnection * conn, gint64 timeout)
1220 {
1221   GstRTSPResult result;
1222   GstRTSPMessage response;
1223
1224   memset (&response, 0, sizeof (response));
1225   gst_rtsp_message_init (&response);
1226
1227   result = gst_rtsp_connection_connect_with_response_usec (conn, timeout,
1228       &response);
1229
1230   gst_rtsp_message_unset (&response);
1231
1232   return result;
1233 }
1234
1235 static void
1236 gen_date_string (gchar * date_string, guint len)
1237 {
1238   static const char wkdays[7][4] =
1239       { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
1240   static const char months[12][4] =
1241       { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
1242     "Nov", "Dec"
1243   };
1244   struct tm tm;
1245   time_t t;
1246
1247   time (&t);
1248
1249 #ifdef HAVE_GMTIME_R
1250   gmtime_r (&t, &tm);
1251 #else
1252   tm = *gmtime (&t);
1253 #endif
1254
1255   g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
1256       wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
1257       tm.tm_hour, tm.tm_min, tm.tm_sec);
1258 }
1259
1260 static GstRTSPResult
1261 write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
1262     guint size, gboolean block, GCancellable * cancellable)
1263 {
1264   guint left;
1265   gssize r;
1266   GstRTSPResult res;
1267   GError *err = NULL;
1268
1269   if (G_UNLIKELY (*idx > size))
1270     return GST_RTSP_ERROR;
1271
1272   left = size - *idx;
1273
1274   while (left) {
1275     if (block)
1276       r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
1277           cancellable, &err);
1278     else
1279       r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
1280           (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
1281     if (G_UNLIKELY (r < 0))
1282       goto error;
1283
1284     left -= r;
1285     *idx += r;
1286   }
1287   return GST_RTSP_OK;
1288
1289   /* ERRORS */
1290 error:
1291   {
1292     if (G_UNLIKELY (r == 0))
1293       return GST_RTSP_EEOF;
1294
1295     if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1296       GST_WARNING ("%s", err->message);
1297     else
1298       GST_DEBUG ("%s", err->message);
1299
1300     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1301     g_clear_error (&err);
1302     return res;
1303   }
1304 }
1305
1306 /* NOTE: This changes the values of vectors if multiple iterations are needed! */
1307 #if GLIB_CHECK_VERSION(2, 59, 1)
1308 static GstRTSPResult
1309 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1310     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1311 {
1312   gsize _bytes_written = 0;
1313   gsize written;
1314   GstRTSPResult ret;
1315   GError *err = NULL;
1316   GPollableReturn res = G_POLLABLE_RETURN_OK;
1317
1318   while (n_vectors > 0) {
1319     if (block) {
1320       if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors,
1321                   &written, cancellable, &err))) {
1322         /* This will never return G_IO_ERROR_WOULD_BLOCK */
1323         res = G_POLLABLE_RETURN_FAILED;
1324         goto error;
1325       }
1326     } else {
1327       res =
1328           g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
1329           (stream), vectors, n_vectors, &written, cancellable, &err);
1330
1331       if (res != G_POLLABLE_RETURN_OK) {
1332         g_assert (written == 0);
1333         goto error;
1334       }
1335     }
1336     _bytes_written += written;
1337
1338     /* skip vectors that have been written in full */
1339     while (written > 0 && written >= vectors[0].size) {
1340       written -= vectors[0].size;
1341       ++vectors;
1342       --n_vectors;
1343     }
1344
1345     /* skip partially written vector data */
1346     if (written > 0) {
1347       vectors[0].size -= written;
1348       vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
1349     }
1350   }
1351
1352   *bytes_written = _bytes_written;
1353
1354   return GST_RTSP_OK;
1355
1356   /* ERRORS */
1357 error:
1358   {
1359     *bytes_written = _bytes_written;
1360
1361     if (err)
1362       GST_WARNING ("%s", err->message);
1363     if (res == G_POLLABLE_RETURN_WOULD_BLOCK) {
1364       g_assert (!err);
1365       return GST_RTSP_EINTR;
1366     } else if (G_UNLIKELY (written == 0)) {
1367       g_clear_error (&err);
1368       return GST_RTSP_EEOF;
1369     }
1370
1371     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1372     g_clear_error (&err);
1373     return ret;
1374   }
1375 }
1376 #else
1377 static GstRTSPResult
1378 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1379     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1380 {
1381   gsize _bytes_written = 0;
1382   guint written;
1383   gint i;
1384   GstRTSPResult res = GST_RTSP_OK;
1385
1386   for (i = 0; i < n_vectors; i++) {
1387     written = 0;
1388     res =
1389         write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
1390         block, cancellable);
1391     _bytes_written += written;
1392     if (G_UNLIKELY (res != GST_RTSP_OK))
1393       break;
1394   }
1395
1396   *bytes_written = _bytes_written;
1397
1398   return res;
1399 }
1400 #endif
1401
1402 static gint
1403 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1404     gboolean block, GError ** err)
1405 {
1406   gint out = 0;
1407
1408   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1409     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1410
1411     out = MIN (left, size);
1412     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1413
1414     if (left == (gsize) out) {
1415       g_free (conn->initial_buffer);
1416       conn->initial_buffer = NULL;
1417       conn->initial_buffer_offset = 0;
1418     } else
1419       conn->initial_buffer_offset += out;
1420   }
1421
1422   if (G_LIKELY (size > (guint) out)) {
1423     gssize r;
1424     gsize count = size - out;
1425     if (block)
1426       r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
1427           count, conn->may_cancel ? conn->cancellable : NULL, err);
1428     else
1429       r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
1430           (conn->input_stream), (gchar *) & buffer[out], count,
1431           conn->may_cancel ? conn->cancellable : NULL, err);
1432
1433     if (G_UNLIKELY (r < 0)) {
1434       if (out == 0) {
1435         /* propagate the error */
1436         out = r;
1437       } else {
1438         /* we have some data ignore error */
1439         g_clear_error (err);
1440       }
1441     } else
1442       out += r;
1443   }
1444
1445   return out;
1446 }
1447
1448 static gint
1449 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1450     gboolean block, GError ** err)
1451 {
1452   DecodeCtx *ctx = conn->ctxp;
1453   gint out = 0;
1454
1455   if (ctx) {
1456     while (size > 0) {
1457       guint8 in[sizeof (ctx->out) * 4 / 3];
1458       gint r;
1459
1460       while (size > 0 && ctx->cout < ctx->coutl) {
1461         /* we have some leftover bytes */
1462         *buffer++ = ctx->out[ctx->cout++];
1463         size--;
1464         out++;
1465       }
1466
1467       /* got what we needed? */
1468       if (size == 0)
1469         break;
1470
1471       /* try to read more bytes */
1472       r = fill_raw_bytes (conn, in, sizeof (in), block, err);
1473       if (r <= 0) {
1474         if (out == 0) {
1475           out = r;
1476         } else {
1477           /* we have some data ignore error */
1478           g_clear_error (err);
1479         }
1480         break;
1481       }
1482
1483       ctx->cout = 0;
1484       ctx->coutl =
1485           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1486           &ctx->save);
1487     }
1488   } else {
1489     out = fill_raw_bytes (conn, buffer, size, block, err);
1490   }
1491
1492   return out;
1493 }
1494
1495 static GstRTSPResult
1496 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1497     gboolean block)
1498 {
1499   guint left;
1500   gint r;
1501   GstRTSPResult res;
1502   GError *err = NULL;
1503
1504   if (G_UNLIKELY (*idx > size))
1505     return GST_RTSP_ERROR;
1506
1507   left = size - *idx;
1508
1509   while (left) {
1510     r = fill_bytes (conn, &buffer[*idx], left, block, &err);
1511     if (G_UNLIKELY (r <= 0))
1512       goto error;
1513
1514     left -= r;
1515     *idx += r;
1516   }
1517   return GST_RTSP_OK;
1518
1519   /* ERRORS */
1520 error:
1521   {
1522     if (G_UNLIKELY (r == 0))
1523       return GST_RTSP_EEOF;
1524
1525     GST_DEBUG ("%s", err->message);
1526     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1527     g_clear_error (&err);
1528     return res;
1529   }
1530 }
1531
1532 /* The code below tries to handle clients using \r, \n or \r\n to indicate the
1533  * end of a line. It even does its best to handle clients which mix them (even
1534  * though this is a really stupid idea (tm).) It also handles Line White Space
1535  * (LWS), where a line end followed by whitespace is considered LWS. This is
1536  * the method used in RTSP (and HTTP) to break long lines.
1537  */
1538 static GstRTSPResult
1539 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1540     gboolean block)
1541 {
1542   GstRTSPResult res;
1543
1544   while (TRUE) {
1545     guint8 c;
1546     guint i;
1547
1548     if (conn->read_ahead == READ_AHEAD_EOH) {
1549       /* the last call to read_line() already determined that we have reached
1550        * the end of the headers, so convey that information now */
1551       conn->read_ahead = 0;
1552       break;
1553     } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1554       /* the last call to read_line() left off after having read \r\n */
1555       c = '\n';
1556     } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1557       /* the last call to read_line() left off after having read \r\n\r */
1558       c = '\r';
1559     } else if (conn->read_ahead != 0) {
1560       /* the last call to read_line() left us with a character to start with */
1561       c = (guint8) conn->read_ahead;
1562       conn->read_ahead = 0;
1563     } else {
1564       /* read the next character */
1565       i = 0;
1566       res = read_bytes (conn, &c, &i, 1, block);
1567       if (G_UNLIKELY (res != GST_RTSP_OK))
1568         return res;
1569     }
1570
1571     /* special treatment of line endings */
1572     if (c == '\r' || c == '\n') {
1573       guint8 read_ahead;
1574
1575     retry:
1576       /* need to read ahead one more character to know what to do... */
1577       i = 0;
1578       res = read_bytes (conn, &read_ahead, &i, 1, block);
1579       if (G_UNLIKELY (res != GST_RTSP_OK))
1580         return res;
1581
1582       if (read_ahead == ' ' || read_ahead == '\t') {
1583         if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1584           /* got \r\n\r followed by whitespace, treat it as a normal line
1585            * followed by one starting with LWS */
1586           conn->read_ahead = read_ahead;
1587           break;
1588         } else {
1589           /* got LWS, change the line ending to a space and continue */
1590           c = ' ';
1591           conn->read_ahead = read_ahead;
1592         }
1593       } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1594         if (read_ahead == '\r' || read_ahead == '\n') {
1595           /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */
1596           conn->read_ahead = READ_AHEAD_EOH;
1597           break;
1598         } else {
1599           /* got \r\n\r followed by something else, this is not really
1600            * supported since we have probably just eaten the first character
1601            * of the body or the next message, so just ignore the second \r
1602            * and live with it... */
1603           conn->read_ahead = read_ahead;
1604           break;
1605         }
1606       } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1607         if (read_ahead == '\r') {
1608           /* got \r\n\r so far, need one more character... */
1609           conn->read_ahead = READ_AHEAD_CRLFCR;
1610           goto retry;
1611         } else if (read_ahead == '\n') {
1612           /* got \r\n\n, treat it as the end of the headers */
1613           conn->read_ahead = READ_AHEAD_EOH;
1614           break;
1615         } else {
1616           /* found the end of a line, keep read_ahead for the next line */
1617           conn->read_ahead = read_ahead;
1618           break;
1619         }
1620       } else if (c == read_ahead) {
1621         /* got double \r or \n, treat it as the end of the headers */
1622         conn->read_ahead = READ_AHEAD_EOH;
1623         break;
1624       } else if (c == '\r' && read_ahead == '\n') {
1625         /* got \r\n so far, still need more to know what to do... */
1626         conn->read_ahead = READ_AHEAD_CRLF;
1627         goto retry;
1628       } else {
1629         /* found the end of a line, keep read_ahead for the next line */
1630         conn->read_ahead = read_ahead;
1631         break;
1632       }
1633     }
1634
1635     if (G_LIKELY (*idx < size - 1))
1636       buffer[(*idx)++] = c;
1637   }
1638   buffer[*idx] = '\0';
1639
1640   return GST_RTSP_OK;
1641 }
1642
1643 static void
1644 set_read_socket_timeout (GstRTSPConnection * conn, gint64 timeout)
1645 {
1646   GstClockTime to_nsecs;
1647   guint to_secs;
1648
1649   g_mutex_lock (&conn->socket_use_mutex);
1650
1651   g_assert (!conn->read_socket_used);
1652   conn->read_socket_used = TRUE;
1653
1654   to_nsecs = timeout * 1000;
1655   to_secs = (to_nsecs + GST_SECOND - 1) / GST_SECOND;
1656
1657   if (to_secs > g_socket_get_timeout (conn->read_socket)) {
1658     g_socket_set_timeout (conn->read_socket, to_secs);
1659   }
1660
1661   g_mutex_unlock (&conn->socket_use_mutex);
1662 }
1663
1664 static void
1665 set_write_socket_timeout (GstRTSPConnection * conn, gint64 timeout)
1666 {
1667   GstClockTime to_nsecs;
1668   guint to_secs;
1669
1670   g_mutex_lock (&conn->socket_use_mutex);
1671
1672   g_assert (!conn->write_socket_used);
1673   conn->write_socket_used = TRUE;
1674
1675   to_nsecs = timeout * 1000;
1676   to_secs = (to_nsecs + GST_SECOND - 1) / GST_SECOND;
1677
1678   if (to_secs > g_socket_get_timeout (conn->write_socket)) {
1679     g_socket_set_timeout (conn->write_socket, to_secs);
1680   }
1681
1682   g_mutex_unlock (&conn->socket_use_mutex);
1683 }
1684
1685 static void
1686 clear_read_socket_timeout (GstRTSPConnection * conn)
1687 {
1688   g_mutex_lock (&conn->socket_use_mutex);
1689
1690   conn->read_socket_used = FALSE;
1691   if (conn->read_socket != conn->write_socket || !conn->write_socket_used) {
1692     g_socket_set_timeout (conn->read_socket, 0);
1693   }
1694
1695   g_mutex_unlock (&conn->socket_use_mutex);
1696 }
1697
1698 static void
1699 clear_write_socket_timeout (GstRTSPConnection * conn)
1700 {
1701   g_mutex_lock (&conn->socket_use_mutex);
1702
1703   conn->write_socket_used = FALSE;
1704   if (conn->write_socket != conn->read_socket || !conn->read_socket_used) {
1705     g_socket_set_timeout (conn->write_socket, 0);
1706   }
1707
1708   g_mutex_unlock (&conn->socket_use_mutex);
1709 }
1710
1711 /**
1712  * gst_rtsp_connection_write_usec:
1713  * @conn: a #GstRTSPConnection
1714  * @data: the data to write
1715  * @size: the size of @data
1716  * @timeout: a timeout value or 0
1717  *
1718  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1719  * the specified @timeout. @timeout can be 0, in which case this function
1720  * might block forever.
1721  *
1722  * This function can be cancelled with gst_rtsp_connection_flush().
1723  *
1724  * Returns: #GST_RTSP_OK on success.
1725  *
1726  * Since: 1.18
1727  */
1728 /* FIXME 2.0: This should've been static! */
1729 GstRTSPResult
1730 gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
1731     guint size, gint64 timeout)
1732 {
1733   guint offset;
1734   GstRTSPResult res;
1735
1736   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1737   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1738   g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
1739
1740   offset = 0;
1741
1742   set_write_socket_timeout (conn, timeout);
1743
1744   res =
1745       write_bytes (conn->output_stream, data, &offset, size, TRUE,
1746       conn->cancellable);
1747
1748   clear_write_socket_timeout (conn);
1749
1750   return res;
1751 }
1752
1753 static gboolean
1754 serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
1755     GstRTSPSerializedMessage * serialized_message)
1756 {
1757   GString *str = NULL;
1758
1759   memset (serialized_message, 0, sizeof (*serialized_message));
1760
1761   /* Initially we borrow the body_data / body_buffer fields from
1762    * the message */
1763   serialized_message->borrowed = TRUE;
1764
1765   switch (message->type) {
1766     case GST_RTSP_MESSAGE_REQUEST:
1767       str = g_string_new ("");
1768
1769       /* create request string, add CSeq */
1770       g_string_append_printf (str, "%s %s RTSP/%s\r\n"
1771           "CSeq: %d\r\n",
1772           gst_rtsp_method_as_text (message->type_data.request.method),
1773           message->type_data.request.uri,
1774           gst_rtsp_version_as_text (message->type_data.request.version),
1775           conn->cseq++);
1776       /* add session id if we have one */
1777       if (conn->session_id[0] != '\0') {
1778         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1779         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1780             conn->session_id);
1781       }
1782       /* add any authentication headers */
1783       add_auth_header (conn, message);
1784       break;
1785     case GST_RTSP_MESSAGE_RESPONSE:
1786       str = g_string_new ("");
1787
1788       /* create response string */
1789       g_string_append_printf (str, "RTSP/%s %d %s\r\n",
1790           gst_rtsp_version_as_text (message->type_data.response.version),
1791           message->type_data.response.code, message->type_data.response.reason);
1792       break;
1793     case GST_RTSP_MESSAGE_HTTP_REQUEST:
1794       str = g_string_new ("");
1795
1796       /* create request string */
1797       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1798           gst_rtsp_method_as_text (message->type_data.request.method),
1799           message->type_data.request.uri,
1800           gst_rtsp_version_as_text (message->type_data.request.version));
1801       /* add any authentication headers */
1802       add_auth_header (conn, message);
1803       break;
1804     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1805       str = g_string_new ("");
1806
1807       /* create response string */
1808       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1809           gst_rtsp_version_as_text (message->type_data.request.version),
1810           message->type_data.response.code, message->type_data.response.reason);
1811       break;
1812     case GST_RTSP_MESSAGE_DATA:
1813     {
1814       guint8 *data_header = serialized_message->data_header;
1815
1816       /* prepare data header */
1817       data_header[0] = '$';
1818       data_header[1] = message->type_data.data.channel;
1819       data_header[2] = (message->body_size >> 8) & 0xff;
1820       data_header[3] = message->body_size & 0xff;
1821
1822       /* create serialized message with header and data */
1823       serialized_message->data_is_data_header = TRUE;
1824       serialized_message->data_size = 4;
1825
1826       if (message->body) {
1827         serialized_message->body_data = message->body;
1828         serialized_message->body_data_size = message->body_size;
1829       } else {
1830         g_assert (message->body_buffer != NULL);
1831         serialized_message->body_buffer = message->body_buffer;
1832       }
1833       break;
1834     }
1835     default:
1836       g_string_free (str, TRUE);
1837       g_return_val_if_reached (FALSE);
1838       break;
1839   }
1840
1841   /* append headers and body */
1842   if (message->type != GST_RTSP_MESSAGE_DATA) {
1843     gchar date_string[100];
1844
1845     g_assert (str != NULL);
1846
1847     gen_date_string (date_string, sizeof (date_string));
1848
1849     /* add date header */
1850     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1851     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1852
1853     /* append headers */
1854     gst_rtsp_message_append_headers (message, str);
1855
1856     /* append Content-Length and body if needed */
1857     if (message->body_size > 0) {
1858       gchar *len;
1859
1860       len = g_strdup_printf ("%d", message->body_size);
1861       g_string_append_printf (str, "%s: %s\r\n",
1862           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1863       g_free (len);
1864       /* header ends here */
1865       g_string_append (str, "\r\n");
1866
1867       if (message->body) {
1868         serialized_message->body_data = message->body;
1869         serialized_message->body_data_size = message->body_size;
1870       } else {
1871         g_assert (message->body_buffer != NULL);
1872         serialized_message->body_buffer = message->body_buffer;
1873       }
1874     } else {
1875       /* just end headers */
1876       g_string_append (str, "\r\n");
1877     }
1878
1879     serialized_message->data_size = str->len;
1880     serialized_message->data = (guint8 *) g_string_free (str, FALSE);
1881   }
1882
1883   return TRUE;
1884 }
1885
1886 /**
1887  * gst_rtsp_connection_send_usec:
1888  * @conn: a #GstRTSPConnection
1889  * @message: the message to send
1890  * @timeout: a timeout value in microseconds
1891  *
1892  * Attempt to send @message to the connected @conn, blocking up to
1893  * the specified @timeout. @timeout can be 0, in which case this function
1894  * might block forever.
1895  *
1896  * This function can be cancelled with gst_rtsp_connection_flush().
1897  *
1898  * Returns: #GST_RTSP_OK on success.
1899  *
1900  * Since: 1.18
1901  */
1902 GstRTSPResult
1903 gst_rtsp_connection_send_usec (GstRTSPConnection * conn,
1904     GstRTSPMessage * message, gint64 timeout)
1905 {
1906   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1907   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1908
1909   return gst_rtsp_connection_send_messages_usec (conn, message, 1, timeout);
1910 }
1911
1912 /**
1913  * gst_rtsp_connection_send_messages_usec:
1914  * @conn: a #GstRTSPConnection
1915  * @messages: (array length=n_messages): the messages to send
1916  * @n_messages: the number of messages to send
1917  * @timeout: a timeout value in microseconds
1918  *
1919  * Attempt to send @messages to the connected @conn, blocking up to
1920  * the specified @timeout. @timeout can be 0, in which case this function
1921  * might block forever.
1922  *
1923  * This function can be cancelled with gst_rtsp_connection_flush().
1924  *
1925  * Returns: #GST_RTSP_OK on Since.
1926  *
1927  * Since: 1.18
1928  */
1929 GstRTSPResult
1930 gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
1931     GstRTSPMessage * messages, guint n_messages, gint64 timeout)
1932 {
1933   GstRTSPResult res;
1934   GstRTSPSerializedMessage *serialized_messages;
1935   GOutputVector *vectors;
1936   GstMapInfo *map_infos;
1937   guint n_vectors, n_memories;
1938   gint i, j, k;
1939   gsize bytes_to_write, bytes_written;
1940
1941   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1942   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
1943
1944   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
1945   memset (serialized_messages, 0,
1946       sizeof (GstRTSPSerializedMessage) * n_messages);
1947
1948   for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
1949       i++) {
1950     if (G_UNLIKELY (!serialize_message (conn, &messages[i],
1951                 &serialized_messages[i])))
1952       goto no_message;
1953
1954     if (conn->tunneled) {
1955       gint state = 0, save = 0;
1956       gchar *base64_buffer, *out_buffer;
1957       gsize written = 0;
1958       gsize in_length;
1959
1960       in_length = serialized_messages[i].data_size;
1961       if (serialized_messages[i].body_data)
1962         in_length += serialized_messages[i].body_data_size;
1963       else if (serialized_messages[i].body_buffer)
1964         in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
1965
1966       in_length = (in_length / 3 + 1) * 4 + 4 + 1;
1967       base64_buffer = out_buffer = g_malloc0 (in_length);
1968
1969       written =
1970           g_base64_encode_step (serialized_messages[i].data_is_data_header ?
1971           serialized_messages[i].data_header : serialized_messages[i].data,
1972           serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
1973       out_buffer += written;
1974
1975       if (serialized_messages[i].body_data) {
1976         written =
1977             g_base64_encode_step (serialized_messages[i].body_data,
1978             serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
1979             &save);
1980         out_buffer += written;
1981       } else if (serialized_messages[i].body_buffer) {
1982         guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1983
1984         for (j = 0; j < n; j++) {
1985           GstMemory *mem =
1986               gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
1987           GstMapInfo map;
1988
1989           gst_memory_map (mem, &map, GST_MAP_READ);
1990
1991           written = g_base64_encode_step (map.data, map.size,
1992               FALSE, out_buffer, &state, &save);
1993           out_buffer += written;
1994
1995           gst_memory_unmap (mem, &map);
1996         }
1997       }
1998
1999       written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
2000       out_buffer += written;
2001
2002       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
2003       memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
2004
2005       serialized_messages[i].data = (guint8 *) base64_buffer;
2006       serialized_messages[i].data_size = (out_buffer - base64_buffer);
2007       n_vectors++;
2008     } else {
2009       n_vectors++;
2010       if (serialized_messages[i].body_data) {
2011         n_vectors++;
2012       } else if (serialized_messages[i].body_buffer) {
2013         n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
2014         n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
2015       }
2016     }
2017   }
2018
2019   vectors = g_newa (GOutputVector, n_vectors);
2020   map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
2021
2022   for (i = 0, j = 0, k = 0; i < n_messages; i++) {
2023     vectors[j].buffer = serialized_messages[i].data_is_data_header ?
2024         serialized_messages[i].data_header : serialized_messages[i].data;
2025     vectors[j].size = serialized_messages[i].data_size;
2026     bytes_to_write += vectors[j].size;
2027     j++;
2028
2029     if (serialized_messages[i].body_data) {
2030       vectors[j].buffer = serialized_messages[i].body_data;
2031       vectors[j].size = serialized_messages[i].body_data_size;
2032       bytes_to_write += vectors[j].size;
2033       j++;
2034     } else if (serialized_messages[i].body_buffer) {
2035       gint l, n;
2036
2037       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
2038       for (l = 0; l < n; l++) {
2039         GstMemory *mem =
2040             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
2041
2042         gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
2043         vectors[j].buffer = map_infos[k].data;
2044         vectors[j].size = map_infos[k].size;
2045         bytes_to_write += vectors[j].size;
2046
2047         k++;
2048         j++;
2049       }
2050     }
2051   }
2052
2053   /* write request: this is synchronous */
2054   set_write_socket_timeout (conn, timeout);
2055
2056   res =
2057       writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
2058       TRUE, conn->cancellable);
2059
2060   clear_write_socket_timeout (conn);
2061
2062   g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
2063
2064   /* free everything */
2065   for (i = 0, k = 0; i < n_messages; i++) {
2066     if (serialized_messages[i].body_buffer) {
2067       gint l, n;
2068
2069       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
2070       for (l = 0; l < n; l++) {
2071         GstMemory *mem =
2072             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
2073
2074         gst_memory_unmap (mem, &map_infos[k]);
2075         k++;
2076       }
2077     }
2078
2079     g_free (serialized_messages[i].data);
2080   }
2081
2082   return res;
2083
2084 no_message:
2085   {
2086     for (i = 0; i < n_messages; i++) {
2087       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
2088     }
2089     g_warning ("Wrong message");
2090     return GST_RTSP_EINVAL;
2091   }
2092 }
2093
2094 static GstRTSPResult
2095 parse_string (gchar * dest, gint size, gchar ** src)
2096 {
2097   GstRTSPResult res = GST_RTSP_OK;
2098   gint idx;
2099
2100   idx = 0;
2101   /* skip spaces */
2102   while (g_ascii_isspace (**src))
2103     (*src)++;
2104
2105   while (!g_ascii_isspace (**src) && **src != '\0') {
2106     if (idx < size - 1)
2107       dest[idx++] = **src;
2108     else
2109       res = GST_RTSP_EPARSE;
2110     (*src)++;
2111   }
2112   if (size > 0)
2113     dest[idx] = '\0';
2114
2115   return res;
2116 }
2117
2118 static GstRTSPResult
2119 parse_protocol_version (gchar * protocol, GstRTSPMsgType * type,
2120     GstRTSPVersion * version)
2121 {
2122   GstRTSPVersion rversion;
2123   GstRTSPResult res = GST_RTSP_OK;
2124   gchar *ver;
2125
2126   if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) {
2127     guint major;
2128     guint minor;
2129     gchar dummychar;
2130
2131     *ver++ = '\0';
2132
2133     /* the version number must be formatted as X.Y with nothing following */
2134     if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2)
2135       res = GST_RTSP_EPARSE;
2136
2137     rversion = major * 0x10 + minor;
2138     if (g_ascii_strcasecmp (protocol, "RTSP") == 0) {
2139
2140       if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) {
2141         *version = GST_RTSP_VERSION_INVALID;
2142         res = GST_RTSP_ERROR;
2143       }
2144     } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) {
2145       if (*type == GST_RTSP_MESSAGE_REQUEST)
2146         *type = GST_RTSP_MESSAGE_HTTP_REQUEST;
2147       else if (*type == GST_RTSP_MESSAGE_RESPONSE)
2148         *type = GST_RTSP_MESSAGE_HTTP_RESPONSE;
2149
2150       if (rversion != GST_RTSP_VERSION_1_0 &&
2151           rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0)
2152         res = GST_RTSP_ERROR;
2153     } else
2154       res = GST_RTSP_EPARSE;
2155   } else
2156     res = GST_RTSP_EPARSE;
2157
2158   if (res == GST_RTSP_OK)
2159     *version = rversion;
2160
2161   return res;
2162 }
2163
2164 static GstRTSPResult
2165 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
2166 {
2167   GstRTSPResult res = GST_RTSP_OK;
2168   GstRTSPResult res2;
2169   gchar versionstr[20];
2170   gchar codestr[4];
2171   gint code;
2172   gchar *bptr;
2173
2174   bptr = (gchar *) buffer;
2175
2176   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2177     res = GST_RTSP_EPARSE;
2178
2179   if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK)
2180     res = GST_RTSP_EPARSE;
2181   code = atoi (codestr);
2182   if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600))
2183     res = GST_RTSP_EPARSE;
2184
2185   while (g_ascii_isspace (*bptr))
2186     bptr++;
2187
2188   if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr,
2189               NULL) != GST_RTSP_OK))
2190     res = GST_RTSP_EPARSE;
2191
2192   res2 = parse_protocol_version (versionstr, &msg->type,
2193       &msg->type_data.response.version);
2194   if (G_LIKELY (res == GST_RTSP_OK))
2195     res = res2;
2196
2197   return res;
2198 }
2199
2200 static GstRTSPResult
2201 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
2202 {
2203   GstRTSPResult res = GST_RTSP_OK;
2204   GstRTSPResult res2;
2205   gchar versionstr[20];
2206   gchar methodstr[20];
2207   gchar urlstr[4096];
2208   gchar *bptr;
2209   GstRTSPMethod method;
2210
2211   bptr = (gchar *) buffer;
2212
2213   if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK)
2214     res = GST_RTSP_EPARSE;
2215   method = gst_rtsp_find_method (methodstr);
2216
2217   if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK)
2218     res = GST_RTSP_EPARSE;
2219   if (G_UNLIKELY (*urlstr == '\0'))
2220     res = GST_RTSP_EPARSE;
2221
2222   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2223     res = GST_RTSP_EPARSE;
2224
2225   if (G_UNLIKELY (*bptr != '\0'))
2226     res = GST_RTSP_EPARSE;
2227
2228   if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method,
2229               urlstr) != GST_RTSP_OK))
2230     res = GST_RTSP_EPARSE;
2231
2232   res2 = parse_protocol_version (versionstr, &msg->type,
2233       &msg->type_data.request.version);
2234   if (G_LIKELY (res == GST_RTSP_OK))
2235     res = res2;
2236
2237   if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) {
2238     /* GET and POST are not allowed as RTSP methods */
2239     if (msg->type_data.request.method == GST_RTSP_GET ||
2240         msg->type_data.request.method == GST_RTSP_POST) {
2241       msg->type_data.request.method = GST_RTSP_INVALID;
2242       if (res == GST_RTSP_OK)
2243         res = GST_RTSP_ERROR;
2244     }
2245   } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2246     /* only GET and POST are allowed as HTTP methods */
2247     if (msg->type_data.request.method != GST_RTSP_GET &&
2248         msg->type_data.request.method != GST_RTSP_POST) {
2249       msg->type_data.request.method = GST_RTSP_INVALID;
2250       if (res == GST_RTSP_OK)
2251         res = GST_RTSP_ERROR;
2252     }
2253   }
2254
2255   return res;
2256 }
2257
2258 /* parsing lines means reading a Key: Value pair */
2259 static GstRTSPResult
2260 parse_line (guint8 * buffer, GstRTSPMessage * msg)
2261 {
2262   GstRTSPHeaderField field;
2263   gchar *line = (gchar *) buffer;
2264   gchar *field_name = NULL;
2265   gchar *value;
2266
2267   if ((value = strchr (line, ':')) == NULL || value == line)
2268     goto parse_error;
2269
2270   /* trim space before the colon */
2271   if (value[-1] == ' ')
2272     value[-1] = '\0';
2273
2274   /* replace the colon with a NUL */
2275   *value++ = '\0';
2276
2277   /* find the header */
2278   field = gst_rtsp_find_header_field (line);
2279   /* custom header not present in the list of pre-defined headers */
2280   if (field == GST_RTSP_HDR_INVALID)
2281     field_name = line;
2282
2283   /* split up the value in multiple key:value pairs if it contains comma(s) */
2284   while (*value != '\0') {
2285     gchar *next_value;
2286     gchar *comma = NULL;
2287     gboolean quoted = FALSE;
2288     guint comment = 0;
2289
2290     /* trim leading space */
2291     if (*value == ' ')
2292       value++;
2293
2294     /* for headers which may not appear multiple times, and thus may not
2295      * contain multiple values on the same line, we can short-circuit the loop
2296      * below and the entire value results in just one key:value pair*/
2297     if (!gst_rtsp_header_allow_multiple (field))
2298       next_value = value + strlen (value);
2299     else
2300       next_value = value;
2301
2302     /* find the next value, taking special care of quotes and comments */
2303     while (*next_value != '\0') {
2304       if ((quoted || comment != 0) && *next_value == '\\' &&
2305           next_value[1] != '\0')
2306         next_value++;
2307       else if (comment == 0 && *next_value == '"')
2308         quoted = !quoted;
2309       else if (!quoted && *next_value == '(')
2310         comment++;
2311       else if (comment != 0 && *next_value == ')')
2312         comment--;
2313       else if (!quoted && comment == 0) {
2314         /* To quote RFC 2068: "User agents MUST take special care in parsing
2315          * the WWW-Authenticate field value if it contains more than one
2316          * challenge, or if more than one WWW-Authenticate header field is
2317          * provided, since the contents of a challenge may itself contain a
2318          * comma-separated list of authentication parameters."
2319          *
2320          * What this means is that we cannot just look for an unquoted comma
2321          * when looking for multiple values in Proxy-Authenticate and
2322          * WWW-Authenticate headers. Instead we need to look for the sequence
2323          * "comma [space] token space token" before we can split after the
2324          * comma...
2325          */
2326         if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE ||
2327             field == GST_RTSP_HDR_WWW_AUTHENTICATE) {
2328           if (*next_value == ',') {
2329             if (next_value[1] == ' ') {
2330               /* skip any space following the comma so we do not mistake it for
2331                * separating between two tokens */
2332               next_value++;
2333             }
2334             comma = next_value;
2335           } else if (*next_value == ' ' && next_value[1] != ',' &&
2336               next_value[1] != '=' && comma != NULL) {
2337             next_value = comma;
2338             comma = NULL;
2339             break;
2340           }
2341         } else if (*next_value == ',')
2342           break;
2343       }
2344
2345       next_value++;
2346     }
2347
2348     if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) {
2349       /* The timeout parameter is only allowed in a session response header
2350        * but some clients send it as part of the session request header.
2351        * Ignore everything from the semicolon to the end of the line. */
2352       next_value = value;
2353       while (*next_value != '\0') {
2354         if (*next_value == ';') {
2355           break;
2356         }
2357         next_value++;
2358       }
2359     }
2360
2361     /* trim space */
2362     if (value != next_value && next_value[-1] == ' ')
2363       next_value[-1] = '\0';
2364
2365     if (*next_value != '\0')
2366       *next_value++ = '\0';
2367
2368     /* add the key:value pair */
2369     if (*value != '\0') {
2370       if (field != GST_RTSP_HDR_INVALID)
2371         gst_rtsp_message_add_header (msg, field, value);
2372       else
2373         gst_rtsp_message_add_header_by_name (msg, field_name, value);
2374     }
2375
2376     value = next_value;
2377   }
2378
2379   return GST_RTSP_OK;
2380
2381   /* ERRORS */
2382 parse_error:
2383   {
2384     return GST_RTSP_EPARSE;
2385   }
2386 }
2387
2388 /* convert all consecutive whitespace to a single space */
2389 static void
2390 normalize_line (guint8 * buffer)
2391 {
2392   while (*buffer) {
2393     if (g_ascii_isspace (*buffer)) {
2394       guint8 *tmp;
2395
2396       *buffer++ = ' ';
2397       for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) {
2398       }
2399       if (buffer != tmp)
2400         memmove (buffer, tmp, strlen ((gchar *) tmp) + 1);
2401     } else {
2402       buffer++;
2403     }
2404   }
2405 }
2406
2407 static gboolean
2408 cseq_validation (GstRTSPConnection * conn, GstRTSPMessage * message)
2409 {
2410   gchar *cseq_header;
2411   gint64 cseq = 0;
2412   GstRTSPResult res;
2413
2414   if (message->type == GST_RTSP_MESSAGE_RESPONSE ||
2415       message->type == GST_RTSP_MESSAGE_REQUEST) {
2416     if ((res = gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ,
2417                 &cseq_header, 0)) != GST_RTSP_OK) {
2418       /* rfc2326 This field MUST be present in all RTSP req and resp */
2419       goto invalid_format;
2420     }
2421
2422     errno = 0;
2423     cseq = g_ascii_strtoll (cseq_header, NULL, 10);
2424     if (errno != 0 || cseq < 0) {
2425       /* CSeq has no valid value */
2426       goto invalid_format;
2427     }
2428
2429     if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2430         (conn->cseq == 0 || conn->cseq < cseq)) {
2431       /* Response CSeq can't be higher than the number of outgoing requests
2432        * neither is a response valid if no request has been made */
2433       goto invalid_format;
2434     }
2435   }
2436   return GST_RTSP_OK;
2437
2438 invalid_format:
2439   {
2440     return GST_RTSP_EPARSE;
2441   }
2442 }
2443
2444 /* returns:
2445  *  GST_RTSP_OK when a complete message was read.
2446  *  GST_RTSP_EEOF: when the read socket is closed
2447  *  GST_RTSP_EINTR: when more data is needed.
2448  *  GST_RTSP_..: some other error occurred.
2449  */
2450 static GstRTSPResult
2451 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
2452     GstRTSPConnection * conn, gboolean block)
2453 {
2454   GstRTSPResult res;
2455
2456   while (TRUE) {
2457     switch (builder->state) {
2458       case STATE_START:
2459       {
2460         guint8 c;
2461
2462         builder->offset = 0;
2463         res =
2464             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
2465             block);
2466         if (res != GST_RTSP_OK)
2467           goto done;
2468
2469         c = builder->buffer[0];
2470
2471         /* we have 1 bytes now and we can see if this is a data message or
2472          * not */
2473         if (c == '$') {
2474           /* data message, prepare for the header */
2475           builder->state = STATE_DATA_HEADER;
2476           conn->may_cancel = FALSE;
2477         } else if (c == '\n' || c == '\r') {
2478           /* skip \n and \r */
2479           builder->offset = 0;
2480         } else {
2481           builder->line = 0;
2482           builder->state = STATE_READ_LINES;
2483           conn->may_cancel = FALSE;
2484         }
2485         break;
2486       }
2487       case STATE_DATA_HEADER:
2488       {
2489         res =
2490             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
2491             block);
2492         if (res != GST_RTSP_OK)
2493           goto done;
2494
2495         gst_rtsp_message_init_data (message, builder->buffer[1]);
2496
2497         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
2498         builder->body_data = g_malloc (builder->body_len + 1);
2499         builder->body_data[builder->body_len] = '\0';
2500         builder->offset = 0;
2501         builder->state = STATE_DATA_BODY;
2502         break;
2503       }
2504       case STATE_DATA_BODY:
2505       {
2506         res =
2507             read_bytes (conn, builder->body_data, &builder->offset,
2508             builder->body_len, block);
2509         if (res != GST_RTSP_OK)
2510           goto done;
2511
2512         /* we have the complete body now, store in the message adjusting the
2513          * length to include the trailing '\0' */
2514         gst_rtsp_message_take_body (message,
2515             (guint8 *) builder->body_data, builder->body_len + 1);
2516         builder->body_data = NULL;
2517         builder->body_len = 0;
2518
2519         builder->state = STATE_END;
2520         break;
2521       }
2522       case STATE_READ_LINES:
2523       {
2524         res = read_line (conn, builder->buffer, &builder->offset,
2525             sizeof (builder->buffer), block);
2526         if (res != GST_RTSP_OK)
2527           goto done;
2528
2529         /* we have a regular response */
2530         if (builder->buffer[0] == '\0') {
2531           gchar *hdrval;
2532           gint64 content_length_parsed = 0;
2533
2534           /* empty line, end of message header */
2535           /* see if there is a Content-Length header, but ignore it if this
2536            * is a POST request with an x-sessioncookie header */
2537           if (gst_rtsp_message_get_header (message,
2538                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK &&
2539               (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST ||
2540                   message->type_data.request.method != GST_RTSP_POST ||
2541                   gst_rtsp_message_get_header (message,
2542                       GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
2543             /* there is, prepare to read the body */
2544             errno = 0;
2545             content_length_parsed = g_ascii_strtoll (hdrval, NULL, 10);
2546             if (errno != 0 || content_length_parsed < 0) {
2547               res = GST_RTSP_EPARSE;
2548               goto invalid_body_len;
2549             } else if (content_length_parsed > conn->content_length_limit) {
2550               res = GST_RTSP_ENOMEM;
2551               goto invalid_body_len;
2552             }
2553             builder->body_len = content_length_parsed;
2554             builder->body_data = g_try_malloc (builder->body_len + 1);
2555             /* we can't do much here, we need the length to know how many bytes
2556              * we need to read next and when allocation fails, we can't read the payload. */
2557             if (builder->body_data == NULL) {
2558               res = GST_RTSP_ENOMEM;
2559               goto invalid_body_len;
2560             }
2561
2562             builder->body_data[builder->body_len] = '\0';
2563             builder->offset = 0;
2564             builder->state = STATE_DATA_BODY;
2565           } else {
2566             builder->state = STATE_END;
2567           }
2568           break;
2569         }
2570
2571         /* we have a line */
2572         normalize_line (builder->buffer);
2573         if (builder->line == 0) {
2574           /* first line, check for response status */
2575           if (memcmp (builder->buffer, "RTSP", 4) == 0 ||
2576               memcmp (builder->buffer, "HTTP", 4) == 0) {
2577             builder->status = parse_response_status (builder->buffer, message);
2578           } else {
2579             builder->status = parse_request_line (builder->buffer, message);
2580           }
2581         } else {
2582           /* else just parse the line */
2583           res = parse_line (builder->buffer, message);
2584           if (res != GST_RTSP_OK)
2585             builder->status = res;
2586         }
2587         if (builder->status != GST_RTSP_OK) {
2588           res = builder->status;
2589           goto invalid_format;
2590         }
2591
2592         builder->line++;
2593         builder->offset = 0;
2594         break;
2595       }
2596       case STATE_END:
2597       {
2598         gchar *session_cookie;
2599         gchar *session_id;
2600
2601         conn->may_cancel = TRUE;
2602
2603         if ((res = cseq_validation (conn, message)) != GST_RTSP_OK) {
2604           /* message don't comply with rfc2326 regarding CSeq */
2605           goto invalid_format;
2606         }
2607
2608         if (message->type == GST_RTSP_MESSAGE_DATA) {
2609           /* data messages don't have headers */
2610           res = GST_RTSP_OK;
2611           goto done;
2612         }
2613
2614         /* save the tunnel session in the connection */
2615         if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST &&
2616             !conn->manual_http &&
2617             conn->tstate == TUNNEL_STATE_NONE &&
2618             gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE,
2619                 &session_cookie, 0) == GST_RTSP_OK) {
2620           strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN);
2621           conn->tunnelid[TUNNELID_LEN - 1] = '\0';
2622           conn->tunneled = TRUE;
2623         }
2624
2625         /* save session id in the connection for further use */
2626         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2627             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
2628                 &session_id, 0) == GST_RTSP_OK) {
2629           gint maxlen, i;
2630
2631           maxlen = sizeof (conn->session_id) - 1;
2632           /* the sessionid can have attributes marked with ;
2633            * Make sure we strip them */
2634           for (i = 0; i < maxlen && session_id[i] != '\0'; i++) {
2635             if (session_id[i] == ';') {
2636               maxlen = i;
2637               /* parse timeout */
2638               do {
2639                 i++;
2640               } while (g_ascii_isspace (session_id[i]));
2641               if (g_str_has_prefix (&session_id[i], "timeout=")) {
2642                 gint to;
2643
2644                 /* if we parsed something valid, configure */
2645                 if ((to = atoi (&session_id[i + 8])) > 0)
2646                   conn->timeout = to;
2647               }
2648               break;
2649             }
2650           }
2651
2652           /* make sure to not overflow */
2653           if (conn->remember_session_id) {
2654             strncpy (conn->session_id, session_id, maxlen);
2655             conn->session_id[maxlen] = '\0';
2656           }
2657         }
2658         res = builder->status;
2659         goto done;
2660       }
2661       default:
2662         res = GST_RTSP_ERROR;
2663         goto done;
2664     }
2665   }
2666 done:
2667   conn->may_cancel = TRUE;
2668   return res;
2669
2670   /* ERRORS */
2671 invalid_body_len:
2672   {
2673     conn->may_cancel = TRUE;
2674     GST_DEBUG ("could not allocate body");
2675     return res;
2676   }
2677 invalid_format:
2678   {
2679     conn->may_cancel = TRUE;
2680     GST_DEBUG ("could not parse");
2681     return res;
2682   }
2683 }
2684
2685 /**
2686  * gst_rtsp_connection_read_usec:
2687  * @conn: a #GstRTSPConnection
2688  * @data: the data to read
2689  * @size: the size of @data
2690  * @timeout: a timeout value in microseconds
2691  *
2692  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
2693  * the specified @timeout. @timeout can be 0, in which case this function
2694  * might block forever.
2695  *
2696  * This function can be cancelled with gst_rtsp_connection_flush().
2697  *
2698  * Returns: #GST_RTSP_OK on success.
2699  *
2700  * Since: 1.18
2701  */
2702 GstRTSPResult
2703 gst_rtsp_connection_read_usec (GstRTSPConnection * conn, guint8 * data,
2704     guint size, gint64 timeout)
2705 {
2706   guint offset;
2707   GstRTSPResult res;
2708
2709   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2710   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
2711   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2712
2713   if (G_UNLIKELY (size == 0))
2714     return GST_RTSP_OK;
2715
2716   offset = 0;
2717
2718   /* configure timeout if any */
2719   set_read_socket_timeout (conn, timeout);
2720
2721   res = read_bytes (conn, data, &offset, size, TRUE);
2722
2723   clear_read_socket_timeout (conn);
2724
2725   return res;
2726 }
2727
2728 static GstRTSPMessage *
2729 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
2730     const GstRTSPMessage * request)
2731 {
2732   GstRTSPMessage *msg;
2733   GstRTSPResult res;
2734
2735   if (gst_rtsp_status_as_text (code) == NULL)
2736     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
2737
2738   GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request),
2739       no_message);
2740
2741   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER,
2742       "GStreamer RTSP Server");
2743   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close");
2744   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store");
2745   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
2746
2747   if (code == GST_RTSP_STS_OK) {
2748     /* add the local ip address to the tunnel reply, this is where the client
2749      * should send the POST request to */
2750     if (conn->local_ip)
2751       gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
2752           conn->local_ip);
2753     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
2754         "application/x-rtsp-tunnelled");
2755   }
2756
2757   return msg;
2758
2759   /* ERRORS */
2760 no_message:
2761   {
2762     return NULL;
2763   }
2764 }
2765
2766 /**
2767  * gst_rtsp_connection_receive_usec:
2768  * @conn: a #GstRTSPConnection
2769  * @message: the message to read
2770  * @timeout: a timeout value or 0
2771  *
2772  * Attempt to read into @message from the connected @conn, blocking up to
2773  * the specified @timeout. @timeout can be 0, in which case this function
2774  * might block forever.
2775  *
2776  * This function can be cancelled with gst_rtsp_connection_flush().
2777  *
2778  * Returns: #GST_RTSP_OK on success.
2779  *
2780  * Since: 1.18
2781  */
2782 GstRTSPResult
2783 gst_rtsp_connection_receive_usec (GstRTSPConnection * conn,
2784     GstRTSPMessage * message, gint64 timeout)
2785 {
2786   GstRTSPResult res;
2787   GstRTSPBuilder builder;
2788
2789   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2790   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2791   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2792
2793   /* configure timeout if any */
2794   set_read_socket_timeout (conn, timeout);
2795
2796   memset (&builder, 0, sizeof (GstRTSPBuilder));
2797   res = build_next (&builder, message, conn, TRUE);
2798
2799   clear_read_socket_timeout (conn);
2800
2801   if (G_UNLIKELY (res != GST_RTSP_OK))
2802     goto read_error;
2803
2804   if (!conn->manual_http) {
2805     if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2806       if (conn->tstate == TUNNEL_STATE_NONE &&
2807           message->type_data.request.method == GST_RTSP_GET) {
2808         GstRTSPMessage *response;
2809
2810         conn->tstate = TUNNEL_STATE_GET;
2811
2812         /* tunnel GET request, we can reply now */
2813         response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
2814         res = gst_rtsp_connection_send_usec (conn, response, timeout);
2815         gst_rtsp_message_free (response);
2816         if (res == GST_RTSP_OK)
2817           res = GST_RTSP_ETGET;
2818         goto cleanup;
2819       } else if (conn->tstate == TUNNEL_STATE_NONE &&
2820           message->type_data.request.method == GST_RTSP_POST) {
2821         conn->tstate = TUNNEL_STATE_POST;
2822
2823         /* tunnel POST request, the caller now has to link the two
2824          * connections. */
2825         res = GST_RTSP_ETPOST;
2826         goto cleanup;
2827       } else {
2828         res = GST_RTSP_EPARSE;
2829         goto cleanup;
2830       }
2831     } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
2832       res = GST_RTSP_EPARSE;
2833       goto cleanup;
2834     }
2835   }
2836
2837   /* we have a message here */
2838   build_reset (&builder);
2839
2840   return GST_RTSP_OK;
2841
2842   /* ERRORS */
2843 read_error:
2844 cleanup:
2845   {
2846     build_reset (&builder);
2847     gst_rtsp_message_unset (message);
2848     return res;
2849   }
2850 }
2851
2852 /**
2853  * gst_rtsp_connection_close:
2854  * @conn: a #GstRTSPConnection
2855  *
2856  * Close the connected @conn. After this call, the connection is in the same
2857  * state as when it was first created.
2858  *
2859  * Returns: #GST_RTSP_OK on success.
2860  */
2861 GstRTSPResult
2862 gst_rtsp_connection_close (GstRTSPConnection * conn)
2863 {
2864   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2865
2866   /* last unref closes the connection we don't want to explicitly close here
2867    * because these sockets might have been provided at construction */
2868   if (conn->stream0) {
2869     g_object_unref (conn->stream0);
2870     conn->stream0 = NULL;
2871     conn->socket0 = NULL;
2872   }
2873   if (conn->stream1) {
2874     g_object_unref (conn->stream1);
2875     conn->stream1 = NULL;
2876     conn->socket1 = NULL;
2877   }
2878
2879   /* these were owned by the stream */
2880   conn->input_stream = NULL;
2881   conn->output_stream = NULL;
2882   conn->control_stream = NULL;
2883
2884   g_free (conn->remote_ip);
2885   conn->remote_ip = NULL;
2886   g_free (conn->local_ip);
2887   conn->local_ip = NULL;
2888
2889   conn->read_ahead = 0;
2890
2891   g_free (conn->initial_buffer);
2892   conn->initial_buffer = NULL;
2893   conn->initial_buffer_offset = 0;
2894
2895   conn->write_socket = NULL;
2896   conn->read_socket = NULL;
2897   conn->write_socket_used = FALSE;
2898   conn->read_socket_used = FALSE;
2899   conn->tunneled = FALSE;
2900   conn->tstate = TUNNEL_STATE_NONE;
2901   conn->ctxp = NULL;
2902   g_free (conn->username);
2903   conn->username = NULL;
2904   g_free (conn->passwd);
2905   conn->passwd = NULL;
2906   gst_rtsp_connection_clear_auth_params (conn);
2907   conn->timeout = 60;
2908   conn->cseq = 0;
2909   conn->session_id[0] = '\0';
2910
2911   return GST_RTSP_OK;
2912 }
2913
2914 /**
2915  * gst_rtsp_connection_free:
2916  * @conn: a #GstRTSPConnection
2917  *
2918  * Close and free @conn.
2919  *
2920  * Returns: #GST_RTSP_OK on success.
2921  */
2922 GstRTSPResult
2923 gst_rtsp_connection_free (GstRTSPConnection * conn)
2924 {
2925   GstRTSPResult res;
2926
2927   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2928
2929   res = gst_rtsp_connection_close (conn);
2930
2931   if (conn->cancellable)
2932     g_object_unref (conn->cancellable);
2933   if (conn->client)
2934     g_object_unref (conn->client);
2935   if (conn->tls_database)
2936     g_object_unref (conn->tls_database);
2937   if (conn->tls_interaction)
2938     g_object_unref (conn->tls_interaction);
2939   if (conn->accept_certificate_destroy_notify)
2940     conn->
2941         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
2942
2943   g_timer_destroy (conn->timer);
2944   gst_rtsp_url_free (conn->url);
2945   g_free (conn->proxy_host);
2946   g_free (conn);
2947
2948   return res;
2949 }
2950
2951 /**
2952  * gst_rtsp_connection_poll_usec:
2953  * @conn: a #GstRTSPConnection
2954  * @events: a bitmask of #GstRTSPEvent flags to check
2955  * @revents: location for result flags
2956  * @timeout: a timeout in microseconds
2957  *
2958  * Wait up to the specified @timeout for the connection to become available for
2959  * at least one of the operations specified in @events. When the function returns
2960  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2961  * @conn.
2962  *
2963  * @timeout can be 0, in which case this function might block forever.
2964  *
2965  * This function can be cancelled with gst_rtsp_connection_flush().
2966  *
2967  * Returns: #GST_RTSP_OK on success.
2968  *
2969  * Since: 1.18
2970  */
2971 GstRTSPResult
2972 gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
2973     GstRTSPEvent * revents, gint64 timeout)
2974 {
2975   GMainContext *ctx;
2976   GSource *rs, *ws, *ts;
2977   GIOCondition condition;
2978
2979   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2980   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2981   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2982   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2983   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
2984
2985   ctx = g_main_context_new ();
2986
2987   /* configure timeout if any */
2988   if (timeout) {
2989     ts = g_timeout_source_new (timeout / 1000);
2990     g_source_set_dummy_callback (ts);
2991     g_source_attach (ts, ctx);
2992     g_source_unref (ts);
2993   }
2994
2995   if (events & GST_RTSP_EV_READ) {
2996     rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
2997         conn->cancellable);
2998     g_source_set_dummy_callback (rs);
2999     g_source_attach (rs, ctx);
3000     g_source_unref (rs);
3001   }
3002
3003   if (events & GST_RTSP_EV_WRITE) {
3004     ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
3005         conn->cancellable);
3006     g_source_set_dummy_callback (ws);
3007     g_source_attach (ws, ctx);
3008     g_source_unref (ws);
3009   }
3010
3011   /* Returns after handling all pending events */
3012   while (!g_main_context_iteration (ctx, TRUE));
3013
3014   g_main_context_unref (ctx);
3015
3016   *revents = 0;
3017   if (events & GST_RTSP_EV_READ) {
3018     condition = g_socket_condition_check (conn->read_socket,
3019         G_IO_IN | G_IO_PRI);
3020     if ((condition & G_IO_IN) || (condition & G_IO_PRI))
3021       *revents |= GST_RTSP_EV_READ;
3022   }
3023   if (events & GST_RTSP_EV_WRITE) {
3024     condition = g_socket_condition_check (conn->write_socket, G_IO_OUT);
3025     if ((condition & G_IO_OUT))
3026       *revents |= GST_RTSP_EV_WRITE;
3027   }
3028
3029   if (*revents == 0)
3030     return GST_RTSP_ETIMEOUT;
3031
3032   return GST_RTSP_OK;
3033 }
3034
3035 /**
3036  * gst_rtsp_connection_next_timeout_usec:
3037  * @conn: a #GstRTSPConnection
3038  *
3039  * Calculate the next timeout for @conn
3040  *
3041  * Returns: #the next timeout in microseconds
3042  *
3043  * Since: 1.18
3044  */
3045 gint64
3046 gst_rtsp_connection_next_timeout_usec (GstRTSPConnection * conn)
3047 {
3048   gdouble elapsed;
3049   gulong usec;
3050   gint ctimeout;
3051   gint64 timeout = 0;
3052
3053   g_return_val_if_fail (conn != NULL, 1);
3054
3055   ctimeout = conn->timeout;
3056   if (ctimeout >= 20) {
3057     /* Because we should act before the timeout we timeout 5
3058      * seconds in advance. */
3059     ctimeout -= 5;
3060   } else if (ctimeout >= 5) {
3061     /* else timeout 20% earlier */
3062     ctimeout -= ctimeout / 5;
3063   } else if (ctimeout >= 1) {
3064     /* else timeout 1 second earlier */
3065     ctimeout -= 1;
3066   }
3067
3068   elapsed = g_timer_elapsed (conn->timer, &usec);
3069   if (elapsed >= ctimeout) {
3070     timeout = 0;
3071   } else {
3072     gint64 sec = ctimeout - elapsed;
3073     if (usec <= G_USEC_PER_SEC)
3074       usec = G_USEC_PER_SEC - usec;
3075     else
3076       usec = 0;
3077     timeout = usec + sec * G_USEC_PER_SEC;
3078   }
3079
3080   return timeout;
3081 }
3082
3083 /**
3084  * gst_rtsp_connection_reset_timeout:
3085  * @conn: a #GstRTSPConnection
3086  *
3087  * Reset the timeout of @conn.
3088  *
3089  * Returns: #GST_RTSP_OK.
3090  */
3091 GstRTSPResult
3092 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
3093 {
3094   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3095
3096   g_timer_start (conn->timer);
3097
3098   return GST_RTSP_OK;
3099 }
3100
3101 /**
3102  * gst_rtsp_connection_flush:
3103  * @conn: a #GstRTSPConnection
3104  * @flush: start or stop the flush
3105  *
3106  * Start or stop the flushing action on @conn. When flushing, all current
3107  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
3108  * is set to non-flushing mode again.
3109  *
3110  * Returns: #GST_RTSP_OK.
3111  */
3112 GstRTSPResult
3113 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
3114 {
3115   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3116
3117   if (flush) {
3118     g_cancellable_cancel (conn->cancellable);
3119   } else {
3120     g_object_unref (conn->cancellable);
3121     conn->cancellable = g_cancellable_new ();
3122   }
3123
3124   return GST_RTSP_OK;
3125 }
3126
3127 /**
3128  * gst_rtsp_connection_set_proxy:
3129  * @conn: a #GstRTSPConnection
3130  * @host: the proxy host
3131  * @port: the proxy port
3132  *
3133  * Set the proxy host and port.
3134  *
3135  * Returns: #GST_RTSP_OK.
3136  */
3137 GstRTSPResult
3138 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
3139     const gchar * host, guint port)
3140 {
3141   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3142
3143   g_free (conn->proxy_host);
3144   conn->proxy_host = g_strdup (host);
3145   conn->proxy_port = port;
3146
3147   return GST_RTSP_OK;
3148 }
3149
3150 /**
3151  * gst_rtsp_connection_set_auth:
3152  * @conn: a #GstRTSPConnection
3153  * @method: authentication method
3154  * @user: the user
3155  * @pass: the password
3156  *
3157  * Configure @conn for authentication mode @method with @user and @pass as the
3158  * user and password respectively.
3159  *
3160  * Returns: #GST_RTSP_OK.
3161  */
3162 GstRTSPResult
3163 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
3164     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
3165 {
3166   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3167
3168   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
3169           || g_strrstr (user, ":") != NULL))
3170     return GST_RTSP_EINVAL;
3171
3172   /* Make sure the username and passwd are being set for authentication */
3173   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
3174     return GST_RTSP_EINVAL;
3175
3176   /* ":" chars are not allowed in usernames for basic auth */
3177   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
3178     return GST_RTSP_EINVAL;
3179
3180   g_free (conn->username);
3181   g_free (conn->passwd);
3182
3183   conn->auth_method = method;
3184   conn->username = g_strdup (user);
3185   conn->passwd = g_strdup (pass);
3186
3187   return GST_RTSP_OK;
3188 }
3189
3190 /**
3191  * str_case_hash:
3192  * @key: ASCII string to hash
3193  *
3194  * Hashes @key in a case-insensitive manner.
3195  *
3196  * Returns: the hash code.
3197  **/
3198 static guint
3199 str_case_hash (gconstpointer key)
3200 {
3201   const char *p = key;
3202   guint h = g_ascii_toupper (*p);
3203
3204   if (h)
3205     for (p += 1; *p != '\0'; p++)
3206       h = (h << 5) - h + g_ascii_toupper (*p);
3207
3208   return h;
3209 }
3210
3211 /**
3212  * str_case_equal:
3213  * @v1: an ASCII string
3214  * @v2: another ASCII string
3215  *
3216  * Compares @v1 and @v2 in a case-insensitive manner
3217  *
3218  * Returns: %TRUE if they are equal (modulo case)
3219  **/
3220 static gboolean
3221 str_case_equal (gconstpointer v1, gconstpointer v2)
3222 {
3223   const char *string1 = v1;
3224   const char *string2 = v2;
3225
3226   return g_ascii_strcasecmp (string1, string2) == 0;
3227 }
3228
3229 /**
3230  * gst_rtsp_connection_set_auth_param:
3231  * @conn: a #GstRTSPConnection
3232  * @param: authentication directive
3233  * @value: value
3234  *
3235  * Setup @conn with authentication directives. This is not necessary for
3236  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
3237  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
3238  * in the WWW-Authenticate response header and can include realm, domain,
3239  * nonce, opaque, stale, algorithm, qop as per RFC2617.
3240  */
3241 void
3242 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
3243     const gchar * param, const gchar * value)
3244 {
3245   g_return_if_fail (conn != NULL);
3246   g_return_if_fail (param != NULL);
3247
3248   if (conn->auth_params == NULL) {
3249     conn->auth_params =
3250         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
3251   }
3252   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
3253 }
3254
3255 /**
3256  * gst_rtsp_connection_clear_auth_params:
3257  * @conn: a #GstRTSPConnection
3258  *
3259  * Clear the list of authentication directives stored in @conn.
3260  */
3261 void
3262 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
3263 {
3264   g_return_if_fail (conn != NULL);
3265
3266   if (conn->auth_params != NULL) {
3267     g_hash_table_destroy (conn->auth_params);
3268     conn->auth_params = NULL;
3269   }
3270 }
3271
3272 static GstRTSPResult
3273 set_qos_dscp (GSocket * socket, guint qos_dscp)
3274 {
3275 #ifndef IP_TOS
3276   GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
3277   return GST_RTSP_OK;
3278 #else
3279   gint fd;
3280   union gst_sockaddr sa;
3281   socklen_t slen = sizeof (sa);
3282   gint af;
3283   gint tos;
3284
3285   if (!socket)
3286     return GST_RTSP_OK;
3287
3288   fd = g_socket_get_fd (socket);
3289   if (getsockname (fd, &sa.sa, &slen) < 0)
3290     goto no_getsockname;
3291
3292   af = sa.sa.sa_family;
3293
3294   /* if this is an IPv4-mapped address then do IPv4 QoS */
3295   if (af == AF_INET6) {
3296     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
3297       af = AF_INET;
3298   }
3299
3300   /* extract and shift 6 bits of the DSCP */
3301   tos = (qos_dscp & 0x3f) << 2;
3302
3303 #ifdef G_OS_WIN32
3304 #  define SETSOCKOPT_ARG4_TYPE const char *
3305 #else
3306 #  define SETSOCKOPT_ARG4_TYPE const void *
3307 #endif
3308
3309   switch (af) {
3310     case AF_INET:
3311       if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos,
3312               sizeof (tos)) < 0)
3313         goto no_setsockopt;
3314       break;
3315     case AF_INET6:
3316 #ifdef IPV6_TCLASS
3317       if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS,
3318               (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0)
3319         goto no_setsockopt;
3320       break;
3321 #endif
3322     default:
3323       goto wrong_family;
3324   }
3325
3326   return GST_RTSP_OK;
3327
3328   /* ERRORS */
3329 no_getsockname:
3330 no_setsockopt:
3331   {
3332     return GST_RTSP_ESYS;
3333   }
3334 wrong_family:
3335   {
3336     return GST_RTSP_ERROR;
3337   }
3338 #endif
3339 }
3340
3341 /**
3342  * gst_rtsp_connection_set_qos_dscp:
3343  * @conn: a #GstRTSPConnection
3344  * @qos_dscp: DSCP value
3345  *
3346  * Configure @conn to use the specified DSCP value.
3347  *
3348  * Returns: #GST_RTSP_OK on success.
3349  */
3350 GstRTSPResult
3351 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
3352 {
3353   GstRTSPResult res;
3354
3355   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3356   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
3357   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
3358
3359   res = set_qos_dscp (conn->socket0, qos_dscp);
3360   if (res == GST_RTSP_OK)
3361     res = set_qos_dscp (conn->socket1, qos_dscp);
3362
3363   return res;
3364 }
3365
3366 /**
3367  * gst_rtsp_connection_set_content_length_limit:
3368  * @conn: a #GstRTSPConnection
3369  * @limit: Content-Length limit
3370  *
3371  * Configure @conn to use the specified Content-Length limit.
3372  * Both requests and responses are validated. If content-length is
3373  * exceeded, ENOMEM error will be returned.
3374  *
3375  * Since: 1.18
3376  */
3377 void
3378 gst_rtsp_connection_set_content_length_limit (GstRTSPConnection * conn,
3379     guint limit)
3380 {
3381   g_return_if_fail (conn != NULL);
3382
3383   conn->content_length_limit = limit;
3384 }
3385
3386 /**
3387  * gst_rtsp_connection_get_url:
3388  * @conn: a #GstRTSPConnection
3389  *
3390  * Retrieve the URL of the other end of @conn.
3391  *
3392  * Returns: The URL. This value remains valid until the
3393  * connection is freed.
3394  */
3395 GstRTSPUrl *
3396 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
3397 {
3398   g_return_val_if_fail (conn != NULL, NULL);
3399
3400   return conn->url;
3401 }
3402
3403 /**
3404  * gst_rtsp_connection_get_ip:
3405  * @conn: a #GstRTSPConnection
3406  *
3407  * Retrieve the IP address of the other end of @conn.
3408  *
3409  * Returns: The IP address as a string. this value remains valid until the
3410  * connection is closed.
3411  */
3412 const gchar *
3413 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
3414 {
3415   g_return_val_if_fail (conn != NULL, NULL);
3416
3417   return conn->remote_ip;
3418 }
3419
3420 /**
3421  * gst_rtsp_connection_set_ip:
3422  * @conn: a #GstRTSPConnection
3423  * @ip: an ip address
3424  *
3425  * Set the IP address of the server.
3426  */
3427 void
3428 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
3429 {
3430   g_return_if_fail (conn != NULL);
3431
3432   g_free (conn->remote_ip);
3433   conn->remote_ip = g_strdup (ip);
3434 }
3435
3436 /**
3437  * gst_rtsp_connection_get_read_socket:
3438  * @conn: a #GstRTSPConnection
3439  *
3440  * Get the file descriptor for reading.
3441  *
3442  * Returns: (transfer none): the file descriptor used for reading or %NULL on
3443  * error. The file descriptor remains valid until the connection is closed.
3444  */
3445 GSocket *
3446 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
3447 {
3448   g_return_val_if_fail (conn != NULL, NULL);
3449   g_return_val_if_fail (conn->read_socket != NULL, NULL);
3450
3451   return conn->read_socket;
3452 }
3453
3454 /**
3455  * gst_rtsp_connection_get_write_socket:
3456  * @conn: a #GstRTSPConnection
3457  *
3458  * Get the file descriptor for writing.
3459  *
3460  * Returns: (transfer none): the file descriptor used for writing or NULL on
3461  * error. The file descriptor remains valid until the connection is closed.
3462  */
3463 GSocket *
3464 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
3465 {
3466   g_return_val_if_fail (conn != NULL, NULL);
3467   g_return_val_if_fail (conn->write_socket != NULL, NULL);
3468
3469   return conn->write_socket;
3470 }
3471
3472 /**
3473  * gst_rtsp_connection_set_http_mode:
3474  * @conn: a #GstRTSPConnection
3475  * @enable: %TRUE to enable manual HTTP mode
3476  *
3477  * By setting the HTTP mode to %TRUE the message parsing will support HTTP
3478  * messages in addition to the RTSP messages. It will also disable the
3479  * automatic handling of setting up an HTTP tunnel.
3480  */
3481 void
3482 gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
3483 {
3484   g_return_if_fail (conn != NULL);
3485
3486   conn->manual_http = enable;
3487 }
3488
3489 /**
3490  * gst_rtsp_connection_set_tunneled:
3491  * @conn: a #GstRTSPConnection
3492  * @tunneled: the new state
3493  *
3494  * Set the HTTP tunneling state of the connection. This must be configured before
3495  * the @conn is connected.
3496  */
3497 void
3498 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
3499 {
3500   g_return_if_fail (conn != NULL);
3501   g_return_if_fail (conn->read_socket == NULL);
3502   g_return_if_fail (conn->write_socket == NULL);
3503
3504   conn->tunneled = tunneled;
3505 }
3506
3507 /**
3508  * gst_rtsp_connection_is_tunneled:
3509  * @conn: a #GstRTSPConnection
3510  *
3511  * Get the tunneling state of the connection.
3512  *
3513  * Returns: if @conn is using HTTP tunneling.
3514  */
3515 gboolean
3516 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
3517 {
3518   g_return_val_if_fail (conn != NULL, FALSE);
3519
3520   return conn->tunneled;
3521 }
3522
3523 /**
3524  * gst_rtsp_connection_get_tunnelid:
3525  * @conn: a #GstRTSPConnection
3526  *
3527  * Get the tunnel session id the connection.
3528  *
3529  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
3530  */
3531 const gchar *
3532 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
3533 {
3534   g_return_val_if_fail (conn != NULL, NULL);
3535
3536   if (!conn->tunneled)
3537     return NULL;
3538
3539   return conn->tunnelid;
3540 }
3541
3542 /**
3543  * gst_rtsp_connection_set_ignore_x_server_reply:
3544  * @conn: a #GstRTSPConnection
3545  * @ignore: %TRUE to ignore the x-server-ip-address header reply or %FALSE to
3546  *          comply with it (%FALSE is the default).
3547  *
3548  * Set whether to ignore the x-server-ip-address header reply or not. If the
3549  * header is ignored, the original address will be used instead.
3550  *
3551  * Since: 1.20
3552  */
3553 void
3554 gst_rtsp_connection_set_ignore_x_server_reply (GstRTSPConnection * conn,
3555     gboolean ignore)
3556 {
3557   g_return_if_fail (conn != NULL);
3558
3559   conn->ignore_x_server_reply = ignore;
3560 }
3561
3562 /**
3563  * gst_rtsp_connection_get_ignore_x_server_reply:
3564  * @conn: a #GstRTSPConnection
3565  *
3566  * Get the ignore_x_server_reply value.
3567  *
3568  * Returns: returns %TRUE if the x-server-ip-address header reply will be
3569  *          ignored, else returns %FALSE
3570  *
3571  * Since: 1.20
3572  */
3573 gboolean
3574 gst_rtsp_connection_get_ignore_x_server_reply (const GstRTSPConnection * conn)
3575 {
3576   g_return_val_if_fail (conn != NULL, FALSE);
3577
3578   return conn->ignore_x_server_reply;
3579 }
3580
3581 /**
3582  * gst_rtsp_connection_do_tunnel:
3583  * @conn: a #GstRTSPConnection
3584  * @conn2: a #GstRTSPConnection or %NULL
3585  *
3586  * If @conn received the first tunnel connection and @conn2 received
3587  * the second tunnel connection, link the two connections together so that
3588  * @conn manages the tunneled connection.
3589  *
3590  * After this call, @conn2 cannot be used anymore and must be freed with
3591  * gst_rtsp_connection_free().
3592  *
3593  * If @conn2 is %NULL then only the base64 decoding context will be setup for
3594  * @conn.
3595  *
3596  * Returns: return GST_RTSP_OK on success.
3597  */
3598 GstRTSPResult
3599 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
3600     GstRTSPConnection * conn2)
3601 {
3602   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3603
3604   if (conn2 != NULL) {
3605     GstRTSPTunnelState ts1 = conn->tstate;
3606     GstRTSPTunnelState ts2 = conn2->tstate;
3607
3608     g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST)
3609         || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET),
3610         GST_RTSP_EINVAL);
3611     g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid,
3612             TUNNELID_LEN), GST_RTSP_EINVAL);
3613
3614     /* both connections have socket0 as the read/write socket */
3615     if (ts1 == TUNNEL_STATE_GET) {
3616       /* conn2 is the HTTP POST channel. take its socket and set it as read
3617        * socket in conn */
3618       conn->socket1 = conn2->socket0;
3619       conn->stream1 = conn2->stream0;
3620       conn->input_stream = conn2->input_stream;
3621       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3622       conn2->output_stream = NULL;
3623     } else {
3624       /* conn2 is the HTTP GET channel. take its socket and set it as write
3625        * socket in conn */
3626       conn->socket1 = conn->socket0;
3627       conn->stream1 = conn->stream0;
3628       conn->socket0 = conn2->socket0;
3629       conn->stream0 = conn2->stream0;
3630       conn->output_stream = conn2->output_stream;
3631       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3632     }
3633
3634     /* clean up some of the state of conn2 */
3635     g_cancellable_cancel (conn2->cancellable);
3636     conn2->write_socket = conn2->read_socket = NULL;
3637     conn2->socket0 = NULL;
3638     conn2->stream0 = NULL;
3639     conn2->socket1 = NULL;
3640     conn2->stream1 = NULL;
3641     conn2->input_stream = NULL;
3642     conn2->control_stream = NULL;
3643     g_object_unref (conn2->cancellable);
3644     conn2->cancellable = NULL;
3645
3646     /* We make socket0 the write socket and socket1 the read socket. */
3647     conn->write_socket = conn->socket0;
3648     conn->read_socket = conn->socket1;
3649
3650     conn->tstate = TUNNEL_STATE_COMPLETE;
3651
3652     g_free (conn->initial_buffer);
3653     conn->initial_buffer = conn2->initial_buffer;
3654     conn2->initial_buffer = NULL;
3655     conn->initial_buffer_offset = conn2->initial_buffer_offset;
3656   }
3657
3658   /* we need base64 decoding for the readfd */
3659   conn->ctx.state = 0;
3660   conn->ctx.save = 0;
3661   conn->ctx.cout = 0;
3662   conn->ctx.coutl = 0;
3663   conn->ctxp = &conn->ctx;
3664
3665   return GST_RTSP_OK;
3666 }
3667
3668 /**
3669  * gst_rtsp_connection_set_remember_session_id:
3670  * @conn: a #GstRTSPConnection
3671  * @remember: %TRUE if the connection should remember the session id
3672  *
3673  * Sets if the #GstRTSPConnection should remember the session id from the last
3674  * response received and force it onto any further requests.
3675  *
3676  * The default value is %TRUE
3677  */
3678
3679 void
3680 gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
3681     gboolean remember)
3682 {
3683   conn->remember_session_id = remember;
3684   if (!remember)
3685     conn->session_id[0] = '\0';
3686 }
3687
3688 /**
3689  * gst_rtsp_connection_get_remember_session_id:
3690  * @conn: a #GstRTSPConnection
3691  *
3692  * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
3693  * last response to set it on any further request.
3694  */
3695
3696 gboolean
3697 gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
3698 {
3699   return conn->remember_session_id;
3700 }
3701
3702
3703 #define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3704 #define READ_COND   (G_IO_IN | READ_ERR)
3705 #define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3706 #define WRITE_COND  (G_IO_OUT | WRITE_ERR)
3707
3708 /* async functions */
3709 struct _GstRTSPWatch
3710 {
3711   GSource source;
3712
3713   GstRTSPConnection *conn;
3714
3715   GstRTSPBuilder builder;
3716   GstRTSPMessage message;
3717
3718   GSource *readsrc;
3719   GSource *writesrc;
3720   GSource *controlsrc;
3721
3722   gboolean keep_running;
3723
3724   /* queued message for transmission */
3725   guint id;
3726   GMutex mutex;
3727   GstQueueArray *messages;
3728   gsize messages_bytes;
3729   guint messages_count;
3730
3731   gsize max_bytes;
3732   guint max_messages;
3733   GCond queue_not_full;
3734   gboolean flushing;
3735
3736   GstRTSPWatchFuncs funcs;
3737
3738   gpointer user_data;
3739   GDestroyNotify notify;
3740 };
3741
3742 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
3743       ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
3744
3745 static gboolean
3746 gst_rtsp_source_prepare (GSource * source, gint * timeout)
3747 {
3748   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3749
3750   if (watch->conn->initial_buffer != NULL)
3751     return TRUE;
3752
3753   *timeout = (watch->conn->timeout * 1000);
3754
3755   return FALSE;
3756 }
3757
3758 static gboolean
3759 gst_rtsp_source_check (GSource * source)
3760 {
3761   return FALSE;
3762 }
3763
3764 static gboolean
3765 gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
3766     GstRTSPWatch * watch)
3767 {
3768   gssize count;
3769   guint8 buffer[1024];
3770   GError *error = NULL;
3771
3772   /* try to read in order to be able to detect errors, we read 1k in case some
3773    * client actually decides to send data on the GET channel */
3774   count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
3775       &error);
3776   if (count == 0) {
3777     /* other end closed the socket */
3778     goto eof;
3779   }
3780
3781   if (count < 0) {
3782     GST_DEBUG ("%s", error->message);
3783     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
3784         g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
3785       g_clear_error (&error);
3786       goto done;
3787     }
3788     g_clear_error (&error);
3789     goto read_error;
3790   }
3791
3792   /* client sent data on the GET channel, ignore it */
3793
3794 done:
3795   return TRUE;
3796
3797   /* ERRORS */
3798 eof:
3799   {
3800     if (watch->funcs.closed)
3801       watch->funcs.closed (watch, watch->user_data);
3802
3803     /* the read connection was closed, stop the watch now */
3804     watch->keep_running = FALSE;
3805
3806     return FALSE;
3807   }
3808 read_error:
3809   {
3810     if (watch->funcs.error_full)
3811       watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
3812           0, watch->user_data);
3813     else if (watch->funcs.error)
3814       watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
3815
3816     goto eof;
3817   }
3818 }
3819
3820 static gboolean
3821 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
3822     GstRTSPWatch * watch)
3823 {
3824   GstRTSPResult res = GST_RTSP_ERROR;
3825   GstRTSPConnection *conn = watch->conn;
3826
3827   /* if this connection was already closed, stop now */
3828   if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
3829     goto eof;
3830
3831   res = build_next (&watch->builder, &watch->message, conn, FALSE);
3832   if (res == GST_RTSP_EINTR)
3833     goto done;
3834   else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
3835     g_mutex_lock (&watch->mutex);
3836     if (watch->readsrc) {
3837       if (!g_source_is_destroyed ((GSource *) watch))
3838         g_source_remove_child_source ((GSource *) watch, watch->readsrc);
3839       g_source_unref (watch->readsrc);
3840       watch->readsrc = NULL;
3841     }
3842
3843     if (conn->stream1) {
3844       g_object_unref (conn->stream1);
3845       conn->stream1 = NULL;
3846       conn->socket1 = NULL;
3847       conn->input_stream = NULL;
3848     }
3849     g_mutex_unlock (&watch->mutex);
3850
3851     /* When we are in tunnelled mode, the read socket can be closed and we
3852      * should be prepared for a new POST method to reopen it */
3853     if (conn->tstate == TUNNEL_STATE_COMPLETE) {
3854       /* remove the read connection for the tunnel */
3855       /* we accept a new POST request */
3856       conn->tstate = TUNNEL_STATE_GET;
3857       /* and signal that we lost our tunnel */
3858       if (watch->funcs.tunnel_lost)
3859         res = watch->funcs.tunnel_lost (watch, watch->user_data);
3860       /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */
3861       g_mutex_lock (&watch->mutex);
3862       if (watch->conn->control_stream && !watch->controlsrc) {
3863         watch->controlsrc =
3864             g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3865             (watch->conn->control_stream), NULL);
3866         g_source_set_callback (watch->controlsrc,
3867             (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3868             NULL);
3869         g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3870       }
3871       g_mutex_unlock (&watch->mutex);
3872       goto read_done;
3873     } else
3874       goto eof;
3875   } else if (G_LIKELY (res == GST_RTSP_OK)) {
3876     if (!conn->manual_http &&
3877         watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3878       if (conn->tstate == TUNNEL_STATE_NONE &&
3879           watch->message.type_data.request.method == GST_RTSP_GET) {
3880         GstRTSPMessage *response;
3881         GstRTSPStatusCode code;
3882
3883         conn->tstate = TUNNEL_STATE_GET;
3884
3885         if (watch->funcs.tunnel_start)
3886           code = watch->funcs.tunnel_start (watch, watch->user_data);
3887         else
3888           code = GST_RTSP_STS_OK;
3889
3890         /* queue the response */
3891         response = gen_tunnel_reply (conn, code, &watch->message);
3892         if (watch->funcs.tunnel_http_response)
3893           watch->funcs.tunnel_http_response (watch, &watch->message, response,
3894               watch->user_data);
3895         gst_rtsp_watch_send_message (watch, response, NULL);
3896         gst_rtsp_message_free (response);
3897         goto read_done;
3898       } else if (conn->tstate == TUNNEL_STATE_NONE &&
3899           watch->message.type_data.request.method == GST_RTSP_POST) {
3900         conn->tstate = TUNNEL_STATE_POST;
3901
3902         /* in the callback the connection should be tunneled with the
3903          * GET connection */
3904         if (watch->funcs.tunnel_complete) {
3905           watch->funcs.tunnel_complete (watch, watch->user_data);
3906         }
3907         goto read_done;
3908       }
3909     }
3910   } else
3911     goto read_error;
3912
3913   if (!conn->manual_http) {
3914     /* if manual HTTP support is not enabled, then restore the message to
3915      * what it would have looked like without the support for parsing HTTP
3916      * messages being present */
3917     if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3918       watch->message.type = GST_RTSP_MESSAGE_REQUEST;
3919       watch->message.type_data.request.method = GST_RTSP_INVALID;
3920       if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
3921         watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
3922       res = GST_RTSP_EPARSE;
3923     } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
3924       watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
3925       if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
3926         watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
3927       res = GST_RTSP_EPARSE;
3928     }
3929   }
3930   if (G_LIKELY (res != GST_RTSP_OK))
3931     goto read_error;
3932
3933   if (watch->funcs.message_received)
3934     watch->funcs.message_received (watch, &watch->message, watch->user_data);
3935
3936 read_done:
3937   gst_rtsp_message_unset (&watch->message);
3938   build_reset (&watch->builder);
3939
3940 done:
3941   return TRUE;
3942
3943   /* ERRORS */
3944 eof:
3945   {
3946     if (watch->funcs.closed)
3947       watch->funcs.closed (watch, watch->user_data);
3948
3949     /* we closed the read connection, stop the watch now */
3950     watch->keep_running = FALSE;
3951
3952     /* always stop when the input returns EOF in non-tunneled mode */
3953     return FALSE;
3954   }
3955 read_error:
3956   {
3957     if (watch->funcs.error_full)
3958       watch->funcs.error_full (watch, res, &watch->message,
3959           0, watch->user_data);
3960     else if (watch->funcs.error)
3961       watch->funcs.error (watch, res, watch->user_data);
3962
3963     goto eof;
3964   }
3965 }
3966
3967 static gboolean
3968 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
3969     gpointer user_data G_GNUC_UNUSED)
3970 {
3971   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3972   GstRTSPConnection *conn = watch->conn;
3973
3974   if (conn->initial_buffer != NULL) {
3975     gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
3976         watch);
3977   }
3978   return watch->keep_running;
3979 }
3980
3981 static gboolean
3982 gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
3983     GstRTSPWatch * watch)
3984 {
3985   GstRTSPResult res = GST_RTSP_ERROR;
3986   GstRTSPConnection *conn = watch->conn;
3987
3988   /* if this connection was already closed, stop now */
3989   if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3990       !watch->messages)
3991     goto eof;
3992
3993   g_mutex_lock (&watch->mutex);
3994   do {
3995     guint n_messages = gst_queue_array_get_length (watch->messages);
3996     GOutputVector *vectors;
3997     GstMapInfo *map_infos;
3998     guint *ids;
3999     gsize bytes_to_write, bytes_written;
4000     guint n_vectors, n_memories, n_ids, drop_messages;
4001     gint i, j, l, n_mmap;
4002     GstRTSPSerializedMessage *msg;
4003
4004     /* if this connection was already closed, stop now */
4005     if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
4006         !watch->messages) {
4007       g_mutex_unlock (&watch->mutex);
4008       goto eof;
4009     }
4010
4011     if (n_messages == 0) {
4012       if (watch->writesrc) {
4013         if (!g_source_is_destroyed ((GSource *) watch))
4014           g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4015         g_source_unref (watch->writesrc);
4016         watch->writesrc = NULL;
4017         /* we create and add the write source again when we actually have
4018          * something to write */
4019
4020         /* since write source is now removed we add read source on the write
4021          * socket instead to be able to detect when client closes get channel
4022          * in tunneled mode */
4023         if (watch->conn->control_stream) {
4024           watch->controlsrc =
4025               g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4026               (watch->conn->control_stream), NULL);
4027           g_source_set_callback (watch->controlsrc,
4028               (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
4029               NULL);
4030           g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4031         } else {
4032           watch->controlsrc = NULL;
4033         }
4034       }
4035       break;
4036     }
4037
4038     for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
4039       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4040       if (msg->id != 0)
4041         n_ids++;
4042
4043       if (msg->data_offset < msg->data_size)
4044         n_vectors++;
4045
4046       if (msg->body_data && msg->body_offset < msg->body_data_size) {
4047         n_vectors++;
4048       } else if (msg->body_buffer) {
4049         guint m, n;
4050         guint offset = 0;
4051
4052         n = gst_buffer_n_memory (msg->body_buffer);
4053         for (m = 0; m < n; m++) {
4054           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
4055
4056           /* Skip all memories we already wrote */
4057           if (offset + mem->size <= msg->body_offset) {
4058             offset += mem->size;
4059             continue;
4060           }
4061           offset += mem->size;
4062
4063           n_memories++;
4064           n_vectors++;
4065         }
4066       }
4067     }
4068
4069     vectors = g_newa (GOutputVector, n_vectors);
4070     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4071     ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
4072     if (ids)
4073       memset (ids, 0, sizeof (guint) * (n_ids + 1));
4074
4075     for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages;
4076         i++) {
4077       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4078
4079       if (msg->data_offset < msg->data_size) {
4080         vectors[j].buffer = (msg->data_is_data_header ?
4081             msg->data_header : msg->data) + msg->data_offset;
4082         vectors[j].size = msg->data_size - msg->data_offset;
4083         bytes_to_write += vectors[j].size;
4084         j++;
4085       }
4086
4087       if (msg->body_data) {
4088         if (msg->body_offset < msg->body_data_size) {
4089           vectors[j].buffer = msg->body_data + msg->body_offset;
4090           vectors[j].size = msg->body_data_size - msg->body_offset;
4091           bytes_to_write += vectors[j].size;
4092           j++;
4093         }
4094       } else if (msg->body_buffer) {
4095         guint m, n;
4096         guint offset = 0;
4097         n = gst_buffer_n_memory (msg->body_buffer);
4098         for (m = 0; m < n; m++) {
4099           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
4100           guint off;
4101
4102           /* Skip all memories we already wrote */
4103           if (offset + mem->size <= msg->body_offset) {
4104             offset += mem->size;
4105             continue;
4106           }
4107
4108           if (offset < msg->body_offset)
4109             off = msg->body_offset - offset;
4110           else
4111             off = 0;
4112
4113           offset += mem->size;
4114
4115           g_assert (off < mem->size);
4116
4117           gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ);
4118           vectors[j].buffer = map_infos[n_mmap].data + off;
4119           vectors[j].size = map_infos[n_mmap].size - off;
4120           bytes_to_write += vectors[j].size;
4121
4122           n_mmap++;
4123           j++;
4124         }
4125       }
4126     }
4127
4128     res =
4129         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4130         &bytes_written, FALSE, watch->conn->cancellable);
4131     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4132
4133     /* First unmap all memories here, this simplifies the code below
4134      * as we don't have to skip all memories that were already written
4135      * before */
4136     for (i = 0; i < n_mmap; i++) {
4137       gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
4138     }
4139
4140     if (bytes_written == bytes_to_write) {
4141       /* fast path, just unmap all memories, free memory, drop all messages and notify them */
4142       l = 0;
4143       while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4144         if (msg->id) {
4145           ids[l] = msg->id;
4146           l++;
4147         }
4148
4149         gst_rtsp_serialized_message_clear (msg);
4150       }
4151
4152       g_assert (watch->messages_bytes >= bytes_written);
4153       watch->messages_bytes -= bytes_written;
4154     } else if (bytes_written > 0) {
4155       /* not done, let's skip all messages that were sent already and free them */
4156       for (i = 0, drop_messages = 0; i < n_messages; i++) {
4157         msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4158
4159         if (bytes_written >= msg->data_size - msg->data_offset) {
4160           guint body_size;
4161
4162           /* all data of this message is sent, check body and otherwise
4163            * skip the whole message for next time */
4164           bytes_written -= (msg->data_size - msg->data_offset);
4165           watch->messages_bytes -= (msg->data_size - msg->data_offset);
4166           msg->data_offset = msg->data_size;
4167
4168           if (msg->body_data) {
4169             body_size = msg->body_data_size;
4170           } else if (msg->body_buffer) {
4171             body_size = gst_buffer_get_size (msg->body_buffer);
4172           } else {
4173             body_size = 0;
4174           }
4175
4176           if (bytes_written + msg->body_offset >= body_size) {
4177             /* body written, drop this message */
4178             bytes_written -= body_size - msg->body_offset;
4179             watch->messages_bytes -= body_size - msg->body_offset;
4180             msg->body_offset = body_size;
4181             drop_messages++;
4182
4183             if (msg->id) {
4184               ids[l] = msg->id;
4185               l++;
4186             }
4187
4188             gst_rtsp_serialized_message_clear (msg);
4189           } else {
4190             msg->body_offset += bytes_written;
4191             watch->messages_bytes -= bytes_written;
4192             bytes_written = 0;
4193           }
4194         } else {
4195           /* Need to continue sending from the data of this message */
4196           msg->data_offset += bytes_written;
4197           watch->messages_bytes -= bytes_written;
4198           bytes_written = 0;
4199         }
4200       }
4201
4202       while (drop_messages > 0) {
4203         msg = gst_queue_array_pop_head_struct (watch->messages);
4204         g_assert (msg);
4205         drop_messages--;
4206       }
4207
4208       g_assert (watch->messages_bytes >= bytes_written);
4209       watch->messages_bytes -= bytes_written;
4210     }
4211
4212     if (!IS_BACKLOG_FULL (watch))
4213       g_cond_signal (&watch->queue_not_full);
4214     g_mutex_unlock (&watch->mutex);
4215
4216     /* notify all messages that were successfully written */
4217     if (ids) {
4218       while (*ids) {
4219         /* only decrease the counter for messages that have an id. Only
4220          * the last message of a messages chunk is counted */
4221         watch->messages_count--;
4222
4223         if (watch->funcs.message_sent)
4224           watch->funcs.message_sent (watch, *ids, watch->user_data);
4225         ids++;
4226       }
4227     }
4228
4229     if (res == GST_RTSP_EINTR) {
4230       goto write_blocked;
4231     } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
4232       goto write_error;
4233     }
4234     g_mutex_lock (&watch->mutex);
4235   } while (TRUE);
4236   g_mutex_unlock (&watch->mutex);
4237
4238 write_blocked:
4239   return TRUE;
4240
4241   /* ERRORS */
4242 eof:
4243   {
4244     return FALSE;
4245   }
4246 write_error:
4247   {
4248     if (watch->funcs.error_full) {
4249       guint i, n_messages;
4250
4251       n_messages = gst_queue_array_get_length (watch->messages);
4252       for (i = 0; i < n_messages; i++) {
4253         GstRTSPSerializedMessage *msg =
4254             gst_queue_array_peek_nth_struct (watch->messages, i);
4255         if (msg->id)
4256           watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
4257       }
4258     } else if (watch->funcs.error) {
4259       watch->funcs.error (watch, res, watch->user_data);
4260     }
4261
4262     return FALSE;
4263   }
4264 }
4265
4266 static void
4267 gst_rtsp_source_finalize (GSource * source)
4268 {
4269   GstRTSPWatch *watch = (GstRTSPWatch *) source;
4270   GstRTSPSerializedMessage *msg;
4271
4272   if (watch->notify)
4273     watch->notify (watch->user_data);
4274
4275   build_reset (&watch->builder);
4276   gst_rtsp_message_unset (&watch->message);
4277
4278   while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4279     gst_rtsp_serialized_message_clear (msg);
4280   }
4281   gst_queue_array_free (watch->messages);
4282   watch->messages = NULL;
4283   watch->messages_bytes = 0;
4284   watch->messages_count = 0;
4285
4286   g_cond_clear (&watch->queue_not_full);
4287
4288   if (watch->readsrc)
4289     g_source_unref (watch->readsrc);
4290   if (watch->writesrc)
4291     g_source_unref (watch->writesrc);
4292   if (watch->controlsrc)
4293     g_source_unref (watch->controlsrc);
4294
4295   g_mutex_clear (&watch->mutex);
4296 }
4297
4298 static GSourceFuncs gst_rtsp_source_funcs = {
4299   gst_rtsp_source_prepare,
4300   gst_rtsp_source_check,
4301   gst_rtsp_source_dispatch,
4302   gst_rtsp_source_finalize,
4303   NULL,
4304   NULL
4305 };
4306
4307 /**
4308  * gst_rtsp_watch_new: (skip)
4309  * @conn: a #GstRTSPConnection
4310  * @funcs: watch functions
4311  * @user_data: user data to pass to @funcs
4312  * @notify: notify when @user_data is not referenced anymore
4313  *
4314  * Create a watch object for @conn. The functions provided in @funcs will be
4315  * called with @user_data when activity happened on the watch.
4316  *
4317  * The new watch is usually created so that it can be attached to a
4318  * maincontext with gst_rtsp_watch_attach().
4319  *
4320  * @conn must exist for the entire lifetime of the watch.
4321  *
4322  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
4323  * communication. Free with gst_rtsp_watch_unref () after usage.
4324  */
4325 GstRTSPWatch *
4326 gst_rtsp_watch_new (GstRTSPConnection * conn,
4327     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
4328 {
4329   GstRTSPWatch *result;
4330
4331   g_return_val_if_fail (conn != NULL, NULL);
4332   g_return_val_if_fail (funcs != NULL, NULL);
4333   g_return_val_if_fail (conn->read_socket != NULL, NULL);
4334   g_return_val_if_fail (conn->write_socket != NULL, NULL);
4335
4336   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
4337       sizeof (GstRTSPWatch));
4338
4339   result->conn = conn;
4340   result->builder.state = STATE_START;
4341
4342   g_mutex_init (&result->mutex);
4343   result->messages =
4344       gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
4345   g_cond_init (&result->queue_not_full);
4346
4347   gst_rtsp_watch_reset (result);
4348   result->keep_running = TRUE;
4349   result->flushing = FALSE;
4350
4351   result->funcs = *funcs;
4352   result->user_data = user_data;
4353   result->notify = notify;
4354
4355   return result;
4356 }
4357
4358 /**
4359  * gst_rtsp_watch_reset:
4360  * @watch: a #GstRTSPWatch
4361  *
4362  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
4363  * when the file descriptors of the connection might have changed.
4364  */
4365 void
4366 gst_rtsp_watch_reset (GstRTSPWatch * watch)
4367 {
4368   g_mutex_lock (&watch->mutex);
4369   if (watch->readsrc) {
4370     g_source_remove_child_source ((GSource *) watch, watch->readsrc);
4371     g_source_unref (watch->readsrc);
4372   }
4373   if (watch->writesrc) {
4374     g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4375     g_source_unref (watch->writesrc);
4376     watch->writesrc = NULL;
4377   }
4378   if (watch->controlsrc) {
4379     g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4380     g_source_unref (watch->controlsrc);
4381     watch->controlsrc = NULL;
4382   }
4383
4384   if (watch->conn->input_stream) {
4385     watch->readsrc =
4386         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4387         (watch->conn->input_stream), NULL);
4388     g_source_set_callback (watch->readsrc,
4389         (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
4390     g_source_add_child_source ((GSource *) watch, watch->readsrc);
4391   } else {
4392     watch->readsrc = NULL;
4393   }
4394
4395   /* we create and add the write source when we actually have something to
4396    * write */
4397
4398   /* when write source is not added we add read source on the write socket
4399    * instead to be able to detect when client closes get channel in tunneled
4400    * mode */
4401   if (watch->conn->control_stream) {
4402     watch->controlsrc =
4403         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4404         (watch->conn->control_stream), NULL);
4405     g_source_set_callback (watch->controlsrc,
4406         (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
4407     g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4408   } else {
4409     watch->controlsrc = NULL;
4410   }
4411   g_mutex_unlock (&watch->mutex);
4412 }
4413
4414 /**
4415  * gst_rtsp_watch_attach:
4416  * @watch: a #GstRTSPWatch
4417  * @context: a GMainContext (if NULL, the default context will be used)
4418  *
4419  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
4420  *
4421  * Returns: the ID (greater than 0) for the watch within the GMainContext.
4422  */
4423 guint
4424 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
4425 {
4426   g_return_val_if_fail (watch != NULL, 0);
4427
4428   return g_source_attach ((GSource *) watch, context);
4429 }
4430
4431 /**
4432  * gst_rtsp_watch_unref:
4433  * @watch: a #GstRTSPWatch
4434  *
4435  * Decreases the reference count of @watch by one. If the resulting reference
4436  * count is zero the watch and associated memory will be destroyed.
4437  */
4438 void
4439 gst_rtsp_watch_unref (GstRTSPWatch * watch)
4440 {
4441   g_return_if_fail (watch != NULL);
4442
4443   g_source_unref ((GSource *) watch);
4444 }
4445
4446 /**
4447  * gst_rtsp_watch_set_send_backlog:
4448  * @watch: a #GstRTSPWatch
4449  * @bytes: maximum bytes
4450  * @messages: maximum messages
4451  *
4452  * Set the maximum amount of bytes and messages that will be queued in @watch.
4453  * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
4454  * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
4455  *
4456  * A value of 0 for @bytes or @messages means no limits.
4457  *
4458  * Since: 1.2
4459  */
4460 void
4461 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
4462     gsize bytes, guint messages)
4463 {
4464   g_return_if_fail (watch != NULL);
4465
4466   g_mutex_lock (&watch->mutex);
4467   watch->max_bytes = bytes;
4468   watch->max_messages = messages;
4469   if (!IS_BACKLOG_FULL (watch))
4470     g_cond_signal (&watch->queue_not_full);
4471   g_mutex_unlock (&watch->mutex);
4472
4473   GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
4474       bytes, messages);
4475 }
4476
4477 /**
4478  * gst_rtsp_watch_get_send_backlog:
4479  * @watch: a #GstRTSPWatch
4480  * @bytes: (out) (allow-none): maximum bytes
4481  * @messages: (out) (allow-none): maximum messages
4482  *
4483  * Get the maximum amount of bytes and messages that will be queued in @watch.
4484  * See gst_rtsp_watch_set_send_backlog().
4485  *
4486  * Since: 1.2
4487  */
4488 void
4489 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
4490     gsize * bytes, guint * messages)
4491 {
4492   g_return_if_fail (watch != NULL);
4493
4494   g_mutex_lock (&watch->mutex);
4495   if (bytes)
4496     *bytes = watch->max_bytes;
4497   if (messages)
4498     *messages = watch->max_messages;
4499   g_mutex_unlock (&watch->mutex);
4500 }
4501
4502 static GstRTSPResult
4503 gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
4504     GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
4505 {
4506   GstRTSPResult res;
4507   GMainContext *context = NULL;
4508   gint i;
4509
4510   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4511   g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
4512
4513   g_mutex_lock (&watch->mutex);
4514   if (watch->flushing)
4515     goto flushing;
4516
4517   /* try to send the message synchronously first */
4518   if (gst_queue_array_get_length (watch->messages) == 0) {
4519     gint j, k;
4520     GOutputVector *vectors;
4521     GstMapInfo *map_infos;
4522     gsize bytes_to_write, bytes_written;
4523     guint n_vectors, n_memories, drop_messages;
4524
4525     for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
4526       n_vectors++;
4527       if (messages[i].body_data) {
4528         n_vectors++;
4529       } else if (messages[i].body_buffer) {
4530         n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
4531         n_memories += gst_buffer_n_memory (messages[i].body_buffer);
4532       }
4533     }
4534
4535     vectors = g_newa (GOutputVector, n_vectors);
4536     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4537
4538     for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
4539       vectors[j].buffer = messages[i].data_is_data_header ?
4540           messages[i].data_header : messages[i].data;
4541       vectors[j].size = messages[i].data_size;
4542       bytes_to_write += vectors[j].size;
4543       j++;
4544
4545       if (messages[i].body_data) {
4546         vectors[j].buffer = messages[i].body_data;
4547         vectors[j].size = messages[i].body_data_size;
4548         bytes_to_write += vectors[j].size;
4549         j++;
4550       } else if (messages[i].body_buffer) {
4551         gint l, n;
4552
4553         n = gst_buffer_n_memory (messages[i].body_buffer);
4554         for (l = 0; l < n; l++) {
4555           GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
4556
4557           gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
4558           vectors[j].buffer = map_infos[k].data;
4559           vectors[j].size = map_infos[k].size;
4560           bytes_to_write += vectors[j].size;
4561
4562           k++;
4563           j++;
4564         }
4565       }
4566     }
4567
4568     res =
4569         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4570         &bytes_written, FALSE, watch->conn->cancellable);
4571     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4572
4573     /* At this point we sent everything we could without blocking or
4574      * error and updated the offsets inside the message accordingly */
4575
4576     /* First of all unmap all memories. This simplifies the code below */
4577     for (k = 0; k < n_memories; k++) {
4578       gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
4579     }
4580
4581     if (res != GST_RTSP_EINTR) {
4582       /* actual error or done completely */
4583       if (id != NULL)
4584         *id = 0;
4585
4586       /* free everything */
4587       for (i = 0, k = 0; i < n_messages; i++) {
4588         gst_rtsp_serialized_message_clear (&messages[i]);
4589       }
4590
4591       goto done;
4592     }
4593
4594     /* not done, let's skip all messages that were sent already and free them */
4595     for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
4596       if (bytes_written >= messages[i].data_size) {
4597         guint body_size;
4598
4599         /* all data of this message is sent, check body and otherwise
4600          * skip the whole message for next time */
4601         messages[i].data_offset = messages[i].data_size;
4602         bytes_written -= messages[i].data_size;
4603
4604         if (messages[i].body_data) {
4605           body_size = messages[i].body_data_size;
4606
4607         } else if (messages[i].body_buffer) {
4608           body_size = gst_buffer_get_size (messages[i].body_buffer);
4609         } else {
4610           body_size = 0;
4611         }
4612
4613         if (bytes_written >= body_size) {
4614           /* body written, drop this message */
4615           messages[i].body_offset = body_size;
4616           bytes_written -= body_size;
4617           drop_messages++;
4618
4619           gst_rtsp_serialized_message_clear (&messages[i]);
4620         } else {
4621           messages[i].body_offset = bytes_written;
4622           bytes_written = 0;
4623         }
4624       } else {
4625         /* Need to continue sending from the data of this message */
4626         messages[i].data_offset = bytes_written;
4627         bytes_written = 0;
4628       }
4629     }
4630
4631     g_assert (n_messages > drop_messages);
4632
4633     messages += drop_messages;
4634     n_messages -= drop_messages;
4635   }
4636
4637   /* check limits */
4638   if (IS_BACKLOG_FULL (watch))
4639     goto too_much_backlog;
4640
4641   for (i = 0; i < n_messages; i++) {
4642     GstRTSPSerializedMessage local_message;
4643
4644     /* make a record with the data and id for sending async */
4645     local_message = messages[i];
4646
4647     /* copy the body data or take an additional reference to the body buffer
4648      * we don't own them here */
4649     if (local_message.body_data) {
4650       local_message.body_data =
4651           g_memdup2 (local_message.body_data, local_message.body_data_size);
4652     } else if (local_message.body_buffer) {
4653       gst_buffer_ref (local_message.body_buffer);
4654     }
4655     local_message.borrowed = FALSE;
4656
4657     /* set an id for the very last message */
4658     if (i == n_messages - 1) {
4659       do {
4660         /* make sure rec->id is never 0 */
4661         local_message.id = ++watch->id;
4662       } while (G_UNLIKELY (local_message.id == 0));
4663
4664       if (id != NULL)
4665         *id = local_message.id;
4666     } else {
4667       local_message.id = 0;
4668     }
4669
4670     /* add the record to a queue. */
4671     gst_queue_array_push_tail_struct (watch->messages, &local_message);
4672     watch->messages_bytes +=
4673         (local_message.data_size - local_message.data_offset);
4674     if (local_message.body_data)
4675       watch->messages_bytes +=
4676           (local_message.body_data_size - local_message.body_offset);
4677     else if (local_message.body_buffer)
4678       watch->messages_bytes +=
4679           (gst_buffer_get_size (local_message.body_buffer) -
4680           local_message.body_offset);
4681   }
4682   /* each message chunks is one unit */
4683   watch->messages_count++;
4684
4685   /* make sure the main context will now also check for writability on the
4686    * socket */
4687   context = ((GSource *) watch)->context;
4688   if (!watch->writesrc) {
4689     /* remove the read source on the write socket, we will be able to detect
4690      * errors while writing */
4691     if (watch->controlsrc) {
4692       g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4693       g_source_unref (watch->controlsrc);
4694       watch->controlsrc = NULL;
4695     }
4696
4697     watch->writesrc =
4698         g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
4699         (watch->conn->output_stream), NULL);
4700     g_source_set_callback (watch->writesrc,
4701         (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
4702     g_source_add_child_source ((GSource *) watch, watch->writesrc);
4703   }
4704   res = GST_RTSP_OK;
4705
4706 done:
4707   g_mutex_unlock (&watch->mutex);
4708
4709   if (context)
4710     g_main_context_wakeup (context);
4711
4712   return res;
4713
4714   /* ERRORS */
4715 flushing:
4716   {
4717     GST_DEBUG ("we are flushing");
4718     g_mutex_unlock (&watch->mutex);
4719     for (i = 0; i < n_messages; i++) {
4720       gst_rtsp_serialized_message_clear (&messages[i]);
4721     }
4722     return GST_RTSP_EINTR;
4723   }
4724 too_much_backlog:
4725   {
4726     GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
4727         G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
4728         watch->messages_bytes, watch->max_messages, watch->messages_count);
4729     g_mutex_unlock (&watch->mutex);
4730     for (i = 0; i < n_messages; i++) {
4731       gst_rtsp_serialized_message_clear (&messages[i]);
4732     }
4733     return GST_RTSP_ENOMEM;
4734   }
4735
4736   return GST_RTSP_OK;
4737 }
4738
4739 /**
4740  * gst_rtsp_watch_write_data:
4741  * @watch: a #GstRTSPWatch
4742  * @data: (array length=size) (transfer full): the data to queue
4743  * @size: the size of @data
4744  * @id: (out) (allow-none): location for a message ID or %NULL
4745  *
4746  * Write @data using the connection of the @watch. If it cannot be sent
4747  * immediately, it will be queued for transmission in @watch. The contents of
4748  * @message will then be serialized and transmitted when the connection of the
4749  * @watch becomes writable. In case the @message is queued, the ID returned in
4750  * @id will be non-zero and used as the ID argument in the message_sent
4751  * callback.
4752  *
4753  * This function will take ownership of @data and g_free() it after use.
4754  *
4755  * If the amount of queued data exceeds the limits set with
4756  * gst_rtsp_watch_set_send_backlog(), this function will return
4757  * #GST_RTSP_ENOMEM.
4758  *
4759  * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
4760  * are reached. #GST_RTSP_EINTR when @watch was flushing.
4761  */
4762 /* FIXME 2.0: This should've been static! */
4763 GstRTSPResult
4764 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
4765     guint size, guint * id)
4766 {
4767   GstRTSPSerializedMessage serialized_message;
4768
4769   memset (&serialized_message, 0, sizeof (serialized_message));
4770   serialized_message.data = (guint8 *) data;
4771   serialized_message.data_size = size;
4772
4773   return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
4774       1, id);
4775 }
4776
4777 /**
4778  * gst_rtsp_watch_send_message:
4779  * @watch: a #GstRTSPWatch
4780  * @message: a #GstRTSPMessage
4781  * @id: (out) (allow-none): location for a message ID or %NULL
4782  *
4783  * Send a @message using the connection of the @watch. If it cannot be sent
4784  * immediately, it will be queued for transmission in @watch. The contents of
4785  * @message will then be serialized and transmitted when the connection of the
4786  * @watch becomes writable. In case the @message is queued, the ID returned in
4787  * @id will be non-zero and used as the ID argument in the message_sent
4788  * callback.
4789  *
4790  * Returns: #GST_RTSP_OK on success.
4791  */
4792 GstRTSPResult
4793 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
4794     guint * id)
4795 {
4796   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4797   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
4798
4799   return gst_rtsp_watch_send_messages (watch, message, 1, id);
4800 }
4801
4802 /**
4803  * gst_rtsp_watch_send_messages:
4804  * @watch: a #GstRTSPWatch
4805  * @messages: (array length=n_messages): the messages to send
4806  * @n_messages: the number of messages to send
4807  * @id: (out) (allow-none): location for a message ID or %NULL
4808  *
4809  * Sends @messages using the connection of the @watch. If they cannot be sent
4810  * immediately, they will be queued for transmission in @watch. The contents of
4811  * @messages will then be serialized and transmitted when the connection of the
4812  * @watch becomes writable. In case the @messages are queued, the ID returned in
4813  * @id will be non-zero and used as the ID argument in the message_sent
4814  * callback once the last message is sent. The callback will only be called
4815  * once for the last message.
4816  *
4817  * Returns: #GST_RTSP_OK on success.
4818  *
4819  * Since: 1.16
4820  */
4821 GstRTSPResult
4822 gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
4823     guint n_messages, guint * id)
4824 {
4825   GstRTSPSerializedMessage *serialized_messages;
4826   gint i;
4827
4828   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4829   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
4830
4831   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
4832   memset (serialized_messages, 0,
4833       sizeof (GstRTSPSerializedMessage) * n_messages);
4834
4835   for (i = 0; i < n_messages; i++) {
4836     if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
4837       goto error;
4838   }
4839
4840   return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
4841       n_messages, id);
4842
4843 error:
4844   for (i = 0; i < n_messages; i++) {
4845     gst_rtsp_serialized_message_clear (&serialized_messages[i]);
4846   }
4847
4848   return GST_RTSP_EINVAL;
4849 }
4850
4851 /**
4852  * gst_rtsp_watch_wait_backlog_usec:
4853  * @watch: a #GstRTSPWatch
4854  * @timeout: a timeout in microseconds
4855  *
4856  * Wait until there is place in the backlog queue, @timeout is reached
4857  * or @watch is set to flushing.
4858  *
4859  * If @timeout is 0 this function can block forever. If @timeout
4860  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
4861  * after the timeout expired.
4862  *
4863  * The typically use of this function is when gst_rtsp_watch_write_data
4864  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
4865  * free space in the backlog queue and try again.
4866  *
4867  * Returns: %GST_RTSP_OK when if there is room in queue.
4868  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
4869  *          %GST_RTSP_EINTR when @watch is flushing
4870  *          %GST_RTSP_EINVAL when called with invalid parameters.
4871  *
4872  * Since: 1.18
4873  */
4874 GstRTSPResult
4875 gst_rtsp_watch_wait_backlog_usec (GstRTSPWatch * watch, gint64 timeout)
4876 {
4877   gint64 end_time;
4878
4879   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4880
4881   end_time = g_get_monotonic_time () + timeout;
4882
4883   g_mutex_lock (&watch->mutex);
4884   if (watch->flushing)
4885     goto flushing;
4886
4887   while (IS_BACKLOG_FULL (watch)) {
4888     gboolean res;
4889
4890     res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time);
4891     if (watch->flushing)
4892       goto flushing;
4893
4894     if (!res)
4895       goto timeout;
4896   }
4897   g_mutex_unlock (&watch->mutex);
4898
4899   return GST_RTSP_OK;
4900
4901   /* ERRORS */
4902 flushing:
4903   {
4904     GST_DEBUG ("we are flushing");
4905     g_mutex_unlock (&watch->mutex);
4906     return GST_RTSP_EINTR;
4907   }
4908 timeout:
4909   {
4910     GST_DEBUG ("we timed out");
4911     g_mutex_unlock (&watch->mutex);
4912     return GST_RTSP_ETIMEOUT;
4913   }
4914 }
4915
4916 /**
4917  * gst_rtsp_watch_set_flushing:
4918  * @watch: a #GstRTSPWatch
4919  * @flushing: new flushing state
4920  *
4921  * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog()
4922  * and make sure gst_rtsp_watch_write_data() returns immediately with
4923  * #GST_RTSP_EINTR. And empty the queue.
4924  *
4925  * Since: 1.4
4926  */
4927 void
4928 gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
4929 {
4930   g_return_if_fail (watch != NULL);
4931
4932   g_mutex_lock (&watch->mutex);
4933   watch->flushing = flushing;
4934   g_cond_signal (&watch->queue_not_full);
4935   if (flushing) {
4936     GstRTSPSerializedMessage *msg;
4937
4938     while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4939       gst_rtsp_serialized_message_clear (msg);
4940     }
4941   }
4942   g_mutex_unlock (&watch->mutex);
4943 }
4944
4945
4946 #ifndef GST_DISABLE_DEPRECATED
4947 G_GNUC_BEGIN_IGNORE_DEPRECATIONS
4948 /* Deprecated */
4949 #define TV_TO_USEC(tv) ((tv) ? ((tv)->tv_sec * G_USEC_PER_SEC + (tv)->tv_usec) : 0)
4950 /**
4951  * gst_rtsp_connection_connect:
4952  * @conn: a #GstRTSPConnection
4953  * @timeout: a GTimeVal timeout
4954  *
4955  * Attempt to connect to the url of @conn made with
4956  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4957  * forever. If @timeout contains a valid timeout, this function will return
4958  * #GST_RTSP_ETIMEOUT after the timeout expired.
4959  *
4960  * This function can be cancelled with gst_rtsp_connection_flush().
4961  *
4962  * Returns: #GST_RTSP_OK when a connection could be made.
4963  *
4964  * Deprecated: 1.18
4965  */
4966     GstRTSPResult
4967 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
4968 {
4969   return gst_rtsp_connection_connect_usec (conn, TV_TO_USEC (timeout));
4970 }
4971
4972 /**
4973  * gst_rtsp_connection_connect_with_response:
4974  * @conn: a #GstRTSPConnection
4975  * @timeout: a GTimeVal timeout
4976  * @response: a #GstRTSPMessage
4977  *
4978  * Attempt to connect to the url of @conn made with
4979  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4980  * forever. If @timeout contains a valid timeout, this function will return
4981  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
4982  * @response will contain a response to the tunneling request messages.
4983  *
4984  * This function can be cancelled with gst_rtsp_connection_flush().
4985  *
4986  * Returns: #GST_RTSP_OK when a connection could be made.
4987  *
4988  * Since: 1.8
4989  * Deprecated: 1.18
4990  */
4991 GstRTSPResult
4992 gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn,
4993     GTimeVal * timeout, GstRTSPMessage * response)
4994 {
4995   return gst_rtsp_connection_connect_with_response_usec (conn,
4996       TV_TO_USEC (timeout), response);
4997 }
4998
4999 /**
5000  * gst_rtsp_connection_read:
5001  * @conn: a #GstRTSPConnection
5002  * @data: the data to read
5003  * @size: the size of @data
5004  * @timeout: a timeout value or %NULL
5005  *
5006  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
5007  * the specified @timeout. @timeout can be %NULL, in which case this function
5008  * might block forever.
5009  *
5010  * This function can be cancelled with gst_rtsp_connection_flush().
5011  *
5012  * Returns: #GST_RTSP_OK on success.
5013  *
5014  * Deprecated: 1.18
5015  */
5016 GstRTSPResult
5017 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
5018     GTimeVal * timeout)
5019 {
5020   return gst_rtsp_connection_read_usec (conn, data, size, TV_TO_USEC (timeout));
5021 }
5022
5023 /**
5024  * gst_rtsp_connection_write:
5025  * @conn: a #GstRTSPConnection
5026  * @data: the data to write
5027  * @size: the size of @data
5028  * @timeout: a timeout value or %NULL
5029  *
5030  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
5031  * the specified @timeout. @timeout can be %NULL, in which case this function
5032  * might block forever.
5033  *
5034  * This function can be cancelled with gst_rtsp_connection_flush().
5035  *
5036  * Returns: #GST_RTSP_OK on success.
5037  *
5038  * Deprecated: 1.18
5039  */
5040 GstRTSPResult
5041 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
5042     guint size, GTimeVal * timeout)
5043 {
5044   return gst_rtsp_connection_write_usec (conn, data, size,
5045       TV_TO_USEC (timeout));
5046 }
5047
5048 /**
5049  * gst_rtsp_connection_send:
5050  * @conn: a #GstRTSPConnection
5051  * @message: the message to send
5052  * @timeout: a timeout value or %NULL
5053  *
5054  * Attempt to send @message to the connected @conn, blocking up to
5055  * the specified @timeout. @timeout can be %NULL, in which case this function
5056  * might block forever.
5057  *
5058  * This function can be cancelled with gst_rtsp_connection_flush().
5059  *
5060  * Returns: #GST_RTSP_OK on success.
5061  *
5062  * Deprecated: 1.18
5063  */
5064 GstRTSPResult
5065 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
5066     GTimeVal * timeout)
5067 {
5068   return gst_rtsp_connection_send_usec (conn, message, TV_TO_USEC (timeout));
5069 }
5070
5071 /**
5072  * gst_rtsp_connection_send_messages:
5073  * @conn: a #GstRTSPConnection
5074  * @messages: (array length=n_messages): the messages to send
5075  * @n_messages: the number of messages to send
5076  * @timeout: a timeout value or %NULL
5077  *
5078  * Attempt to send @messages to the connected @conn, blocking up to
5079  * the specified @timeout. @timeout can be %NULL, in which case this function
5080  * might block forever.
5081  *
5082  * This function can be cancelled with gst_rtsp_connection_flush().
5083  *
5084  * Returns: #GST_RTSP_OK on success.
5085  *
5086  * Since: 1.16
5087  * Deprecated: 1.18
5088  */
5089 GstRTSPResult
5090 gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
5091     GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
5092 {
5093   return gst_rtsp_connection_send_messages_usec (conn, messages, n_messages,
5094       TV_TO_USEC (timeout));
5095 }
5096
5097 /**
5098  * gst_rtsp_connection_receive:
5099  * @conn: a #GstRTSPConnection
5100  * @message: the message to read
5101  * @timeout: a timeout value or %NULL
5102  *
5103  * Attempt to read into @message from the connected @conn, blocking up to
5104  * the specified @timeout. @timeout can be %NULL, in which case this function
5105  * might block forever.
5106  *
5107  * This function can be cancelled with gst_rtsp_connection_flush().
5108  *
5109  * Returns: #GST_RTSP_OK on success.
5110  *
5111  * Deprecated: 1.18
5112  */
5113 GstRTSPResult
5114 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
5115     GTimeVal * timeout)
5116 {
5117   return gst_rtsp_connection_receive_usec (conn, message, TV_TO_USEC (timeout));
5118 }
5119
5120 /**
5121  * gst_rtsp_connection_poll:
5122  * @conn: a #GstRTSPConnection
5123  * @events: a bitmask of #GstRTSPEvent flags to check
5124  * @revents: location for result flags
5125  * @timeout: a timeout
5126  *
5127  * Wait up to the specified @timeout for the connection to become available for
5128  * at least one of the operations specified in @events. When the function returns
5129  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
5130  * @conn.
5131  *
5132  * @timeout can be %NULL, in which case this function might block forever.
5133  *
5134  * This function can be cancelled with gst_rtsp_connection_flush().
5135  *
5136  * Returns: #GST_RTSP_OK on success.
5137  *
5138  * Deprecated: 1.18
5139  */
5140 GstRTSPResult
5141 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
5142     GstRTSPEvent * revents, GTimeVal * timeout)
5143 {
5144   return gst_rtsp_connection_poll_usec (conn, events, revents,
5145       TV_TO_USEC (timeout));
5146 }
5147
5148 /**
5149  * gst_rtsp_connection_next_timeout:
5150  * @conn: a #GstRTSPConnection
5151  * @timeout: a timeout
5152  *
5153  * Calculate the next timeout for @conn, storing the result in @timeout.
5154  *
5155  * Returns: #GST_RTSP_OK.
5156  *
5157  * Deprecated: 1.18
5158  */
5159 GstRTSPResult
5160 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
5161 {
5162   gint64 tmptimeout = 0;
5163
5164   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
5165
5166   tmptimeout = gst_rtsp_connection_next_timeout_usec (conn);
5167
5168   timeout->tv_sec = tmptimeout / G_USEC_PER_SEC;
5169   timeout->tv_usec = tmptimeout % G_USEC_PER_SEC;
5170
5171   return GST_RTSP_OK;
5172 }
5173
5174
5175 /**
5176  * gst_rtsp_watch_wait_backlog:
5177  * @watch: a #GstRTSPWatch
5178  * @timeout: a GTimeVal timeout
5179  *
5180  * Wait until there is place in the backlog queue, @timeout is reached
5181  * or @watch is set to flushing.
5182  *
5183  * If @timeout is %NULL this function can block forever. If @timeout
5184  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
5185  * after the timeout expired.
5186  *
5187  * The typically use of this function is when gst_rtsp_watch_write_data
5188  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
5189  * free space in the backlog queue and try again.
5190  *
5191  * Returns: %GST_RTSP_OK when if there is room in queue.
5192  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
5193  *          %GST_RTSP_EINTR when @watch is flushing
5194  *          %GST_RTSP_EINVAL when called with invalid parameters.
5195  *
5196  * Since: 1.4
5197  * Deprecated: 1.18
5198  */
5199 GstRTSPResult
5200 gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout)
5201 {
5202   return gst_rtsp_watch_wait_backlog_usec (watch, TV_TO_USEC (timeout));
5203 }
5204
5205 G_GNUC_END_IGNORE_DEPRECATIONS
5206 #endif /* GST_DISABLE_DEPRECATED */