Bump GLib requirement to >= 2.62
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @title: GstRTSPConnection
46  * @short_description: manage RTSP connections
47  * @see_also: gstrtspurl
48  *
49  * This object manages the RTSP connection to the server. It provides function
50  * to receive and send bytes and messages.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #  include <config.h>
55 #endif
56
57 #include <stdio.h>
58 #include <errno.h>
59 #include <stdlib.h>
60 #include <string.h>
61 #include <time.h>
62
63 /* we include this here to get the G_OS_* defines */
64 #include <glib.h>
65 #include <gst/gst.h>
66 #include <gst/base/base.h>
67
68 /* necessary for IP_TOS define */
69 #include <gio/gnetworking.h>
70
71 #include "gstrtspconnection.h"
72
73 #ifdef IP_TOS
74 union gst_sockaddr
75 {
76   struct sockaddr sa;
77   struct sockaddr_in sa_in;
78   struct sockaddr_in6 sa_in6;
79   struct sockaddr_storage sa_stor;
80 };
81 #endif
82
83 typedef struct
84 {
85   gint state;
86   guint save;
87   guchar out[3];                /* the size must be evenly divisible by 3 */
88   guint cout;
89   guint coutl;
90 } DecodeCtx;
91
92 typedef struct
93 {
94   /* If %TRUE we only own data and none of the
95    * other fields
96    */
97   gboolean borrowed;
98
99   /* Header or full message */
100   guint8 *data;
101   guint data_size;
102   gboolean data_is_data_header;
103
104   /* Payload following data, if any */
105   guint8 *body_data;
106   guint body_data_size;
107   /* or */
108   GstBuffer *body_buffer;
109
110   /* DATA packet header statically allocated for above */
111   guint8 data_header[4];
112
113   /* all below only for async writing */
114
115   guint data_offset;            /* == data_size when done */
116   guint body_offset;            /* into body_data or the buffer */
117
118   /* ID of the message for notification */
119   guint id;
120 } GstRTSPSerializedMessage;
121
122 static void
123 gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
124 {
125   if (!msg->borrowed) {
126     g_free (msg->body_data);
127     gst_buffer_replace (&msg->body_buffer, NULL);
128   }
129   g_free (msg->data);
130 }
131
132 #ifdef MSG_NOSIGNAL
133 #define SEND_FLAGS MSG_NOSIGNAL
134 #else
135 #define SEND_FLAGS 0
136 #endif
137
138 typedef enum
139 {
140   TUNNEL_STATE_NONE,
141   TUNNEL_STATE_GET,
142   TUNNEL_STATE_POST,
143   TUNNEL_STATE_COMPLETE
144 } GstRTSPTunnelState;
145
146 #define TUNNELID_LEN   24
147
148 struct _GstRTSPConnection
149 {
150   /*< private > */
151   /* URL for the remote connection */
152   GstRTSPUrl *url;
153   GstRTSPVersion version;
154
155   gboolean server;
156   GSocketClient *client;
157   GIOStream *stream0;
158   GIOStream *stream1;
159
160   GInputStream *input_stream;
161   GOutputStream *output_stream;
162   /* this is a read source we add on the write socket in tunneled mode to be
163    * able to detect when client disconnects the GET channel */
164   GInputStream *control_stream;
165
166   /* connection state */
167   GSocket *read_socket;
168   GSocket *write_socket;
169   GSocket *socket0, *socket1;
170   gboolean read_socket_used;
171   gboolean write_socket_used;
172   GMutex socket_use_mutex;
173   gboolean manual_http;
174   gboolean may_cancel;
175   GCancellable *cancellable;
176
177   gchar tunnelid[TUNNELID_LEN];
178   gboolean tunneled;
179   gboolean ignore_x_server_reply;
180   GstRTSPTunnelState tstate;
181
182   /* the remote and local ip */
183   gchar *remote_ip;
184   gchar *local_ip;
185
186   gint read_ahead;
187
188   gchar *initial_buffer;
189   gsize initial_buffer_offset;
190
191   gboolean remember_session_id; /* remember the session id or not */
192
193   /* Session state */
194   gint cseq;                    /* sequence number */
195   gchar session_id[512];        /* session id */
196   gint timeout;                 /* session timeout in seconds */
197   GTimer *timer;                /* timeout timer */
198
199   /* Authentication */
200   GstRTSPAuthMethod auth_method;
201   gchar *username;
202   gchar *passwd;
203   GHashTable *auth_params;
204
205   guint content_length_limit;
206
207   /* TLS */
208   GTlsDatabase *tls_database;
209   GTlsInteraction *tls_interaction;
210
211   GstRTSPConnectionAcceptCertificateFunc accept_certificate_func;
212   GDestroyNotify accept_certificate_destroy_notify;
213   gpointer accept_certificate_user_data;
214
215   DecodeCtx ctx;
216   DecodeCtx *ctxp;
217
218   gchar *proxy_host;
219   guint proxy_port;
220 };
221
222 enum
223 {
224   STATE_START = 0,
225   STATE_DATA_HEADER,
226   STATE_DATA_BODY,
227   STATE_READ_LINES,
228   STATE_END,
229   STATE_LAST
230 };
231
232 enum
233 {
234   READ_AHEAD_EOH = -1,          /* end of headers */
235   READ_AHEAD_CRLF = -2,
236   READ_AHEAD_CRLFCR = -3
237 };
238
239 /* a structure for constructing RTSPMessages */
240 typedef struct
241 {
242   gint state;
243   GstRTSPResult status;
244   guint8 buffer[4096];
245   guint offset;
246
247   guint line;
248   guint8 *body_data;
249   guint body_len;
250 } GstRTSPBuilder;
251
252 /* function prototypes */
253 static void add_auth_header (GstRTSPConnection * conn,
254     GstRTSPMessage * message);
255
256 static void
257 build_reset (GstRTSPBuilder * builder)
258 {
259   g_free (builder->body_data);
260   memset (builder, 0, sizeof (GstRTSPBuilder));
261 }
262
263 static GstRTSPResult
264 gst_rtsp_result_from_g_io_error (GError * error, GstRTSPResult default_res)
265 {
266   if (error == NULL)
267     return GST_RTSP_OK;
268
269   if (error->domain != G_IO_ERROR)
270     return default_res;
271
272   switch (error->code) {
273     case G_IO_ERROR_TIMED_OUT:
274       return GST_RTSP_ETIMEOUT;
275     case G_IO_ERROR_INVALID_ARGUMENT:
276       return GST_RTSP_EINVAL;
277     case G_IO_ERROR_CANCELLED:
278     case G_IO_ERROR_WOULD_BLOCK:
279       return GST_RTSP_EINTR;
280     default:
281       return default_res;
282   }
283 }
284
285 static gboolean
286 tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
287     GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
288 {
289   GError *error = NULL;
290   gboolean accept = FALSE;
291
292   if (rtspconn->tls_database) {
293     GSocketConnectable *peer_identity;
294     GTlsCertificateFlags validation_flags;
295
296     GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
297
298     peer_identity =
299         g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
300         (conn));
301
302     errors =
303         g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
304         G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
305         g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
306         NULL, &error);
307
308     if (error)
309       goto verify_error;
310
311     validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
312
313     accept = ((errors & validation_flags) == 0);
314     if (accept)
315       GST_DEBUG ("Peer certificate accepted");
316     else
317       GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
318   }
319
320   if (!accept && rtspconn->accept_certificate_func) {
321     accept =
322         rtspconn->accept_certificate_func (conn, peer_cert, errors,
323         rtspconn->accept_certificate_user_data);
324     GST_DEBUG ("Peer certificate %saccepted by accept-certificate function",
325         accept ? "" : "not ");
326   }
327
328   return accept;
329
330 /* ERRORS */
331 verify_error:
332   {
333     GST_ERROR ("An error occurred while verifying the peer certificate: %s",
334         error->message);
335     g_clear_error (&error);
336     return FALSE;
337   }
338 }
339
340 static void
341 socket_client_event (GSocketClient * client, GSocketClientEvent event,
342     GSocketConnectable * connectable, GTlsConnection * connection,
343     GstRTSPConnection * rtspconn)
344 {
345   if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
346     GST_DEBUG ("TLS handshaking about to start...");
347
348     g_signal_connect (connection, "accept-certificate",
349         (GCallback) tls_accept_certificate, rtspconn);
350
351     g_tls_connection_set_interaction (connection, rtspconn->tls_interaction);
352   }
353 }
354
355 /**
356  * gst_rtsp_connection_create:
357  * @url: a #GstRTSPUrl
358  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
359  *
360  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
361  * The connection will not yet attempt to connect to @url, use
362  * gst_rtsp_connection_connect().
363  *
364  * A copy of @url will be made.
365  *
366  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
367  */
368 GstRTSPResult
369 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
370 {
371   GstRTSPConnection *newconn;
372
373   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
374   g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
375
376   newconn = g_new0 (GstRTSPConnection, 1);
377
378   newconn->may_cancel = TRUE;
379   newconn->cancellable = g_cancellable_new ();
380   newconn->client = g_socket_client_new ();
381
382   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
383     g_socket_client_set_tls (newconn->client, TRUE);
384
385   g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
386       newconn);
387
388   newconn->url = gst_rtsp_url_copy (url);
389   newconn->timer = g_timer_new ();
390   newconn->timeout = 60;
391   newconn->cseq = 1;            /* RFC 7826: "it is RECOMMENDED to start at 0.",
392                                    but some servers don't copy values <1 due to bugs. */
393
394   newconn->remember_session_id = TRUE;
395
396   newconn->auth_method = GST_RTSP_AUTH_NONE;
397   newconn->username = NULL;
398   newconn->passwd = NULL;
399   newconn->auth_params = NULL;
400   newconn->version = 0;
401
402   newconn->content_length_limit = G_MAXUINT;
403
404   *conn = newconn;
405
406   return GST_RTSP_OK;
407 }
408
409 static gboolean
410 collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
411     gboolean remote, GError ** error)
412 {
413   GSocketAddress *addr;
414
415   if (remote)
416     addr = g_socket_get_remote_address (socket, error);
417   else
418     addr = g_socket_get_local_address (socket, error);
419   if (!addr)
420     return FALSE;
421
422   if (ip)
423     *ip = g_inet_address_to_string (g_inet_socket_address_get_address
424         (G_INET_SOCKET_ADDRESS (addr)));
425   if (port)
426     *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
427
428   g_object_unref (addr);
429
430   return TRUE;
431 }
432
433
434 /**
435  * gst_rtsp_connection_create_from_socket:
436  * @socket: a #GSocket
437  * @ip: the IP address of the other end
438  * @port: the port used by the other end
439  * @initial_buffer: data already read from @fd
440  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
441  *
442  * Create a new #GstRTSPConnection for handling communication on the existing
443  * socket @socket. The @initial_buffer contains zero terminated data already
444  * read from @socket which should be used before starting to read new data.
445  *
446  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
447  */
448 /* FIXME 2.0 We don't need the ip and port since they can be got from the
449  * GSocket */
450 GstRTSPResult
451 gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
452     guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn)
453 {
454   GstRTSPConnection *newconn = NULL;
455   GstRTSPUrl *url;
456   GstRTSPResult res;
457   GError *err = NULL;
458   gchar *local_ip;
459   GIOStream *stream;
460
461   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
462   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
463   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
464
465   if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
466     goto getnameinfo_failed;
467
468   /* create a url for the client address */
469   url = g_new0 (GstRTSPUrl, 1);
470   url->host = g_strdup (ip);
471   url->port = port;
472
473   /* now create the connection object */
474   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
475   gst_rtsp_url_free (url);
476
477   stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
478
479   /* both read and write initially */
480   newconn->server = TRUE;
481   newconn->socket0 = socket;
482   newconn->stream0 = stream;
483   newconn->write_socket = newconn->read_socket = newconn->socket0;
484   newconn->read_socket_used = FALSE;
485   newconn->write_socket_used = FALSE;
486   g_mutex_init (&newconn->socket_use_mutex);
487   newconn->input_stream = g_io_stream_get_input_stream (stream);
488   newconn->output_stream = g_io_stream_get_output_stream (stream);
489   newconn->control_stream = NULL;
490   newconn->remote_ip = g_strdup (ip);
491   newconn->local_ip = local_ip;
492   newconn->initial_buffer = g_strdup (initial_buffer);
493
494   *conn = newconn;
495
496   return GST_RTSP_OK;
497
498   /* ERRORS */
499 getnameinfo_failed:
500   {
501     GST_ERROR ("failed to get local address: %s", err->message);
502     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
503     g_clear_error (&err);
504     return res;
505   }
506 newconn_failed:
507   {
508     GST_ERROR ("failed to make connection");
509     g_free (local_ip);
510     gst_rtsp_url_free (url);
511     return res;
512   }
513 }
514
515 /**
516  * gst_rtsp_connection_accept:
517  * @socket: a socket
518  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
519  * @cancellable: a #GCancellable to cancel the operation
520  *
521  * Accept a new connection on @socket and create a new #GstRTSPConnection for
522  * handling communication on new socket.
523  *
524  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
525  */
526 GstRTSPResult
527 gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
528     GCancellable * cancellable)
529 {
530   GError *err = NULL;
531   gchar *ip;
532   guint16 port;
533   GSocket *client_sock;
534   GstRTSPResult ret;
535
536   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
537   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
538
539   client_sock = g_socket_accept (socket, cancellable, &err);
540   if (!client_sock)
541     goto accept_failed;
542
543   /* get the remote ip address and port */
544   if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
545     goto getnameinfo_failed;
546
547   ret =
548       gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
549       conn);
550   g_object_unref (client_sock);
551   g_free (ip);
552
553   return ret;
554
555   /* ERRORS */
556 accept_failed:
557   {
558     GST_DEBUG ("Accepting client failed: %s", err->message);
559     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
560     g_clear_error (&err);
561     return ret;
562   }
563 getnameinfo_failed:
564   {
565     GST_DEBUG ("getnameinfo failed: %s", err->message);
566     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
567     g_clear_error (&err);
568     if (!g_socket_close (client_sock, &err)) {
569       GST_DEBUG ("Closing socket failed: %s", err->message);
570       g_clear_error (&err);
571     }
572     g_object_unref (client_sock);
573     return ret;
574   }
575 }
576
577 /**
578  * gst_rtsp_connection_get_tls:
579  * @conn: a #GstRTSPConnection
580  * @error: #GError for error reporting, or NULL to ignore.
581  *
582  * Get the TLS connection of @conn.
583  *
584  * For client side this will return the #GTlsClientConnection when connected
585  * over TLS.
586  *
587  * For server side connections, this function will create a GTlsServerConnection
588  * when called the first time and will return that same connection on subsequent
589  * calls. The server is then responsible for configuring the TLS connection.
590  *
591  * Returns: (transfer none): the TLS connection for @conn.
592  *
593  * Since: 1.2
594  */
595 GTlsConnection *
596 gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
597 {
598   GTlsConnection *result;
599
600   if (G_IS_TLS_CONNECTION (conn->stream0)) {
601     /* we already had one, return it */
602     result = G_TLS_CONNECTION (conn->stream0);
603   } else if (conn->server) {
604     /* no TLS connection but we are server, make one */
605     result = (GTlsConnection *)
606         g_tls_server_connection_new (conn->stream0, NULL, error);
607     if (result) {
608       g_object_unref (conn->stream0);
609       conn->stream0 = G_IO_STREAM (result);
610       conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
611       conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
612     }
613   } else {
614     /* client */
615     result = NULL;
616     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
617         "client not connected with TLS");
618   }
619   return result;
620 }
621
622 /**
623  * gst_rtsp_connection_set_tls_validation_flags:
624  * @conn: a #GstRTSPConnection
625  * @flags: the validation flags.
626  *
627  * Sets the TLS validation flags to be used to verify the peer
628  * certificate when a TLS connection is established.
629  *
630  * Returns: TRUE if the validation flags are set correctly, or FALSE if
631  * @conn is NULL or is not a TLS connection.
632  *
633  * Since: 1.2.1
634  */
635 gboolean
636 gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
637     GTlsCertificateFlags flags)
638 {
639   gboolean res = FALSE;
640
641   g_return_val_if_fail (conn != NULL, FALSE);
642
643   res = g_socket_client_get_tls (conn->client);
644   if (res)
645     g_socket_client_set_tls_validation_flags (conn->client, flags);
646
647   return res;
648 }
649
650 /**
651  * gst_rtsp_connection_get_tls_validation_flags:
652  * @conn: a #GstRTSPConnection
653  *
654  * Gets the TLS validation flags used to verify the peer certificate
655  * when a TLS connection is established.
656  *
657  * Returns: the validationg flags.
658  *
659  * Since: 1.2.1
660  */
661 GTlsCertificateFlags
662 gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
663 {
664   g_return_val_if_fail (conn != NULL, 0);
665
666   return g_socket_client_get_tls_validation_flags (conn->client);
667 }
668
669 /**
670  * gst_rtsp_connection_set_tls_database:
671  * @conn: a #GstRTSPConnection
672  * @database: a #GTlsDatabase
673  *
674  * Sets the anchor certificate authorities database. This certificate
675  * database will be used to verify the server's certificate in case it
676  * can't be verified with the default certificate database first.
677  *
678  * Since: 1.4
679  */
680 void
681 gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
682     GTlsDatabase * database)
683 {
684   GTlsDatabase *old_db;
685
686   g_return_if_fail (conn != NULL);
687
688   if (database)
689     g_object_ref (database);
690
691   old_db = conn->tls_database;
692   conn->tls_database = database;
693
694   if (old_db)
695     g_object_unref (old_db);
696 }
697
698 /**
699  * gst_rtsp_connection_get_tls_database:
700  * @conn: a #GstRTSPConnection
701  *
702  * Gets the anchor certificate authorities database that will be used
703  * after a server certificate can't be verified with the default
704  * certificate database.
705  *
706  * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
707  * database has been previously set. Use g_object_unref() to release the
708  * certificate database.
709  *
710  * Since: 1.4
711  */
712 GTlsDatabase *
713 gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
714 {
715   GTlsDatabase *result;
716
717   g_return_val_if_fail (conn != NULL, NULL);
718
719   if ((result = conn->tls_database))
720     g_object_ref (result);
721
722   return result;
723 }
724
725 /**
726  * gst_rtsp_connection_set_tls_interaction:
727  * @conn: a #GstRTSPConnection
728  * @interaction: a #GTlsInteraction
729  *
730  * Sets a #GTlsInteraction object to be used when the connection or certificate
731  * database need to interact with the user. This will be used to prompt the
732  * user for passwords where necessary.
733  *
734  * Since: 1.6
735  */
736 void
737 gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn,
738     GTlsInteraction * interaction)
739 {
740   GTlsInteraction *old_interaction;
741
742   g_return_if_fail (conn != NULL);
743
744   if (interaction)
745     g_object_ref (interaction);
746
747   old_interaction = conn->tls_interaction;
748   conn->tls_interaction = interaction;
749
750   if (old_interaction)
751     g_object_unref (old_interaction);
752 }
753
754 /**
755  * gst_rtsp_connection_get_tls_interaction:
756  * @conn: a #GstRTSPConnection
757  *
758  * Gets a #GTlsInteraction object to be used when the connection or certificate
759  * database need to interact with the user. This will be used to prompt the
760  * user for passwords where necessary.
761  *
762  * Returns: (transfer full): a reference on the #GTlsInteraction. Use
763  * g_object_unref() to release.
764  *
765  * Since: 1.6
766  */
767 GTlsInteraction *
768 gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn)
769 {
770   GTlsInteraction *result;
771
772   g_return_val_if_fail (conn != NULL, NULL);
773
774   if ((result = conn->tls_interaction))
775     g_object_ref (result);
776
777   return result;
778 }
779
780 /**
781  * gst_rtsp_connection_set_accept_certificate_func:
782  * @conn: a #GstRTSPConnection
783  * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates
784  * @destroy_notify: #GDestroyNotify for @user_data
785  * @user_data: User data passed to @func
786  *
787  * Sets a custom accept-certificate function for checking certificates for
788  * validity. This will directly map to #GTlsConnection 's "accept-certificate"
789  * signal and be performed after the default checks of #GstRTSPConnection
790  * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
791  * have failed. If no #GTlsDatabase is set on this connection, only @func will
792  * be called.
793  *
794  * Since: 1.14
795  */
796 void
797 gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn,
798     GstRTSPConnectionAcceptCertificateFunc func,
799     gpointer user_data, GDestroyNotify destroy_notify)
800 {
801   if (conn->accept_certificate_destroy_notify)
802     conn->
803         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
804   conn->accept_certificate_func = func;
805   conn->accept_certificate_user_data = user_data;
806   conn->accept_certificate_destroy_notify = destroy_notify;
807 }
808
809 static gchar *
810 get_tunneled_connection_uri_strdup (GstRTSPUrl * url, guint16 port)
811 {
812   const gchar *pre_host = "";
813   const gchar *post_host = "";
814
815   if (url->family == GST_RTSP_FAM_INET6) {
816     pre_host = "[";
817     post_host = "]";
818   }
819
820   return g_strdup_printf ("http://%s%s%s:%d%s%s%s", pre_host, url->host,
821       post_host, port, url->abspath, url->query ? "?" : "",
822       url->query ? url->query : "");
823 }
824
825 static GstRTSPResult
826 setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
827     GstRTSPMessage * response)
828 {
829   gint i;
830   GstRTSPResult res;
831   gchar *value;
832   guint16 url_port;
833   GstRTSPMessage *msg;
834   gboolean old_http;
835   GstRTSPUrl *url;
836   GError *error = NULL;
837   GSocketConnection *connection;
838   GSocket *socket;
839   gchar *connection_uri = NULL;
840   gchar *request_uri = NULL;
841   gchar *host = NULL;
842
843   url = conn->url;
844
845   gst_rtsp_url_get_port (url, &url_port);
846   host = g_strdup_printf ("%s:%d", url->host, url_port);
847
848   /* create a random sessionid */
849   for (i = 0; i < TUNNELID_LEN; i++)
850     conn->tunnelid[i] = g_random_int_range ('a', 'z');
851   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
852
853   /* create the GET request for the read connection */
854   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
855       no_message);
856   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
857
858   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
859       conn->tunnelid);
860   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
861       "application/x-rtsp-tunnelled");
862   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
863   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
864   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
865
866   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
867    * request from being base64 encoded */
868   conn->tunneled = FALSE;
869   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
870       write_failed);
871   gst_rtsp_message_free (msg);
872   conn->tunneled = TRUE;
873
874   /* receive the response to the GET request */
875   /* we need to temporarily set manual_http to TRUE since
876    * gst_rtsp_connection_receive() will treat the HTTP response as a parsing
877    * failure otherwise */
878   old_http = conn->manual_http;
879   conn->manual_http = TRUE;
880   GST_RTSP_CHECK (gst_rtsp_connection_receive_usec (conn, response, timeout),
881       read_failed);
882   conn->manual_http = old_http;
883
884   if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE ||
885       response->type_data.response.code != GST_RTSP_STS_OK)
886     goto wrong_result;
887
888   if (!conn->ignore_x_server_reply &&
889       gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
890           &value, 0) == GST_RTSP_OK) {
891     g_free (url->host);
892     url->host = g_strdup (value);
893     g_free (conn->remote_ip);
894     conn->remote_ip = g_strdup (value);
895   }
896
897   connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
898
899   /* connect to the host/port */
900   if (conn->proxy_host) {
901     connection = g_socket_client_connect_to_host (conn->client,
902         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
903     request_uri = g_strdup (connection_uri);
904   } else {
905     connection = g_socket_client_connect_to_uri (conn->client,
906         connection_uri, 0, conn->cancellable, &error);
907     request_uri =
908         g_strdup_printf ("%s%s%s", url->abspath,
909         url->query ? "?" : "", url->query ? url->query : "");
910   }
911   if (connection == NULL)
912     goto connect_failed;
913
914   socket = g_socket_connection_get_socket (connection);
915
916   /* get remote address */
917   g_free (conn->remote_ip);
918   conn->remote_ip = NULL;
919
920   if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
921     goto remote_address_failed;
922
923   /* this is now our writing socket */
924   conn->stream1 = G_IO_STREAM (connection);
925   conn->socket1 = socket;
926   conn->write_socket = conn->socket1;
927   conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
928   conn->control_stream = NULL;
929
930   /* create the POST request for the write connection */
931   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST,
932           request_uri), no_message);
933   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
934
935   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
936       conn->tunnelid);
937   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
938       "application/x-rtsp-tunnelled");
939   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
940       "application/x-rtsp-tunnelled");
941   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
942   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
943   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES,
944       "Sun, 9 Jan 1972 00:00:00 GMT");
945   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767");
946   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
947
948   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
949    * request from being base64 encoded */
950   conn->tunneled = FALSE;
951   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
952       write_failed);
953   gst_rtsp_message_free (msg);
954   conn->tunneled = TRUE;
955
956 exit:
957   g_free (connection_uri);
958   g_free (request_uri);
959   g_free (host);
960
961   return res;
962
963   /* ERRORS */
964 no_message:
965   {
966     GST_ERROR ("failed to create request (%d)", res);
967     goto exit;
968   }
969 write_failed:
970   {
971     GST_ERROR ("write failed (%d)", res);
972     gst_rtsp_message_free (msg);
973     conn->tunneled = TRUE;
974     goto exit;
975   }
976 read_failed:
977   {
978     GST_ERROR ("read failed (%d)", res);
979     conn->manual_http = FALSE;
980     goto exit;
981   }
982 wrong_result:
983   {
984     GST_ERROR ("got failure response %d %s",
985         response->type_data.response.code, response->type_data.response.reason);
986     res = GST_RTSP_ERROR;
987     goto exit;
988   }
989 connect_failed:
990   {
991     GST_ERROR ("failed to connect: %s", error->message);
992     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
993     g_clear_error (&error);
994     goto exit;
995   }
996 remote_address_failed:
997   {
998     GST_ERROR ("failed to resolve address: %s", error->message);
999     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1000     g_object_unref (connection);
1001     g_clear_error (&error);
1002     return res;
1003   }
1004 }
1005
1006 /**
1007  * gst_rtsp_connection_connect_with_response_usec:
1008  * @conn: a #GstRTSPConnection
1009  * @timeout: a timeout in microseconds
1010  * @response: a #GstRTSPMessage
1011  *
1012  * Attempt to connect to the url of @conn made with
1013  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1014  * forever. If @timeout contains a valid timeout, this function will return
1015  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
1016  * @response will contain a response to the tunneling request messages.
1017  *
1018  * This function can be cancelled with gst_rtsp_connection_flush().
1019  *
1020  * Returns: #GST_RTSP_OK when a connection could be made.
1021  *
1022  * Since: 1.18
1023  */
1024 GstRTSPResult
1025 gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
1026     gint64 timeout, GstRTSPMessage * response)
1027 {
1028   GstRTSPResult res;
1029   GSocketConnection *connection;
1030   GSocket *socket;
1031   GError *error = NULL;
1032   gchar *connection_uri, *request_uri, *remote_ip;
1033   GstClockTime to;
1034   guint16 url_port;
1035   GstRTSPUrl *url;
1036
1037   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1038   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
1039   g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
1040
1041   to = timeout * 1000;
1042   g_socket_client_set_timeout (conn->client,
1043       (to + GST_SECOND - 1) / GST_SECOND);
1044
1045   url = conn->url;
1046
1047   gst_rtsp_url_get_port (url, &url_port);
1048
1049   if (conn->tunneled) {
1050     connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
1051   } else {
1052     connection_uri = gst_rtsp_url_get_request_uri (url);
1053   }
1054
1055   if (conn->proxy_host) {
1056     connection = g_socket_client_connect_to_host (conn->client,
1057         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
1058     request_uri = g_strdup (connection_uri);
1059   } else {
1060     connection = g_socket_client_connect_to_uri (conn->client,
1061         connection_uri, url_port, conn->cancellable, &error);
1062
1063     /* use the relative component of the uri for non-proxy connections */
1064     request_uri = g_strdup_printf ("%s%s%s", url->abspath,
1065         url->query ? "?" : "", url->query ? url->query : "");
1066   }
1067   if (connection == NULL)
1068     goto connect_failed;
1069
1070   /* get remote address */
1071   socket = g_socket_connection_get_socket (connection);
1072
1073   if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
1074     goto remote_address_failed;
1075
1076   g_free (conn->remote_ip);
1077   conn->remote_ip = remote_ip;
1078   conn->stream0 = G_IO_STREAM (connection);
1079   conn->socket0 = socket;
1080   /* this is our read socket */
1081   conn->read_socket = conn->socket0;
1082   conn->write_socket = conn->socket0;
1083   conn->read_socket_used = FALSE;
1084   conn->write_socket_used = FALSE;
1085   conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
1086   conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
1087   conn->control_stream = NULL;
1088
1089   if (conn->tunneled) {
1090     res = setup_tunneling (conn, timeout, request_uri, response);
1091     if (res != GST_RTSP_OK)
1092       goto tunneling_failed;
1093   }
1094   g_free (connection_uri);
1095   g_free (request_uri);
1096
1097   return GST_RTSP_OK;
1098
1099   /* ERRORS */
1100 connect_failed:
1101   {
1102     GST_ERROR ("failed to connect: %s", error->message);
1103     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1104     g_clear_error (&error);
1105     g_free (connection_uri);
1106     g_free (request_uri);
1107     return res;
1108   }
1109 remote_address_failed:
1110   {
1111     GST_ERROR ("failed to connect: %s", error->message);
1112     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1113     g_object_unref (connection);
1114     g_clear_error (&error);
1115     g_free (connection_uri);
1116     g_free (request_uri);
1117     return res;
1118   }
1119 tunneling_failed:
1120   {
1121     GST_ERROR ("failed to setup tunneling");
1122     g_free (connection_uri);
1123     g_free (request_uri);
1124     return res;
1125   }
1126 }
1127
1128 static void
1129 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
1130 {
1131   switch (conn->auth_method) {
1132     case GST_RTSP_AUTH_BASIC:{
1133       gchar *user_pass;
1134       gchar *user_pass64;
1135       gchar *auth_string;
1136
1137       if (conn->username == NULL || conn->passwd == NULL)
1138         break;
1139
1140       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1141       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1142       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1143
1144       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1145           auth_string);
1146
1147       g_free (user_pass);
1148       g_free (user_pass64);
1149       break;
1150     }
1151     case GST_RTSP_AUTH_DIGEST:{
1152       gchar *response;
1153       gchar *auth_string, *auth_string2;
1154       gchar *realm;
1155       gchar *nonce;
1156       gchar *opaque;
1157       const gchar *uri;
1158       const gchar *method;
1159
1160       /* we need to have some params set */
1161       if (conn->auth_params == NULL || conn->username == NULL ||
1162           conn->passwd == NULL)
1163         break;
1164
1165       /* we need the realm and nonce */
1166       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1167       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1168       if (realm == NULL || nonce == NULL)
1169         break;
1170
1171       method = gst_rtsp_method_as_text (message->type_data.request.method);
1172       uri = message->type_data.request.uri;
1173
1174       response =
1175           gst_rtsp_generate_digest_auth_response (NULL, method, realm,
1176           conn->username, conn->passwd, uri, nonce);
1177       auth_string =
1178           g_strdup_printf ("Digest username=\"%s\", "
1179           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1180           conn->username, realm, nonce, uri, response);
1181       g_free (response);
1182
1183       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1184       if (opaque) {
1185         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1186             opaque);
1187         g_free (auth_string);
1188         auth_string = auth_string2;
1189       }
1190       /* Do not keep any old Authorization headers */
1191       gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1);
1192       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1193           auth_string);
1194       break;
1195     }
1196     default:
1197       /* Nothing to do */
1198       break;
1199   }
1200 }
1201
1202 /**
1203  * gst_rtsp_connection_connect_usec:
1204  * @conn: a #GstRTSPConnection
1205  * @timeout: a timeout in microseconds
1206  *
1207  * Attempt to connect to the url of @conn made with
1208  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1209  * forever. If @timeout contains a valid timeout, this function will return
1210  * #GST_RTSP_ETIMEOUT after the timeout expired.
1211  *
1212  * This function can be cancelled with gst_rtsp_connection_flush().
1213  *
1214  * Returns: #GST_RTSP_OK when a connection could be made.
1215  *
1216  * Since: 1.18
1217  */
1218 GstRTSPResult
1219 gst_rtsp_connection_connect_usec (GstRTSPConnection * conn, gint64 timeout)
1220 {
1221   GstRTSPResult result;
1222   GstRTSPMessage response;
1223
1224   memset (&response, 0, sizeof (response));
1225   gst_rtsp_message_init (&response);
1226
1227   result = gst_rtsp_connection_connect_with_response_usec (conn, timeout,
1228       &response);
1229
1230   gst_rtsp_message_unset (&response);
1231
1232   return result;
1233 }
1234
1235 static void
1236 gen_date_string (gchar * date_string, guint len)
1237 {
1238   static const char wkdays[7][4] =
1239       { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
1240   static const char months[12][4] =
1241       { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
1242     "Nov", "Dec"
1243   };
1244   struct tm tm;
1245   time_t t;
1246
1247   time (&t);
1248
1249 #ifdef HAVE_GMTIME_R
1250   gmtime_r (&t, &tm);
1251 #else
1252   tm = *gmtime (&t);
1253 #endif
1254
1255   g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
1256       wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
1257       tm.tm_hour, tm.tm_min, tm.tm_sec);
1258 }
1259
1260 static GstRTSPResult
1261 write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
1262     guint size, gboolean block, GCancellable * cancellable)
1263 {
1264   guint left;
1265   gssize r;
1266   GstRTSPResult res;
1267   GError *err = NULL;
1268
1269   if (G_UNLIKELY (*idx > size))
1270     return GST_RTSP_ERROR;
1271
1272   left = size - *idx;
1273
1274   while (left) {
1275     if (block)
1276       r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
1277           cancellable, &err);
1278     else
1279       r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
1280           (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
1281     if (G_UNLIKELY (r < 0))
1282       goto error;
1283
1284     left -= r;
1285     *idx += r;
1286   }
1287   return GST_RTSP_OK;
1288
1289   /* ERRORS */
1290 error:
1291   {
1292     if (G_UNLIKELY (r == 0))
1293       return GST_RTSP_EEOF;
1294
1295     if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1296       GST_WARNING ("%s", err->message);
1297     else
1298       GST_DEBUG ("%s", err->message);
1299
1300     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1301     g_clear_error (&err);
1302     return res;
1303   }
1304 }
1305
1306 /* NOTE: This changes the values of vectors if multiple iterations are needed! */
1307 static GstRTSPResult
1308 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1309     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1310 {
1311   gsize _bytes_written = 0;
1312   gsize written;
1313   GstRTSPResult ret;
1314   GError *err = NULL;
1315   GPollableReturn res = G_POLLABLE_RETURN_OK;
1316
1317   while (n_vectors > 0) {
1318     if (block) {
1319       if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors,
1320                   &written, cancellable, &err))) {
1321         /* This will never return G_IO_ERROR_WOULD_BLOCK */
1322         res = G_POLLABLE_RETURN_FAILED;
1323         goto error;
1324       }
1325     } else {
1326       res =
1327           g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
1328           (stream), vectors, n_vectors, &written, cancellable, &err);
1329
1330       if (res != G_POLLABLE_RETURN_OK) {
1331         g_assert (written == 0);
1332         goto error;
1333       }
1334     }
1335     _bytes_written += written;
1336
1337     /* skip vectors that have been written in full */
1338     while (written > 0 && written >= vectors[0].size) {
1339       written -= vectors[0].size;
1340       ++vectors;
1341       --n_vectors;
1342     }
1343
1344     /* skip partially written vector data */
1345     if (written > 0) {
1346       vectors[0].size -= written;
1347       vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
1348     }
1349   }
1350
1351   *bytes_written = _bytes_written;
1352
1353   return GST_RTSP_OK;
1354
1355   /* ERRORS */
1356 error:
1357   {
1358     *bytes_written = _bytes_written;
1359
1360     if (err)
1361       GST_WARNING ("%s", err->message);
1362     if (res == G_POLLABLE_RETURN_WOULD_BLOCK) {
1363       g_assert (!err);
1364       return GST_RTSP_EINTR;
1365     } else if (G_UNLIKELY (written == 0)) {
1366       g_clear_error (&err);
1367       return GST_RTSP_EEOF;
1368     }
1369
1370     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1371     g_clear_error (&err);
1372     return ret;
1373   }
1374 }
1375
1376 static gint
1377 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1378     gboolean block, GError ** err)
1379 {
1380   gint out = 0;
1381
1382   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1383     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1384
1385     out = MIN (left, size);
1386     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1387
1388     if (left == (gsize) out) {
1389       g_free (conn->initial_buffer);
1390       conn->initial_buffer = NULL;
1391       conn->initial_buffer_offset = 0;
1392     } else
1393       conn->initial_buffer_offset += out;
1394   }
1395
1396   if (G_LIKELY (size > (guint) out)) {
1397     gssize r;
1398     gsize count = size - out;
1399     if (block)
1400       r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
1401           count, conn->may_cancel ? conn->cancellable : NULL, err);
1402     else
1403       r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
1404           (conn->input_stream), (gchar *) & buffer[out], count,
1405           conn->may_cancel ? conn->cancellable : NULL, err);
1406
1407     if (G_UNLIKELY (r < 0)) {
1408       if (out == 0) {
1409         /* propagate the error */
1410         out = r;
1411       } else {
1412         /* we have some data ignore error */
1413         g_clear_error (err);
1414       }
1415     } else
1416       out += r;
1417   }
1418
1419   return out;
1420 }
1421
1422 static gint
1423 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1424     gboolean block, GError ** err)
1425 {
1426   DecodeCtx *ctx = conn->ctxp;
1427   gint out = 0;
1428
1429   if (ctx) {
1430     while (size > 0) {
1431       guint8 in[sizeof (ctx->out) * 4 / 3];
1432       gint r;
1433
1434       while (size > 0 && ctx->cout < ctx->coutl) {
1435         /* we have some leftover bytes */
1436         *buffer++ = ctx->out[ctx->cout++];
1437         size--;
1438         out++;
1439       }
1440
1441       /* got what we needed? */
1442       if (size == 0)
1443         break;
1444
1445       /* try to read more bytes */
1446       r = fill_raw_bytes (conn, in, sizeof (in), block, err);
1447       if (r <= 0) {
1448         if (out == 0) {
1449           out = r;
1450         } else {
1451           /* we have some data ignore error */
1452           g_clear_error (err);
1453         }
1454         break;
1455       }
1456
1457       ctx->cout = 0;
1458       ctx->coutl =
1459           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1460           &ctx->save);
1461     }
1462   } else {
1463     out = fill_raw_bytes (conn, buffer, size, block, err);
1464   }
1465
1466   return out;
1467 }
1468
1469 static GstRTSPResult
1470 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1471     gboolean block)
1472 {
1473   guint left;
1474   gint r;
1475   GstRTSPResult res;
1476   GError *err = NULL;
1477
1478   if (G_UNLIKELY (*idx > size))
1479     return GST_RTSP_ERROR;
1480
1481   left = size - *idx;
1482
1483   while (left) {
1484     r = fill_bytes (conn, &buffer[*idx], left, block, &err);
1485     if (G_UNLIKELY (r <= 0))
1486       goto error;
1487
1488     left -= r;
1489     *idx += r;
1490   }
1491   return GST_RTSP_OK;
1492
1493   /* ERRORS */
1494 error:
1495   {
1496     if (G_UNLIKELY (r == 0))
1497       return GST_RTSP_EEOF;
1498
1499     GST_DEBUG ("%s", err->message);
1500     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1501     g_clear_error (&err);
1502     return res;
1503   }
1504 }
1505
1506 /* The code below tries to handle clients using \r, \n or \r\n to indicate the
1507  * end of a line. It even does its best to handle clients which mix them (even
1508  * though this is a really stupid idea (tm).) It also handles Line White Space
1509  * (LWS), where a line end followed by whitespace is considered LWS. This is
1510  * the method used in RTSP (and HTTP) to break long lines.
1511  */
1512 static GstRTSPResult
1513 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1514     gboolean block)
1515 {
1516   GstRTSPResult res;
1517
1518   while (TRUE) {
1519     guint8 c;
1520     guint i;
1521
1522     if (conn->read_ahead == READ_AHEAD_EOH) {
1523       /* the last call to read_line() already determined that we have reached
1524        * the end of the headers, so convey that information now */
1525       conn->read_ahead = 0;
1526       break;
1527     } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1528       /* the last call to read_line() left off after having read \r\n */
1529       c = '\n';
1530     } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1531       /* the last call to read_line() left off after having read \r\n\r */
1532       c = '\r';
1533     } else if (conn->read_ahead != 0) {
1534       /* the last call to read_line() left us with a character to start with */
1535       c = (guint8) conn->read_ahead;
1536       conn->read_ahead = 0;
1537     } else {
1538       /* read the next character */
1539       i = 0;
1540       res = read_bytes (conn, &c, &i, 1, block);
1541       if (G_UNLIKELY (res != GST_RTSP_OK))
1542         return res;
1543     }
1544
1545     /* special treatment of line endings */
1546     if (c == '\r' || c == '\n') {
1547       guint8 read_ahead;
1548
1549     retry:
1550       /* need to read ahead one more character to know what to do... */
1551       i = 0;
1552       res = read_bytes (conn, &read_ahead, &i, 1, block);
1553       if (G_UNLIKELY (res != GST_RTSP_OK))
1554         return res;
1555
1556       if (read_ahead == ' ' || read_ahead == '\t') {
1557         if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1558           /* got \r\n\r followed by whitespace, treat it as a normal line
1559            * followed by one starting with LWS */
1560           conn->read_ahead = read_ahead;
1561           break;
1562         } else {
1563           /* got LWS, change the line ending to a space and continue */
1564           c = ' ';
1565           conn->read_ahead = read_ahead;
1566         }
1567       } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1568         if (read_ahead == '\r' || read_ahead == '\n') {
1569           /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */
1570           conn->read_ahead = READ_AHEAD_EOH;
1571           break;
1572         } else {
1573           /* got \r\n\r followed by something else, this is not really
1574            * supported since we have probably just eaten the first character
1575            * of the body or the next message, so just ignore the second \r
1576            * and live with it... */
1577           conn->read_ahead = read_ahead;
1578           break;
1579         }
1580       } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1581         if (read_ahead == '\r') {
1582           /* got \r\n\r so far, need one more character... */
1583           conn->read_ahead = READ_AHEAD_CRLFCR;
1584           goto retry;
1585         } else if (read_ahead == '\n') {
1586           /* got \r\n\n, treat it as the end of the headers */
1587           conn->read_ahead = READ_AHEAD_EOH;
1588           break;
1589         } else {
1590           /* found the end of a line, keep read_ahead for the next line */
1591           conn->read_ahead = read_ahead;
1592           break;
1593         }
1594       } else if (c == read_ahead) {
1595         /* got double \r or \n, treat it as the end of the headers */
1596         conn->read_ahead = READ_AHEAD_EOH;
1597         break;
1598       } else if (c == '\r' && read_ahead == '\n') {
1599         /* got \r\n so far, still need more to know what to do... */
1600         conn->read_ahead = READ_AHEAD_CRLF;
1601         goto retry;
1602       } else {
1603         /* found the end of a line, keep read_ahead for the next line */
1604         conn->read_ahead = read_ahead;
1605         break;
1606       }
1607     }
1608
1609     if (G_LIKELY (*idx < size - 1))
1610       buffer[(*idx)++] = c;
1611   }
1612   buffer[*idx] = '\0';
1613
1614   return GST_RTSP_OK;
1615 }
1616
1617 static void
1618 set_read_socket_timeout (GstRTSPConnection * conn, gint64 timeout)
1619 {
1620   GstClockTime to_nsecs;
1621   guint to_secs;
1622
1623   g_mutex_lock (&conn->socket_use_mutex);
1624
1625   g_assert (!conn->read_socket_used);
1626   conn->read_socket_used = TRUE;
1627
1628   to_nsecs = timeout * 1000;
1629   to_secs = (to_nsecs + GST_SECOND - 1) / GST_SECOND;
1630
1631   if (to_secs > g_socket_get_timeout (conn->read_socket)) {
1632     g_socket_set_timeout (conn->read_socket, to_secs);
1633   }
1634
1635   g_mutex_unlock (&conn->socket_use_mutex);
1636 }
1637
1638 static void
1639 set_write_socket_timeout (GstRTSPConnection * conn, gint64 timeout)
1640 {
1641   GstClockTime to_nsecs;
1642   guint to_secs;
1643
1644   g_mutex_lock (&conn->socket_use_mutex);
1645
1646   g_assert (!conn->write_socket_used);
1647   conn->write_socket_used = TRUE;
1648
1649   to_nsecs = timeout * 1000;
1650   to_secs = (to_nsecs + GST_SECOND - 1) / GST_SECOND;
1651
1652   if (to_secs > g_socket_get_timeout (conn->write_socket)) {
1653     g_socket_set_timeout (conn->write_socket, to_secs);
1654   }
1655
1656   g_mutex_unlock (&conn->socket_use_mutex);
1657 }
1658
1659 static void
1660 clear_read_socket_timeout (GstRTSPConnection * conn)
1661 {
1662   g_mutex_lock (&conn->socket_use_mutex);
1663
1664   conn->read_socket_used = FALSE;
1665   if (conn->read_socket != conn->write_socket || !conn->write_socket_used) {
1666     g_socket_set_timeout (conn->read_socket, 0);
1667   }
1668
1669   g_mutex_unlock (&conn->socket_use_mutex);
1670 }
1671
1672 static void
1673 clear_write_socket_timeout (GstRTSPConnection * conn)
1674 {
1675   g_mutex_lock (&conn->socket_use_mutex);
1676
1677   conn->write_socket_used = FALSE;
1678   if (conn->write_socket != conn->read_socket || !conn->read_socket_used) {
1679     g_socket_set_timeout (conn->write_socket, 0);
1680   }
1681
1682   g_mutex_unlock (&conn->socket_use_mutex);
1683 }
1684
1685 /**
1686  * gst_rtsp_connection_write_usec:
1687  * @conn: a #GstRTSPConnection
1688  * @data: the data to write
1689  * @size: the size of @data
1690  * @timeout: a timeout value or 0
1691  *
1692  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1693  * the specified @timeout. @timeout can be 0, in which case this function
1694  * might block forever.
1695  *
1696  * This function can be cancelled with gst_rtsp_connection_flush().
1697  *
1698  * Returns: #GST_RTSP_OK on success.
1699  *
1700  * Since: 1.18
1701  */
1702 /* FIXME 2.0: This should've been static! */
1703 GstRTSPResult
1704 gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
1705     guint size, gint64 timeout)
1706 {
1707   guint offset;
1708   GstRTSPResult res;
1709
1710   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1711   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1712   g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
1713
1714   offset = 0;
1715
1716   set_write_socket_timeout (conn, timeout);
1717
1718   res =
1719       write_bytes (conn->output_stream, data, &offset, size, TRUE,
1720       conn->cancellable);
1721
1722   clear_write_socket_timeout (conn);
1723
1724   return res;
1725 }
1726
1727 static gboolean
1728 serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
1729     GstRTSPSerializedMessage * serialized_message)
1730 {
1731   GString *str = NULL;
1732
1733   memset (serialized_message, 0, sizeof (*serialized_message));
1734
1735   /* Initially we borrow the body_data / body_buffer fields from
1736    * the message */
1737   serialized_message->borrowed = TRUE;
1738
1739   switch (message->type) {
1740     case GST_RTSP_MESSAGE_REQUEST:
1741       str = g_string_new ("");
1742
1743       /* create request string, add CSeq */
1744       g_string_append_printf (str, "%s %s RTSP/%s\r\n"
1745           "CSeq: %d\r\n",
1746           gst_rtsp_method_as_text (message->type_data.request.method),
1747           message->type_data.request.uri,
1748           gst_rtsp_version_as_text (message->type_data.request.version),
1749           conn->cseq++);
1750       /* add session id if we have one */
1751       if (conn->session_id[0] != '\0') {
1752         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1753         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1754             conn->session_id);
1755       }
1756       /* add any authentication headers */
1757       add_auth_header (conn, message);
1758       break;
1759     case GST_RTSP_MESSAGE_RESPONSE:
1760       str = g_string_new ("");
1761
1762       /* create response string */
1763       g_string_append_printf (str, "RTSP/%s %d %s\r\n",
1764           gst_rtsp_version_as_text (message->type_data.response.version),
1765           message->type_data.response.code, message->type_data.response.reason);
1766       break;
1767     case GST_RTSP_MESSAGE_HTTP_REQUEST:
1768       str = g_string_new ("");
1769
1770       /* create request string */
1771       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1772           gst_rtsp_method_as_text (message->type_data.request.method),
1773           message->type_data.request.uri,
1774           gst_rtsp_version_as_text (message->type_data.request.version));
1775       /* add any authentication headers */
1776       add_auth_header (conn, message);
1777       break;
1778     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1779       str = g_string_new ("");
1780
1781       /* create response string */
1782       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1783           gst_rtsp_version_as_text (message->type_data.request.version),
1784           message->type_data.response.code, message->type_data.response.reason);
1785       break;
1786     case GST_RTSP_MESSAGE_DATA:
1787     {
1788       guint8 *data_header = serialized_message->data_header;
1789
1790       /* prepare data header */
1791       data_header[0] = '$';
1792       data_header[1] = message->type_data.data.channel;
1793       data_header[2] = (message->body_size >> 8) & 0xff;
1794       data_header[3] = message->body_size & 0xff;
1795
1796       /* create serialized message with header and data */
1797       serialized_message->data_is_data_header = TRUE;
1798       serialized_message->data_size = 4;
1799
1800       if (message->body) {
1801         serialized_message->body_data = message->body;
1802         serialized_message->body_data_size = message->body_size;
1803       } else {
1804         g_assert (message->body_buffer != NULL);
1805         serialized_message->body_buffer = message->body_buffer;
1806       }
1807       break;
1808     }
1809     default:
1810       g_string_free (str, TRUE);
1811       g_return_val_if_reached (FALSE);
1812       break;
1813   }
1814
1815   /* append headers and body */
1816   if (message->type != GST_RTSP_MESSAGE_DATA) {
1817     gchar date_string[100];
1818
1819     g_assert (str != NULL);
1820
1821     gen_date_string (date_string, sizeof (date_string));
1822
1823     /* add date header */
1824     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1825     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1826
1827     /* append headers */
1828     gst_rtsp_message_append_headers (message, str);
1829
1830     /* append Content-Length and body if needed */
1831     if (message->body_size > 0) {
1832       gchar *len;
1833
1834       len = g_strdup_printf ("%d", message->body_size);
1835       g_string_append_printf (str, "%s: %s\r\n",
1836           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1837       g_free (len);
1838       /* header ends here */
1839       g_string_append (str, "\r\n");
1840
1841       if (message->body) {
1842         serialized_message->body_data = message->body;
1843         serialized_message->body_data_size = message->body_size;
1844       } else {
1845         g_assert (message->body_buffer != NULL);
1846         serialized_message->body_buffer = message->body_buffer;
1847       }
1848     } else {
1849       /* just end headers */
1850       g_string_append (str, "\r\n");
1851     }
1852
1853     serialized_message->data_size = str->len;
1854     serialized_message->data = (guint8 *) g_string_free (str, FALSE);
1855   }
1856
1857   return TRUE;
1858 }
1859
1860 /**
1861  * gst_rtsp_connection_send_usec:
1862  * @conn: a #GstRTSPConnection
1863  * @message: the message to send
1864  * @timeout: a timeout value in microseconds
1865  *
1866  * Attempt to send @message to the connected @conn, blocking up to
1867  * the specified @timeout. @timeout can be 0, in which case this function
1868  * might block forever.
1869  *
1870  * This function can be cancelled with gst_rtsp_connection_flush().
1871  *
1872  * Returns: #GST_RTSP_OK on success.
1873  *
1874  * Since: 1.18
1875  */
1876 GstRTSPResult
1877 gst_rtsp_connection_send_usec (GstRTSPConnection * conn,
1878     GstRTSPMessage * message, gint64 timeout)
1879 {
1880   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1881   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1882
1883   return gst_rtsp_connection_send_messages_usec (conn, message, 1, timeout);
1884 }
1885
1886 /**
1887  * gst_rtsp_connection_send_messages_usec:
1888  * @conn: a #GstRTSPConnection
1889  * @messages: (array length=n_messages): the messages to send
1890  * @n_messages: the number of messages to send
1891  * @timeout: a timeout value in microseconds
1892  *
1893  * Attempt to send @messages to the connected @conn, blocking up to
1894  * the specified @timeout. @timeout can be 0, in which case this function
1895  * might block forever.
1896  *
1897  * This function can be cancelled with gst_rtsp_connection_flush().
1898  *
1899  * Returns: #GST_RTSP_OK on Since.
1900  *
1901  * Since: 1.18
1902  */
1903 GstRTSPResult
1904 gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
1905     GstRTSPMessage * messages, guint n_messages, gint64 timeout)
1906 {
1907   GstRTSPResult res;
1908   GstRTSPSerializedMessage *serialized_messages;
1909   GOutputVector *vectors;
1910   GstMapInfo *map_infos;
1911   guint n_vectors, n_memories;
1912   gint i, j, k;
1913   gsize bytes_to_write, bytes_written;
1914
1915   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1916   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
1917
1918   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
1919   memset (serialized_messages, 0,
1920       sizeof (GstRTSPSerializedMessage) * n_messages);
1921
1922   for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
1923       i++) {
1924     if (G_UNLIKELY (!serialize_message (conn, &messages[i],
1925                 &serialized_messages[i])))
1926       goto no_message;
1927
1928     if (conn->tunneled) {
1929       gint state = 0, save = 0;
1930       gchar *base64_buffer, *out_buffer;
1931       gsize written = 0;
1932       gsize in_length;
1933
1934       in_length = serialized_messages[i].data_size;
1935       if (serialized_messages[i].body_data)
1936         in_length += serialized_messages[i].body_data_size;
1937       else if (serialized_messages[i].body_buffer)
1938         in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
1939
1940       in_length = (in_length / 3 + 1) * 4 + 4 + 1;
1941       base64_buffer = out_buffer = g_malloc0 (in_length);
1942
1943       written =
1944           g_base64_encode_step (serialized_messages[i].data_is_data_header ?
1945           serialized_messages[i].data_header : serialized_messages[i].data,
1946           serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
1947       out_buffer += written;
1948
1949       if (serialized_messages[i].body_data) {
1950         written =
1951             g_base64_encode_step (serialized_messages[i].body_data,
1952             serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
1953             &save);
1954         out_buffer += written;
1955       } else if (serialized_messages[i].body_buffer) {
1956         guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1957
1958         for (j = 0; j < n; j++) {
1959           GstMemory *mem =
1960               gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
1961           GstMapInfo map;
1962
1963           gst_memory_map (mem, &map, GST_MAP_READ);
1964
1965           written = g_base64_encode_step (map.data, map.size,
1966               FALSE, out_buffer, &state, &save);
1967           out_buffer += written;
1968
1969           gst_memory_unmap (mem, &map);
1970         }
1971       }
1972
1973       written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
1974       out_buffer += written;
1975
1976       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1977       memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
1978
1979       serialized_messages[i].data = (guint8 *) base64_buffer;
1980       serialized_messages[i].data_size = (out_buffer - base64_buffer);
1981       n_vectors++;
1982     } else {
1983       n_vectors++;
1984       if (serialized_messages[i].body_data) {
1985         n_vectors++;
1986       } else if (serialized_messages[i].body_buffer) {
1987         n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1988         n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1989       }
1990     }
1991   }
1992
1993   vectors = g_newa (GOutputVector, n_vectors);
1994   map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
1995
1996   for (i = 0, j = 0, k = 0; i < n_messages; i++) {
1997     vectors[j].buffer = serialized_messages[i].data_is_data_header ?
1998         serialized_messages[i].data_header : serialized_messages[i].data;
1999     vectors[j].size = serialized_messages[i].data_size;
2000     bytes_to_write += vectors[j].size;
2001     j++;
2002
2003     if (serialized_messages[i].body_data) {
2004       vectors[j].buffer = serialized_messages[i].body_data;
2005       vectors[j].size = serialized_messages[i].body_data_size;
2006       bytes_to_write += vectors[j].size;
2007       j++;
2008     } else if (serialized_messages[i].body_buffer) {
2009       gint l, n;
2010
2011       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
2012       for (l = 0; l < n; l++) {
2013         GstMemory *mem =
2014             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
2015
2016         gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
2017         vectors[j].buffer = map_infos[k].data;
2018         vectors[j].size = map_infos[k].size;
2019         bytes_to_write += vectors[j].size;
2020
2021         k++;
2022         j++;
2023       }
2024     }
2025   }
2026
2027   /* write request: this is synchronous */
2028   set_write_socket_timeout (conn, timeout);
2029
2030   res =
2031       writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
2032       TRUE, conn->cancellable);
2033
2034   clear_write_socket_timeout (conn);
2035
2036   g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
2037
2038   /* free everything */
2039   for (i = 0, k = 0; i < n_messages; i++) {
2040     if (serialized_messages[i].body_buffer) {
2041       gint l, n;
2042
2043       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
2044       for (l = 0; l < n; l++) {
2045         GstMemory *mem =
2046             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
2047
2048         gst_memory_unmap (mem, &map_infos[k]);
2049         k++;
2050       }
2051     }
2052
2053     g_free (serialized_messages[i].data);
2054   }
2055
2056   return res;
2057
2058 no_message:
2059   {
2060     for (i = 0; i < n_messages; i++) {
2061       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
2062     }
2063     g_warning ("Wrong message");
2064     return GST_RTSP_EINVAL;
2065   }
2066 }
2067
2068 static GstRTSPResult
2069 parse_string (gchar * dest, gint size, gchar ** src)
2070 {
2071   GstRTSPResult res = GST_RTSP_OK;
2072   gint idx;
2073
2074   idx = 0;
2075   /* skip spaces */
2076   while (g_ascii_isspace (**src))
2077     (*src)++;
2078
2079   while (!g_ascii_isspace (**src) && **src != '\0') {
2080     if (idx < size - 1)
2081       dest[idx++] = **src;
2082     else
2083       res = GST_RTSP_EPARSE;
2084     (*src)++;
2085   }
2086   if (size > 0)
2087     dest[idx] = '\0';
2088
2089   return res;
2090 }
2091
2092 static GstRTSPResult
2093 parse_protocol_version (gchar * protocol, GstRTSPMsgType * type,
2094     GstRTSPVersion * version)
2095 {
2096   GstRTSPVersion rversion;
2097   GstRTSPResult res = GST_RTSP_OK;
2098   gchar *ver;
2099
2100   if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) {
2101     guint major;
2102     guint minor;
2103     gchar dummychar;
2104
2105     *ver++ = '\0';
2106
2107     /* the version number must be formatted as X.Y with nothing following */
2108     if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2)
2109       res = GST_RTSP_EPARSE;
2110
2111     rversion = major * 0x10 + minor;
2112     if (g_ascii_strcasecmp (protocol, "RTSP") == 0) {
2113
2114       if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) {
2115         *version = GST_RTSP_VERSION_INVALID;
2116         res = GST_RTSP_ERROR;
2117       }
2118     } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) {
2119       if (*type == GST_RTSP_MESSAGE_REQUEST)
2120         *type = GST_RTSP_MESSAGE_HTTP_REQUEST;
2121       else if (*type == GST_RTSP_MESSAGE_RESPONSE)
2122         *type = GST_RTSP_MESSAGE_HTTP_RESPONSE;
2123
2124       if (rversion != GST_RTSP_VERSION_1_0 &&
2125           rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0)
2126         res = GST_RTSP_ERROR;
2127     } else
2128       res = GST_RTSP_EPARSE;
2129   } else
2130     res = GST_RTSP_EPARSE;
2131
2132   if (res == GST_RTSP_OK)
2133     *version = rversion;
2134
2135   return res;
2136 }
2137
2138 static GstRTSPResult
2139 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
2140 {
2141   GstRTSPResult res = GST_RTSP_OK;
2142   GstRTSPResult res2;
2143   gchar versionstr[20];
2144   gchar codestr[4];
2145   gint code;
2146   gchar *bptr;
2147
2148   bptr = (gchar *) buffer;
2149
2150   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2151     res = GST_RTSP_EPARSE;
2152
2153   if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK)
2154     res = GST_RTSP_EPARSE;
2155   code = atoi (codestr);
2156   if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600))
2157     res = GST_RTSP_EPARSE;
2158
2159   while (g_ascii_isspace (*bptr))
2160     bptr++;
2161
2162   if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr,
2163               NULL) != GST_RTSP_OK))
2164     res = GST_RTSP_EPARSE;
2165
2166   res2 = parse_protocol_version (versionstr, &msg->type,
2167       &msg->type_data.response.version);
2168   if (G_LIKELY (res == GST_RTSP_OK))
2169     res = res2;
2170
2171   return res;
2172 }
2173
2174 static GstRTSPResult
2175 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
2176 {
2177   GstRTSPResult res = GST_RTSP_OK;
2178   GstRTSPResult res2;
2179   gchar versionstr[20];
2180   gchar methodstr[20];
2181   gchar urlstr[4096];
2182   gchar *bptr;
2183   GstRTSPMethod method;
2184
2185   bptr = (gchar *) buffer;
2186
2187   if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK)
2188     res = GST_RTSP_EPARSE;
2189   method = gst_rtsp_find_method (methodstr);
2190
2191   if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK)
2192     res = GST_RTSP_EPARSE;
2193   if (G_UNLIKELY (*urlstr == '\0'))
2194     res = GST_RTSP_EPARSE;
2195
2196   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2197     res = GST_RTSP_EPARSE;
2198
2199   if (G_UNLIKELY (*bptr != '\0'))
2200     res = GST_RTSP_EPARSE;
2201
2202   if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method,
2203               urlstr) != GST_RTSP_OK))
2204     res = GST_RTSP_EPARSE;
2205
2206   res2 = parse_protocol_version (versionstr, &msg->type,
2207       &msg->type_data.request.version);
2208   if (G_LIKELY (res == GST_RTSP_OK))
2209     res = res2;
2210
2211   if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) {
2212     /* GET and POST are not allowed as RTSP methods */
2213     if (msg->type_data.request.method == GST_RTSP_GET ||
2214         msg->type_data.request.method == GST_RTSP_POST) {
2215       msg->type_data.request.method = GST_RTSP_INVALID;
2216       if (res == GST_RTSP_OK)
2217         res = GST_RTSP_ERROR;
2218     }
2219   } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2220     /* only GET and POST are allowed as HTTP methods */
2221     if (msg->type_data.request.method != GST_RTSP_GET &&
2222         msg->type_data.request.method != GST_RTSP_POST) {
2223       msg->type_data.request.method = GST_RTSP_INVALID;
2224       if (res == GST_RTSP_OK)
2225         res = GST_RTSP_ERROR;
2226     }
2227   }
2228
2229   return res;
2230 }
2231
2232 /* parsing lines means reading a Key: Value pair */
2233 static GstRTSPResult
2234 parse_line (guint8 * buffer, GstRTSPMessage * msg)
2235 {
2236   GstRTSPHeaderField field;
2237   gchar *line = (gchar *) buffer;
2238   gchar *field_name = NULL;
2239   gchar *value;
2240
2241   if ((value = strchr (line, ':')) == NULL || value == line)
2242     goto parse_error;
2243
2244   /* trim space before the colon */
2245   if (value[-1] == ' ')
2246     value[-1] = '\0';
2247
2248   /* replace the colon with a NUL */
2249   *value++ = '\0';
2250
2251   /* find the header */
2252   field = gst_rtsp_find_header_field (line);
2253   /* custom header not present in the list of pre-defined headers */
2254   if (field == GST_RTSP_HDR_INVALID)
2255     field_name = line;
2256
2257   /* split up the value in multiple key:value pairs if it contains comma(s) */
2258   while (*value != '\0') {
2259     gchar *next_value;
2260     gchar *comma = NULL;
2261     gboolean quoted = FALSE;
2262     guint comment = 0;
2263
2264     /* trim leading space */
2265     if (*value == ' ')
2266       value++;
2267
2268     /* for headers which may not appear multiple times, and thus may not
2269      * contain multiple values on the same line, we can short-circuit the loop
2270      * below and the entire value results in just one key:value pair*/
2271     if (!gst_rtsp_header_allow_multiple (field))
2272       next_value = value + strlen (value);
2273     else
2274       next_value = value;
2275
2276     /* find the next value, taking special care of quotes and comments */
2277     while (*next_value != '\0') {
2278       if ((quoted || comment != 0) && *next_value == '\\' &&
2279           next_value[1] != '\0')
2280         next_value++;
2281       else if (comment == 0 && *next_value == '"')
2282         quoted = !quoted;
2283       else if (!quoted && *next_value == '(')
2284         comment++;
2285       else if (comment != 0 && *next_value == ')')
2286         comment--;
2287       else if (!quoted && comment == 0) {
2288         /* To quote RFC 2068: "User agents MUST take special care in parsing
2289          * the WWW-Authenticate field value if it contains more than one
2290          * challenge, or if more than one WWW-Authenticate header field is
2291          * provided, since the contents of a challenge may itself contain a
2292          * comma-separated list of authentication parameters."
2293          *
2294          * What this means is that we cannot just look for an unquoted comma
2295          * when looking for multiple values in Proxy-Authenticate and
2296          * WWW-Authenticate headers. Instead we need to look for the sequence
2297          * "comma [space] token space token" before we can split after the
2298          * comma...
2299          */
2300         if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE ||
2301             field == GST_RTSP_HDR_WWW_AUTHENTICATE) {
2302           if (*next_value == ',') {
2303             if (next_value[1] == ' ') {
2304               /* skip any space following the comma so we do not mistake it for
2305                * separating between two tokens */
2306               next_value++;
2307             }
2308             comma = next_value;
2309           } else if (*next_value == ' ' && next_value[1] != ',' &&
2310               next_value[1] != '=' && comma != NULL) {
2311             next_value = comma;
2312             comma = NULL;
2313             break;
2314           }
2315         } else if (*next_value == ',')
2316           break;
2317       }
2318
2319       next_value++;
2320     }
2321
2322     if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) {
2323       /* The timeout parameter is only allowed in a session response header
2324        * but some clients send it as part of the session request header.
2325        * Ignore everything from the semicolon to the end of the line. */
2326       next_value = value;
2327       while (*next_value != '\0') {
2328         if (*next_value == ';') {
2329           break;
2330         }
2331         next_value++;
2332       }
2333     }
2334
2335     /* trim space */
2336     if (value != next_value && next_value[-1] == ' ')
2337       next_value[-1] = '\0';
2338
2339     if (*next_value != '\0')
2340       *next_value++ = '\0';
2341
2342     /* add the key:value pair */
2343     if (*value != '\0') {
2344       if (field != GST_RTSP_HDR_INVALID)
2345         gst_rtsp_message_add_header (msg, field, value);
2346       else
2347         gst_rtsp_message_add_header_by_name (msg, field_name, value);
2348     }
2349
2350     value = next_value;
2351   }
2352
2353   return GST_RTSP_OK;
2354
2355   /* ERRORS */
2356 parse_error:
2357   {
2358     return GST_RTSP_EPARSE;
2359   }
2360 }
2361
2362 /* convert all consecutive whitespace to a single space */
2363 static void
2364 normalize_line (guint8 * buffer)
2365 {
2366   while (*buffer) {
2367     if (g_ascii_isspace (*buffer)) {
2368       guint8 *tmp;
2369
2370       *buffer++ = ' ';
2371       for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) {
2372       }
2373       if (buffer != tmp)
2374         memmove (buffer, tmp, strlen ((gchar *) tmp) + 1);
2375     } else {
2376       buffer++;
2377     }
2378   }
2379 }
2380
2381 static gboolean
2382 cseq_validation (GstRTSPConnection * conn, GstRTSPMessage * message)
2383 {
2384   gchar *cseq_header;
2385   gint64 cseq = 0;
2386   GstRTSPResult res;
2387
2388   if (message->type == GST_RTSP_MESSAGE_RESPONSE ||
2389       message->type == GST_RTSP_MESSAGE_REQUEST) {
2390     if ((res = gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ,
2391                 &cseq_header, 0)) != GST_RTSP_OK) {
2392       /* rfc2326 This field MUST be present in all RTSP req and resp */
2393       goto invalid_format;
2394     }
2395
2396     errno = 0;
2397     cseq = g_ascii_strtoll (cseq_header, NULL, 10);
2398     if (errno != 0 || cseq < 0) {
2399       /* CSeq has no valid value */
2400       goto invalid_format;
2401     }
2402
2403     if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2404         (conn->cseq == 0 || conn->cseq < cseq)) {
2405       /* Response CSeq can't be higher than the number of outgoing requests
2406        * neither is a response valid if no request has been made */
2407       goto invalid_format;
2408     }
2409   }
2410   return GST_RTSP_OK;
2411
2412 invalid_format:
2413   {
2414     return GST_RTSP_EPARSE;
2415   }
2416 }
2417
2418 /* returns:
2419  *  GST_RTSP_OK when a complete message was read.
2420  *  GST_RTSP_EEOF: when the read socket is closed
2421  *  GST_RTSP_EINTR: when more data is needed.
2422  *  GST_RTSP_..: some other error occurred.
2423  */
2424 static GstRTSPResult
2425 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
2426     GstRTSPConnection * conn, gboolean block)
2427 {
2428   GstRTSPResult res;
2429
2430   while (TRUE) {
2431     switch (builder->state) {
2432       case STATE_START:
2433       {
2434         guint8 c;
2435
2436         builder->offset = 0;
2437         res =
2438             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
2439             block);
2440         if (res != GST_RTSP_OK)
2441           goto done;
2442
2443         c = builder->buffer[0];
2444
2445         /* we have 1 bytes now and we can see if this is a data message or
2446          * not */
2447         if (c == '$') {
2448           /* data message, prepare for the header */
2449           builder->state = STATE_DATA_HEADER;
2450           conn->may_cancel = FALSE;
2451         } else if (c == '\n' || c == '\r') {
2452           /* skip \n and \r */
2453           builder->offset = 0;
2454         } else {
2455           builder->line = 0;
2456           builder->state = STATE_READ_LINES;
2457           conn->may_cancel = FALSE;
2458         }
2459         break;
2460       }
2461       case STATE_DATA_HEADER:
2462       {
2463         res =
2464             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
2465             block);
2466         if (res != GST_RTSP_OK)
2467           goto done;
2468
2469         gst_rtsp_message_init_data (message, builder->buffer[1]);
2470
2471         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
2472         builder->body_data = g_malloc (builder->body_len + 1);
2473         builder->body_data[builder->body_len] = '\0';
2474         builder->offset = 0;
2475         builder->state = STATE_DATA_BODY;
2476         break;
2477       }
2478       case STATE_DATA_BODY:
2479       {
2480         res =
2481             read_bytes (conn, builder->body_data, &builder->offset,
2482             builder->body_len, block);
2483         if (res != GST_RTSP_OK)
2484           goto done;
2485
2486         /* we have the complete body now, store in the message adjusting the
2487          * length to include the trailing '\0' */
2488         gst_rtsp_message_take_body (message,
2489             (guint8 *) builder->body_data, builder->body_len + 1);
2490         builder->body_data = NULL;
2491         builder->body_len = 0;
2492
2493         builder->state = STATE_END;
2494         break;
2495       }
2496       case STATE_READ_LINES:
2497       {
2498         res = read_line (conn, builder->buffer, &builder->offset,
2499             sizeof (builder->buffer), block);
2500         if (res != GST_RTSP_OK)
2501           goto done;
2502
2503         /* we have a regular response */
2504         if (builder->buffer[0] == '\0') {
2505           gchar *hdrval;
2506           gint64 content_length_parsed = 0;
2507
2508           /* empty line, end of message header */
2509           /* see if there is a Content-Length header, but ignore it if this
2510            * is a POST request with an x-sessioncookie header */
2511           if (gst_rtsp_message_get_header (message,
2512                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK &&
2513               (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST ||
2514                   message->type_data.request.method != GST_RTSP_POST ||
2515                   gst_rtsp_message_get_header (message,
2516                       GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
2517             /* there is, prepare to read the body */
2518             errno = 0;
2519             content_length_parsed = g_ascii_strtoll (hdrval, NULL, 10);
2520             if (errno != 0 || content_length_parsed < 0) {
2521               res = GST_RTSP_EPARSE;
2522               goto invalid_body_len;
2523             } else if (content_length_parsed > conn->content_length_limit) {
2524               res = GST_RTSP_ENOMEM;
2525               goto invalid_body_len;
2526             }
2527             builder->body_len = content_length_parsed;
2528             builder->body_data = g_try_malloc (builder->body_len + 1);
2529             /* we can't do much here, we need the length to know how many bytes
2530              * we need to read next and when allocation fails, we can't read the payload. */
2531             if (builder->body_data == NULL) {
2532               res = GST_RTSP_ENOMEM;
2533               goto invalid_body_len;
2534             }
2535
2536             builder->body_data[builder->body_len] = '\0';
2537             builder->offset = 0;
2538             builder->state = STATE_DATA_BODY;
2539           } else {
2540             builder->state = STATE_END;
2541           }
2542           break;
2543         }
2544
2545         /* we have a line */
2546         normalize_line (builder->buffer);
2547         if (builder->line == 0) {
2548           /* first line, check for response status */
2549           if (memcmp (builder->buffer, "RTSP", 4) == 0 ||
2550               memcmp (builder->buffer, "HTTP", 4) == 0) {
2551             builder->status = parse_response_status (builder->buffer, message);
2552           } else {
2553             builder->status = parse_request_line (builder->buffer, message);
2554           }
2555         } else {
2556           /* else just parse the line */
2557           res = parse_line (builder->buffer, message);
2558           if (res != GST_RTSP_OK)
2559             builder->status = res;
2560         }
2561         if (builder->status != GST_RTSP_OK) {
2562           res = builder->status;
2563           goto invalid_format;
2564         }
2565
2566         builder->line++;
2567         builder->offset = 0;
2568         break;
2569       }
2570       case STATE_END:
2571       {
2572         gchar *session_cookie;
2573         gchar *session_id;
2574
2575         conn->may_cancel = TRUE;
2576
2577         if ((res = cseq_validation (conn, message)) != GST_RTSP_OK) {
2578           /* message don't comply with rfc2326 regarding CSeq */
2579           goto invalid_format;
2580         }
2581
2582         if (message->type == GST_RTSP_MESSAGE_DATA) {
2583           /* data messages don't have headers */
2584           res = GST_RTSP_OK;
2585           goto done;
2586         }
2587
2588         /* save the tunnel session in the connection */
2589         if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST &&
2590             !conn->manual_http &&
2591             conn->tstate == TUNNEL_STATE_NONE &&
2592             gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE,
2593                 &session_cookie, 0) == GST_RTSP_OK) {
2594           strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN);
2595           conn->tunnelid[TUNNELID_LEN - 1] = '\0';
2596           conn->tunneled = TRUE;
2597         }
2598
2599         /* save session id in the connection for further use */
2600         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2601             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
2602                 &session_id, 0) == GST_RTSP_OK) {
2603           gint maxlen, i;
2604
2605           maxlen = sizeof (conn->session_id) - 1;
2606           /* the sessionid can have attributes marked with ;
2607            * Make sure we strip them */
2608           for (i = 0; i < maxlen && session_id[i] != '\0'; i++) {
2609             if (session_id[i] == ';') {
2610               maxlen = i;
2611               /* parse timeout */
2612               do {
2613                 i++;
2614               } while (g_ascii_isspace (session_id[i]));
2615               if (g_str_has_prefix (&session_id[i], "timeout=")) {
2616                 gint to;
2617
2618                 /* if we parsed something valid, configure */
2619                 if ((to = atoi (&session_id[i + 8])) > 0)
2620                   conn->timeout = to;
2621               }
2622               break;
2623             }
2624           }
2625
2626           /* make sure to not overflow */
2627           if (conn->remember_session_id) {
2628             strncpy (conn->session_id, session_id, maxlen);
2629             conn->session_id[maxlen] = '\0';
2630           }
2631         }
2632         res = builder->status;
2633         goto done;
2634       }
2635       default:
2636         res = GST_RTSP_ERROR;
2637         goto done;
2638     }
2639   }
2640 done:
2641   conn->may_cancel = TRUE;
2642   return res;
2643
2644   /* ERRORS */
2645 invalid_body_len:
2646   {
2647     conn->may_cancel = TRUE;
2648     GST_DEBUG ("could not allocate body");
2649     return res;
2650   }
2651 invalid_format:
2652   {
2653     conn->may_cancel = TRUE;
2654     GST_DEBUG ("could not parse");
2655     return res;
2656   }
2657 }
2658
2659 /**
2660  * gst_rtsp_connection_read_usec:
2661  * @conn: a #GstRTSPConnection
2662  * @data: the data to read
2663  * @size: the size of @data
2664  * @timeout: a timeout value in microseconds
2665  *
2666  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
2667  * the specified @timeout. @timeout can be 0, in which case this function
2668  * might block forever.
2669  *
2670  * This function can be cancelled with gst_rtsp_connection_flush().
2671  *
2672  * Returns: #GST_RTSP_OK on success.
2673  *
2674  * Since: 1.18
2675  */
2676 GstRTSPResult
2677 gst_rtsp_connection_read_usec (GstRTSPConnection * conn, guint8 * data,
2678     guint size, gint64 timeout)
2679 {
2680   guint offset;
2681   GstRTSPResult res;
2682
2683   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2684   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
2685   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2686
2687   if (G_UNLIKELY (size == 0))
2688     return GST_RTSP_OK;
2689
2690   offset = 0;
2691
2692   /* configure timeout if any */
2693   set_read_socket_timeout (conn, timeout);
2694
2695   res = read_bytes (conn, data, &offset, size, TRUE);
2696
2697   clear_read_socket_timeout (conn);
2698
2699   return res;
2700 }
2701
2702 static GstRTSPMessage *
2703 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
2704     const GstRTSPMessage * request)
2705 {
2706   GstRTSPMessage *msg;
2707   GstRTSPResult res;
2708
2709   if (gst_rtsp_status_as_text (code) == NULL)
2710     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
2711
2712   GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request),
2713       no_message);
2714
2715   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER,
2716       "GStreamer RTSP Server");
2717   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close");
2718   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store");
2719   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
2720
2721   if (code == GST_RTSP_STS_OK) {
2722     /* add the local ip address to the tunnel reply, this is where the client
2723      * should send the POST request to */
2724     if (conn->local_ip)
2725       gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
2726           conn->local_ip);
2727     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
2728         "application/x-rtsp-tunnelled");
2729   }
2730
2731   return msg;
2732
2733   /* ERRORS */
2734 no_message:
2735   {
2736     return NULL;
2737   }
2738 }
2739
2740 /**
2741  * gst_rtsp_connection_receive_usec:
2742  * @conn: a #GstRTSPConnection
2743  * @message: the message to read
2744  * @timeout: a timeout value or 0
2745  *
2746  * Attempt to read into @message from the connected @conn, blocking up to
2747  * the specified @timeout. @timeout can be 0, in which case this function
2748  * might block forever.
2749  *
2750  * This function can be cancelled with gst_rtsp_connection_flush().
2751  *
2752  * Returns: #GST_RTSP_OK on success.
2753  *
2754  * Since: 1.18
2755  */
2756 GstRTSPResult
2757 gst_rtsp_connection_receive_usec (GstRTSPConnection * conn,
2758     GstRTSPMessage * message, gint64 timeout)
2759 {
2760   GstRTSPResult res;
2761   GstRTSPBuilder builder;
2762
2763   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2764   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2765   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2766
2767   /* configure timeout if any */
2768   set_read_socket_timeout (conn, timeout);
2769
2770   memset (&builder, 0, sizeof (GstRTSPBuilder));
2771   res = build_next (&builder, message, conn, TRUE);
2772
2773   clear_read_socket_timeout (conn);
2774
2775   if (G_UNLIKELY (res != GST_RTSP_OK))
2776     goto read_error;
2777
2778   if (!conn->manual_http) {
2779     if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2780       if (conn->tstate == TUNNEL_STATE_NONE &&
2781           message->type_data.request.method == GST_RTSP_GET) {
2782         GstRTSPMessage *response;
2783
2784         conn->tstate = TUNNEL_STATE_GET;
2785
2786         /* tunnel GET request, we can reply now */
2787         response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
2788         res = gst_rtsp_connection_send_usec (conn, response, timeout);
2789         gst_rtsp_message_free (response);
2790         if (res == GST_RTSP_OK)
2791           res = GST_RTSP_ETGET;
2792         goto cleanup;
2793       } else if (conn->tstate == TUNNEL_STATE_NONE &&
2794           message->type_data.request.method == GST_RTSP_POST) {
2795         conn->tstate = TUNNEL_STATE_POST;
2796
2797         /* tunnel POST request, the caller now has to link the two
2798          * connections. */
2799         res = GST_RTSP_ETPOST;
2800         goto cleanup;
2801       } else {
2802         res = GST_RTSP_EPARSE;
2803         goto cleanup;
2804       }
2805     } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
2806       res = GST_RTSP_EPARSE;
2807       goto cleanup;
2808     }
2809   }
2810
2811   /* we have a message here */
2812   build_reset (&builder);
2813
2814   return GST_RTSP_OK;
2815
2816   /* ERRORS */
2817 read_error:
2818 cleanup:
2819   {
2820     build_reset (&builder);
2821     gst_rtsp_message_unset (message);
2822     return res;
2823   }
2824 }
2825
2826 /**
2827  * gst_rtsp_connection_close:
2828  * @conn: a #GstRTSPConnection
2829  *
2830  * Close the connected @conn. After this call, the connection is in the same
2831  * state as when it was first created.
2832  *
2833  * Returns: #GST_RTSP_OK on success.
2834  */
2835 GstRTSPResult
2836 gst_rtsp_connection_close (GstRTSPConnection * conn)
2837 {
2838   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2839
2840   /* last unref closes the connection we don't want to explicitly close here
2841    * because these sockets might have been provided at construction */
2842   if (conn->stream0) {
2843     g_object_unref (conn->stream0);
2844     conn->stream0 = NULL;
2845     conn->socket0 = NULL;
2846   }
2847   if (conn->stream1) {
2848     g_object_unref (conn->stream1);
2849     conn->stream1 = NULL;
2850     conn->socket1 = NULL;
2851   }
2852
2853   /* these were owned by the stream */
2854   conn->input_stream = NULL;
2855   conn->output_stream = NULL;
2856   conn->control_stream = NULL;
2857
2858   g_free (conn->remote_ip);
2859   conn->remote_ip = NULL;
2860   g_free (conn->local_ip);
2861   conn->local_ip = NULL;
2862
2863   conn->read_ahead = 0;
2864
2865   g_free (conn->initial_buffer);
2866   conn->initial_buffer = NULL;
2867   conn->initial_buffer_offset = 0;
2868
2869   conn->write_socket = NULL;
2870   conn->read_socket = NULL;
2871   conn->write_socket_used = FALSE;
2872   conn->read_socket_used = FALSE;
2873   conn->tunneled = FALSE;
2874   conn->tstate = TUNNEL_STATE_NONE;
2875   conn->ctxp = NULL;
2876   g_free (conn->username);
2877   conn->username = NULL;
2878   g_free (conn->passwd);
2879   conn->passwd = NULL;
2880   gst_rtsp_connection_clear_auth_params (conn);
2881   conn->timeout = 60;
2882   conn->cseq = 0;
2883   conn->session_id[0] = '\0';
2884
2885   return GST_RTSP_OK;
2886 }
2887
2888 /**
2889  * gst_rtsp_connection_free:
2890  * @conn: a #GstRTSPConnection
2891  *
2892  * Close and free @conn.
2893  *
2894  * Returns: #GST_RTSP_OK on success.
2895  */
2896 GstRTSPResult
2897 gst_rtsp_connection_free (GstRTSPConnection * conn)
2898 {
2899   GstRTSPResult res;
2900
2901   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2902
2903   res = gst_rtsp_connection_close (conn);
2904
2905   if (conn->cancellable)
2906     g_object_unref (conn->cancellable);
2907   if (conn->client)
2908     g_object_unref (conn->client);
2909   if (conn->tls_database)
2910     g_object_unref (conn->tls_database);
2911   if (conn->tls_interaction)
2912     g_object_unref (conn->tls_interaction);
2913   if (conn->accept_certificate_destroy_notify)
2914     conn->
2915         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
2916
2917   g_timer_destroy (conn->timer);
2918   gst_rtsp_url_free (conn->url);
2919   g_free (conn->proxy_host);
2920   g_free (conn);
2921
2922   return res;
2923 }
2924
2925 /**
2926  * gst_rtsp_connection_poll_usec:
2927  * @conn: a #GstRTSPConnection
2928  * @events: a bitmask of #GstRTSPEvent flags to check
2929  * @revents: location for result flags
2930  * @timeout: a timeout in microseconds
2931  *
2932  * Wait up to the specified @timeout for the connection to become available for
2933  * at least one of the operations specified in @events. When the function returns
2934  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2935  * @conn.
2936  *
2937  * @timeout can be 0, in which case this function might block forever.
2938  *
2939  * This function can be cancelled with gst_rtsp_connection_flush().
2940  *
2941  * Returns: #GST_RTSP_OK on success.
2942  *
2943  * Since: 1.18
2944  */
2945 GstRTSPResult
2946 gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
2947     GstRTSPEvent * revents, gint64 timeout)
2948 {
2949   GMainContext *ctx;
2950   GSource *rs, *ws, *ts;
2951   GIOCondition condition;
2952
2953   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2954   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2955   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2956   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2957   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
2958
2959   ctx = g_main_context_new ();
2960
2961   /* configure timeout if any */
2962   if (timeout) {
2963     ts = g_timeout_source_new (timeout / 1000);
2964     g_source_set_dummy_callback (ts);
2965     g_source_attach (ts, ctx);
2966     g_source_unref (ts);
2967   }
2968
2969   if (events & GST_RTSP_EV_READ) {
2970     rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
2971         conn->cancellable);
2972     g_source_set_dummy_callback (rs);
2973     g_source_attach (rs, ctx);
2974     g_source_unref (rs);
2975   }
2976
2977   if (events & GST_RTSP_EV_WRITE) {
2978     ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
2979         conn->cancellable);
2980     g_source_set_dummy_callback (ws);
2981     g_source_attach (ws, ctx);
2982     g_source_unref (ws);
2983   }
2984
2985   /* Returns after handling all pending events */
2986   while (!g_main_context_iteration (ctx, TRUE));
2987
2988   g_main_context_unref (ctx);
2989
2990   *revents = 0;
2991   if (events & GST_RTSP_EV_READ) {
2992     condition = g_socket_condition_check (conn->read_socket,
2993         G_IO_IN | G_IO_PRI);
2994     if ((condition & G_IO_IN) || (condition & G_IO_PRI))
2995       *revents |= GST_RTSP_EV_READ;
2996   }
2997   if (events & GST_RTSP_EV_WRITE) {
2998     condition = g_socket_condition_check (conn->write_socket, G_IO_OUT);
2999     if ((condition & G_IO_OUT))
3000       *revents |= GST_RTSP_EV_WRITE;
3001   }
3002
3003   if (*revents == 0)
3004     return GST_RTSP_ETIMEOUT;
3005
3006   return GST_RTSP_OK;
3007 }
3008
3009 /**
3010  * gst_rtsp_connection_next_timeout_usec:
3011  * @conn: a #GstRTSPConnection
3012  *
3013  * Calculate the next timeout for @conn
3014  *
3015  * Returns: #the next timeout in microseconds
3016  *
3017  * Since: 1.18
3018  */
3019 gint64
3020 gst_rtsp_connection_next_timeout_usec (GstRTSPConnection * conn)
3021 {
3022   gdouble elapsed;
3023   gulong usec;
3024   gint ctimeout;
3025   gint64 timeout = 0;
3026
3027   g_return_val_if_fail (conn != NULL, 1);
3028
3029   ctimeout = conn->timeout;
3030   if (ctimeout >= 20) {
3031     /* Because we should act before the timeout we timeout 5
3032      * seconds in advance. */
3033     ctimeout -= 5;
3034   } else if (ctimeout >= 5) {
3035     /* else timeout 20% earlier */
3036     ctimeout -= ctimeout / 5;
3037   } else if (ctimeout >= 1) {
3038     /* else timeout 1 second earlier */
3039     ctimeout -= 1;
3040   }
3041
3042   elapsed = g_timer_elapsed (conn->timer, &usec);
3043   if (elapsed >= ctimeout) {
3044     timeout = 0;
3045   } else {
3046     gint64 sec = ctimeout - elapsed;
3047     if (usec <= G_USEC_PER_SEC)
3048       usec = G_USEC_PER_SEC - usec;
3049     else
3050       usec = 0;
3051     timeout = usec + sec * G_USEC_PER_SEC;
3052   }
3053
3054   return timeout;
3055 }
3056
3057 /**
3058  * gst_rtsp_connection_reset_timeout:
3059  * @conn: a #GstRTSPConnection
3060  *
3061  * Reset the timeout of @conn.
3062  *
3063  * Returns: #GST_RTSP_OK.
3064  */
3065 GstRTSPResult
3066 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
3067 {
3068   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3069
3070   g_timer_start (conn->timer);
3071
3072   return GST_RTSP_OK;
3073 }
3074
3075 /**
3076  * gst_rtsp_connection_flush:
3077  * @conn: a #GstRTSPConnection
3078  * @flush: start or stop the flush
3079  *
3080  * Start or stop the flushing action on @conn. When flushing, all current
3081  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
3082  * is set to non-flushing mode again.
3083  *
3084  * Returns: #GST_RTSP_OK.
3085  */
3086 GstRTSPResult
3087 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
3088 {
3089   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3090
3091   if (flush) {
3092     g_cancellable_cancel (conn->cancellable);
3093   } else {
3094     g_object_unref (conn->cancellable);
3095     conn->cancellable = g_cancellable_new ();
3096   }
3097
3098   return GST_RTSP_OK;
3099 }
3100
3101 /**
3102  * gst_rtsp_connection_set_proxy:
3103  * @conn: a #GstRTSPConnection
3104  * @host: the proxy host
3105  * @port: the proxy port
3106  *
3107  * Set the proxy host and port.
3108  *
3109  * Returns: #GST_RTSP_OK.
3110  */
3111 GstRTSPResult
3112 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
3113     const gchar * host, guint port)
3114 {
3115   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3116
3117   g_free (conn->proxy_host);
3118   conn->proxy_host = g_strdup (host);
3119   conn->proxy_port = port;
3120
3121   return GST_RTSP_OK;
3122 }
3123
3124 /**
3125  * gst_rtsp_connection_set_auth:
3126  * @conn: a #GstRTSPConnection
3127  * @method: authentication method
3128  * @user: the user
3129  * @pass: the password
3130  *
3131  * Configure @conn for authentication mode @method with @user and @pass as the
3132  * user and password respectively.
3133  *
3134  * Returns: #GST_RTSP_OK.
3135  */
3136 GstRTSPResult
3137 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
3138     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
3139 {
3140   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3141
3142   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
3143           || g_strrstr (user, ":") != NULL))
3144     return GST_RTSP_EINVAL;
3145
3146   /* Make sure the username and passwd are being set for authentication */
3147   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
3148     return GST_RTSP_EINVAL;
3149
3150   /* ":" chars are not allowed in usernames for basic auth */
3151   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
3152     return GST_RTSP_EINVAL;
3153
3154   g_free (conn->username);
3155   g_free (conn->passwd);
3156
3157   conn->auth_method = method;
3158   conn->username = g_strdup (user);
3159   conn->passwd = g_strdup (pass);
3160
3161   return GST_RTSP_OK;
3162 }
3163
3164 /**
3165  * str_case_hash:
3166  * @key: ASCII string to hash
3167  *
3168  * Hashes @key in a case-insensitive manner.
3169  *
3170  * Returns: the hash code.
3171  **/
3172 static guint
3173 str_case_hash (gconstpointer key)
3174 {
3175   const char *p = key;
3176   guint h = g_ascii_toupper (*p);
3177
3178   if (h)
3179     for (p += 1; *p != '\0'; p++)
3180       h = (h << 5) - h + g_ascii_toupper (*p);
3181
3182   return h;
3183 }
3184
3185 /**
3186  * str_case_equal:
3187  * @v1: an ASCII string
3188  * @v2: another ASCII string
3189  *
3190  * Compares @v1 and @v2 in a case-insensitive manner
3191  *
3192  * Returns: %TRUE if they are equal (modulo case)
3193  **/
3194 static gboolean
3195 str_case_equal (gconstpointer v1, gconstpointer v2)
3196 {
3197   const char *string1 = v1;
3198   const char *string2 = v2;
3199
3200   return g_ascii_strcasecmp (string1, string2) == 0;
3201 }
3202
3203 /**
3204  * gst_rtsp_connection_set_auth_param:
3205  * @conn: a #GstRTSPConnection
3206  * @param: authentication directive
3207  * @value: value
3208  *
3209  * Setup @conn with authentication directives. This is not necessary for
3210  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
3211  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
3212  * in the WWW-Authenticate response header and can include realm, domain,
3213  * nonce, opaque, stale, algorithm, qop as per RFC2617.
3214  */
3215 void
3216 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
3217     const gchar * param, const gchar * value)
3218 {
3219   g_return_if_fail (conn != NULL);
3220   g_return_if_fail (param != NULL);
3221
3222   if (conn->auth_params == NULL) {
3223     conn->auth_params =
3224         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
3225   }
3226   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
3227 }
3228
3229 /**
3230  * gst_rtsp_connection_clear_auth_params:
3231  * @conn: a #GstRTSPConnection
3232  *
3233  * Clear the list of authentication directives stored in @conn.
3234  */
3235 void
3236 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
3237 {
3238   g_return_if_fail (conn != NULL);
3239
3240   if (conn->auth_params != NULL) {
3241     g_hash_table_destroy (conn->auth_params);
3242     conn->auth_params = NULL;
3243   }
3244 }
3245
3246 static GstRTSPResult
3247 set_qos_dscp (GSocket * socket, guint qos_dscp)
3248 {
3249 #ifndef IP_TOS
3250   GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
3251   return GST_RTSP_OK;
3252 #else
3253   gint fd;
3254   union gst_sockaddr sa;
3255   socklen_t slen = sizeof (sa);
3256   gint af;
3257   gint tos;
3258
3259   if (!socket)
3260     return GST_RTSP_OK;
3261
3262   fd = g_socket_get_fd (socket);
3263   if (getsockname (fd, &sa.sa, &slen) < 0)
3264     goto no_getsockname;
3265
3266   af = sa.sa.sa_family;
3267
3268   /* if this is an IPv4-mapped address then do IPv4 QoS */
3269   if (af == AF_INET6) {
3270     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
3271       af = AF_INET;
3272   }
3273
3274   /* extract and shift 6 bits of the DSCP */
3275   tos = (qos_dscp & 0x3f) << 2;
3276
3277 #ifdef G_OS_WIN32
3278 #  define SETSOCKOPT_ARG4_TYPE const char *
3279 #else
3280 #  define SETSOCKOPT_ARG4_TYPE const void *
3281 #endif
3282
3283   switch (af) {
3284     case AF_INET:
3285       if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos,
3286               sizeof (tos)) < 0)
3287         goto no_setsockopt;
3288       break;
3289     case AF_INET6:
3290 #ifdef IPV6_TCLASS
3291       if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS,
3292               (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0)
3293         goto no_setsockopt;
3294       break;
3295 #endif
3296     default:
3297       goto wrong_family;
3298   }
3299
3300   return GST_RTSP_OK;
3301
3302   /* ERRORS */
3303 no_getsockname:
3304 no_setsockopt:
3305   {
3306     return GST_RTSP_ESYS;
3307   }
3308 wrong_family:
3309   {
3310     return GST_RTSP_ERROR;
3311   }
3312 #endif
3313 }
3314
3315 /**
3316  * gst_rtsp_connection_set_qos_dscp:
3317  * @conn: a #GstRTSPConnection
3318  * @qos_dscp: DSCP value
3319  *
3320  * Configure @conn to use the specified DSCP value.
3321  *
3322  * Returns: #GST_RTSP_OK on success.
3323  */
3324 GstRTSPResult
3325 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
3326 {
3327   GstRTSPResult res;
3328
3329   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3330   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
3331   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
3332
3333   res = set_qos_dscp (conn->socket0, qos_dscp);
3334   if (res == GST_RTSP_OK)
3335     res = set_qos_dscp (conn->socket1, qos_dscp);
3336
3337   return res;
3338 }
3339
3340 /**
3341  * gst_rtsp_connection_set_content_length_limit:
3342  * @conn: a #GstRTSPConnection
3343  * @limit: Content-Length limit
3344  *
3345  * Configure @conn to use the specified Content-Length limit.
3346  * Both requests and responses are validated. If content-length is
3347  * exceeded, ENOMEM error will be returned.
3348  *
3349  * Since: 1.18
3350  */
3351 void
3352 gst_rtsp_connection_set_content_length_limit (GstRTSPConnection * conn,
3353     guint limit)
3354 {
3355   g_return_if_fail (conn != NULL);
3356
3357   conn->content_length_limit = limit;
3358 }
3359
3360 /**
3361  * gst_rtsp_connection_get_url:
3362  * @conn: a #GstRTSPConnection
3363  *
3364  * Retrieve the URL of the other end of @conn.
3365  *
3366  * Returns: The URL. This value remains valid until the
3367  * connection is freed.
3368  */
3369 GstRTSPUrl *
3370 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
3371 {
3372   g_return_val_if_fail (conn != NULL, NULL);
3373
3374   return conn->url;
3375 }
3376
3377 /**
3378  * gst_rtsp_connection_get_ip:
3379  * @conn: a #GstRTSPConnection
3380  *
3381  * Retrieve the IP address of the other end of @conn.
3382  *
3383  * Returns: The IP address as a string. this value remains valid until the
3384  * connection is closed.
3385  */
3386 const gchar *
3387 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
3388 {
3389   g_return_val_if_fail (conn != NULL, NULL);
3390
3391   return conn->remote_ip;
3392 }
3393
3394 /**
3395  * gst_rtsp_connection_set_ip:
3396  * @conn: a #GstRTSPConnection
3397  * @ip: an ip address
3398  *
3399  * Set the IP address of the server.
3400  */
3401 void
3402 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
3403 {
3404   g_return_if_fail (conn != NULL);
3405
3406   g_free (conn->remote_ip);
3407   conn->remote_ip = g_strdup (ip);
3408 }
3409
3410 /**
3411  * gst_rtsp_connection_get_read_socket:
3412  * @conn: a #GstRTSPConnection
3413  *
3414  * Get the file descriptor for reading.
3415  *
3416  * Returns: (transfer none): the file descriptor used for reading or %NULL on
3417  * error. The file descriptor remains valid until the connection is closed.
3418  */
3419 GSocket *
3420 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
3421 {
3422   g_return_val_if_fail (conn != NULL, NULL);
3423   g_return_val_if_fail (conn->read_socket != NULL, NULL);
3424
3425   return conn->read_socket;
3426 }
3427
3428 /**
3429  * gst_rtsp_connection_get_write_socket:
3430  * @conn: a #GstRTSPConnection
3431  *
3432  * Get the file descriptor for writing.
3433  *
3434  * Returns: (transfer none): the file descriptor used for writing or NULL on
3435  * error. The file descriptor remains valid until the connection is closed.
3436  */
3437 GSocket *
3438 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
3439 {
3440   g_return_val_if_fail (conn != NULL, NULL);
3441   g_return_val_if_fail (conn->write_socket != NULL, NULL);
3442
3443   return conn->write_socket;
3444 }
3445
3446 /**
3447  * gst_rtsp_connection_set_http_mode:
3448  * @conn: a #GstRTSPConnection
3449  * @enable: %TRUE to enable manual HTTP mode
3450  *
3451  * By setting the HTTP mode to %TRUE the message parsing will support HTTP
3452  * messages in addition to the RTSP messages. It will also disable the
3453  * automatic handling of setting up an HTTP tunnel.
3454  */
3455 void
3456 gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
3457 {
3458   g_return_if_fail (conn != NULL);
3459
3460   conn->manual_http = enable;
3461 }
3462
3463 /**
3464  * gst_rtsp_connection_set_tunneled:
3465  * @conn: a #GstRTSPConnection
3466  * @tunneled: the new state
3467  *
3468  * Set the HTTP tunneling state of the connection. This must be configured before
3469  * the @conn is connected.
3470  */
3471 void
3472 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
3473 {
3474   g_return_if_fail (conn != NULL);
3475   g_return_if_fail (conn->read_socket == NULL);
3476   g_return_if_fail (conn->write_socket == NULL);
3477
3478   conn->tunneled = tunneled;
3479 }
3480
3481 /**
3482  * gst_rtsp_connection_is_tunneled:
3483  * @conn: a #GstRTSPConnection
3484  *
3485  * Get the tunneling state of the connection.
3486  *
3487  * Returns: if @conn is using HTTP tunneling.
3488  */
3489 gboolean
3490 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
3491 {
3492   g_return_val_if_fail (conn != NULL, FALSE);
3493
3494   return conn->tunneled;
3495 }
3496
3497 /**
3498  * gst_rtsp_connection_get_tunnelid:
3499  * @conn: a #GstRTSPConnection
3500  *
3501  * Get the tunnel session id the connection.
3502  *
3503  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
3504  */
3505 const gchar *
3506 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
3507 {
3508   g_return_val_if_fail (conn != NULL, NULL);
3509
3510   if (!conn->tunneled)
3511     return NULL;
3512
3513   return conn->tunnelid;
3514 }
3515
3516 /**
3517  * gst_rtsp_connection_set_ignore_x_server_reply:
3518  * @conn: a #GstRTSPConnection
3519  * @ignore: %TRUE to ignore the x-server-ip-address header reply or %FALSE to
3520  *          comply with it (%FALSE is the default).
3521  *
3522  * Set whether to ignore the x-server-ip-address header reply or not. If the
3523  * header is ignored, the original address will be used instead.
3524  *
3525  * Since: 1.20
3526  */
3527 void
3528 gst_rtsp_connection_set_ignore_x_server_reply (GstRTSPConnection * conn,
3529     gboolean ignore)
3530 {
3531   g_return_if_fail (conn != NULL);
3532
3533   conn->ignore_x_server_reply = ignore;
3534 }
3535
3536 /**
3537  * gst_rtsp_connection_get_ignore_x_server_reply:
3538  * @conn: a #GstRTSPConnection
3539  *
3540  * Get the ignore_x_server_reply value.
3541  *
3542  * Returns: returns %TRUE if the x-server-ip-address header reply will be
3543  *          ignored, else returns %FALSE
3544  *
3545  * Since: 1.20
3546  */
3547 gboolean
3548 gst_rtsp_connection_get_ignore_x_server_reply (const GstRTSPConnection * conn)
3549 {
3550   g_return_val_if_fail (conn != NULL, FALSE);
3551
3552   return conn->ignore_x_server_reply;
3553 }
3554
3555 /**
3556  * gst_rtsp_connection_do_tunnel:
3557  * @conn: a #GstRTSPConnection
3558  * @conn2: a #GstRTSPConnection or %NULL
3559  *
3560  * If @conn received the first tunnel connection and @conn2 received
3561  * the second tunnel connection, link the two connections together so that
3562  * @conn manages the tunneled connection.
3563  *
3564  * After this call, @conn2 cannot be used anymore and must be freed with
3565  * gst_rtsp_connection_free().
3566  *
3567  * If @conn2 is %NULL then only the base64 decoding context will be setup for
3568  * @conn.
3569  *
3570  * Returns: return GST_RTSP_OK on success.
3571  */
3572 GstRTSPResult
3573 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
3574     GstRTSPConnection * conn2)
3575 {
3576   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3577
3578   if (conn2 != NULL) {
3579     GstRTSPTunnelState ts1 = conn->tstate;
3580     GstRTSPTunnelState ts2 = conn2->tstate;
3581
3582     g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST)
3583         || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET),
3584         GST_RTSP_EINVAL);
3585     g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid,
3586             TUNNELID_LEN), GST_RTSP_EINVAL);
3587
3588     /* both connections have socket0 as the read/write socket */
3589     if (ts1 == TUNNEL_STATE_GET) {
3590       /* conn2 is the HTTP POST channel. take its socket and set it as read
3591        * socket in conn */
3592       conn->socket1 = conn2->socket0;
3593       conn->stream1 = conn2->stream0;
3594       conn->input_stream = conn2->input_stream;
3595       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3596       conn2->output_stream = NULL;
3597     } else {
3598       /* conn2 is the HTTP GET channel. take its socket and set it as write
3599        * socket in conn */
3600       conn->socket1 = conn->socket0;
3601       conn->stream1 = conn->stream0;
3602       conn->socket0 = conn2->socket0;
3603       conn->stream0 = conn2->stream0;
3604       conn->output_stream = conn2->output_stream;
3605       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3606     }
3607
3608     /* clean up some of the state of conn2 */
3609     g_cancellable_cancel (conn2->cancellable);
3610     conn2->write_socket = conn2->read_socket = NULL;
3611     conn2->socket0 = NULL;
3612     conn2->stream0 = NULL;
3613     conn2->socket1 = NULL;
3614     conn2->stream1 = NULL;
3615     conn2->input_stream = NULL;
3616     conn2->control_stream = NULL;
3617     g_object_unref (conn2->cancellable);
3618     conn2->cancellable = NULL;
3619
3620     /* We make socket0 the write socket and socket1 the read socket. */
3621     conn->write_socket = conn->socket0;
3622     conn->read_socket = conn->socket1;
3623
3624     conn->tstate = TUNNEL_STATE_COMPLETE;
3625
3626     g_free (conn->initial_buffer);
3627     conn->initial_buffer = conn2->initial_buffer;
3628     conn2->initial_buffer = NULL;
3629     conn->initial_buffer_offset = conn2->initial_buffer_offset;
3630   }
3631
3632   /* we need base64 decoding for the readfd */
3633   conn->ctx.state = 0;
3634   conn->ctx.save = 0;
3635   conn->ctx.cout = 0;
3636   conn->ctx.coutl = 0;
3637   conn->ctxp = &conn->ctx;
3638
3639   return GST_RTSP_OK;
3640 }
3641
3642 /**
3643  * gst_rtsp_connection_set_remember_session_id:
3644  * @conn: a #GstRTSPConnection
3645  * @remember: %TRUE if the connection should remember the session id
3646  *
3647  * Sets if the #GstRTSPConnection should remember the session id from the last
3648  * response received and force it onto any further requests.
3649  *
3650  * The default value is %TRUE
3651  */
3652
3653 void
3654 gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
3655     gboolean remember)
3656 {
3657   conn->remember_session_id = remember;
3658   if (!remember)
3659     conn->session_id[0] = '\0';
3660 }
3661
3662 /**
3663  * gst_rtsp_connection_get_remember_session_id:
3664  * @conn: a #GstRTSPConnection
3665  *
3666  * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
3667  * last response to set it on any further request.
3668  */
3669
3670 gboolean
3671 gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
3672 {
3673   return conn->remember_session_id;
3674 }
3675
3676
3677 #define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3678 #define READ_COND   (G_IO_IN | READ_ERR)
3679 #define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3680 #define WRITE_COND  (G_IO_OUT | WRITE_ERR)
3681
3682 /* async functions */
3683 struct _GstRTSPWatch
3684 {
3685   GSource source;
3686
3687   GstRTSPConnection *conn;
3688
3689   GstRTSPBuilder builder;
3690   GstRTSPMessage message;
3691
3692   GSource *readsrc;
3693   GSource *writesrc;
3694   GSource *controlsrc;
3695
3696   gboolean keep_running;
3697
3698   /* queued message for transmission */
3699   guint id;
3700   GMutex mutex;
3701   GstQueueArray *messages;
3702   gsize messages_bytes;
3703   guint messages_count;
3704
3705   gsize max_bytes;
3706   guint max_messages;
3707   GCond queue_not_full;
3708   gboolean flushing;
3709
3710   GstRTSPWatchFuncs funcs;
3711
3712   gpointer user_data;
3713   GDestroyNotify notify;
3714 };
3715
3716 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
3717       ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
3718
3719 static gboolean
3720 gst_rtsp_source_prepare (GSource * source, gint * timeout)
3721 {
3722   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3723
3724   if (watch->conn->initial_buffer != NULL)
3725     return TRUE;
3726
3727   *timeout = (watch->conn->timeout * 1000);
3728
3729   return FALSE;
3730 }
3731
3732 static gboolean
3733 gst_rtsp_source_check (GSource * source)
3734 {
3735   return FALSE;
3736 }
3737
3738 static gboolean
3739 gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
3740     GstRTSPWatch * watch)
3741 {
3742   gssize count;
3743   guint8 buffer[1024];
3744   GError *error = NULL;
3745
3746   /* try to read in order to be able to detect errors, we read 1k in case some
3747    * client actually decides to send data on the GET channel */
3748   count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
3749       &error);
3750   if (count == 0) {
3751     /* other end closed the socket */
3752     goto eof;
3753   }
3754
3755   if (count < 0) {
3756     GST_DEBUG ("%s", error->message);
3757     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
3758         g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
3759       g_clear_error (&error);
3760       goto done;
3761     }
3762     g_clear_error (&error);
3763     goto read_error;
3764   }
3765
3766   /* client sent data on the GET channel, ignore it */
3767
3768 done:
3769   return TRUE;
3770
3771   /* ERRORS */
3772 eof:
3773   {
3774     if (watch->funcs.closed)
3775       watch->funcs.closed (watch, watch->user_data);
3776
3777     /* the read connection was closed, stop the watch now */
3778     watch->keep_running = FALSE;
3779
3780     return FALSE;
3781   }
3782 read_error:
3783   {
3784     if (watch->funcs.error_full)
3785       watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
3786           0, watch->user_data);
3787     else if (watch->funcs.error)
3788       watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
3789
3790     goto eof;
3791   }
3792 }
3793
3794 static gboolean
3795 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
3796     GstRTSPWatch * watch)
3797 {
3798   GstRTSPResult res = GST_RTSP_ERROR;
3799   GstRTSPConnection *conn = watch->conn;
3800
3801   /* if this connection was already closed, stop now */
3802   if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
3803     goto eof;
3804
3805   res = build_next (&watch->builder, &watch->message, conn, FALSE);
3806   if (res == GST_RTSP_EINTR)
3807     goto done;
3808   else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
3809     g_mutex_lock (&watch->mutex);
3810     if (watch->readsrc) {
3811       if (!g_source_is_destroyed ((GSource *) watch))
3812         g_source_remove_child_source ((GSource *) watch, watch->readsrc);
3813       g_source_unref (watch->readsrc);
3814       watch->readsrc = NULL;
3815     }
3816
3817     if (conn->stream1) {
3818       g_object_unref (conn->stream1);
3819       conn->stream1 = NULL;
3820       conn->socket1 = NULL;
3821       conn->input_stream = NULL;
3822     }
3823     g_mutex_unlock (&watch->mutex);
3824
3825     /* When we are in tunnelled mode, the read socket can be closed and we
3826      * should be prepared for a new POST method to reopen it */
3827     if (conn->tstate == TUNNEL_STATE_COMPLETE) {
3828       /* remove the read connection for the tunnel */
3829       /* we accept a new POST request */
3830       conn->tstate = TUNNEL_STATE_GET;
3831       /* and signal that we lost our tunnel */
3832       if (watch->funcs.tunnel_lost)
3833         res = watch->funcs.tunnel_lost (watch, watch->user_data);
3834       /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */
3835       g_mutex_lock (&watch->mutex);
3836       if (watch->conn->control_stream && !watch->controlsrc) {
3837         watch->controlsrc =
3838             g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3839             (watch->conn->control_stream), NULL);
3840         g_source_set_callback (watch->controlsrc,
3841             (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3842             NULL);
3843         g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3844       }
3845       g_mutex_unlock (&watch->mutex);
3846       goto read_done;
3847     } else
3848       goto eof;
3849   } else if (G_LIKELY (res == GST_RTSP_OK)) {
3850     if (!conn->manual_http &&
3851         watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3852       if (conn->tstate == TUNNEL_STATE_NONE &&
3853           watch->message.type_data.request.method == GST_RTSP_GET) {
3854         GstRTSPMessage *response;
3855         GstRTSPStatusCode code;
3856
3857         conn->tstate = TUNNEL_STATE_GET;
3858
3859         if (watch->funcs.tunnel_start)
3860           code = watch->funcs.tunnel_start (watch, watch->user_data);
3861         else
3862           code = GST_RTSP_STS_OK;
3863
3864         /* queue the response */
3865         response = gen_tunnel_reply (conn, code, &watch->message);
3866         if (watch->funcs.tunnel_http_response)
3867           watch->funcs.tunnel_http_response (watch, &watch->message, response,
3868               watch->user_data);
3869         gst_rtsp_watch_send_message (watch, response, NULL);
3870         gst_rtsp_message_free (response);
3871         goto read_done;
3872       } else if (conn->tstate == TUNNEL_STATE_NONE &&
3873           watch->message.type_data.request.method == GST_RTSP_POST) {
3874         conn->tstate = TUNNEL_STATE_POST;
3875
3876         /* in the callback the connection should be tunneled with the
3877          * GET connection */
3878         if (watch->funcs.tunnel_complete) {
3879           watch->funcs.tunnel_complete (watch, watch->user_data);
3880         }
3881         goto read_done;
3882       }
3883     }
3884   } else
3885     goto read_error;
3886
3887   if (!conn->manual_http) {
3888     /* if manual HTTP support is not enabled, then restore the message to
3889      * what it would have looked like without the support for parsing HTTP
3890      * messages being present */
3891     if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3892       watch->message.type = GST_RTSP_MESSAGE_REQUEST;
3893       watch->message.type_data.request.method = GST_RTSP_INVALID;
3894       if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
3895         watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
3896       res = GST_RTSP_EPARSE;
3897     } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
3898       watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
3899       if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
3900         watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
3901       res = GST_RTSP_EPARSE;
3902     }
3903   }
3904   if (G_LIKELY (res != GST_RTSP_OK))
3905     goto read_error;
3906
3907   if (watch->funcs.message_received)
3908     watch->funcs.message_received (watch, &watch->message, watch->user_data);
3909
3910 read_done:
3911   gst_rtsp_message_unset (&watch->message);
3912   build_reset (&watch->builder);
3913
3914 done:
3915   return TRUE;
3916
3917   /* ERRORS */
3918 eof:
3919   {
3920     if (watch->funcs.closed)
3921       watch->funcs.closed (watch, watch->user_data);
3922
3923     /* we closed the read connection, stop the watch now */
3924     watch->keep_running = FALSE;
3925
3926     /* always stop when the input returns EOF in non-tunneled mode */
3927     return FALSE;
3928   }
3929 read_error:
3930   {
3931     if (watch->funcs.error_full)
3932       watch->funcs.error_full (watch, res, &watch->message,
3933           0, watch->user_data);
3934     else if (watch->funcs.error)
3935       watch->funcs.error (watch, res, watch->user_data);
3936
3937     goto eof;
3938   }
3939 }
3940
3941 static gboolean
3942 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
3943     gpointer user_data G_GNUC_UNUSED)
3944 {
3945   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3946   GstRTSPConnection *conn = watch->conn;
3947
3948   if (conn->initial_buffer != NULL) {
3949     gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
3950         watch);
3951   }
3952   return watch->keep_running;
3953 }
3954
3955 static gboolean
3956 gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
3957     GstRTSPWatch * watch)
3958 {
3959   GstRTSPResult res = GST_RTSP_ERROR;
3960   GstRTSPConnection *conn = watch->conn;
3961
3962   /* if this connection was already closed, stop now */
3963   if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3964       !watch->messages)
3965     goto eof;
3966
3967   g_mutex_lock (&watch->mutex);
3968   do {
3969     guint n_messages = gst_queue_array_get_length (watch->messages);
3970     GOutputVector *vectors;
3971     GstMapInfo *map_infos;
3972     guint *ids;
3973     gsize bytes_to_write, bytes_written;
3974     guint n_vectors, n_memories, n_ids, drop_messages;
3975     gint i, j, l, n_mmap;
3976     GstRTSPSerializedMessage *msg;
3977
3978     /* if this connection was already closed, stop now */
3979     if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3980         !watch->messages) {
3981       g_mutex_unlock (&watch->mutex);
3982       goto eof;
3983     }
3984
3985     if (n_messages == 0) {
3986       if (watch->writesrc) {
3987         if (!g_source_is_destroyed ((GSource *) watch))
3988           g_source_remove_child_source ((GSource *) watch, watch->writesrc);
3989         g_source_unref (watch->writesrc);
3990         watch->writesrc = NULL;
3991         /* we create and add the write source again when we actually have
3992          * something to write */
3993
3994         /* since write source is now removed we add read source on the write
3995          * socket instead to be able to detect when client closes get channel
3996          * in tunneled mode */
3997         if (watch->conn->control_stream) {
3998           watch->controlsrc =
3999               g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4000               (watch->conn->control_stream), NULL);
4001           g_source_set_callback (watch->controlsrc,
4002               (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
4003               NULL);
4004           g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4005         } else {
4006           watch->controlsrc = NULL;
4007         }
4008       }
4009       break;
4010     }
4011
4012     for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
4013       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4014       if (msg->id != 0)
4015         n_ids++;
4016
4017       if (msg->data_offset < msg->data_size)
4018         n_vectors++;
4019
4020       if (msg->body_data && msg->body_offset < msg->body_data_size) {
4021         n_vectors++;
4022       } else if (msg->body_buffer) {
4023         guint m, n;
4024         guint offset = 0;
4025
4026         n = gst_buffer_n_memory (msg->body_buffer);
4027         for (m = 0; m < n; m++) {
4028           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
4029
4030           /* Skip all memories we already wrote */
4031           if (offset + mem->size <= msg->body_offset) {
4032             offset += mem->size;
4033             continue;
4034           }
4035           offset += mem->size;
4036
4037           n_memories++;
4038           n_vectors++;
4039         }
4040       }
4041     }
4042
4043     vectors = g_newa (GOutputVector, n_vectors);
4044     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4045     ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
4046     if (ids)
4047       memset (ids, 0, sizeof (guint) * (n_ids + 1));
4048
4049     for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages;
4050         i++) {
4051       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4052
4053       if (msg->data_offset < msg->data_size) {
4054         vectors[j].buffer = (msg->data_is_data_header ?
4055             msg->data_header : msg->data) + msg->data_offset;
4056         vectors[j].size = msg->data_size - msg->data_offset;
4057         bytes_to_write += vectors[j].size;
4058         j++;
4059       }
4060
4061       if (msg->body_data) {
4062         if (msg->body_offset < msg->body_data_size) {
4063           vectors[j].buffer = msg->body_data + msg->body_offset;
4064           vectors[j].size = msg->body_data_size - msg->body_offset;
4065           bytes_to_write += vectors[j].size;
4066           j++;
4067         }
4068       } else if (msg->body_buffer) {
4069         guint m, n;
4070         guint offset = 0;
4071         n = gst_buffer_n_memory (msg->body_buffer);
4072         for (m = 0; m < n; m++) {
4073           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
4074           guint off;
4075
4076           /* Skip all memories we already wrote */
4077           if (offset + mem->size <= msg->body_offset) {
4078             offset += mem->size;
4079             continue;
4080           }
4081
4082           if (offset < msg->body_offset)
4083             off = msg->body_offset - offset;
4084           else
4085             off = 0;
4086
4087           offset += mem->size;
4088
4089           g_assert (off < mem->size);
4090
4091           gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ);
4092           vectors[j].buffer = map_infos[n_mmap].data + off;
4093           vectors[j].size = map_infos[n_mmap].size - off;
4094           bytes_to_write += vectors[j].size;
4095
4096           n_mmap++;
4097           j++;
4098         }
4099       }
4100     }
4101
4102     res =
4103         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4104         &bytes_written, FALSE, watch->conn->cancellable);
4105     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4106
4107     /* First unmap all memories here, this simplifies the code below
4108      * as we don't have to skip all memories that were already written
4109      * before */
4110     for (i = 0; i < n_mmap; i++) {
4111       gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
4112     }
4113
4114     if (bytes_written == bytes_to_write) {
4115       /* fast path, just unmap all memories, free memory, drop all messages and notify them */
4116       l = 0;
4117       while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4118         if (msg->id) {
4119           ids[l] = msg->id;
4120           l++;
4121         }
4122
4123         gst_rtsp_serialized_message_clear (msg);
4124       }
4125
4126       g_assert (watch->messages_bytes >= bytes_written);
4127       watch->messages_bytes -= bytes_written;
4128     } else if (bytes_written > 0) {
4129       /* not done, let's skip all messages that were sent already and free them */
4130       for (i = 0, drop_messages = 0; i < n_messages; i++) {
4131         msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4132
4133         if (bytes_written >= msg->data_size - msg->data_offset) {
4134           guint body_size;
4135
4136           /* all data of this message is sent, check body and otherwise
4137            * skip the whole message for next time */
4138           bytes_written -= (msg->data_size - msg->data_offset);
4139           watch->messages_bytes -= (msg->data_size - msg->data_offset);
4140           msg->data_offset = msg->data_size;
4141
4142           if (msg->body_data) {
4143             body_size = msg->body_data_size;
4144           } else if (msg->body_buffer) {
4145             body_size = gst_buffer_get_size (msg->body_buffer);
4146           } else {
4147             body_size = 0;
4148           }
4149
4150           if (bytes_written + msg->body_offset >= body_size) {
4151             /* body written, drop this message */
4152             bytes_written -= body_size - msg->body_offset;
4153             watch->messages_bytes -= body_size - msg->body_offset;
4154             msg->body_offset = body_size;
4155             drop_messages++;
4156
4157             if (msg->id) {
4158               ids[l] = msg->id;
4159               l++;
4160             }
4161
4162             gst_rtsp_serialized_message_clear (msg);
4163           } else {
4164             msg->body_offset += bytes_written;
4165             watch->messages_bytes -= bytes_written;
4166             bytes_written = 0;
4167           }
4168         } else {
4169           /* Need to continue sending from the data of this message */
4170           msg->data_offset += bytes_written;
4171           watch->messages_bytes -= bytes_written;
4172           bytes_written = 0;
4173         }
4174       }
4175
4176       while (drop_messages > 0) {
4177         msg = gst_queue_array_pop_head_struct (watch->messages);
4178         g_assert (msg);
4179         drop_messages--;
4180       }
4181
4182       g_assert (watch->messages_bytes >= bytes_written);
4183       watch->messages_bytes -= bytes_written;
4184     }
4185
4186     if (!IS_BACKLOG_FULL (watch))
4187       g_cond_signal (&watch->queue_not_full);
4188     g_mutex_unlock (&watch->mutex);
4189
4190     /* notify all messages that were successfully written */
4191     if (ids) {
4192       while (*ids) {
4193         /* only decrease the counter for messages that have an id. Only
4194          * the last message of a messages chunk is counted */
4195         watch->messages_count--;
4196
4197         if (watch->funcs.message_sent)
4198           watch->funcs.message_sent (watch, *ids, watch->user_data);
4199         ids++;
4200       }
4201     }
4202
4203     if (res == GST_RTSP_EINTR) {
4204       goto write_blocked;
4205     } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
4206       goto write_error;
4207     }
4208     g_mutex_lock (&watch->mutex);
4209   } while (TRUE);
4210   g_mutex_unlock (&watch->mutex);
4211
4212 write_blocked:
4213   return TRUE;
4214
4215   /* ERRORS */
4216 eof:
4217   {
4218     return FALSE;
4219   }
4220 write_error:
4221   {
4222     if (watch->funcs.error_full) {
4223       guint i, n_messages;
4224
4225       n_messages = gst_queue_array_get_length (watch->messages);
4226       for (i = 0; i < n_messages; i++) {
4227         GstRTSPSerializedMessage *msg =
4228             gst_queue_array_peek_nth_struct (watch->messages, i);
4229         if (msg->id)
4230           watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
4231       }
4232     } else if (watch->funcs.error) {
4233       watch->funcs.error (watch, res, watch->user_data);
4234     }
4235
4236     return FALSE;
4237   }
4238 }
4239
4240 static void
4241 gst_rtsp_source_finalize (GSource * source)
4242 {
4243   GstRTSPWatch *watch = (GstRTSPWatch *) source;
4244   GstRTSPSerializedMessage *msg;
4245
4246   if (watch->notify)
4247     watch->notify (watch->user_data);
4248
4249   build_reset (&watch->builder);
4250   gst_rtsp_message_unset (&watch->message);
4251
4252   while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4253     gst_rtsp_serialized_message_clear (msg);
4254   }
4255   gst_queue_array_free (watch->messages);
4256   watch->messages = NULL;
4257   watch->messages_bytes = 0;
4258   watch->messages_count = 0;
4259
4260   g_cond_clear (&watch->queue_not_full);
4261
4262   if (watch->readsrc)
4263     g_source_unref (watch->readsrc);
4264   if (watch->writesrc)
4265     g_source_unref (watch->writesrc);
4266   if (watch->controlsrc)
4267     g_source_unref (watch->controlsrc);
4268
4269   g_mutex_clear (&watch->mutex);
4270 }
4271
4272 static GSourceFuncs gst_rtsp_source_funcs = {
4273   gst_rtsp_source_prepare,
4274   gst_rtsp_source_check,
4275   gst_rtsp_source_dispatch,
4276   gst_rtsp_source_finalize,
4277   NULL,
4278   NULL
4279 };
4280
4281 /**
4282  * gst_rtsp_watch_new: (skip)
4283  * @conn: a #GstRTSPConnection
4284  * @funcs: watch functions
4285  * @user_data: user data to pass to @funcs
4286  * @notify: notify when @user_data is not referenced anymore
4287  *
4288  * Create a watch object for @conn. The functions provided in @funcs will be
4289  * called with @user_data when activity happened on the watch.
4290  *
4291  * The new watch is usually created so that it can be attached to a
4292  * maincontext with gst_rtsp_watch_attach().
4293  *
4294  * @conn must exist for the entire lifetime of the watch.
4295  *
4296  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
4297  * communication. Free with gst_rtsp_watch_unref () after usage.
4298  */
4299 GstRTSPWatch *
4300 gst_rtsp_watch_new (GstRTSPConnection * conn,
4301     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
4302 {
4303   GstRTSPWatch *result;
4304
4305   g_return_val_if_fail (conn != NULL, NULL);
4306   g_return_val_if_fail (funcs != NULL, NULL);
4307   g_return_val_if_fail (conn->read_socket != NULL, NULL);
4308   g_return_val_if_fail (conn->write_socket != NULL, NULL);
4309
4310   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
4311       sizeof (GstRTSPWatch));
4312
4313   result->conn = conn;
4314   result->builder.state = STATE_START;
4315
4316   g_mutex_init (&result->mutex);
4317   result->messages =
4318       gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
4319   g_cond_init (&result->queue_not_full);
4320
4321   gst_rtsp_watch_reset (result);
4322   result->keep_running = TRUE;
4323   result->flushing = FALSE;
4324
4325   result->funcs = *funcs;
4326   result->user_data = user_data;
4327   result->notify = notify;
4328
4329   return result;
4330 }
4331
4332 /**
4333  * gst_rtsp_watch_reset:
4334  * @watch: a #GstRTSPWatch
4335  *
4336  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
4337  * when the file descriptors of the connection might have changed.
4338  */
4339 void
4340 gst_rtsp_watch_reset (GstRTSPWatch * watch)
4341 {
4342   g_mutex_lock (&watch->mutex);
4343   if (watch->readsrc) {
4344     g_source_remove_child_source ((GSource *) watch, watch->readsrc);
4345     g_source_unref (watch->readsrc);
4346   }
4347   if (watch->writesrc) {
4348     g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4349     g_source_unref (watch->writesrc);
4350     watch->writesrc = NULL;
4351   }
4352   if (watch->controlsrc) {
4353     g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4354     g_source_unref (watch->controlsrc);
4355     watch->controlsrc = NULL;
4356   }
4357
4358   if (watch->conn->input_stream) {
4359     watch->readsrc =
4360         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4361         (watch->conn->input_stream), NULL);
4362     g_source_set_callback (watch->readsrc,
4363         (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
4364     g_source_add_child_source ((GSource *) watch, watch->readsrc);
4365   } else {
4366     watch->readsrc = NULL;
4367   }
4368
4369   /* we create and add the write source when we actually have something to
4370    * write */
4371
4372   /* when write source is not added we add read source on the write socket
4373    * instead to be able to detect when client closes get channel in tunneled
4374    * mode */
4375   if (watch->conn->control_stream) {
4376     watch->controlsrc =
4377         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4378         (watch->conn->control_stream), NULL);
4379     g_source_set_callback (watch->controlsrc,
4380         (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
4381     g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4382   } else {
4383     watch->controlsrc = NULL;
4384   }
4385   g_mutex_unlock (&watch->mutex);
4386 }
4387
4388 /**
4389  * gst_rtsp_watch_attach:
4390  * @watch: a #GstRTSPWatch
4391  * @context: a GMainContext (if NULL, the default context will be used)
4392  *
4393  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
4394  *
4395  * Returns: the ID (greater than 0) for the watch within the GMainContext.
4396  */
4397 guint
4398 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
4399 {
4400   g_return_val_if_fail (watch != NULL, 0);
4401
4402   return g_source_attach ((GSource *) watch, context);
4403 }
4404
4405 /**
4406  * gst_rtsp_watch_unref:
4407  * @watch: a #GstRTSPWatch
4408  *
4409  * Decreases the reference count of @watch by one. If the resulting reference
4410  * count is zero the watch and associated memory will be destroyed.
4411  */
4412 void
4413 gst_rtsp_watch_unref (GstRTSPWatch * watch)
4414 {
4415   g_return_if_fail (watch != NULL);
4416
4417   g_source_unref ((GSource *) watch);
4418 }
4419
4420 /**
4421  * gst_rtsp_watch_set_send_backlog:
4422  * @watch: a #GstRTSPWatch
4423  * @bytes: maximum bytes
4424  * @messages: maximum messages
4425  *
4426  * Set the maximum amount of bytes and messages that will be queued in @watch.
4427  * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
4428  * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
4429  *
4430  * A value of 0 for @bytes or @messages means no limits.
4431  *
4432  * Since: 1.2
4433  */
4434 void
4435 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
4436     gsize bytes, guint messages)
4437 {
4438   g_return_if_fail (watch != NULL);
4439
4440   g_mutex_lock (&watch->mutex);
4441   watch->max_bytes = bytes;
4442   watch->max_messages = messages;
4443   if (!IS_BACKLOG_FULL (watch))
4444     g_cond_signal (&watch->queue_not_full);
4445   g_mutex_unlock (&watch->mutex);
4446
4447   GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
4448       bytes, messages);
4449 }
4450
4451 /**
4452  * gst_rtsp_watch_get_send_backlog:
4453  * @watch: a #GstRTSPWatch
4454  * @bytes: (out) (allow-none): maximum bytes
4455  * @messages: (out) (allow-none): maximum messages
4456  *
4457  * Get the maximum amount of bytes and messages that will be queued in @watch.
4458  * See gst_rtsp_watch_set_send_backlog().
4459  *
4460  * Since: 1.2
4461  */
4462 void
4463 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
4464     gsize * bytes, guint * messages)
4465 {
4466   g_return_if_fail (watch != NULL);
4467
4468   g_mutex_lock (&watch->mutex);
4469   if (bytes)
4470     *bytes = watch->max_bytes;
4471   if (messages)
4472     *messages = watch->max_messages;
4473   g_mutex_unlock (&watch->mutex);
4474 }
4475
4476 static GstRTSPResult
4477 gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
4478     GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
4479 {
4480   GstRTSPResult res;
4481   GMainContext *context = NULL;
4482   gint i;
4483
4484   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4485   g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
4486
4487   g_mutex_lock (&watch->mutex);
4488   if (watch->flushing)
4489     goto flushing;
4490
4491   /* try to send the message synchronously first */
4492   if (gst_queue_array_get_length (watch->messages) == 0) {
4493     gint j, k;
4494     GOutputVector *vectors;
4495     GstMapInfo *map_infos;
4496     gsize bytes_to_write, bytes_written;
4497     guint n_vectors, n_memories, drop_messages;
4498
4499     for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
4500       n_vectors++;
4501       if (messages[i].body_data) {
4502         n_vectors++;
4503       } else if (messages[i].body_buffer) {
4504         n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
4505         n_memories += gst_buffer_n_memory (messages[i].body_buffer);
4506       }
4507     }
4508
4509     vectors = g_newa (GOutputVector, n_vectors);
4510     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4511
4512     for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
4513       vectors[j].buffer = messages[i].data_is_data_header ?
4514           messages[i].data_header : messages[i].data;
4515       vectors[j].size = messages[i].data_size;
4516       bytes_to_write += vectors[j].size;
4517       j++;
4518
4519       if (messages[i].body_data) {
4520         vectors[j].buffer = messages[i].body_data;
4521         vectors[j].size = messages[i].body_data_size;
4522         bytes_to_write += vectors[j].size;
4523         j++;
4524       } else if (messages[i].body_buffer) {
4525         gint l, n;
4526
4527         n = gst_buffer_n_memory (messages[i].body_buffer);
4528         for (l = 0; l < n; l++) {
4529           GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
4530
4531           gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
4532           vectors[j].buffer = map_infos[k].data;
4533           vectors[j].size = map_infos[k].size;
4534           bytes_to_write += vectors[j].size;
4535
4536           k++;
4537           j++;
4538         }
4539       }
4540     }
4541
4542     res =
4543         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4544         &bytes_written, FALSE, watch->conn->cancellable);
4545     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4546
4547     /* At this point we sent everything we could without blocking or
4548      * error and updated the offsets inside the message accordingly */
4549
4550     /* First of all unmap all memories. This simplifies the code below */
4551     for (k = 0; k < n_memories; k++) {
4552       gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
4553     }
4554
4555     if (res != GST_RTSP_EINTR) {
4556       /* actual error or done completely */
4557       if (id != NULL)
4558         *id = 0;
4559
4560       /* free everything */
4561       for (i = 0, k = 0; i < n_messages; i++) {
4562         gst_rtsp_serialized_message_clear (&messages[i]);
4563       }
4564
4565       goto done;
4566     }
4567
4568     /* not done, let's skip all messages that were sent already and free them */
4569     for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
4570       if (bytes_written >= messages[i].data_size) {
4571         guint body_size;
4572
4573         /* all data of this message is sent, check body and otherwise
4574          * skip the whole message for next time */
4575         messages[i].data_offset = messages[i].data_size;
4576         bytes_written -= messages[i].data_size;
4577
4578         if (messages[i].body_data) {
4579           body_size = messages[i].body_data_size;
4580
4581         } else if (messages[i].body_buffer) {
4582           body_size = gst_buffer_get_size (messages[i].body_buffer);
4583         } else {
4584           body_size = 0;
4585         }
4586
4587         if (bytes_written >= body_size) {
4588           /* body written, drop this message */
4589           messages[i].body_offset = body_size;
4590           bytes_written -= body_size;
4591           drop_messages++;
4592
4593           gst_rtsp_serialized_message_clear (&messages[i]);
4594         } else {
4595           messages[i].body_offset = bytes_written;
4596           bytes_written = 0;
4597         }
4598       } else {
4599         /* Need to continue sending from the data of this message */
4600         messages[i].data_offset = bytes_written;
4601         bytes_written = 0;
4602       }
4603     }
4604
4605     g_assert (n_messages > drop_messages);
4606
4607     messages += drop_messages;
4608     n_messages -= drop_messages;
4609   }
4610
4611   /* check limits */
4612   if (IS_BACKLOG_FULL (watch))
4613     goto too_much_backlog;
4614
4615   for (i = 0; i < n_messages; i++) {
4616     GstRTSPSerializedMessage local_message;
4617
4618     /* make a record with the data and id for sending async */
4619     local_message = messages[i];
4620
4621     /* copy the body data or take an additional reference to the body buffer
4622      * we don't own them here */
4623     if (local_message.body_data) {
4624       local_message.body_data =
4625           g_memdup2 (local_message.body_data, local_message.body_data_size);
4626     } else if (local_message.body_buffer) {
4627       gst_buffer_ref (local_message.body_buffer);
4628     }
4629     local_message.borrowed = FALSE;
4630
4631     /* set an id for the very last message */
4632     if (i == n_messages - 1) {
4633       do {
4634         /* make sure rec->id is never 0 */
4635         local_message.id = ++watch->id;
4636       } while (G_UNLIKELY (local_message.id == 0));
4637
4638       if (id != NULL)
4639         *id = local_message.id;
4640     } else {
4641       local_message.id = 0;
4642     }
4643
4644     /* add the record to a queue. */
4645     gst_queue_array_push_tail_struct (watch->messages, &local_message);
4646     watch->messages_bytes +=
4647         (local_message.data_size - local_message.data_offset);
4648     if (local_message.body_data)
4649       watch->messages_bytes +=
4650           (local_message.body_data_size - local_message.body_offset);
4651     else if (local_message.body_buffer)
4652       watch->messages_bytes +=
4653           (gst_buffer_get_size (local_message.body_buffer) -
4654           local_message.body_offset);
4655   }
4656   /* each message chunks is one unit */
4657   watch->messages_count++;
4658
4659   /* make sure the main context will now also check for writability on the
4660    * socket */
4661   context = ((GSource *) watch)->context;
4662   if (!watch->writesrc) {
4663     /* remove the read source on the write socket, we will be able to detect
4664      * errors while writing */
4665     if (watch->controlsrc) {
4666       g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4667       g_source_unref (watch->controlsrc);
4668       watch->controlsrc = NULL;
4669     }
4670
4671     watch->writesrc =
4672         g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
4673         (watch->conn->output_stream), NULL);
4674     g_source_set_callback (watch->writesrc,
4675         (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
4676     g_source_add_child_source ((GSource *) watch, watch->writesrc);
4677   }
4678   res = GST_RTSP_OK;
4679
4680 done:
4681   g_mutex_unlock (&watch->mutex);
4682
4683   if (context)
4684     g_main_context_wakeup (context);
4685
4686   return res;
4687
4688   /* ERRORS */
4689 flushing:
4690   {
4691     GST_DEBUG ("we are flushing");
4692     g_mutex_unlock (&watch->mutex);
4693     for (i = 0; i < n_messages; i++) {
4694       gst_rtsp_serialized_message_clear (&messages[i]);
4695     }
4696     return GST_RTSP_EINTR;
4697   }
4698 too_much_backlog:
4699   {
4700     GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
4701         G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
4702         watch->messages_bytes, watch->max_messages, watch->messages_count);
4703     g_mutex_unlock (&watch->mutex);
4704     for (i = 0; i < n_messages; i++) {
4705       gst_rtsp_serialized_message_clear (&messages[i]);
4706     }
4707     return GST_RTSP_ENOMEM;
4708   }
4709
4710   return GST_RTSP_OK;
4711 }
4712
4713 /**
4714  * gst_rtsp_watch_write_data:
4715  * @watch: a #GstRTSPWatch
4716  * @data: (array length=size) (transfer full): the data to queue
4717  * @size: the size of @data
4718  * @id: (out) (allow-none): location for a message ID or %NULL
4719  *
4720  * Write @data using the connection of the @watch. If it cannot be sent
4721  * immediately, it will be queued for transmission in @watch. The contents of
4722  * @message will then be serialized and transmitted when the connection of the
4723  * @watch becomes writable. In case the @message is queued, the ID returned in
4724  * @id will be non-zero and used as the ID argument in the message_sent
4725  * callback.
4726  *
4727  * This function will take ownership of @data and g_free() it after use.
4728  *
4729  * If the amount of queued data exceeds the limits set with
4730  * gst_rtsp_watch_set_send_backlog(), this function will return
4731  * #GST_RTSP_ENOMEM.
4732  *
4733  * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
4734  * are reached. #GST_RTSP_EINTR when @watch was flushing.
4735  */
4736 /* FIXME 2.0: This should've been static! */
4737 GstRTSPResult
4738 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
4739     guint size, guint * id)
4740 {
4741   GstRTSPSerializedMessage serialized_message;
4742
4743   memset (&serialized_message, 0, sizeof (serialized_message));
4744   serialized_message.data = (guint8 *) data;
4745   serialized_message.data_size = size;
4746
4747   return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
4748       1, id);
4749 }
4750
4751 /**
4752  * gst_rtsp_watch_send_message:
4753  * @watch: a #GstRTSPWatch
4754  * @message: a #GstRTSPMessage
4755  * @id: (out) (allow-none): location for a message ID or %NULL
4756  *
4757  * Send a @message using the connection of the @watch. If it cannot be sent
4758  * immediately, it will be queued for transmission in @watch. The contents of
4759  * @message will then be serialized and transmitted when the connection of the
4760  * @watch becomes writable. In case the @message is queued, the ID returned in
4761  * @id will be non-zero and used as the ID argument in the message_sent
4762  * callback.
4763  *
4764  * Returns: #GST_RTSP_OK on success.
4765  */
4766 GstRTSPResult
4767 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
4768     guint * id)
4769 {
4770   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4771   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
4772
4773   return gst_rtsp_watch_send_messages (watch, message, 1, id);
4774 }
4775
4776 /**
4777  * gst_rtsp_watch_send_messages:
4778  * @watch: a #GstRTSPWatch
4779  * @messages: (array length=n_messages): the messages to send
4780  * @n_messages: the number of messages to send
4781  * @id: (out) (allow-none): location for a message ID or %NULL
4782  *
4783  * Sends @messages using the connection of the @watch. If they cannot be sent
4784  * immediately, they will be queued for transmission in @watch. The contents of
4785  * @messages will then be serialized and transmitted when the connection of the
4786  * @watch becomes writable. In case the @messages are queued, the ID returned in
4787  * @id will be non-zero and used as the ID argument in the message_sent
4788  * callback once the last message is sent. The callback will only be called
4789  * once for the last message.
4790  *
4791  * Returns: #GST_RTSP_OK on success.
4792  *
4793  * Since: 1.16
4794  */
4795 GstRTSPResult
4796 gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
4797     guint n_messages, guint * id)
4798 {
4799   GstRTSPSerializedMessage *serialized_messages;
4800   gint i;
4801
4802   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4803   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
4804
4805   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
4806   memset (serialized_messages, 0,
4807       sizeof (GstRTSPSerializedMessage) * n_messages);
4808
4809   for (i = 0; i < n_messages; i++) {
4810     if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
4811       goto error;
4812   }
4813
4814   return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
4815       n_messages, id);
4816
4817 error:
4818   for (i = 0; i < n_messages; i++) {
4819     gst_rtsp_serialized_message_clear (&serialized_messages[i]);
4820   }
4821
4822   return GST_RTSP_EINVAL;
4823 }
4824
4825 /**
4826  * gst_rtsp_watch_wait_backlog_usec:
4827  * @watch: a #GstRTSPWatch
4828  * @timeout: a timeout in microseconds
4829  *
4830  * Wait until there is place in the backlog queue, @timeout is reached
4831  * or @watch is set to flushing.
4832  *
4833  * If @timeout is 0 this function can block forever. If @timeout
4834  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
4835  * after the timeout expired.
4836  *
4837  * The typically use of this function is when gst_rtsp_watch_write_data
4838  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
4839  * free space in the backlog queue and try again.
4840  *
4841  * Returns: %GST_RTSP_OK when if there is room in queue.
4842  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
4843  *          %GST_RTSP_EINTR when @watch is flushing
4844  *          %GST_RTSP_EINVAL when called with invalid parameters.
4845  *
4846  * Since: 1.18
4847  */
4848 GstRTSPResult
4849 gst_rtsp_watch_wait_backlog_usec (GstRTSPWatch * watch, gint64 timeout)
4850 {
4851   gint64 end_time;
4852
4853   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4854
4855   end_time = g_get_monotonic_time () + timeout;
4856
4857   g_mutex_lock (&watch->mutex);
4858   if (watch->flushing)
4859     goto flushing;
4860
4861   while (IS_BACKLOG_FULL (watch)) {
4862     gboolean res;
4863
4864     res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time);
4865     if (watch->flushing)
4866       goto flushing;
4867
4868     if (!res)
4869       goto timeout;
4870   }
4871   g_mutex_unlock (&watch->mutex);
4872
4873   return GST_RTSP_OK;
4874
4875   /* ERRORS */
4876 flushing:
4877   {
4878     GST_DEBUG ("we are flushing");
4879     g_mutex_unlock (&watch->mutex);
4880     return GST_RTSP_EINTR;
4881   }
4882 timeout:
4883   {
4884     GST_DEBUG ("we timed out");
4885     g_mutex_unlock (&watch->mutex);
4886     return GST_RTSP_ETIMEOUT;
4887   }
4888 }
4889
4890 /**
4891  * gst_rtsp_watch_set_flushing:
4892  * @watch: a #GstRTSPWatch
4893  * @flushing: new flushing state
4894  *
4895  * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog()
4896  * and make sure gst_rtsp_watch_write_data() returns immediately with
4897  * #GST_RTSP_EINTR. And empty the queue.
4898  *
4899  * Since: 1.4
4900  */
4901 void
4902 gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
4903 {
4904   g_return_if_fail (watch != NULL);
4905
4906   g_mutex_lock (&watch->mutex);
4907   watch->flushing = flushing;
4908   g_cond_signal (&watch->queue_not_full);
4909   if (flushing) {
4910     GstRTSPSerializedMessage *msg;
4911
4912     while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4913       gst_rtsp_serialized_message_clear (msg);
4914     }
4915   }
4916   g_mutex_unlock (&watch->mutex);
4917 }
4918
4919
4920 #ifndef GST_DISABLE_DEPRECATED
4921 G_GNUC_BEGIN_IGNORE_DEPRECATIONS
4922 /* Deprecated */
4923 #define TV_TO_USEC(tv) ((tv) ? ((tv)->tv_sec * G_USEC_PER_SEC + (tv)->tv_usec) : 0)
4924 /**
4925  * gst_rtsp_connection_connect:
4926  * @conn: a #GstRTSPConnection
4927  * @timeout: a GTimeVal timeout
4928  *
4929  * Attempt to connect to the url of @conn made with
4930  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4931  * forever. If @timeout contains a valid timeout, this function will return
4932  * #GST_RTSP_ETIMEOUT after the timeout expired.
4933  *
4934  * This function can be cancelled with gst_rtsp_connection_flush().
4935  *
4936  * Returns: #GST_RTSP_OK when a connection could be made.
4937  *
4938  * Deprecated: 1.18
4939  */
4940     GstRTSPResult
4941 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
4942 {
4943   return gst_rtsp_connection_connect_usec (conn, TV_TO_USEC (timeout));
4944 }
4945
4946 /**
4947  * gst_rtsp_connection_connect_with_response:
4948  * @conn: a #GstRTSPConnection
4949  * @timeout: a GTimeVal timeout
4950  * @response: a #GstRTSPMessage
4951  *
4952  * Attempt to connect to the url of @conn made with
4953  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4954  * forever. If @timeout contains a valid timeout, this function will return
4955  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
4956  * @response will contain a response to the tunneling request messages.
4957  *
4958  * This function can be cancelled with gst_rtsp_connection_flush().
4959  *
4960  * Returns: #GST_RTSP_OK when a connection could be made.
4961  *
4962  * Since: 1.8
4963  * Deprecated: 1.18
4964  */
4965 GstRTSPResult
4966 gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn,
4967     GTimeVal * timeout, GstRTSPMessage * response)
4968 {
4969   return gst_rtsp_connection_connect_with_response_usec (conn,
4970       TV_TO_USEC (timeout), response);
4971 }
4972
4973 /**
4974  * gst_rtsp_connection_read:
4975  * @conn: a #GstRTSPConnection
4976  * @data: the data to read
4977  * @size: the size of @data
4978  * @timeout: a timeout value or %NULL
4979  *
4980  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
4981  * the specified @timeout. @timeout can be %NULL, in which case this function
4982  * might block forever.
4983  *
4984  * This function can be cancelled with gst_rtsp_connection_flush().
4985  *
4986  * Returns: #GST_RTSP_OK on success.
4987  *
4988  * Deprecated: 1.18
4989  */
4990 GstRTSPResult
4991 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
4992     GTimeVal * timeout)
4993 {
4994   return gst_rtsp_connection_read_usec (conn, data, size, TV_TO_USEC (timeout));
4995 }
4996
4997 /**
4998  * gst_rtsp_connection_write:
4999  * @conn: a #GstRTSPConnection
5000  * @data: the data to write
5001  * @size: the size of @data
5002  * @timeout: a timeout value or %NULL
5003  *
5004  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
5005  * the specified @timeout. @timeout can be %NULL, in which case this function
5006  * might block forever.
5007  *
5008  * This function can be cancelled with gst_rtsp_connection_flush().
5009  *
5010  * Returns: #GST_RTSP_OK on success.
5011  *
5012  * Deprecated: 1.18
5013  */
5014 GstRTSPResult
5015 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
5016     guint size, GTimeVal * timeout)
5017 {
5018   return gst_rtsp_connection_write_usec (conn, data, size,
5019       TV_TO_USEC (timeout));
5020 }
5021
5022 /**
5023  * gst_rtsp_connection_send:
5024  * @conn: a #GstRTSPConnection
5025  * @message: the message to send
5026  * @timeout: a timeout value or %NULL
5027  *
5028  * Attempt to send @message to the connected @conn, blocking up to
5029  * the specified @timeout. @timeout can be %NULL, in which case this function
5030  * might block forever.
5031  *
5032  * This function can be cancelled with gst_rtsp_connection_flush().
5033  *
5034  * Returns: #GST_RTSP_OK on success.
5035  *
5036  * Deprecated: 1.18
5037  */
5038 GstRTSPResult
5039 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
5040     GTimeVal * timeout)
5041 {
5042   return gst_rtsp_connection_send_usec (conn, message, TV_TO_USEC (timeout));
5043 }
5044
5045 /**
5046  * gst_rtsp_connection_send_messages:
5047  * @conn: a #GstRTSPConnection
5048  * @messages: (array length=n_messages): the messages to send
5049  * @n_messages: the number of messages to send
5050  * @timeout: a timeout value or %NULL
5051  *
5052  * Attempt to send @messages to the connected @conn, blocking up to
5053  * the specified @timeout. @timeout can be %NULL, in which case this function
5054  * might block forever.
5055  *
5056  * This function can be cancelled with gst_rtsp_connection_flush().
5057  *
5058  * Returns: #GST_RTSP_OK on success.
5059  *
5060  * Since: 1.16
5061  * Deprecated: 1.18
5062  */
5063 GstRTSPResult
5064 gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
5065     GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
5066 {
5067   return gst_rtsp_connection_send_messages_usec (conn, messages, n_messages,
5068       TV_TO_USEC (timeout));
5069 }
5070
5071 /**
5072  * gst_rtsp_connection_receive:
5073  * @conn: a #GstRTSPConnection
5074  * @message: the message to read
5075  * @timeout: a timeout value or %NULL
5076  *
5077  * Attempt to read into @message from the connected @conn, blocking up to
5078  * the specified @timeout. @timeout can be %NULL, in which case this function
5079  * might block forever.
5080  *
5081  * This function can be cancelled with gst_rtsp_connection_flush().
5082  *
5083  * Returns: #GST_RTSP_OK on success.
5084  *
5085  * Deprecated: 1.18
5086  */
5087 GstRTSPResult
5088 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
5089     GTimeVal * timeout)
5090 {
5091   return gst_rtsp_connection_receive_usec (conn, message, TV_TO_USEC (timeout));
5092 }
5093
5094 /**
5095  * gst_rtsp_connection_poll:
5096  * @conn: a #GstRTSPConnection
5097  * @events: a bitmask of #GstRTSPEvent flags to check
5098  * @revents: location for result flags
5099  * @timeout: a timeout
5100  *
5101  * Wait up to the specified @timeout for the connection to become available for
5102  * at least one of the operations specified in @events. When the function returns
5103  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
5104  * @conn.
5105  *
5106  * @timeout can be %NULL, in which case this function might block forever.
5107  *
5108  * This function can be cancelled with gst_rtsp_connection_flush().
5109  *
5110  * Returns: #GST_RTSP_OK on success.
5111  *
5112  * Deprecated: 1.18
5113  */
5114 GstRTSPResult
5115 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
5116     GstRTSPEvent * revents, GTimeVal * timeout)
5117 {
5118   return gst_rtsp_connection_poll_usec (conn, events, revents,
5119       TV_TO_USEC (timeout));
5120 }
5121
5122 /**
5123  * gst_rtsp_connection_next_timeout:
5124  * @conn: a #GstRTSPConnection
5125  * @timeout: a timeout
5126  *
5127  * Calculate the next timeout for @conn, storing the result in @timeout.
5128  *
5129  * Returns: #GST_RTSP_OK.
5130  *
5131  * Deprecated: 1.18
5132  */
5133 GstRTSPResult
5134 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
5135 {
5136   gint64 tmptimeout = 0;
5137
5138   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
5139
5140   tmptimeout = gst_rtsp_connection_next_timeout_usec (conn);
5141
5142   timeout->tv_sec = tmptimeout / G_USEC_PER_SEC;
5143   timeout->tv_usec = tmptimeout % G_USEC_PER_SEC;
5144
5145   return GST_RTSP_OK;
5146 }
5147
5148
5149 /**
5150  * gst_rtsp_watch_wait_backlog:
5151  * @watch: a #GstRTSPWatch
5152  * @timeout: a GTimeVal timeout
5153  *
5154  * Wait until there is place in the backlog queue, @timeout is reached
5155  * or @watch is set to flushing.
5156  *
5157  * If @timeout is %NULL this function can block forever. If @timeout
5158  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
5159  * after the timeout expired.
5160  *
5161  * The typically use of this function is when gst_rtsp_watch_write_data
5162  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
5163  * free space in the backlog queue and try again.
5164  *
5165  * Returns: %GST_RTSP_OK when if there is room in queue.
5166  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
5167  *          %GST_RTSP_EINTR when @watch is flushing
5168  *          %GST_RTSP_EINVAL when called with invalid parameters.
5169  *
5170  * Since: 1.4
5171  * Deprecated: 1.18
5172  */
5173 GstRTSPResult
5174 gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout)
5175 {
5176   return gst_rtsp_watch_wait_backlog_usec (watch, TV_TO_USEC (timeout));
5177 }
5178
5179 G_GNUC_END_IGNORE_DEPRECATIONS
5180 #endif /* GST_DISABLE_DEPRECATED */