gstrtspconnection: Add support to ignore x-server header reply
[platform/upstream/gst-plugins-base.git] / gst-libs / gst / rtsp / gstrtspconnection.c
1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42
43 /**
44  * SECTION:gstrtspconnection
45  * @title: GstRTSPConnection
46  * @short_description: manage RTSP connections
47  * @see_also: gstrtspurl
48  *
49  * This object manages the RTSP connection to the server. It provides function
50  * to receive and send bytes and messages.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #  include <config.h>
55 #endif
56
57 #include <stdio.h>
58 #include <errno.h>
59 #include <stdlib.h>
60 #include <string.h>
61 #include <time.h>
62
63 /* we include this here to get the G_OS_* defines */
64 #include <glib.h>
65 #include <gst/gst.h>
66 #include <gst/base/base.h>
67
68 /* necessary for IP_TOS define */
69 #include <gio/gnetworking.h>
70
71 #include "gstrtspconnection.h"
72
73 #ifdef IP_TOS
74 union gst_sockaddr
75 {
76   struct sockaddr sa;
77   struct sockaddr_in sa_in;
78   struct sockaddr_in6 sa_in6;
79   struct sockaddr_storage sa_stor;
80 };
81 #endif
82
83 typedef struct
84 {
85   gint state;
86   guint save;
87   guchar out[3];                /* the size must be evenly divisible by 3 */
88   guint cout;
89   guint coutl;
90 } DecodeCtx;
91
92 typedef struct
93 {
94   /* If %TRUE we only own data and none of the
95    * other fields
96    */
97   gboolean borrowed;
98
99   /* Header or full message */
100   guint8 *data;
101   guint data_size;
102   gboolean data_is_data_header;
103
104   /* Payload following data, if any */
105   guint8 *body_data;
106   guint body_data_size;
107   /* or */
108   GstBuffer *body_buffer;
109
110   /* DATA packet header statically allocated for above */
111   guint8 data_header[4];
112
113   /* all below only for async writing */
114
115   guint data_offset;            /* == data_size when done */
116   guint body_offset;            /* into body_data or the buffer */
117
118   /* ID of the message for notification */
119   guint id;
120 } GstRTSPSerializedMessage;
121
122 static void
123 gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
124 {
125   if (!msg->borrowed) {
126     g_free (msg->body_data);
127     gst_buffer_replace (&msg->body_buffer, NULL);
128   }
129   g_free (msg->data);
130 }
131
132 #ifdef MSG_NOSIGNAL
133 #define SEND_FLAGS MSG_NOSIGNAL
134 #else
135 #define SEND_FLAGS 0
136 #endif
137
138 typedef enum
139 {
140   TUNNEL_STATE_NONE,
141   TUNNEL_STATE_GET,
142   TUNNEL_STATE_POST,
143   TUNNEL_STATE_COMPLETE
144 } GstRTSPTunnelState;
145
146 #define TUNNELID_LEN   24
147
148 struct _GstRTSPConnection
149 {
150   /*< private > */
151   /* URL for the remote connection */
152   GstRTSPUrl *url;
153   GstRTSPVersion version;
154
155   gboolean server;
156   GSocketClient *client;
157   GIOStream *stream0;
158   GIOStream *stream1;
159
160   GInputStream *input_stream;
161   GOutputStream *output_stream;
162   /* this is a read source we add on the write socket in tunneled mode to be
163    * able to detect when client disconnects the GET channel */
164   GInputStream *control_stream;
165
166   /* connection state */
167   GSocket *read_socket;
168   GSocket *write_socket;
169   GSocket *socket0, *socket1;
170   gboolean manual_http;
171   gboolean may_cancel;
172   GCancellable *cancellable;
173
174   gchar tunnelid[TUNNELID_LEN];
175   gboolean tunneled;
176   gboolean ignore_x_server_reply;
177   GstRTSPTunnelState tstate;
178
179   /* the remote and local ip */
180   gchar *remote_ip;
181   gchar *local_ip;
182
183   gint read_ahead;
184
185   gchar *initial_buffer;
186   gsize initial_buffer_offset;
187
188   gboolean remember_session_id; /* remember the session id or not */
189
190   /* Session state */
191   gint cseq;                    /* sequence number */
192   gchar session_id[512];        /* session id */
193   gint timeout;                 /* session timeout in seconds */
194   GTimer *timer;                /* timeout timer */
195
196   /* Authentication */
197   GstRTSPAuthMethod auth_method;
198   gchar *username;
199   gchar *passwd;
200   GHashTable *auth_params;
201
202   guint content_length_limit;
203
204   /* TLS */
205   GTlsDatabase *tls_database;
206   GTlsInteraction *tls_interaction;
207
208   GstRTSPConnectionAcceptCertificateFunc accept_certificate_func;
209   GDestroyNotify accept_certificate_destroy_notify;
210   gpointer accept_certificate_user_data;
211
212   DecodeCtx ctx;
213   DecodeCtx *ctxp;
214
215   gchar *proxy_host;
216   guint proxy_port;
217 };
218
219 enum
220 {
221   STATE_START = 0,
222   STATE_DATA_HEADER,
223   STATE_DATA_BODY,
224   STATE_READ_LINES,
225   STATE_END,
226   STATE_LAST
227 };
228
229 enum
230 {
231   READ_AHEAD_EOH = -1,          /* end of headers */
232   READ_AHEAD_CRLF = -2,
233   READ_AHEAD_CRLFCR = -3
234 };
235
236 /* a structure for constructing RTSPMessages */
237 typedef struct
238 {
239   gint state;
240   GstRTSPResult status;
241   guint8 buffer[4096];
242   guint offset;
243
244   guint line;
245   guint8 *body_data;
246   guint body_len;
247 } GstRTSPBuilder;
248
249 /* function prototypes */
250 static void add_auth_header (GstRTSPConnection * conn,
251     GstRTSPMessage * message);
252
253 static void
254 build_reset (GstRTSPBuilder * builder)
255 {
256   g_free (builder->body_data);
257   memset (builder, 0, sizeof (GstRTSPBuilder));
258 }
259
260 static GstRTSPResult
261 gst_rtsp_result_from_g_io_error (GError * error, GstRTSPResult default_res)
262 {
263   if (error == NULL)
264     return GST_RTSP_OK;
265
266   if (error->domain != G_IO_ERROR)
267     return default_res;
268
269   switch (error->code) {
270     case G_IO_ERROR_TIMED_OUT:
271       return GST_RTSP_ETIMEOUT;
272     case G_IO_ERROR_INVALID_ARGUMENT:
273       return GST_RTSP_EINVAL;
274     case G_IO_ERROR_CANCELLED:
275     case G_IO_ERROR_WOULD_BLOCK:
276       return GST_RTSP_EINTR;
277     default:
278       return default_res;
279   }
280 }
281
282 static gboolean
283 tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
284     GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
285 {
286   GError *error = NULL;
287   gboolean accept = FALSE;
288
289   if (rtspconn->tls_database) {
290     GSocketConnectable *peer_identity;
291     GTlsCertificateFlags validation_flags;
292
293     GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
294
295     peer_identity =
296         g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
297         (conn));
298
299     errors =
300         g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
301         G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
302         g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
303         NULL, &error);
304
305     if (error)
306       goto verify_error;
307
308     validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
309
310     accept = ((errors & validation_flags) == 0);
311     if (accept)
312       GST_DEBUG ("Peer certificate accepted");
313     else
314       GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
315   }
316
317   if (!accept && rtspconn->accept_certificate_func) {
318     accept =
319         rtspconn->accept_certificate_func (conn, peer_cert, errors,
320         rtspconn->accept_certificate_user_data);
321     GST_DEBUG ("Peer certificate %saccepted by accept-certificate function",
322         accept ? "" : "not ");
323   }
324
325   return accept;
326
327 /* ERRORS */
328 verify_error:
329   {
330     GST_ERROR ("An error occurred while verifying the peer certificate: %s",
331         error->message);
332     g_clear_error (&error);
333     return FALSE;
334   }
335 }
336
337 static void
338 socket_client_event (GSocketClient * client, GSocketClientEvent event,
339     GSocketConnectable * connectable, GTlsConnection * connection,
340     GstRTSPConnection * rtspconn)
341 {
342   if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
343     GST_DEBUG ("TLS handshaking about to start...");
344
345     g_signal_connect (connection, "accept-certificate",
346         (GCallback) tls_accept_certificate, rtspconn);
347
348     g_tls_connection_set_interaction (connection, rtspconn->tls_interaction);
349   }
350 }
351
352 /**
353  * gst_rtsp_connection_create:
354  * @url: a #GstRTSPUrl
355  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
356  *
357  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
358  * The connection will not yet attempt to connect to @url, use
359  * gst_rtsp_connection_connect().
360  *
361  * A copy of @url will be made.
362  *
363  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
364  */
365 GstRTSPResult
366 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
367 {
368   GstRTSPConnection *newconn;
369
370   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
371   g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
372
373   newconn = g_new0 (GstRTSPConnection, 1);
374
375   newconn->may_cancel = TRUE;
376   newconn->cancellable = g_cancellable_new ();
377   newconn->client = g_socket_client_new ();
378
379   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
380     g_socket_client_set_tls (newconn->client, TRUE);
381
382   g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
383       newconn);
384
385   newconn->url = gst_rtsp_url_copy (url);
386   newconn->timer = g_timer_new ();
387   newconn->timeout = 60;
388   newconn->cseq = 1;            /* RFC 7826: "it is RECOMMENDED to start at 0.",
389                                    but some servers don't copy values <1 due to bugs. */
390
391   newconn->remember_session_id = TRUE;
392
393   newconn->auth_method = GST_RTSP_AUTH_NONE;
394   newconn->username = NULL;
395   newconn->passwd = NULL;
396   newconn->auth_params = NULL;
397   newconn->version = 0;
398
399   newconn->content_length_limit = G_MAXUINT;
400
401   *conn = newconn;
402
403   return GST_RTSP_OK;
404 }
405
406 static gboolean
407 collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
408     gboolean remote, GError ** error)
409 {
410   GSocketAddress *addr;
411
412   if (remote)
413     addr = g_socket_get_remote_address (socket, error);
414   else
415     addr = g_socket_get_local_address (socket, error);
416   if (!addr)
417     return FALSE;
418
419   if (ip)
420     *ip = g_inet_address_to_string (g_inet_socket_address_get_address
421         (G_INET_SOCKET_ADDRESS (addr)));
422   if (port)
423     *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
424
425   g_object_unref (addr);
426
427   return TRUE;
428 }
429
430
431 /**
432  * gst_rtsp_connection_create_from_socket:
433  * @socket: a #GSocket
434  * @ip: the IP address of the other end
435  * @port: the port used by the other end
436  * @initial_buffer: data already read from @fd
437  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
438  *
439  * Create a new #GstRTSPConnection for handling communication on the existing
440  * socket @socket. The @initial_buffer contains zero terminated data already
441  * read from @socket which should be used before starting to read new data.
442  *
443  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
444  */
445 /* FIXME 2.0 We don't need the ip and port since they can be got from the
446  * GSocket */
447 GstRTSPResult
448 gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
449     guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn)
450 {
451   GstRTSPConnection *newconn = NULL;
452   GstRTSPUrl *url;
453   GstRTSPResult res;
454   GError *err = NULL;
455   gchar *local_ip;
456   GIOStream *stream;
457
458   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
459   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
460   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
461
462   if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
463     goto getnameinfo_failed;
464
465   /* create a url for the client address */
466   url = g_new0 (GstRTSPUrl, 1);
467   url->host = g_strdup (ip);
468   url->port = port;
469
470   /* now create the connection object */
471   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
472   gst_rtsp_url_free (url);
473
474   stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
475
476   /* both read and write initially */
477   newconn->server = TRUE;
478   newconn->socket0 = socket;
479   newconn->stream0 = stream;
480   newconn->write_socket = newconn->read_socket = newconn->socket0;
481   newconn->input_stream = g_io_stream_get_input_stream (stream);
482   newconn->output_stream = g_io_stream_get_output_stream (stream);
483   newconn->control_stream = NULL;
484   newconn->remote_ip = g_strdup (ip);
485   newconn->local_ip = local_ip;
486   newconn->initial_buffer = g_strdup (initial_buffer);
487
488   *conn = newconn;
489
490   return GST_RTSP_OK;
491
492   /* ERRORS */
493 getnameinfo_failed:
494   {
495     GST_ERROR ("failed to get local address: %s", err->message);
496     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
497     g_clear_error (&err);
498     return res;
499   }
500 newconn_failed:
501   {
502     GST_ERROR ("failed to make connection");
503     g_free (local_ip);
504     gst_rtsp_url_free (url);
505     return res;
506   }
507 }
508
509 /**
510  * gst_rtsp_connection_accept:
511  * @socket: a socket
512  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
513  * @cancellable: a #GCancellable to cancel the operation
514  *
515  * Accept a new connection on @socket and create a new #GstRTSPConnection for
516  * handling communication on new socket.
517  *
518  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
519  */
520 GstRTSPResult
521 gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
522     GCancellable * cancellable)
523 {
524   GError *err = NULL;
525   gchar *ip;
526   guint16 port;
527   GSocket *client_sock;
528   GstRTSPResult ret;
529
530   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
531   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
532
533   client_sock = g_socket_accept (socket, cancellable, &err);
534   if (!client_sock)
535     goto accept_failed;
536
537   /* get the remote ip address and port */
538   if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
539     goto getnameinfo_failed;
540
541   ret =
542       gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
543       conn);
544   g_object_unref (client_sock);
545   g_free (ip);
546
547   return ret;
548
549   /* ERRORS */
550 accept_failed:
551   {
552     GST_DEBUG ("Accepting client failed: %s", err->message);
553     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
554     g_clear_error (&err);
555     return ret;
556   }
557 getnameinfo_failed:
558   {
559     GST_DEBUG ("getnameinfo failed: %s", err->message);
560     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ERROR);
561     g_clear_error (&err);
562     if (!g_socket_close (client_sock, &err)) {
563       GST_DEBUG ("Closing socket failed: %s", err->message);
564       g_clear_error (&err);
565     }
566     g_object_unref (client_sock);
567     return ret;
568   }
569 }
570
571 /**
572  * gst_rtsp_connection_get_tls:
573  * @conn: a #GstRTSPConnection
574  * @error: #GError for error reporting, or NULL to ignore.
575  *
576  * Get the TLS connection of @conn.
577  *
578  * For client side this will return the #GTlsClientConnection when connected
579  * over TLS.
580  *
581  * For server side connections, this function will create a GTlsServerConnection
582  * when called the first time and will return that same connection on subsequent
583  * calls. The server is then responsible for configuring the TLS connection.
584  *
585  * Returns: (transfer none): the TLS connection for @conn.
586  *
587  * Since: 1.2
588  */
589 GTlsConnection *
590 gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
591 {
592   GTlsConnection *result;
593
594   if (G_IS_TLS_CONNECTION (conn->stream0)) {
595     /* we already had one, return it */
596     result = G_TLS_CONNECTION (conn->stream0);
597   } else if (conn->server) {
598     /* no TLS connection but we are server, make one */
599     result = (GTlsConnection *)
600         g_tls_server_connection_new (conn->stream0, NULL, error);
601     if (result) {
602       g_object_unref (conn->stream0);
603       conn->stream0 = G_IO_STREAM (result);
604       conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
605       conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
606     }
607   } else {
608     /* client */
609     result = NULL;
610     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
611         "client not connected with TLS");
612   }
613   return result;
614 }
615
616 /**
617  * gst_rtsp_connection_set_tls_validation_flags:
618  * @conn: a #GstRTSPConnection
619  * @flags: the validation flags.
620  *
621  * Sets the TLS validation flags to be used to verify the peer
622  * certificate when a TLS connection is established.
623  *
624  * Returns: TRUE if the validation flags are set correctly, or FALSE if
625  * @conn is NULL or is not a TLS connection.
626  *
627  * Since: 1.2.1
628  */
629 gboolean
630 gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
631     GTlsCertificateFlags flags)
632 {
633   gboolean res = FALSE;
634
635   g_return_val_if_fail (conn != NULL, FALSE);
636
637   res = g_socket_client_get_tls (conn->client);
638   if (res)
639     g_socket_client_set_tls_validation_flags (conn->client, flags);
640
641   return res;
642 }
643
644 /**
645  * gst_rtsp_connection_get_tls_validation_flags:
646  * @conn: a #GstRTSPConnection
647  *
648  * Gets the TLS validation flags used to verify the peer certificate
649  * when a TLS connection is established.
650  *
651  * Returns: the validationg flags.
652  *
653  * Since: 1.2.1
654  */
655 GTlsCertificateFlags
656 gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
657 {
658   g_return_val_if_fail (conn != NULL, 0);
659
660   return g_socket_client_get_tls_validation_flags (conn->client);
661 }
662
663 /**
664  * gst_rtsp_connection_set_tls_database:
665  * @conn: a #GstRTSPConnection
666  * @database: a #GTlsDatabase
667  *
668  * Sets the anchor certificate authorities database. This certificate
669  * database will be used to verify the server's certificate in case it
670  * can't be verified with the default certificate database first.
671  *
672  * Since: 1.4
673  */
674 void
675 gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
676     GTlsDatabase * database)
677 {
678   GTlsDatabase *old_db;
679
680   g_return_if_fail (conn != NULL);
681
682   if (database)
683     g_object_ref (database);
684
685   old_db = conn->tls_database;
686   conn->tls_database = database;
687
688   if (old_db)
689     g_object_unref (old_db);
690 }
691
692 /**
693  * gst_rtsp_connection_get_tls_database:
694  * @conn: a #GstRTSPConnection
695  *
696  * Gets the anchor certificate authorities database that will be used
697  * after a server certificate can't be verified with the default
698  * certificate database.
699  *
700  * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
701  * database has been previously set. Use g_object_unref() to release the
702  * certificate database.
703  *
704  * Since: 1.4
705  */
706 GTlsDatabase *
707 gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
708 {
709   GTlsDatabase *result;
710
711   g_return_val_if_fail (conn != NULL, NULL);
712
713   if ((result = conn->tls_database))
714     g_object_ref (result);
715
716   return result;
717 }
718
719 /**
720  * gst_rtsp_connection_set_tls_interaction:
721  * @conn: a #GstRTSPConnection
722  * @interaction: a #GTlsInteraction
723  *
724  * Sets a #GTlsInteraction object to be used when the connection or certificate
725  * database need to interact with the user. This will be used to prompt the
726  * user for passwords where necessary.
727  *
728  * Since: 1.6
729  */
730 void
731 gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn,
732     GTlsInteraction * interaction)
733 {
734   GTlsInteraction *old_interaction;
735
736   g_return_if_fail (conn != NULL);
737
738   if (interaction)
739     g_object_ref (interaction);
740
741   old_interaction = conn->tls_interaction;
742   conn->tls_interaction = interaction;
743
744   if (old_interaction)
745     g_object_unref (old_interaction);
746 }
747
748 /**
749  * gst_rtsp_connection_get_tls_interaction:
750  * @conn: a #GstRTSPConnection
751  *
752  * Gets a #GTlsInteraction object to be used when the connection or certificate
753  * database need to interact with the user. This will be used to prompt the
754  * user for passwords where necessary.
755  *
756  * Returns: (transfer full): a reference on the #GTlsInteraction. Use
757  * g_object_unref() to release.
758  *
759  * Since: 1.6
760  */
761 GTlsInteraction *
762 gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn)
763 {
764   GTlsInteraction *result;
765
766   g_return_val_if_fail (conn != NULL, NULL);
767
768   if ((result = conn->tls_interaction))
769     g_object_ref (result);
770
771   return result;
772 }
773
774 /**
775  * gst_rtsp_connection_set_accept_certificate_func:
776  * @conn: a #GstRTSPConnection
777  * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates
778  * @destroy_notify: #GDestroyNotify for @user_data
779  * @user_data: User data passed to @func
780  *
781  * Sets a custom accept-certificate function for checking certificates for
782  * validity. This will directly map to #GTlsConnection 's "accept-certificate"
783  * signal and be performed after the default checks of #GstRTSPConnection
784  * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
785  * have failed. If no #GTlsDatabase is set on this connection, only @func will
786  * be called.
787  *
788  * Since: 1.14
789  */
790 void
791 gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn,
792     GstRTSPConnectionAcceptCertificateFunc func,
793     gpointer user_data, GDestroyNotify destroy_notify)
794 {
795   if (conn->accept_certificate_destroy_notify)
796     conn->
797         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
798   conn->accept_certificate_func = func;
799   conn->accept_certificate_user_data = user_data;
800   conn->accept_certificate_destroy_notify = destroy_notify;
801 }
802
803 static gchar *
804 get_tunneled_connection_uri_strdup (GstRTSPUrl * url, guint16 port)
805 {
806   const gchar *pre_host = "";
807   const gchar *post_host = "";
808
809   if (url->family == GST_RTSP_FAM_INET6) {
810     pre_host = "[";
811     post_host = "]";
812   }
813
814   return g_strdup_printf ("http://%s%s%s:%d%s%s%s", pre_host, url->host,
815       post_host, port, url->abspath, url->query ? "?" : "",
816       url->query ? url->query : "");
817 }
818
819 static GstRTSPResult
820 setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
821     GstRTSPMessage * response)
822 {
823   gint i;
824   GstRTSPResult res;
825   gchar *value;
826   guint16 url_port;
827   GstRTSPMessage *msg;
828   gboolean old_http;
829   GstRTSPUrl *url;
830   GError *error = NULL;
831   GSocketConnection *connection;
832   GSocket *socket;
833   gchar *connection_uri = NULL;
834   gchar *request_uri = NULL;
835   gchar *host = NULL;
836
837   url = conn->url;
838
839   gst_rtsp_url_get_port (url, &url_port);
840   host = g_strdup_printf ("%s:%d", url->host, url_port);
841
842   /* create a random sessionid */
843   for (i = 0; i < TUNNELID_LEN; i++)
844     conn->tunnelid[i] = g_random_int_range ('a', 'z');
845   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
846
847   /* create the GET request for the read connection */
848   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
849       no_message);
850   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
851
852   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
853       conn->tunnelid);
854   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
855       "application/x-rtsp-tunnelled");
856   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
857   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
858   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
859
860   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
861    * request from being base64 encoded */
862   conn->tunneled = FALSE;
863   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
864       write_failed);
865   gst_rtsp_message_free (msg);
866   conn->tunneled = TRUE;
867
868   /* receive the response to the GET request */
869   /* we need to temporarily set manual_http to TRUE since
870    * gst_rtsp_connection_receive() will treat the HTTP response as a parsing
871    * failure otherwise */
872   old_http = conn->manual_http;
873   conn->manual_http = TRUE;
874   GST_RTSP_CHECK (gst_rtsp_connection_receive_usec (conn, response, timeout),
875       read_failed);
876   conn->manual_http = old_http;
877
878   if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE ||
879       response->type_data.response.code != GST_RTSP_STS_OK)
880     goto wrong_result;
881
882   if (!conn->ignore_x_server_reply &&
883       gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
884           &value, 0) == GST_RTSP_OK) {
885     g_free (url->host);
886     url->host = g_strdup (value);
887     g_free (conn->remote_ip);
888     conn->remote_ip = g_strdup (value);
889   }
890
891   connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
892
893   /* connect to the host/port */
894   if (conn->proxy_host) {
895     connection = g_socket_client_connect_to_host (conn->client,
896         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
897     request_uri = g_strdup (connection_uri);
898   } else {
899     connection = g_socket_client_connect_to_uri (conn->client,
900         connection_uri, 0, conn->cancellable, &error);
901     request_uri =
902         g_strdup_printf ("%s%s%s", url->abspath,
903         url->query ? "?" : "", url->query ? url->query : "");
904   }
905   if (connection == NULL)
906     goto connect_failed;
907
908   socket = g_socket_connection_get_socket (connection);
909
910   /* get remote address */
911   g_free (conn->remote_ip);
912   conn->remote_ip = NULL;
913
914   if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
915     goto remote_address_failed;
916
917   /* this is now our writing socket */
918   conn->stream1 = G_IO_STREAM (connection);
919   conn->socket1 = socket;
920   conn->write_socket = conn->socket1;
921   conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
922   conn->control_stream = NULL;
923
924   /* create the POST request for the write connection */
925   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST,
926           request_uri), no_message);
927   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
928
929   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
930       conn->tunnelid);
931   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
932       "application/x-rtsp-tunnelled");
933   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
934       "application/x-rtsp-tunnelled");
935   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
936   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
937   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES,
938       "Sun, 9 Jan 1972 00:00:00 GMT");
939   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767");
940   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
941
942   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
943    * request from being base64 encoded */
944   conn->tunneled = FALSE;
945   GST_RTSP_CHECK (gst_rtsp_connection_send_usec (conn, msg, timeout),
946       write_failed);
947   gst_rtsp_message_free (msg);
948   conn->tunneled = TRUE;
949
950 exit:
951   g_free (connection_uri);
952   g_free (request_uri);
953   g_free (host);
954
955   return res;
956
957   /* ERRORS */
958 no_message:
959   {
960     GST_ERROR ("failed to create request (%d)", res);
961     goto exit;
962   }
963 write_failed:
964   {
965     GST_ERROR ("write failed (%d)", res);
966     gst_rtsp_message_free (msg);
967     conn->tunneled = TRUE;
968     goto exit;
969   }
970 read_failed:
971   {
972     GST_ERROR ("read failed (%d)", res);
973     conn->manual_http = FALSE;
974     goto exit;
975   }
976 wrong_result:
977   {
978     GST_ERROR ("got failure response %d %s",
979         response->type_data.response.code, response->type_data.response.reason);
980     res = GST_RTSP_ERROR;
981     goto exit;
982   }
983 connect_failed:
984   {
985     GST_ERROR ("failed to connect: %s", error->message);
986     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
987     g_clear_error (&error);
988     goto exit;
989   }
990 remote_address_failed:
991   {
992     GST_ERROR ("failed to resolve address: %s", error->message);
993     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
994     g_object_unref (connection);
995     g_clear_error (&error);
996     return res;
997   }
998 }
999
1000 /**
1001  * gst_rtsp_connection_connect_with_response_usec:
1002  * @conn: a #GstRTSPConnection
1003  * @timeout: a timeout in microseconds
1004  * @response: a #GstRTSPMessage
1005  *
1006  * Attempt to connect to the url of @conn made with
1007  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1008  * forever. If @timeout contains a valid timeout, this function will return
1009  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
1010  * @response will contain a response to the tunneling request messages.
1011  *
1012  * This function can be cancelled with gst_rtsp_connection_flush().
1013  *
1014  * Returns: #GST_RTSP_OK when a connection could be made.
1015  *
1016  * Since: 1.18
1017  */
1018 GstRTSPResult
1019 gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
1020     gint64 timeout, GstRTSPMessage * response)
1021 {
1022   GstRTSPResult res;
1023   GSocketConnection *connection;
1024   GSocket *socket;
1025   GError *error = NULL;
1026   gchar *connection_uri, *request_uri, *remote_ip;
1027   GstClockTime to;
1028   guint16 url_port;
1029   GstRTSPUrl *url;
1030
1031   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1032   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
1033   g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
1034
1035   to = timeout * 1000;
1036   g_socket_client_set_timeout (conn->client,
1037       (to + GST_SECOND - 1) / GST_SECOND);
1038
1039   url = conn->url;
1040
1041   gst_rtsp_url_get_port (url, &url_port);
1042
1043   if (conn->tunneled) {
1044     connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
1045   } else {
1046     connection_uri = gst_rtsp_url_get_request_uri (url);
1047   }
1048
1049   if (conn->proxy_host) {
1050     connection = g_socket_client_connect_to_host (conn->client,
1051         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
1052     request_uri = g_strdup (connection_uri);
1053   } else {
1054     connection = g_socket_client_connect_to_uri (conn->client,
1055         connection_uri, url_port, conn->cancellable, &error);
1056
1057     /* use the relative component of the uri for non-proxy connections */
1058     request_uri = g_strdup_printf ("%s%s%s", url->abspath,
1059         url->query ? "?" : "", url->query ? url->query : "");
1060   }
1061   if (connection == NULL)
1062     goto connect_failed;
1063
1064   /* get remote address */
1065   socket = g_socket_connection_get_socket (connection);
1066
1067   if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
1068     goto remote_address_failed;
1069
1070   g_free (conn->remote_ip);
1071   conn->remote_ip = remote_ip;
1072   conn->stream0 = G_IO_STREAM (connection);
1073   conn->socket0 = socket;
1074   /* this is our read socket */
1075   conn->read_socket = conn->socket0;
1076   conn->write_socket = conn->socket0;
1077   conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
1078   conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
1079   conn->control_stream = NULL;
1080
1081   if (conn->tunneled) {
1082     res = setup_tunneling (conn, timeout, request_uri, response);
1083     if (res != GST_RTSP_OK)
1084       goto tunneling_failed;
1085   }
1086   g_free (connection_uri);
1087   g_free (request_uri);
1088
1089   return GST_RTSP_OK;
1090
1091   /* ERRORS */
1092 connect_failed:
1093   {
1094     GST_ERROR ("failed to connect: %s", error->message);
1095     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1096     g_clear_error (&error);
1097     g_free (connection_uri);
1098     g_free (request_uri);
1099     return res;
1100   }
1101 remote_address_failed:
1102   {
1103     GST_ERROR ("failed to connect: %s", error->message);
1104     res = gst_rtsp_result_from_g_io_error (error, GST_RTSP_ERROR);
1105     g_object_unref (connection);
1106     g_clear_error (&error);
1107     g_free (connection_uri);
1108     g_free (request_uri);
1109     return res;
1110   }
1111 tunneling_failed:
1112   {
1113     GST_ERROR ("failed to setup tunneling");
1114     g_free (connection_uri);
1115     g_free (request_uri);
1116     return res;
1117   }
1118 }
1119
1120 static void
1121 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
1122 {
1123   switch (conn->auth_method) {
1124     case GST_RTSP_AUTH_BASIC:{
1125       gchar *user_pass;
1126       gchar *user_pass64;
1127       gchar *auth_string;
1128
1129       if (conn->username == NULL || conn->passwd == NULL)
1130         break;
1131
1132       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1133       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1134       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1135
1136       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1137           auth_string);
1138
1139       g_free (user_pass);
1140       g_free (user_pass64);
1141       break;
1142     }
1143     case GST_RTSP_AUTH_DIGEST:{
1144       gchar *response;
1145       gchar *auth_string, *auth_string2;
1146       gchar *realm;
1147       gchar *nonce;
1148       gchar *opaque;
1149       const gchar *uri;
1150       const gchar *method;
1151
1152       /* we need to have some params set */
1153       if (conn->auth_params == NULL || conn->username == NULL ||
1154           conn->passwd == NULL)
1155         break;
1156
1157       /* we need the realm and nonce */
1158       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1159       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1160       if (realm == NULL || nonce == NULL)
1161         break;
1162
1163       method = gst_rtsp_method_as_text (message->type_data.request.method);
1164       uri = message->type_data.request.uri;
1165
1166       response =
1167           gst_rtsp_generate_digest_auth_response (NULL, method, realm,
1168           conn->username, conn->passwd, uri, nonce);
1169       auth_string =
1170           g_strdup_printf ("Digest username=\"%s\", "
1171           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1172           conn->username, realm, nonce, uri, response);
1173       g_free (response);
1174
1175       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1176       if (opaque) {
1177         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1178             opaque);
1179         g_free (auth_string);
1180         auth_string = auth_string2;
1181       }
1182       /* Do not keep any old Authorization headers */
1183       gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1);
1184       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1185           auth_string);
1186       break;
1187     }
1188     default:
1189       /* Nothing to do */
1190       break;
1191   }
1192 }
1193
1194 /**
1195  * gst_rtsp_connection_connect_usec:
1196  * @conn: a #GstRTSPConnection
1197  * @timeout: a timeout in microseconds
1198  *
1199  * Attempt to connect to the url of @conn made with
1200  * gst_rtsp_connection_create(). If @timeout is 0 this function can block
1201  * forever. If @timeout contains a valid timeout, this function will return
1202  * #GST_RTSP_ETIMEOUT after the timeout expired.
1203  *
1204  * This function can be cancelled with gst_rtsp_connection_flush().
1205  *
1206  * Returns: #GST_RTSP_OK when a connection could be made.
1207  *
1208  * Since: 1.18
1209  */
1210 GstRTSPResult
1211 gst_rtsp_connection_connect_usec (GstRTSPConnection * conn, gint64 timeout)
1212 {
1213   GstRTSPResult result;
1214   GstRTSPMessage response;
1215
1216   memset (&response, 0, sizeof (response));
1217   gst_rtsp_message_init (&response);
1218
1219   result = gst_rtsp_connection_connect_with_response_usec (conn, timeout,
1220       &response);
1221
1222   gst_rtsp_message_unset (&response);
1223
1224   return result;
1225 }
1226
1227 static void
1228 gen_date_string (gchar * date_string, guint len)
1229 {
1230   static const char wkdays[7][4] =
1231       { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
1232   static const char months[12][4] =
1233       { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
1234     "Nov", "Dec"
1235   };
1236   struct tm tm;
1237   time_t t;
1238
1239   time (&t);
1240
1241 #ifdef HAVE_GMTIME_R
1242   gmtime_r (&t, &tm);
1243 #else
1244   tm = *gmtime (&t);
1245 #endif
1246
1247   g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
1248       wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
1249       tm.tm_hour, tm.tm_min, tm.tm_sec);
1250 }
1251
1252 static GstRTSPResult
1253 write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
1254     guint size, gboolean block, GCancellable * cancellable)
1255 {
1256   guint left;
1257   gssize r;
1258   GstRTSPResult res;
1259   GError *err = NULL;
1260
1261   if (G_UNLIKELY (*idx > size))
1262     return GST_RTSP_ERROR;
1263
1264   left = size - *idx;
1265
1266   while (left) {
1267     if (block)
1268       r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
1269           cancellable, &err);
1270     else
1271       r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
1272           (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
1273     if (G_UNLIKELY (r < 0))
1274       goto error;
1275
1276     left -= r;
1277     *idx += r;
1278   }
1279   return GST_RTSP_OK;
1280
1281   /* ERRORS */
1282 error:
1283   {
1284     if (G_UNLIKELY (r == 0))
1285       return GST_RTSP_EEOF;
1286
1287     if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1288       GST_WARNING ("%s", err->message);
1289     else
1290       GST_DEBUG ("%s", err->message);
1291
1292     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1293     g_clear_error (&err);
1294     return res;
1295   }
1296 }
1297
1298 /* NOTE: This changes the values of vectors if multiple iterations are needed! */
1299 #if GLIB_CHECK_VERSION(2, 59, 1)
1300 static GstRTSPResult
1301 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1302     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1303 {
1304   gsize _bytes_written = 0;
1305   gsize written;
1306   GstRTSPResult ret;
1307   GError *err = NULL;
1308   GPollableReturn res = G_POLLABLE_RETURN_OK;
1309
1310   while (n_vectors > 0) {
1311     if (block) {
1312       if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors,
1313                   &written, cancellable, &err))) {
1314         /* This will never return G_IO_ERROR_WOULD_BLOCK */
1315         res = G_POLLABLE_RETURN_FAILED;
1316         goto error;
1317       }
1318     } else {
1319       res =
1320           g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
1321           (stream), vectors, n_vectors, &written, cancellable, &err);
1322
1323       if (res != G_POLLABLE_RETURN_OK) {
1324         g_assert (written == 0);
1325         goto error;
1326       }
1327     }
1328     _bytes_written += written;
1329
1330     /* skip vectors that have been written in full */
1331     while (written > 0 && written >= vectors[0].size) {
1332       written -= vectors[0].size;
1333       ++vectors;
1334       --n_vectors;
1335     }
1336
1337     /* skip partially written vector data */
1338     if (written > 0) {
1339       vectors[0].size -= written;
1340       vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
1341     }
1342   }
1343
1344   *bytes_written = _bytes_written;
1345
1346   return GST_RTSP_OK;
1347
1348   /* ERRORS */
1349 error:
1350   {
1351     *bytes_written = _bytes_written;
1352
1353     if (err)
1354       GST_WARNING ("%s", err->message);
1355     if (res == G_POLLABLE_RETURN_WOULD_BLOCK) {
1356       g_assert (!err);
1357       return GST_RTSP_EINTR;
1358     } else if (G_UNLIKELY (written == 0)) {
1359       g_clear_error (&err);
1360       return GST_RTSP_EEOF;
1361     }
1362
1363     ret = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1364     g_clear_error (&err);
1365     return ret;
1366   }
1367 }
1368 #else
1369 static GstRTSPResult
1370 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1371     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1372 {
1373   gsize _bytes_written = 0;
1374   guint written;
1375   gint i;
1376   GstRTSPResult res = GST_RTSP_OK;
1377
1378   for (i = 0; i < n_vectors; i++) {
1379     written = 0;
1380     res =
1381         write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
1382         block, cancellable);
1383     _bytes_written += written;
1384     if (G_UNLIKELY (res != GST_RTSP_OK))
1385       break;
1386   }
1387
1388   *bytes_written = _bytes_written;
1389
1390   return res;
1391 }
1392 #endif
1393
1394 static gint
1395 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1396     gboolean block, GError ** err)
1397 {
1398   gint out = 0;
1399
1400   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1401     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1402
1403     out = MIN (left, size);
1404     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1405
1406     if (left == (gsize) out) {
1407       g_free (conn->initial_buffer);
1408       conn->initial_buffer = NULL;
1409       conn->initial_buffer_offset = 0;
1410     } else
1411       conn->initial_buffer_offset += out;
1412   }
1413
1414   if (G_LIKELY (size > (guint) out)) {
1415     gssize r;
1416     gsize count = size - out;
1417     if (block)
1418       r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
1419           count, conn->may_cancel ? conn->cancellable : NULL, err);
1420     else
1421       r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
1422           (conn->input_stream), (gchar *) & buffer[out], count,
1423           conn->may_cancel ? conn->cancellable : NULL, err);
1424
1425     if (G_UNLIKELY (r < 0)) {
1426       if (out == 0) {
1427         /* propagate the error */
1428         out = r;
1429       } else {
1430         /* we have some data ignore error */
1431         g_clear_error (err);
1432       }
1433     } else
1434       out += r;
1435   }
1436
1437   return out;
1438 }
1439
1440 static gint
1441 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1442     gboolean block, GError ** err)
1443 {
1444   DecodeCtx *ctx = conn->ctxp;
1445   gint out = 0;
1446
1447   if (ctx) {
1448     while (size > 0) {
1449       guint8 in[sizeof (ctx->out) * 4 / 3];
1450       gint r;
1451
1452       while (size > 0 && ctx->cout < ctx->coutl) {
1453         /* we have some leftover bytes */
1454         *buffer++ = ctx->out[ctx->cout++];
1455         size--;
1456         out++;
1457       }
1458
1459       /* got what we needed? */
1460       if (size == 0)
1461         break;
1462
1463       /* try to read more bytes */
1464       r = fill_raw_bytes (conn, in, sizeof (in), block, err);
1465       if (r <= 0) {
1466         if (out == 0) {
1467           out = r;
1468         } else {
1469           /* we have some data ignore error */
1470           g_clear_error (err);
1471         }
1472         break;
1473       }
1474
1475       ctx->cout = 0;
1476       ctx->coutl =
1477           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1478           &ctx->save);
1479     }
1480   } else {
1481     out = fill_raw_bytes (conn, buffer, size, block, err);
1482   }
1483
1484   return out;
1485 }
1486
1487 static GstRTSPResult
1488 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1489     gboolean block)
1490 {
1491   guint left;
1492   gint r;
1493   GstRTSPResult res;
1494   GError *err = NULL;
1495
1496   if (G_UNLIKELY (*idx > size))
1497     return GST_RTSP_ERROR;
1498
1499   left = size - *idx;
1500
1501   while (left) {
1502     r = fill_bytes (conn, &buffer[*idx], left, block, &err);
1503     if (G_UNLIKELY (r <= 0))
1504       goto error;
1505
1506     left -= r;
1507     *idx += r;
1508   }
1509   return GST_RTSP_OK;
1510
1511   /* ERRORS */
1512 error:
1513   {
1514     if (G_UNLIKELY (r == 0))
1515       return GST_RTSP_EEOF;
1516
1517     GST_DEBUG ("%s", err->message);
1518     res = gst_rtsp_result_from_g_io_error (err, GST_RTSP_ESYS);
1519     g_clear_error (&err);
1520     return res;
1521   }
1522 }
1523
1524 /* The code below tries to handle clients using \r, \n or \r\n to indicate the
1525  * end of a line. It even does its best to handle clients which mix them (even
1526  * though this is a really stupid idea (tm).) It also handles Line White Space
1527  * (LWS), where a line end followed by whitespace is considered LWS. This is
1528  * the method used in RTSP (and HTTP) to break long lines.
1529  */
1530 static GstRTSPResult
1531 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1532     gboolean block)
1533 {
1534   GstRTSPResult res;
1535
1536   while (TRUE) {
1537     guint8 c;
1538     guint i;
1539
1540     if (conn->read_ahead == READ_AHEAD_EOH) {
1541       /* the last call to read_line() already determined that we have reached
1542        * the end of the headers, so convey that information now */
1543       conn->read_ahead = 0;
1544       break;
1545     } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1546       /* the last call to read_line() left off after having read \r\n */
1547       c = '\n';
1548     } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1549       /* the last call to read_line() left off after having read \r\n\r */
1550       c = '\r';
1551     } else if (conn->read_ahead != 0) {
1552       /* the last call to read_line() left us with a character to start with */
1553       c = (guint8) conn->read_ahead;
1554       conn->read_ahead = 0;
1555     } else {
1556       /* read the next character */
1557       i = 0;
1558       res = read_bytes (conn, &c, &i, 1, block);
1559       if (G_UNLIKELY (res != GST_RTSP_OK))
1560         return res;
1561     }
1562
1563     /* special treatment of line endings */
1564     if (c == '\r' || c == '\n') {
1565       guint8 read_ahead;
1566
1567     retry:
1568       /* need to read ahead one more character to know what to do... */
1569       i = 0;
1570       res = read_bytes (conn, &read_ahead, &i, 1, block);
1571       if (G_UNLIKELY (res != GST_RTSP_OK))
1572         return res;
1573
1574       if (read_ahead == ' ' || read_ahead == '\t') {
1575         if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1576           /* got \r\n\r followed by whitespace, treat it as a normal line
1577            * followed by one starting with LWS */
1578           conn->read_ahead = read_ahead;
1579           break;
1580         } else {
1581           /* got LWS, change the line ending to a space and continue */
1582           c = ' ';
1583           conn->read_ahead = read_ahead;
1584         }
1585       } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1586         if (read_ahead == '\r' || read_ahead == '\n') {
1587           /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */
1588           conn->read_ahead = READ_AHEAD_EOH;
1589           break;
1590         } else {
1591           /* got \r\n\r followed by something else, this is not really
1592            * supported since we have probably just eaten the first character
1593            * of the body or the next message, so just ignore the second \r
1594            * and live with it... */
1595           conn->read_ahead = read_ahead;
1596           break;
1597         }
1598       } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1599         if (read_ahead == '\r') {
1600           /* got \r\n\r so far, need one more character... */
1601           conn->read_ahead = READ_AHEAD_CRLFCR;
1602           goto retry;
1603         } else if (read_ahead == '\n') {
1604           /* got \r\n\n, treat it as the end of the headers */
1605           conn->read_ahead = READ_AHEAD_EOH;
1606           break;
1607         } else {
1608           /* found the end of a line, keep read_ahead for the next line */
1609           conn->read_ahead = read_ahead;
1610           break;
1611         }
1612       } else if (c == read_ahead) {
1613         /* got double \r or \n, treat it as the end of the headers */
1614         conn->read_ahead = READ_AHEAD_EOH;
1615         break;
1616       } else if (c == '\r' && read_ahead == '\n') {
1617         /* got \r\n so far, still need more to know what to do... */
1618         conn->read_ahead = READ_AHEAD_CRLF;
1619         goto retry;
1620       } else {
1621         /* found the end of a line, keep read_ahead for the next line */
1622         conn->read_ahead = read_ahead;
1623         break;
1624       }
1625     }
1626
1627     if (G_LIKELY (*idx < size - 1))
1628       buffer[(*idx)++] = c;
1629   }
1630   buffer[*idx] = '\0';
1631
1632   return GST_RTSP_OK;
1633 }
1634
1635 /**
1636  * gst_rtsp_connection_write_usec:
1637  * @conn: a #GstRTSPConnection
1638  * @data: the data to write
1639  * @size: the size of @data
1640  * @timeout: a timeout value or 0
1641  *
1642  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1643  * the specified @timeout. @timeout can be 0, in which case this function
1644  * might block forever.
1645  *
1646  * This function can be cancelled with gst_rtsp_connection_flush().
1647  *
1648  * Returns: #GST_RTSP_OK on success.
1649  *
1650  * Since: 1.18
1651  */
1652 /* FIXME 2.0: This should've been static! */
1653 GstRTSPResult
1654 gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
1655     guint size, gint64 timeout)
1656 {
1657   guint offset;
1658   GstClockTime to;
1659   GstRTSPResult res;
1660
1661   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1662   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1663   g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
1664
1665   offset = 0;
1666
1667   to = timeout * 1000;
1668
1669   g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1670   res =
1671       write_bytes (conn->output_stream, data, &offset, size, TRUE,
1672       conn->cancellable);
1673   g_socket_set_timeout (conn->write_socket, 0);
1674
1675   return res;
1676 }
1677
1678 static gboolean
1679 serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
1680     GstRTSPSerializedMessage * serialized_message)
1681 {
1682   GString *str = NULL;
1683
1684   memset (serialized_message, 0, sizeof (*serialized_message));
1685
1686   /* Initially we borrow the body_data / body_buffer fields from
1687    * the message */
1688   serialized_message->borrowed = TRUE;
1689
1690   switch (message->type) {
1691     case GST_RTSP_MESSAGE_REQUEST:
1692       str = g_string_new ("");
1693
1694       /* create request string, add CSeq */
1695       g_string_append_printf (str, "%s %s RTSP/%s\r\n"
1696           "CSeq: %d\r\n",
1697           gst_rtsp_method_as_text (message->type_data.request.method),
1698           message->type_data.request.uri,
1699           gst_rtsp_version_as_text (message->type_data.request.version),
1700           conn->cseq++);
1701       /* add session id if we have one */
1702       if (conn->session_id[0] != '\0') {
1703         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1704         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1705             conn->session_id);
1706       }
1707       /* add any authentication headers */
1708       add_auth_header (conn, message);
1709       break;
1710     case GST_RTSP_MESSAGE_RESPONSE:
1711       str = g_string_new ("");
1712
1713       /* create response string */
1714       g_string_append_printf (str, "RTSP/%s %d %s\r\n",
1715           gst_rtsp_version_as_text (message->type_data.response.version),
1716           message->type_data.response.code, message->type_data.response.reason);
1717       break;
1718     case GST_RTSP_MESSAGE_HTTP_REQUEST:
1719       str = g_string_new ("");
1720
1721       /* create request string */
1722       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1723           gst_rtsp_method_as_text (message->type_data.request.method),
1724           message->type_data.request.uri,
1725           gst_rtsp_version_as_text (message->type_data.request.version));
1726       /* add any authentication headers */
1727       add_auth_header (conn, message);
1728       break;
1729     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1730       str = g_string_new ("");
1731
1732       /* create response string */
1733       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1734           gst_rtsp_version_as_text (message->type_data.request.version),
1735           message->type_data.response.code, message->type_data.response.reason);
1736       break;
1737     case GST_RTSP_MESSAGE_DATA:
1738     {
1739       guint8 *data_header = serialized_message->data_header;
1740
1741       /* prepare data header */
1742       data_header[0] = '$';
1743       data_header[1] = message->type_data.data.channel;
1744       data_header[2] = (message->body_size >> 8) & 0xff;
1745       data_header[3] = message->body_size & 0xff;
1746
1747       /* create serialized message with header and data */
1748       serialized_message->data_is_data_header = TRUE;
1749       serialized_message->data_size = 4;
1750
1751       if (message->body) {
1752         serialized_message->body_data = message->body;
1753         serialized_message->body_data_size = message->body_size;
1754       } else {
1755         g_assert (message->body_buffer != NULL);
1756         serialized_message->body_buffer = message->body_buffer;
1757       }
1758       break;
1759     }
1760     default:
1761       g_string_free (str, TRUE);
1762       g_return_val_if_reached (FALSE);
1763       break;
1764   }
1765
1766   /* append headers and body */
1767   if (message->type != GST_RTSP_MESSAGE_DATA) {
1768     gchar date_string[100];
1769
1770     g_assert (str != NULL);
1771
1772     gen_date_string (date_string, sizeof (date_string));
1773
1774     /* add date header */
1775     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1776     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1777
1778     /* append headers */
1779     gst_rtsp_message_append_headers (message, str);
1780
1781     /* append Content-Length and body if needed */
1782     if (message->body_size > 0) {
1783       gchar *len;
1784
1785       len = g_strdup_printf ("%d", message->body_size);
1786       g_string_append_printf (str, "%s: %s\r\n",
1787           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1788       g_free (len);
1789       /* header ends here */
1790       g_string_append (str, "\r\n");
1791
1792       if (message->body) {
1793         serialized_message->body_data = message->body;
1794         serialized_message->body_data_size = message->body_size;
1795       } else {
1796         g_assert (message->body_buffer != NULL);
1797         serialized_message->body_buffer = message->body_buffer;
1798       }
1799     } else {
1800       /* just end headers */
1801       g_string_append (str, "\r\n");
1802     }
1803
1804     serialized_message->data_size = str->len;
1805     serialized_message->data = (guint8 *) g_string_free (str, FALSE);
1806   }
1807
1808   return TRUE;
1809 }
1810
1811 /**
1812  * gst_rtsp_connection_send_usec:
1813  * @conn: a #GstRTSPConnection
1814  * @message: the message to send
1815  * @timeout: a timeout value in microseconds
1816  *
1817  * Attempt to send @message to the connected @conn, blocking up to
1818  * the specified @timeout. @timeout can be 0, in which case this function
1819  * might block forever.
1820  *
1821  * This function can be cancelled with gst_rtsp_connection_flush().
1822  *
1823  * Returns: #GST_RTSP_OK on success.
1824  *
1825  * Since: 1.18
1826  */
1827 GstRTSPResult
1828 gst_rtsp_connection_send_usec (GstRTSPConnection * conn,
1829     GstRTSPMessage * message, gint64 timeout)
1830 {
1831   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1832   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1833
1834   return gst_rtsp_connection_send_messages_usec (conn, message, 1, timeout);
1835 }
1836
1837 /**
1838  * gst_rtsp_connection_send_messages_usec:
1839  * @conn: a #GstRTSPConnection
1840  * @messages: (array length=n_messages): the messages to send
1841  * @n_messages: the number of messages to send
1842  * @timeout: a timeout value in microseconds
1843  *
1844  * Attempt to send @messages to the connected @conn, blocking up to
1845  * the specified @timeout. @timeout can be 0, in which case this function
1846  * might block forever.
1847  *
1848  * This function can be cancelled with gst_rtsp_connection_flush().
1849  *
1850  * Returns: #GST_RTSP_OK on Since.
1851  *
1852  * Since: 1.18
1853  */
1854 GstRTSPResult
1855 gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
1856     GstRTSPMessage * messages, guint n_messages, gint64 timeout)
1857 {
1858   GstClockTime to;
1859   GstRTSPResult res;
1860   GstRTSPSerializedMessage *serialized_messages;
1861   GOutputVector *vectors;
1862   GstMapInfo *map_infos;
1863   guint n_vectors, n_memories;
1864   gint i, j, k;
1865   gsize bytes_to_write, bytes_written;
1866
1867   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1868   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
1869
1870   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
1871   memset (serialized_messages, 0,
1872       sizeof (GstRTSPSerializedMessage) * n_messages);
1873
1874   for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
1875       i++) {
1876     if (G_UNLIKELY (!serialize_message (conn, &messages[i],
1877                 &serialized_messages[i])))
1878       goto no_message;
1879
1880     if (conn->tunneled) {
1881       gint state = 0, save = 0;
1882       gchar *base64_buffer, *out_buffer;
1883       gsize written = 0;
1884       gsize in_length;
1885
1886       in_length = serialized_messages[i].data_size;
1887       if (serialized_messages[i].body_data)
1888         in_length += serialized_messages[i].body_data_size;
1889       else if (serialized_messages[i].body_buffer)
1890         in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
1891
1892       in_length = (in_length / 3 + 1) * 4 + 4 + 1;
1893       base64_buffer = out_buffer = g_malloc0 (in_length);
1894
1895       written =
1896           g_base64_encode_step (serialized_messages[i].data_is_data_header ?
1897           serialized_messages[i].data_header : serialized_messages[i].data,
1898           serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
1899       out_buffer += written;
1900
1901       if (serialized_messages[i].body_data) {
1902         written =
1903             g_base64_encode_step (serialized_messages[i].body_data,
1904             serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
1905             &save);
1906         out_buffer += written;
1907       } else if (serialized_messages[i].body_buffer) {
1908         guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1909
1910         for (j = 0; j < n; j++) {
1911           GstMemory *mem =
1912               gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
1913           GstMapInfo map;
1914
1915           gst_memory_map (mem, &map, GST_MAP_READ);
1916
1917           written = g_base64_encode_step (map.data, map.size,
1918               FALSE, out_buffer, &state, &save);
1919           out_buffer += written;
1920
1921           gst_memory_unmap (mem, &map);
1922         }
1923       }
1924
1925       written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
1926       out_buffer += written;
1927
1928       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1929       memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
1930
1931       serialized_messages[i].data = (guint8 *) base64_buffer;
1932       serialized_messages[i].data_size = (out_buffer - base64_buffer);
1933       n_vectors++;
1934     } else {
1935       n_vectors++;
1936       if (serialized_messages[i].body_data) {
1937         n_vectors++;
1938       } else if (serialized_messages[i].body_buffer) {
1939         n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1940         n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1941       }
1942     }
1943   }
1944
1945   vectors = g_newa (GOutputVector, n_vectors);
1946   map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
1947
1948   for (i = 0, j = 0, k = 0; i < n_messages; i++) {
1949     vectors[j].buffer = serialized_messages[i].data_is_data_header ?
1950         serialized_messages[i].data_header : serialized_messages[i].data;
1951     vectors[j].size = serialized_messages[i].data_size;
1952     bytes_to_write += vectors[j].size;
1953     j++;
1954
1955     if (serialized_messages[i].body_data) {
1956       vectors[j].buffer = serialized_messages[i].body_data;
1957       vectors[j].size = serialized_messages[i].body_data_size;
1958       bytes_to_write += vectors[j].size;
1959       j++;
1960     } else if (serialized_messages[i].body_buffer) {
1961       gint l, n;
1962
1963       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1964       for (l = 0; l < n; l++) {
1965         GstMemory *mem =
1966             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1967
1968         gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
1969         vectors[j].buffer = map_infos[k].data;
1970         vectors[j].size = map_infos[k].size;
1971         bytes_to_write += vectors[j].size;
1972
1973         k++;
1974         j++;
1975       }
1976     }
1977   }
1978
1979   /* write request: this is synchronous */
1980   to = timeout * 1000;
1981
1982   g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1983   res =
1984       writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
1985       TRUE, conn->cancellable);
1986   g_socket_set_timeout (conn->write_socket, 0);
1987
1988   g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
1989
1990   /* free everything */
1991   for (i = 0, k = 0; i < n_messages; i++) {
1992     if (serialized_messages[i].body_buffer) {
1993       gint l, n;
1994
1995       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1996       for (l = 0; l < n; l++) {
1997         GstMemory *mem =
1998             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1999
2000         gst_memory_unmap (mem, &map_infos[k]);
2001         k++;
2002       }
2003     }
2004
2005     g_free (serialized_messages[i].data);
2006   }
2007
2008   return res;
2009
2010 no_message:
2011   {
2012     for (i = 0; i < n_messages; i++) {
2013       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
2014     }
2015     g_warning ("Wrong message");
2016     return GST_RTSP_EINVAL;
2017   }
2018 }
2019
2020 static GstRTSPResult
2021 parse_string (gchar * dest, gint size, gchar ** src)
2022 {
2023   GstRTSPResult res = GST_RTSP_OK;
2024   gint idx;
2025
2026   idx = 0;
2027   /* skip spaces */
2028   while (g_ascii_isspace (**src))
2029     (*src)++;
2030
2031   while (!g_ascii_isspace (**src) && **src != '\0') {
2032     if (idx < size - 1)
2033       dest[idx++] = **src;
2034     else
2035       res = GST_RTSP_EPARSE;
2036     (*src)++;
2037   }
2038   if (size > 0)
2039     dest[idx] = '\0';
2040
2041   return res;
2042 }
2043
2044 static GstRTSPResult
2045 parse_protocol_version (gchar * protocol, GstRTSPMsgType * type,
2046     GstRTSPVersion * version)
2047 {
2048   GstRTSPVersion rversion;
2049   GstRTSPResult res = GST_RTSP_OK;
2050   gchar *ver;
2051
2052   if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) {
2053     guint major;
2054     guint minor;
2055     gchar dummychar;
2056
2057     *ver++ = '\0';
2058
2059     /* the version number must be formatted as X.Y with nothing following */
2060     if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2)
2061       res = GST_RTSP_EPARSE;
2062
2063     rversion = major * 0x10 + minor;
2064     if (g_ascii_strcasecmp (protocol, "RTSP") == 0) {
2065
2066       if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) {
2067         *version = GST_RTSP_VERSION_INVALID;
2068         res = GST_RTSP_ERROR;
2069       }
2070     } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) {
2071       if (*type == GST_RTSP_MESSAGE_REQUEST)
2072         *type = GST_RTSP_MESSAGE_HTTP_REQUEST;
2073       else if (*type == GST_RTSP_MESSAGE_RESPONSE)
2074         *type = GST_RTSP_MESSAGE_HTTP_RESPONSE;
2075
2076       if (rversion != GST_RTSP_VERSION_1_0 &&
2077           rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0)
2078         res = GST_RTSP_ERROR;
2079     } else
2080       res = GST_RTSP_EPARSE;
2081   } else
2082     res = GST_RTSP_EPARSE;
2083
2084   if (res == GST_RTSP_OK)
2085     *version = rversion;
2086
2087   return res;
2088 }
2089
2090 static GstRTSPResult
2091 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
2092 {
2093   GstRTSPResult res = GST_RTSP_OK;
2094   GstRTSPResult res2;
2095   gchar versionstr[20];
2096   gchar codestr[4];
2097   gint code;
2098   gchar *bptr;
2099
2100   bptr = (gchar *) buffer;
2101
2102   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2103     res = GST_RTSP_EPARSE;
2104
2105   if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK)
2106     res = GST_RTSP_EPARSE;
2107   code = atoi (codestr);
2108   if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600))
2109     res = GST_RTSP_EPARSE;
2110
2111   while (g_ascii_isspace (*bptr))
2112     bptr++;
2113
2114   if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr,
2115               NULL) != GST_RTSP_OK))
2116     res = GST_RTSP_EPARSE;
2117
2118   res2 = parse_protocol_version (versionstr, &msg->type,
2119       &msg->type_data.response.version);
2120   if (G_LIKELY (res == GST_RTSP_OK))
2121     res = res2;
2122
2123   return res;
2124 }
2125
2126 static GstRTSPResult
2127 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
2128 {
2129   GstRTSPResult res = GST_RTSP_OK;
2130   GstRTSPResult res2;
2131   gchar versionstr[20];
2132   gchar methodstr[20];
2133   gchar urlstr[4096];
2134   gchar *bptr;
2135   GstRTSPMethod method;
2136
2137   bptr = (gchar *) buffer;
2138
2139   if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK)
2140     res = GST_RTSP_EPARSE;
2141   method = gst_rtsp_find_method (methodstr);
2142
2143   if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK)
2144     res = GST_RTSP_EPARSE;
2145   if (G_UNLIKELY (*urlstr == '\0'))
2146     res = GST_RTSP_EPARSE;
2147
2148   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2149     res = GST_RTSP_EPARSE;
2150
2151   if (G_UNLIKELY (*bptr != '\0'))
2152     res = GST_RTSP_EPARSE;
2153
2154   if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method,
2155               urlstr) != GST_RTSP_OK))
2156     res = GST_RTSP_EPARSE;
2157
2158   res2 = parse_protocol_version (versionstr, &msg->type,
2159       &msg->type_data.request.version);
2160   if (G_LIKELY (res == GST_RTSP_OK))
2161     res = res2;
2162
2163   if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) {
2164     /* GET and POST are not allowed as RTSP methods */
2165     if (msg->type_data.request.method == GST_RTSP_GET ||
2166         msg->type_data.request.method == GST_RTSP_POST) {
2167       msg->type_data.request.method = GST_RTSP_INVALID;
2168       if (res == GST_RTSP_OK)
2169         res = GST_RTSP_ERROR;
2170     }
2171   } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2172     /* only GET and POST are allowed as HTTP methods */
2173     if (msg->type_data.request.method != GST_RTSP_GET &&
2174         msg->type_data.request.method != GST_RTSP_POST) {
2175       msg->type_data.request.method = GST_RTSP_INVALID;
2176       if (res == GST_RTSP_OK)
2177         res = GST_RTSP_ERROR;
2178     }
2179   }
2180
2181   return res;
2182 }
2183
2184 /* parsing lines means reading a Key: Value pair */
2185 static GstRTSPResult
2186 parse_line (guint8 * buffer, GstRTSPMessage * msg)
2187 {
2188   GstRTSPHeaderField field;
2189   gchar *line = (gchar *) buffer;
2190   gchar *field_name = NULL;
2191   gchar *value;
2192
2193   if ((value = strchr (line, ':')) == NULL || value == line)
2194     goto parse_error;
2195
2196   /* trim space before the colon */
2197   if (value[-1] == ' ')
2198     value[-1] = '\0';
2199
2200   /* replace the colon with a NUL */
2201   *value++ = '\0';
2202
2203   /* find the header */
2204   field = gst_rtsp_find_header_field (line);
2205   /* custom header not present in the list of pre-defined headers */
2206   if (field == GST_RTSP_HDR_INVALID)
2207     field_name = line;
2208
2209   /* split up the value in multiple key:value pairs if it contains comma(s) */
2210   while (*value != '\0') {
2211     gchar *next_value;
2212     gchar *comma = NULL;
2213     gboolean quoted = FALSE;
2214     guint comment = 0;
2215
2216     /* trim leading space */
2217     if (*value == ' ')
2218       value++;
2219
2220     /* for headers which may not appear multiple times, and thus may not
2221      * contain multiple values on the same line, we can short-circuit the loop
2222      * below and the entire value results in just one key:value pair*/
2223     if (!gst_rtsp_header_allow_multiple (field))
2224       next_value = value + strlen (value);
2225     else
2226       next_value = value;
2227
2228     /* find the next value, taking special care of quotes and comments */
2229     while (*next_value != '\0') {
2230       if ((quoted || comment != 0) && *next_value == '\\' &&
2231           next_value[1] != '\0')
2232         next_value++;
2233       else if (comment == 0 && *next_value == '"')
2234         quoted = !quoted;
2235       else if (!quoted && *next_value == '(')
2236         comment++;
2237       else if (comment != 0 && *next_value == ')')
2238         comment--;
2239       else if (!quoted && comment == 0) {
2240         /* To quote RFC 2068: "User agents MUST take special care in parsing
2241          * the WWW-Authenticate field value if it contains more than one
2242          * challenge, or if more than one WWW-Authenticate header field is
2243          * provided, since the contents of a challenge may itself contain a
2244          * comma-separated list of authentication parameters."
2245          *
2246          * What this means is that we cannot just look for an unquoted comma
2247          * when looking for multiple values in Proxy-Authenticate and
2248          * WWW-Authenticate headers. Instead we need to look for the sequence
2249          * "comma [space] token space token" before we can split after the
2250          * comma...
2251          */
2252         if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE ||
2253             field == GST_RTSP_HDR_WWW_AUTHENTICATE) {
2254           if (*next_value == ',') {
2255             if (next_value[1] == ' ') {
2256               /* skip any space following the comma so we do not mistake it for
2257                * separating between two tokens */
2258               next_value++;
2259             }
2260             comma = next_value;
2261           } else if (*next_value == ' ' && next_value[1] != ',' &&
2262               next_value[1] != '=' && comma != NULL) {
2263             next_value = comma;
2264             comma = NULL;
2265             break;
2266           }
2267         } else if (*next_value == ',')
2268           break;
2269       }
2270
2271       next_value++;
2272     }
2273
2274     if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) {
2275       /* The timeout parameter is only allowed in a session response header
2276        * but some clients send it as part of the session request header.
2277        * Ignore everything from the semicolon to the end of the line. */
2278       next_value = value;
2279       while (*next_value != '\0') {
2280         if (*next_value == ';') {
2281           break;
2282         }
2283         next_value++;
2284       }
2285     }
2286
2287     /* trim space */
2288     if (value != next_value && next_value[-1] == ' ')
2289       next_value[-1] = '\0';
2290
2291     if (*next_value != '\0')
2292       *next_value++ = '\0';
2293
2294     /* add the key:value pair */
2295     if (*value != '\0') {
2296       if (field != GST_RTSP_HDR_INVALID)
2297         gst_rtsp_message_add_header (msg, field, value);
2298       else
2299         gst_rtsp_message_add_header_by_name (msg, field_name, value);
2300     }
2301
2302     value = next_value;
2303   }
2304
2305   return GST_RTSP_OK;
2306
2307   /* ERRORS */
2308 parse_error:
2309   {
2310     return GST_RTSP_EPARSE;
2311   }
2312 }
2313
2314 /* convert all consecutive whitespace to a single space */
2315 static void
2316 normalize_line (guint8 * buffer)
2317 {
2318   while (*buffer) {
2319     if (g_ascii_isspace (*buffer)) {
2320       guint8 *tmp;
2321
2322       *buffer++ = ' ';
2323       for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) {
2324       }
2325       if (buffer != tmp)
2326         memmove (buffer, tmp, strlen ((gchar *) tmp) + 1);
2327     } else {
2328       buffer++;
2329     }
2330   }
2331 }
2332
2333 static gboolean
2334 cseq_validation (GstRTSPConnection * conn, GstRTSPMessage * message)
2335 {
2336   gchar *cseq_header;
2337   gint64 cseq = 0;
2338   GstRTSPResult res;
2339
2340   if (message->type == GST_RTSP_MESSAGE_RESPONSE ||
2341       message->type == GST_RTSP_MESSAGE_REQUEST) {
2342     if ((res = gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ,
2343                 &cseq_header, 0)) != GST_RTSP_OK) {
2344       /* rfc2326 This field MUST be present in all RTSP req and resp */
2345       goto invalid_format;
2346     }
2347
2348     errno = 0;
2349     cseq = g_ascii_strtoll (cseq_header, NULL, 10);
2350     if (errno != 0 || cseq < 0) {
2351       /* CSeq has no valid value */
2352       goto invalid_format;
2353     }
2354
2355     if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2356         (conn->cseq == 0 || conn->cseq < cseq)) {
2357       /* Response CSeq can't be higher than the number of outgoing requests
2358        * neither is a response valid if no request has been made */
2359       goto invalid_format;
2360     }
2361   }
2362   return GST_RTSP_OK;
2363
2364 invalid_format:
2365   {
2366     return GST_RTSP_EPARSE;
2367   }
2368 }
2369
2370 /* returns:
2371  *  GST_RTSP_OK when a complete message was read.
2372  *  GST_RTSP_EEOF: when the read socket is closed
2373  *  GST_RTSP_EINTR: when more data is needed.
2374  *  GST_RTSP_..: some other error occurred.
2375  */
2376 static GstRTSPResult
2377 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
2378     GstRTSPConnection * conn, gboolean block)
2379 {
2380   GstRTSPResult res;
2381
2382   while (TRUE) {
2383     switch (builder->state) {
2384       case STATE_START:
2385       {
2386         guint8 c;
2387
2388         builder->offset = 0;
2389         res =
2390             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
2391             block);
2392         if (res != GST_RTSP_OK)
2393           goto done;
2394
2395         c = builder->buffer[0];
2396
2397         /* we have 1 bytes now and we can see if this is a data message or
2398          * not */
2399         if (c == '$') {
2400           /* data message, prepare for the header */
2401           builder->state = STATE_DATA_HEADER;
2402           conn->may_cancel = FALSE;
2403         } else if (c == '\n' || c == '\r') {
2404           /* skip \n and \r */
2405           builder->offset = 0;
2406         } else {
2407           builder->line = 0;
2408           builder->state = STATE_READ_LINES;
2409           conn->may_cancel = FALSE;
2410         }
2411         break;
2412       }
2413       case STATE_DATA_HEADER:
2414       {
2415         res =
2416             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
2417             block);
2418         if (res != GST_RTSP_OK)
2419           goto done;
2420
2421         gst_rtsp_message_init_data (message, builder->buffer[1]);
2422
2423         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
2424         builder->body_data = g_malloc (builder->body_len + 1);
2425         builder->body_data[builder->body_len] = '\0';
2426         builder->offset = 0;
2427         builder->state = STATE_DATA_BODY;
2428         break;
2429       }
2430       case STATE_DATA_BODY:
2431       {
2432         res =
2433             read_bytes (conn, builder->body_data, &builder->offset,
2434             builder->body_len, block);
2435         if (res != GST_RTSP_OK)
2436           goto done;
2437
2438         /* we have the complete body now, store in the message adjusting the
2439          * length to include the trailing '\0' */
2440         gst_rtsp_message_take_body (message,
2441             (guint8 *) builder->body_data, builder->body_len + 1);
2442         builder->body_data = NULL;
2443         builder->body_len = 0;
2444
2445         builder->state = STATE_END;
2446         break;
2447       }
2448       case STATE_READ_LINES:
2449       {
2450         res = read_line (conn, builder->buffer, &builder->offset,
2451             sizeof (builder->buffer), block);
2452         if (res != GST_RTSP_OK)
2453           goto done;
2454
2455         /* we have a regular response */
2456         if (builder->buffer[0] == '\0') {
2457           gchar *hdrval;
2458           gint64 content_length_parsed = 0;
2459
2460           /* empty line, end of message header */
2461           /* see if there is a Content-Length header, but ignore it if this
2462            * is a POST request with an x-sessioncookie header */
2463           if (gst_rtsp_message_get_header (message,
2464                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK &&
2465               (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST ||
2466                   message->type_data.request.method != GST_RTSP_POST ||
2467                   gst_rtsp_message_get_header (message,
2468                       GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
2469             /* there is, prepare to read the body */
2470             errno = 0;
2471             content_length_parsed = g_ascii_strtoll (hdrval, NULL, 10);
2472             if (errno != 0 || content_length_parsed < 0) {
2473               res = GST_RTSP_EPARSE;
2474               goto invalid_body_len;
2475             } else if (content_length_parsed > conn->content_length_limit) {
2476               res = GST_RTSP_ENOMEM;
2477               goto invalid_body_len;
2478             }
2479             builder->body_len = content_length_parsed;
2480             builder->body_data = g_try_malloc (builder->body_len + 1);
2481             /* we can't do much here, we need the length to know how many bytes
2482              * we need to read next and when allocation fails, we can't read the payload. */
2483             if (builder->body_data == NULL) {
2484               res = GST_RTSP_ENOMEM;
2485               goto invalid_body_len;
2486             }
2487
2488             builder->body_data[builder->body_len] = '\0';
2489             builder->offset = 0;
2490             builder->state = STATE_DATA_BODY;
2491           } else {
2492             builder->state = STATE_END;
2493           }
2494           break;
2495         }
2496
2497         /* we have a line */
2498         normalize_line (builder->buffer);
2499         if (builder->line == 0) {
2500           /* first line, check for response status */
2501           if (memcmp (builder->buffer, "RTSP", 4) == 0 ||
2502               memcmp (builder->buffer, "HTTP", 4) == 0) {
2503             builder->status = parse_response_status (builder->buffer, message);
2504           } else {
2505             builder->status = parse_request_line (builder->buffer, message);
2506           }
2507         } else {
2508           /* else just parse the line */
2509           res = parse_line (builder->buffer, message);
2510           if (res != GST_RTSP_OK)
2511             builder->status = res;
2512         }
2513         if (builder->status != GST_RTSP_OK) {
2514           res = builder->status;
2515           goto invalid_format;
2516         }
2517
2518         builder->line++;
2519         builder->offset = 0;
2520         break;
2521       }
2522       case STATE_END:
2523       {
2524         gchar *session_cookie;
2525         gchar *session_id;
2526
2527         conn->may_cancel = TRUE;
2528
2529         if ((res = cseq_validation (conn, message)) != GST_RTSP_OK) {
2530           /* message don't comply with rfc2326 regarding CSeq */
2531           goto invalid_format;
2532         }
2533
2534         if (message->type == GST_RTSP_MESSAGE_DATA) {
2535           /* data messages don't have headers */
2536           res = GST_RTSP_OK;
2537           goto done;
2538         }
2539
2540         /* save the tunnel session in the connection */
2541         if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST &&
2542             !conn->manual_http &&
2543             conn->tstate == TUNNEL_STATE_NONE &&
2544             gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE,
2545                 &session_cookie, 0) == GST_RTSP_OK) {
2546           strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN);
2547           conn->tunnelid[TUNNELID_LEN - 1] = '\0';
2548           conn->tunneled = TRUE;
2549         }
2550
2551         /* save session id in the connection for further use */
2552         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2553             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
2554                 &session_id, 0) == GST_RTSP_OK) {
2555           gint maxlen, i;
2556
2557           maxlen = sizeof (conn->session_id) - 1;
2558           /* the sessionid can have attributes marked with ;
2559            * Make sure we strip them */
2560           for (i = 0; i < maxlen && session_id[i] != '\0'; i++) {
2561             if (session_id[i] == ';') {
2562               maxlen = i;
2563               /* parse timeout */
2564               do {
2565                 i++;
2566               } while (g_ascii_isspace (session_id[i]));
2567               if (g_str_has_prefix (&session_id[i], "timeout=")) {
2568                 gint to;
2569
2570                 /* if we parsed something valid, configure */
2571                 if ((to = atoi (&session_id[i + 8])) > 0)
2572                   conn->timeout = to;
2573               }
2574               break;
2575             }
2576           }
2577
2578           /* make sure to not overflow */
2579           if (conn->remember_session_id) {
2580             strncpy (conn->session_id, session_id, maxlen);
2581             conn->session_id[maxlen] = '\0';
2582           }
2583         }
2584         res = builder->status;
2585         goto done;
2586       }
2587       default:
2588         res = GST_RTSP_ERROR;
2589         goto done;
2590     }
2591   }
2592 done:
2593   conn->may_cancel = TRUE;
2594   return res;
2595
2596   /* ERRORS */
2597 invalid_body_len:
2598   {
2599     conn->may_cancel = TRUE;
2600     GST_DEBUG ("could not allocate body");
2601     return res;
2602   }
2603 invalid_format:
2604   {
2605     conn->may_cancel = TRUE;
2606     GST_DEBUG ("could not parse");
2607     return res;
2608   }
2609 }
2610
2611 /**
2612  * gst_rtsp_connection_read_usec:
2613  * @conn: a #GstRTSPConnection
2614  * @data: the data to read
2615  * @size: the size of @data
2616  * @timeout: a timeout value in microseconds
2617  *
2618  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
2619  * the specified @timeout. @timeout can be 0, in which case this function
2620  * might block forever.
2621  *
2622  * This function can be cancelled with gst_rtsp_connection_flush().
2623  *
2624  * Returns: #GST_RTSP_OK on success.
2625  *
2626  * Since: 1.18
2627  */
2628 GstRTSPResult
2629 gst_rtsp_connection_read_usec (GstRTSPConnection * conn, guint8 * data,
2630     guint size, gint64 timeout)
2631 {
2632   guint offset;
2633   GstClockTime to;
2634   GstRTSPResult res;
2635
2636   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2637   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
2638   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2639
2640   if (G_UNLIKELY (size == 0))
2641     return GST_RTSP_OK;
2642
2643   offset = 0;
2644
2645   /* configure timeout if any */
2646   to = timeout * 1000;
2647
2648   g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2649   res = read_bytes (conn, data, &offset, size, TRUE);
2650   g_socket_set_timeout (conn->read_socket, 0);
2651
2652   return res;
2653 }
2654
2655 static GstRTSPMessage *
2656 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
2657     const GstRTSPMessage * request)
2658 {
2659   GstRTSPMessage *msg;
2660   GstRTSPResult res;
2661
2662   if (gst_rtsp_status_as_text (code) == NULL)
2663     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
2664
2665   GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request),
2666       no_message);
2667
2668   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER,
2669       "GStreamer RTSP Server");
2670   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close");
2671   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store");
2672   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
2673
2674   if (code == GST_RTSP_STS_OK) {
2675     /* add the local ip address to the tunnel reply, this is where the client
2676      * should send the POST request to */
2677     if (conn->local_ip)
2678       gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
2679           conn->local_ip);
2680     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
2681         "application/x-rtsp-tunnelled");
2682   }
2683
2684   return msg;
2685
2686   /* ERRORS */
2687 no_message:
2688   {
2689     return NULL;
2690   }
2691 }
2692
2693 /**
2694  * gst_rtsp_connection_receive_usec:
2695  * @conn: a #GstRTSPConnection
2696  * @message: the message to read
2697  * @timeout: a timeout value or 0
2698  *
2699  * Attempt to read into @message from the connected @conn, blocking up to
2700  * the specified @timeout. @timeout can be 0, in which case this function
2701  * might block forever.
2702  *
2703  * This function can be cancelled with gst_rtsp_connection_flush().
2704  *
2705  * Returns: #GST_RTSP_OK on success.
2706  *
2707  * Since: 1.18
2708  */
2709 GstRTSPResult
2710 gst_rtsp_connection_receive_usec (GstRTSPConnection * conn,
2711     GstRTSPMessage * message, gint64 timeout)
2712 {
2713   GstRTSPResult res;
2714   GstRTSPBuilder builder;
2715   GstClockTime to;
2716
2717   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2718   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2719   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2720
2721   /* configure timeout if any */
2722   to = timeout * 1000;
2723
2724   g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2725   memset (&builder, 0, sizeof (GstRTSPBuilder));
2726   res = build_next (&builder, message, conn, TRUE);
2727   g_socket_set_timeout (conn->read_socket, 0);
2728
2729   if (G_UNLIKELY (res != GST_RTSP_OK))
2730     goto read_error;
2731
2732   if (!conn->manual_http) {
2733     if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2734       if (conn->tstate == TUNNEL_STATE_NONE &&
2735           message->type_data.request.method == GST_RTSP_GET) {
2736         GstRTSPMessage *response;
2737
2738         conn->tstate = TUNNEL_STATE_GET;
2739
2740         /* tunnel GET request, we can reply now */
2741         response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
2742         res = gst_rtsp_connection_send_usec (conn, response, timeout);
2743         gst_rtsp_message_free (response);
2744         if (res == GST_RTSP_OK)
2745           res = GST_RTSP_ETGET;
2746         goto cleanup;
2747       } else if (conn->tstate == TUNNEL_STATE_NONE &&
2748           message->type_data.request.method == GST_RTSP_POST) {
2749         conn->tstate = TUNNEL_STATE_POST;
2750
2751         /* tunnel POST request, the caller now has to link the two
2752          * connections. */
2753         res = GST_RTSP_ETPOST;
2754         goto cleanup;
2755       } else {
2756         res = GST_RTSP_EPARSE;
2757         goto cleanup;
2758       }
2759     } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
2760       res = GST_RTSP_EPARSE;
2761       goto cleanup;
2762     }
2763   }
2764
2765   /* we have a message here */
2766   build_reset (&builder);
2767
2768   return GST_RTSP_OK;
2769
2770   /* ERRORS */
2771 read_error:
2772 cleanup:
2773   {
2774     build_reset (&builder);
2775     gst_rtsp_message_unset (message);
2776     return res;
2777   }
2778 }
2779
2780 /**
2781  * gst_rtsp_connection_close:
2782  * @conn: a #GstRTSPConnection
2783  *
2784  * Close the connected @conn. After this call, the connection is in the same
2785  * state as when it was first created.
2786  *
2787  * Returns: #GST_RTSP_OK on success.
2788  */
2789 GstRTSPResult
2790 gst_rtsp_connection_close (GstRTSPConnection * conn)
2791 {
2792   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2793
2794   /* last unref closes the connection we don't want to explicitly close here
2795    * because these sockets might have been provided at construction */
2796   if (conn->stream0) {
2797     g_object_unref (conn->stream0);
2798     conn->stream0 = NULL;
2799     conn->socket0 = NULL;
2800   }
2801   if (conn->stream1) {
2802     g_object_unref (conn->stream1);
2803     conn->stream1 = NULL;
2804     conn->socket1 = NULL;
2805   }
2806
2807   /* these were owned by the stream */
2808   conn->input_stream = NULL;
2809   conn->output_stream = NULL;
2810   conn->control_stream = NULL;
2811
2812   g_free (conn->remote_ip);
2813   conn->remote_ip = NULL;
2814   g_free (conn->local_ip);
2815   conn->local_ip = NULL;
2816
2817   conn->read_ahead = 0;
2818
2819   g_free (conn->initial_buffer);
2820   conn->initial_buffer = NULL;
2821   conn->initial_buffer_offset = 0;
2822
2823   conn->write_socket = NULL;
2824   conn->read_socket = NULL;
2825   conn->tunneled = FALSE;
2826   conn->tstate = TUNNEL_STATE_NONE;
2827   conn->ctxp = NULL;
2828   g_free (conn->username);
2829   conn->username = NULL;
2830   g_free (conn->passwd);
2831   conn->passwd = NULL;
2832   gst_rtsp_connection_clear_auth_params (conn);
2833   conn->timeout = 60;
2834   conn->cseq = 0;
2835   conn->session_id[0] = '\0';
2836
2837   return GST_RTSP_OK;
2838 }
2839
2840 /**
2841  * gst_rtsp_connection_free:
2842  * @conn: a #GstRTSPConnection
2843  *
2844  * Close and free @conn.
2845  *
2846  * Returns: #GST_RTSP_OK on success.
2847  */
2848 GstRTSPResult
2849 gst_rtsp_connection_free (GstRTSPConnection * conn)
2850 {
2851   GstRTSPResult res;
2852
2853   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2854
2855   res = gst_rtsp_connection_close (conn);
2856
2857   if (conn->cancellable)
2858     g_object_unref (conn->cancellable);
2859   if (conn->client)
2860     g_object_unref (conn->client);
2861   if (conn->tls_database)
2862     g_object_unref (conn->tls_database);
2863   if (conn->tls_interaction)
2864     g_object_unref (conn->tls_interaction);
2865   if (conn->accept_certificate_destroy_notify)
2866     conn->
2867         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
2868
2869   g_timer_destroy (conn->timer);
2870   gst_rtsp_url_free (conn->url);
2871   g_free (conn->proxy_host);
2872   g_free (conn);
2873
2874   return res;
2875 }
2876
2877 /**
2878  * gst_rtsp_connection_poll_usec:
2879  * @conn: a #GstRTSPConnection
2880  * @events: a bitmask of #GstRTSPEvent flags to check
2881  * @revents: location for result flags
2882  * @timeout: a timeout in microseconds
2883  *
2884  * Wait up to the specified @timeout for the connection to become available for
2885  * at least one of the operations specified in @events. When the function returns
2886  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2887  * @conn.
2888  *
2889  * @timeout can be 0, in which case this function might block forever.
2890  *
2891  * This function can be cancelled with gst_rtsp_connection_flush().
2892  *
2893  * Returns: #GST_RTSP_OK on success.
2894  *
2895  * Since: 1.18
2896  */
2897 GstRTSPResult
2898 gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
2899     GstRTSPEvent * revents, gint64 timeout)
2900 {
2901   GMainContext *ctx;
2902   GSource *rs, *ws, *ts;
2903   GIOCondition condition;
2904
2905   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2906   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2907   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2908   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2909   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
2910
2911   ctx = g_main_context_new ();
2912
2913   /* configure timeout if any */
2914   if (timeout) {
2915     ts = g_timeout_source_new (timeout / 1000);
2916     g_source_set_dummy_callback (ts);
2917     g_source_attach (ts, ctx);
2918     g_source_unref (ts);
2919   }
2920
2921   if (events & GST_RTSP_EV_READ) {
2922     rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
2923         conn->cancellable);
2924     g_source_set_dummy_callback (rs);
2925     g_source_attach (rs, ctx);
2926     g_source_unref (rs);
2927   }
2928
2929   if (events & GST_RTSP_EV_WRITE) {
2930     ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
2931         conn->cancellable);
2932     g_source_set_dummy_callback (ws);
2933     g_source_attach (ws, ctx);
2934     g_source_unref (ws);
2935   }
2936
2937   /* Returns after handling all pending events */
2938   while (!g_main_context_iteration (ctx, TRUE));
2939
2940   g_main_context_unref (ctx);
2941
2942   *revents = 0;
2943   if (events & GST_RTSP_EV_READ) {
2944     condition = g_socket_condition_check (conn->read_socket,
2945         G_IO_IN | G_IO_PRI);
2946     if ((condition & G_IO_IN) || (condition & G_IO_PRI))
2947       *revents |= GST_RTSP_EV_READ;
2948   }
2949   if (events & GST_RTSP_EV_WRITE) {
2950     condition = g_socket_condition_check (conn->write_socket, G_IO_OUT);
2951     if ((condition & G_IO_OUT))
2952       *revents |= GST_RTSP_EV_WRITE;
2953   }
2954
2955   if (*revents == 0)
2956     return GST_RTSP_ETIMEOUT;
2957
2958   return GST_RTSP_OK;
2959 }
2960
2961 /**
2962  * gst_rtsp_connection_next_timeout_usec:
2963  * @conn: a #GstRTSPConnection
2964  *
2965  * Calculate the next timeout for @conn
2966  *
2967  * Returns: #the next timeout in microseconds
2968  *
2969  * Since: 1.18
2970  */
2971 gint64
2972 gst_rtsp_connection_next_timeout_usec (GstRTSPConnection * conn)
2973 {
2974   gdouble elapsed;
2975   gulong usec;
2976   gint ctimeout;
2977   gint64 timeout = 0;
2978
2979   g_return_val_if_fail (conn != NULL, 1);
2980
2981   ctimeout = conn->timeout;
2982   if (ctimeout >= 20) {
2983     /* Because we should act before the timeout we timeout 5
2984      * seconds in advance. */
2985     ctimeout -= 5;
2986   } else if (ctimeout >= 5) {
2987     /* else timeout 20% earlier */
2988     ctimeout -= ctimeout / 5;
2989   } else if (ctimeout >= 1) {
2990     /* else timeout 1 second earlier */
2991     ctimeout -= 1;
2992   }
2993
2994   elapsed = g_timer_elapsed (conn->timer, &usec);
2995   if (elapsed >= ctimeout) {
2996     timeout = 0;
2997   } else {
2998     gint64 sec = ctimeout - elapsed;
2999     if (usec <= G_USEC_PER_SEC)
3000       usec = G_USEC_PER_SEC - usec;
3001     else
3002       usec = 0;
3003     timeout = usec + sec * G_USEC_PER_SEC;
3004   }
3005
3006   return timeout;
3007 }
3008
3009 /**
3010  * gst_rtsp_connection_reset_timeout:
3011  * @conn: a #GstRTSPConnection
3012  *
3013  * Reset the timeout of @conn.
3014  *
3015  * Returns: #GST_RTSP_OK.
3016  */
3017 GstRTSPResult
3018 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
3019 {
3020   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3021
3022   g_timer_start (conn->timer);
3023
3024   return GST_RTSP_OK;
3025 }
3026
3027 /**
3028  * gst_rtsp_connection_flush:
3029  * @conn: a #GstRTSPConnection
3030  * @flush: start or stop the flush
3031  *
3032  * Start or stop the flushing action on @conn. When flushing, all current
3033  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
3034  * is set to non-flushing mode again.
3035  *
3036  * Returns: #GST_RTSP_OK.
3037  */
3038 GstRTSPResult
3039 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
3040 {
3041   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3042
3043   if (flush) {
3044     g_cancellable_cancel (conn->cancellable);
3045   } else {
3046     g_object_unref (conn->cancellable);
3047     conn->cancellable = g_cancellable_new ();
3048   }
3049
3050   return GST_RTSP_OK;
3051 }
3052
3053 /**
3054  * gst_rtsp_connection_set_proxy:
3055  * @conn: a #GstRTSPConnection
3056  * @host: the proxy host
3057  * @port: the proxy port
3058  *
3059  * Set the proxy host and port.
3060  *
3061  * Returns: #GST_RTSP_OK.
3062  */
3063 GstRTSPResult
3064 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
3065     const gchar * host, guint port)
3066 {
3067   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3068
3069   g_free (conn->proxy_host);
3070   conn->proxy_host = g_strdup (host);
3071   conn->proxy_port = port;
3072
3073   return GST_RTSP_OK;
3074 }
3075
3076 /**
3077  * gst_rtsp_connection_set_auth:
3078  * @conn: a #GstRTSPConnection
3079  * @method: authentication method
3080  * @user: the user
3081  * @pass: the password
3082  *
3083  * Configure @conn for authentication mode @method with @user and @pass as the
3084  * user and password respectively.
3085  *
3086  * Returns: #GST_RTSP_OK.
3087  */
3088 GstRTSPResult
3089 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
3090     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
3091 {
3092   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3093
3094   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
3095           || g_strrstr (user, ":") != NULL))
3096     return GST_RTSP_EINVAL;
3097
3098   /* Make sure the username and passwd are being set for authentication */
3099   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
3100     return GST_RTSP_EINVAL;
3101
3102   /* ":" chars are not allowed in usernames for basic auth */
3103   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
3104     return GST_RTSP_EINVAL;
3105
3106   g_free (conn->username);
3107   g_free (conn->passwd);
3108
3109   conn->auth_method = method;
3110   conn->username = g_strdup (user);
3111   conn->passwd = g_strdup (pass);
3112
3113   return GST_RTSP_OK;
3114 }
3115
3116 /**
3117  * str_case_hash:
3118  * @key: ASCII string to hash
3119  *
3120  * Hashes @key in a case-insensitive manner.
3121  *
3122  * Returns: the hash code.
3123  **/
3124 static guint
3125 str_case_hash (gconstpointer key)
3126 {
3127   const char *p = key;
3128   guint h = g_ascii_toupper (*p);
3129
3130   if (h)
3131     for (p += 1; *p != '\0'; p++)
3132       h = (h << 5) - h + g_ascii_toupper (*p);
3133
3134   return h;
3135 }
3136
3137 /**
3138  * str_case_equal:
3139  * @v1: an ASCII string
3140  * @v2: another ASCII string
3141  *
3142  * Compares @v1 and @v2 in a case-insensitive manner
3143  *
3144  * Returns: %TRUE if they are equal (modulo case)
3145  **/
3146 static gboolean
3147 str_case_equal (gconstpointer v1, gconstpointer v2)
3148 {
3149   const char *string1 = v1;
3150   const char *string2 = v2;
3151
3152   return g_ascii_strcasecmp (string1, string2) == 0;
3153 }
3154
3155 /**
3156  * gst_rtsp_connection_set_auth_param:
3157  * @conn: a #GstRTSPConnection
3158  * @param: authentication directive
3159  * @value: value
3160  *
3161  * Setup @conn with authentication directives. This is not necessary for
3162  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
3163  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
3164  * in the WWW-Authenticate response header and can include realm, domain,
3165  * nonce, opaque, stale, algorithm, qop as per RFC2617.
3166  */
3167 void
3168 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
3169     const gchar * param, const gchar * value)
3170 {
3171   g_return_if_fail (conn != NULL);
3172   g_return_if_fail (param != NULL);
3173
3174   if (conn->auth_params == NULL) {
3175     conn->auth_params =
3176         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
3177   }
3178   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
3179 }
3180
3181 /**
3182  * gst_rtsp_connection_clear_auth_params:
3183  * @conn: a #GstRTSPConnection
3184  *
3185  * Clear the list of authentication directives stored in @conn.
3186  */
3187 void
3188 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
3189 {
3190   g_return_if_fail (conn != NULL);
3191
3192   if (conn->auth_params != NULL) {
3193     g_hash_table_destroy (conn->auth_params);
3194     conn->auth_params = NULL;
3195   }
3196 }
3197
3198 static GstRTSPResult
3199 set_qos_dscp (GSocket * socket, guint qos_dscp)
3200 {
3201 #ifndef IP_TOS
3202   GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
3203   return GST_RTSP_OK;
3204 #else
3205   gint fd;
3206   union gst_sockaddr sa;
3207   socklen_t slen = sizeof (sa);
3208   gint af;
3209   gint tos;
3210
3211   if (!socket)
3212     return GST_RTSP_OK;
3213
3214   fd = g_socket_get_fd (socket);
3215   if (getsockname (fd, &sa.sa, &slen) < 0)
3216     goto no_getsockname;
3217
3218   af = sa.sa.sa_family;
3219
3220   /* if this is an IPv4-mapped address then do IPv4 QoS */
3221   if (af == AF_INET6) {
3222     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
3223       af = AF_INET;
3224   }
3225
3226   /* extract and shift 6 bits of the DSCP */
3227   tos = (qos_dscp & 0x3f) << 2;
3228
3229 #ifdef G_OS_WIN32
3230 #  define SETSOCKOPT_ARG4_TYPE const char *
3231 #else
3232 #  define SETSOCKOPT_ARG4_TYPE const void *
3233 #endif
3234
3235   switch (af) {
3236     case AF_INET:
3237       if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos,
3238               sizeof (tos)) < 0)
3239         goto no_setsockopt;
3240       break;
3241     case AF_INET6:
3242 #ifdef IPV6_TCLASS
3243       if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS,
3244               (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0)
3245         goto no_setsockopt;
3246       break;
3247 #endif
3248     default:
3249       goto wrong_family;
3250   }
3251
3252   return GST_RTSP_OK;
3253
3254   /* ERRORS */
3255 no_getsockname:
3256 no_setsockopt:
3257   {
3258     return GST_RTSP_ESYS;
3259   }
3260 wrong_family:
3261   {
3262     return GST_RTSP_ERROR;
3263   }
3264 #endif
3265 }
3266
3267 /**
3268  * gst_rtsp_connection_set_qos_dscp:
3269  * @conn: a #GstRTSPConnection
3270  * @qos_dscp: DSCP value
3271  *
3272  * Configure @conn to use the specified DSCP value.
3273  *
3274  * Returns: #GST_RTSP_OK on success.
3275  */
3276 GstRTSPResult
3277 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
3278 {
3279   GstRTSPResult res;
3280
3281   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3282   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
3283   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
3284
3285   res = set_qos_dscp (conn->socket0, qos_dscp);
3286   if (res == GST_RTSP_OK)
3287     res = set_qos_dscp (conn->socket1, qos_dscp);
3288
3289   return res;
3290 }
3291
3292 /**
3293  * gst_rtsp_connection_set_content_length_limit:
3294  * @conn: a #GstRTSPConnection
3295  * @limit: Content-Length limit
3296  *
3297  * Configure @conn to use the specified Content-Length limit.
3298  * Both requests and responses are validated. If content-length is
3299  * exceeded, ENOMEM error will be returned.
3300  *
3301  * Since: 1.18
3302  */
3303 void
3304 gst_rtsp_connection_set_content_length_limit (GstRTSPConnection * conn,
3305     guint limit)
3306 {
3307   g_return_if_fail (conn != NULL);
3308
3309   conn->content_length_limit = limit;
3310 }
3311
3312 /**
3313  * gst_rtsp_connection_get_url:
3314  * @conn: a #GstRTSPConnection
3315  *
3316  * Retrieve the URL of the other end of @conn.
3317  *
3318  * Returns: The URL. This value remains valid until the
3319  * connection is freed.
3320  */
3321 GstRTSPUrl *
3322 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
3323 {
3324   g_return_val_if_fail (conn != NULL, NULL);
3325
3326   return conn->url;
3327 }
3328
3329 /**
3330  * gst_rtsp_connection_get_ip:
3331  * @conn: a #GstRTSPConnection
3332  *
3333  * Retrieve the IP address of the other end of @conn.
3334  *
3335  * Returns: The IP address as a string. this value remains valid until the
3336  * connection is closed.
3337  */
3338 const gchar *
3339 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
3340 {
3341   g_return_val_if_fail (conn != NULL, NULL);
3342
3343   return conn->remote_ip;
3344 }
3345
3346 /**
3347  * gst_rtsp_connection_set_ip:
3348  * @conn: a #GstRTSPConnection
3349  * @ip: an ip address
3350  *
3351  * Set the IP address of the server.
3352  */
3353 void
3354 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
3355 {
3356   g_return_if_fail (conn != NULL);
3357
3358   g_free (conn->remote_ip);
3359   conn->remote_ip = g_strdup (ip);
3360 }
3361
3362 /**
3363  * gst_rtsp_connection_get_read_socket:
3364  * @conn: a #GstRTSPConnection
3365  *
3366  * Get the file descriptor for reading.
3367  *
3368  * Returns: (transfer none): the file descriptor used for reading or %NULL on
3369  * error. The file descriptor remains valid until the connection is closed.
3370  */
3371 GSocket *
3372 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
3373 {
3374   g_return_val_if_fail (conn != NULL, NULL);
3375   g_return_val_if_fail (conn->read_socket != NULL, NULL);
3376
3377   return conn->read_socket;
3378 }
3379
3380 /**
3381  * gst_rtsp_connection_get_write_socket:
3382  * @conn: a #GstRTSPConnection
3383  *
3384  * Get the file descriptor for writing.
3385  *
3386  * Returns: (transfer none): the file descriptor used for writing or NULL on
3387  * error. The file descriptor remains valid until the connection is closed.
3388  */
3389 GSocket *
3390 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
3391 {
3392   g_return_val_if_fail (conn != NULL, NULL);
3393   g_return_val_if_fail (conn->write_socket != NULL, NULL);
3394
3395   return conn->write_socket;
3396 }
3397
3398 /**
3399  * gst_rtsp_connection_set_http_mode:
3400  * @conn: a #GstRTSPConnection
3401  * @enable: %TRUE to enable manual HTTP mode
3402  *
3403  * By setting the HTTP mode to %TRUE the message parsing will support HTTP
3404  * messages in addition to the RTSP messages. It will also disable the
3405  * automatic handling of setting up an HTTP tunnel.
3406  */
3407 void
3408 gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
3409 {
3410   g_return_if_fail (conn != NULL);
3411
3412   conn->manual_http = enable;
3413 }
3414
3415 /**
3416  * gst_rtsp_connection_set_tunneled:
3417  * @conn: a #GstRTSPConnection
3418  * @tunneled: the new state
3419  *
3420  * Set the HTTP tunneling state of the connection. This must be configured before
3421  * the @conn is connected.
3422  */
3423 void
3424 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
3425 {
3426   g_return_if_fail (conn != NULL);
3427   g_return_if_fail (conn->read_socket == NULL);
3428   g_return_if_fail (conn->write_socket == NULL);
3429
3430   conn->tunneled = tunneled;
3431 }
3432
3433 /**
3434  * gst_rtsp_connection_is_tunneled:
3435  * @conn: a #GstRTSPConnection
3436  *
3437  * Get the tunneling state of the connection.
3438  *
3439  * Returns: if @conn is using HTTP tunneling.
3440  */
3441 gboolean
3442 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
3443 {
3444   g_return_val_if_fail (conn != NULL, FALSE);
3445
3446   return conn->tunneled;
3447 }
3448
3449 /**
3450  * gst_rtsp_connection_get_tunnelid:
3451  * @conn: a #GstRTSPConnection
3452  *
3453  * Get the tunnel session id the connection.
3454  *
3455  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
3456  */
3457 const gchar *
3458 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
3459 {
3460   g_return_val_if_fail (conn != NULL, NULL);
3461
3462   if (!conn->tunneled)
3463     return NULL;
3464
3465   return conn->tunnelid;
3466 }
3467
3468 /**
3469  * gst_rtsp_connection_set_ignore_x_server_reply:
3470  * @conn: a #GstRTSPConnection
3471  * @ignore: %TRUE to ignore the x-server-ip-address header reply or %FALSE to
3472  *          comply with it (%FALSE is the default).
3473  *
3474  * Set whether to ignore the x-server-ip-address header reply or not. If the
3475  * header is ignored, the original address will be used instead.
3476  *
3477  * Since: 1.20
3478  */
3479 void
3480 gst_rtsp_connection_set_ignore_x_server_reply (GstRTSPConnection * conn,
3481     gboolean ignore)
3482 {
3483   g_return_if_fail (conn != NULL);
3484
3485   conn->ignore_x_server_reply = ignore;
3486 }
3487
3488 /**
3489  * gst_rtsp_connection_get_ignore_x_server_reply:
3490  * @conn: a #GstRTSPConnection
3491  *
3492  * Get the ignore_x_server_reply value.
3493  *
3494  * Returns: returns %TRUE if the x-server-ip-address header reply will be
3495  *          ignored, else returns %FALSE
3496  *
3497  * Since: 1.20
3498  */
3499 gboolean
3500 gst_rtsp_connection_get_ignore_x_server_reply (const GstRTSPConnection * conn)
3501 {
3502   g_return_val_if_fail (conn != NULL, FALSE);
3503
3504   return conn->ignore_x_server_reply;
3505 }
3506
3507 /**
3508  * gst_rtsp_connection_do_tunnel:
3509  * @conn: a #GstRTSPConnection
3510  * @conn2: a #GstRTSPConnection or %NULL
3511  *
3512  * If @conn received the first tunnel connection and @conn2 received
3513  * the second tunnel connection, link the two connections together so that
3514  * @conn manages the tunneled connection.
3515  *
3516  * After this call, @conn2 cannot be used anymore and must be freed with
3517  * gst_rtsp_connection_free().
3518  *
3519  * If @conn2 is %NULL then only the base64 decoding context will be setup for
3520  * @conn.
3521  *
3522  * Returns: return GST_RTSP_OK on success.
3523  */
3524 GstRTSPResult
3525 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
3526     GstRTSPConnection * conn2)
3527 {
3528   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3529
3530   if (conn2 != NULL) {
3531     GstRTSPTunnelState ts1 = conn->tstate;
3532     GstRTSPTunnelState ts2 = conn2->tstate;
3533
3534     g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST)
3535         || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET),
3536         GST_RTSP_EINVAL);
3537     g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid,
3538             TUNNELID_LEN), GST_RTSP_EINVAL);
3539
3540     /* both connections have socket0 as the read/write socket */
3541     if (ts1 == TUNNEL_STATE_GET) {
3542       /* conn2 is the HTTP POST channel. take its socket and set it as read
3543        * socket in conn */
3544       conn->socket1 = conn2->socket0;
3545       conn->stream1 = conn2->stream0;
3546       conn->input_stream = conn2->input_stream;
3547       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3548       conn2->output_stream = NULL;
3549     } else {
3550       /* conn2 is the HTTP GET channel. take its socket and set it as write
3551        * socket in conn */
3552       conn->socket1 = conn->socket0;
3553       conn->stream1 = conn->stream0;
3554       conn->socket0 = conn2->socket0;
3555       conn->stream0 = conn2->stream0;
3556       conn->output_stream = conn2->output_stream;
3557       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3558     }
3559
3560     /* clean up some of the state of conn2 */
3561     g_cancellable_cancel (conn2->cancellable);
3562     conn2->write_socket = conn2->read_socket = NULL;
3563     conn2->socket0 = NULL;
3564     conn2->stream0 = NULL;
3565     conn2->socket1 = NULL;
3566     conn2->stream1 = NULL;
3567     conn2->input_stream = NULL;
3568     conn2->control_stream = NULL;
3569     g_object_unref (conn2->cancellable);
3570     conn2->cancellable = NULL;
3571
3572     /* We make socket0 the write socket and socket1 the read socket. */
3573     conn->write_socket = conn->socket0;
3574     conn->read_socket = conn->socket1;
3575
3576     conn->tstate = TUNNEL_STATE_COMPLETE;
3577
3578     g_free (conn->initial_buffer);
3579     conn->initial_buffer = conn2->initial_buffer;
3580     conn2->initial_buffer = NULL;
3581     conn->initial_buffer_offset = conn2->initial_buffer_offset;
3582   }
3583
3584   /* we need base64 decoding for the readfd */
3585   conn->ctx.state = 0;
3586   conn->ctx.save = 0;
3587   conn->ctx.cout = 0;
3588   conn->ctx.coutl = 0;
3589   conn->ctxp = &conn->ctx;
3590
3591   return GST_RTSP_OK;
3592 }
3593
3594 /**
3595  * gst_rtsp_connection_set_remember_session_id:
3596  * @conn: a #GstRTSPConnection
3597  * @remember: %TRUE if the connection should remember the session id
3598  *
3599  * Sets if the #GstRTSPConnection should remember the session id from the last
3600  * response received and force it onto any further requests.
3601  *
3602  * The default value is %TRUE
3603  */
3604
3605 void
3606 gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
3607     gboolean remember)
3608 {
3609   conn->remember_session_id = remember;
3610   if (!remember)
3611     conn->session_id[0] = '\0';
3612 }
3613
3614 /**
3615  * gst_rtsp_connection_get_remember_session_id:
3616  * @conn: a #GstRTSPConnection
3617  *
3618  * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
3619  * last response to set it on any further request.
3620  */
3621
3622 gboolean
3623 gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
3624 {
3625   return conn->remember_session_id;
3626 }
3627
3628
3629 #define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3630 #define READ_COND   (G_IO_IN | READ_ERR)
3631 #define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3632 #define WRITE_COND  (G_IO_OUT | WRITE_ERR)
3633
3634 /* async functions */
3635 struct _GstRTSPWatch
3636 {
3637   GSource source;
3638
3639   GstRTSPConnection *conn;
3640
3641   GstRTSPBuilder builder;
3642   GstRTSPMessage message;
3643
3644   GSource *readsrc;
3645   GSource *writesrc;
3646   GSource *controlsrc;
3647
3648   gboolean keep_running;
3649
3650   /* queued message for transmission */
3651   guint id;
3652   GMutex mutex;
3653   GstQueueArray *messages;
3654   gsize messages_bytes;
3655   guint messages_count;
3656
3657   gsize max_bytes;
3658   guint max_messages;
3659   GCond queue_not_full;
3660   gboolean flushing;
3661
3662   GstRTSPWatchFuncs funcs;
3663
3664   gpointer user_data;
3665   GDestroyNotify notify;
3666 };
3667
3668 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
3669       ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
3670
3671 static gboolean
3672 gst_rtsp_source_prepare (GSource * source, gint * timeout)
3673 {
3674   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3675
3676   if (watch->conn->initial_buffer != NULL)
3677     return TRUE;
3678
3679   *timeout = (watch->conn->timeout * 1000);
3680
3681   return FALSE;
3682 }
3683
3684 static gboolean
3685 gst_rtsp_source_check (GSource * source)
3686 {
3687   return FALSE;
3688 }
3689
3690 static gboolean
3691 gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
3692     GstRTSPWatch * watch)
3693 {
3694   gssize count;
3695   guint8 buffer[1024];
3696   GError *error = NULL;
3697
3698   /* try to read in order to be able to detect errors, we read 1k in case some
3699    * client actually decides to send data on the GET channel */
3700   count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
3701       &error);
3702   if (count == 0) {
3703     /* other end closed the socket */
3704     goto eof;
3705   }
3706
3707   if (count < 0) {
3708     GST_DEBUG ("%s", error->message);
3709     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
3710         g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
3711       g_clear_error (&error);
3712       goto done;
3713     }
3714     g_clear_error (&error);
3715     goto read_error;
3716   }
3717
3718   /* client sent data on the GET channel, ignore it */
3719
3720 done:
3721   return TRUE;
3722
3723   /* ERRORS */
3724 eof:
3725   {
3726     if (watch->funcs.closed)
3727       watch->funcs.closed (watch, watch->user_data);
3728
3729     /* the read connection was closed, stop the watch now */
3730     watch->keep_running = FALSE;
3731
3732     return FALSE;
3733   }
3734 read_error:
3735   {
3736     if (watch->funcs.error_full)
3737       watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
3738           0, watch->user_data);
3739     else if (watch->funcs.error)
3740       watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
3741
3742     goto eof;
3743   }
3744 }
3745
3746 static gboolean
3747 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
3748     GstRTSPWatch * watch)
3749 {
3750   GstRTSPResult res = GST_RTSP_ERROR;
3751   GstRTSPConnection *conn = watch->conn;
3752
3753   /* if this connection was already closed, stop now */
3754   if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
3755     goto eof;
3756
3757   res = build_next (&watch->builder, &watch->message, conn, FALSE);
3758   if (res == GST_RTSP_EINTR)
3759     goto done;
3760   else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
3761     g_mutex_lock (&watch->mutex);
3762     if (watch->readsrc) {
3763       if (!g_source_is_destroyed ((GSource *) watch))
3764         g_source_remove_child_source ((GSource *) watch, watch->readsrc);
3765       g_source_unref (watch->readsrc);
3766       watch->readsrc = NULL;
3767     }
3768
3769     if (conn->stream1) {
3770       g_object_unref (conn->stream1);
3771       conn->stream1 = NULL;
3772       conn->socket1 = NULL;
3773       conn->input_stream = NULL;
3774     }
3775     g_mutex_unlock (&watch->mutex);
3776
3777     /* When we are in tunnelled mode, the read socket can be closed and we
3778      * should be prepared for a new POST method to reopen it */
3779     if (conn->tstate == TUNNEL_STATE_COMPLETE) {
3780       /* remove the read connection for the tunnel */
3781       /* we accept a new POST request */
3782       conn->tstate = TUNNEL_STATE_GET;
3783       /* and signal that we lost our tunnel */
3784       if (watch->funcs.tunnel_lost)
3785         res = watch->funcs.tunnel_lost (watch, watch->user_data);
3786       /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */
3787       g_mutex_lock (&watch->mutex);
3788       if (watch->conn->control_stream && !watch->controlsrc) {
3789         watch->controlsrc =
3790             g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3791             (watch->conn->control_stream), NULL);
3792         g_source_set_callback (watch->controlsrc,
3793             (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3794             NULL);
3795         g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3796       }
3797       g_mutex_unlock (&watch->mutex);
3798       goto read_done;
3799     } else
3800       goto eof;
3801   } else if (G_LIKELY (res == GST_RTSP_OK)) {
3802     if (!conn->manual_http &&
3803         watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3804       if (conn->tstate == TUNNEL_STATE_NONE &&
3805           watch->message.type_data.request.method == GST_RTSP_GET) {
3806         GstRTSPMessage *response;
3807         GstRTSPStatusCode code;
3808
3809         conn->tstate = TUNNEL_STATE_GET;
3810
3811         if (watch->funcs.tunnel_start)
3812           code = watch->funcs.tunnel_start (watch, watch->user_data);
3813         else
3814           code = GST_RTSP_STS_OK;
3815
3816         /* queue the response */
3817         response = gen_tunnel_reply (conn, code, &watch->message);
3818         if (watch->funcs.tunnel_http_response)
3819           watch->funcs.tunnel_http_response (watch, &watch->message, response,
3820               watch->user_data);
3821         gst_rtsp_watch_send_message (watch, response, NULL);
3822         gst_rtsp_message_free (response);
3823         goto read_done;
3824       } else if (conn->tstate == TUNNEL_STATE_NONE &&
3825           watch->message.type_data.request.method == GST_RTSP_POST) {
3826         conn->tstate = TUNNEL_STATE_POST;
3827
3828         /* in the callback the connection should be tunneled with the
3829          * GET connection */
3830         if (watch->funcs.tunnel_complete) {
3831           watch->funcs.tunnel_complete (watch, watch->user_data);
3832         }
3833         goto read_done;
3834       }
3835     }
3836   } else
3837     goto read_error;
3838
3839   if (!conn->manual_http) {
3840     /* if manual HTTP support is not enabled, then restore the message to
3841      * what it would have looked like without the support for parsing HTTP
3842      * messages being present */
3843     if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3844       watch->message.type = GST_RTSP_MESSAGE_REQUEST;
3845       watch->message.type_data.request.method = GST_RTSP_INVALID;
3846       if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
3847         watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
3848       res = GST_RTSP_EPARSE;
3849     } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
3850       watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
3851       if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
3852         watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
3853       res = GST_RTSP_EPARSE;
3854     }
3855   }
3856   if (G_LIKELY (res != GST_RTSP_OK))
3857     goto read_error;
3858
3859   if (watch->funcs.message_received)
3860     watch->funcs.message_received (watch, &watch->message, watch->user_data);
3861
3862 read_done:
3863   gst_rtsp_message_unset (&watch->message);
3864   build_reset (&watch->builder);
3865
3866 done:
3867   return TRUE;
3868
3869   /* ERRORS */
3870 eof:
3871   {
3872     if (watch->funcs.closed)
3873       watch->funcs.closed (watch, watch->user_data);
3874
3875     /* we closed the read connection, stop the watch now */
3876     watch->keep_running = FALSE;
3877
3878     /* always stop when the input returns EOF in non-tunneled mode */
3879     return FALSE;
3880   }
3881 read_error:
3882   {
3883     if (watch->funcs.error_full)
3884       watch->funcs.error_full (watch, res, &watch->message,
3885           0, watch->user_data);
3886     else if (watch->funcs.error)
3887       watch->funcs.error (watch, res, watch->user_data);
3888
3889     goto eof;
3890   }
3891 }
3892
3893 static gboolean
3894 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
3895     gpointer user_data G_GNUC_UNUSED)
3896 {
3897   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3898   GstRTSPConnection *conn = watch->conn;
3899
3900   if (conn->initial_buffer != NULL) {
3901     gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
3902         watch);
3903   }
3904   return watch->keep_running;
3905 }
3906
3907 static gboolean
3908 gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
3909     GstRTSPWatch * watch)
3910 {
3911   GstRTSPResult res = GST_RTSP_ERROR;
3912   GstRTSPConnection *conn = watch->conn;
3913
3914   /* if this connection was already closed, stop now */
3915   if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3916       !watch->messages)
3917     goto eof;
3918
3919   g_mutex_lock (&watch->mutex);
3920   do {
3921     guint n_messages = gst_queue_array_get_length (watch->messages);
3922     GOutputVector *vectors;
3923     GstMapInfo *map_infos;
3924     guint *ids;
3925     gsize bytes_to_write, bytes_written;
3926     guint n_vectors, n_memories, n_ids, drop_messages;
3927     gint i, j, l, n_mmap;
3928     GstRTSPSerializedMessage *msg;
3929
3930     /* if this connection was already closed, stop now */
3931     if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3932         !watch->messages) {
3933       g_mutex_unlock (&watch->mutex);
3934       goto eof;
3935     }
3936
3937     if (n_messages == 0) {
3938       if (watch->writesrc) {
3939         if (!g_source_is_destroyed ((GSource *) watch))
3940           g_source_remove_child_source ((GSource *) watch, watch->writesrc);
3941         g_source_unref (watch->writesrc);
3942         watch->writesrc = NULL;
3943         /* we create and add the write source again when we actually have
3944          * something to write */
3945
3946         /* since write source is now removed we add read source on the write
3947          * socket instead to be able to detect when client closes get channel
3948          * in tunneled mode */
3949         if (watch->conn->control_stream) {
3950           watch->controlsrc =
3951               g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3952               (watch->conn->control_stream), NULL);
3953           g_source_set_callback (watch->controlsrc,
3954               (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3955               NULL);
3956           g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3957         } else {
3958           watch->controlsrc = NULL;
3959         }
3960       }
3961       break;
3962     }
3963
3964     for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
3965       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3966       if (msg->id != 0)
3967         n_ids++;
3968
3969       if (msg->data_offset < msg->data_size)
3970         n_vectors++;
3971
3972       if (msg->body_data && msg->body_offset < msg->body_data_size) {
3973         n_vectors++;
3974       } else if (msg->body_buffer) {
3975         guint m, n;
3976         guint offset = 0;
3977
3978         n = gst_buffer_n_memory (msg->body_buffer);
3979         for (m = 0; m < n; m++) {
3980           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3981
3982           /* Skip all memories we already wrote */
3983           if (offset + mem->size <= msg->body_offset) {
3984             offset += mem->size;
3985             continue;
3986           }
3987           offset += mem->size;
3988
3989           n_memories++;
3990           n_vectors++;
3991         }
3992       }
3993     }
3994
3995     vectors = g_newa (GOutputVector, n_vectors);
3996     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
3997     ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
3998     if (ids)
3999       memset (ids, 0, sizeof (guint) * (n_ids + 1));
4000
4001     for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages;
4002         i++) {
4003       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4004
4005       if (msg->data_offset < msg->data_size) {
4006         vectors[j].buffer = (msg->data_is_data_header ?
4007             msg->data_header : msg->data) + msg->data_offset;
4008         vectors[j].size = msg->data_size - msg->data_offset;
4009         bytes_to_write += vectors[j].size;
4010         j++;
4011       }
4012
4013       if (msg->body_data) {
4014         if (msg->body_offset < msg->body_data_size) {
4015           vectors[j].buffer = msg->body_data + msg->body_offset;
4016           vectors[j].size = msg->body_data_size - msg->body_offset;
4017           bytes_to_write += vectors[j].size;
4018           j++;
4019         }
4020       } else if (msg->body_buffer) {
4021         guint m, n;
4022         guint offset = 0;
4023         n = gst_buffer_n_memory (msg->body_buffer);
4024         for (m = 0; m < n; m++) {
4025           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
4026           guint off;
4027
4028           /* Skip all memories we already wrote */
4029           if (offset + mem->size <= msg->body_offset) {
4030             offset += mem->size;
4031             continue;
4032           }
4033
4034           if (offset < msg->body_offset)
4035             off = msg->body_offset - offset;
4036           else
4037             off = 0;
4038
4039           offset += mem->size;
4040
4041           g_assert (off < mem->size);
4042
4043           gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ);
4044           vectors[j].buffer = map_infos[n_mmap].data + off;
4045           vectors[j].size = map_infos[n_mmap].size - off;
4046           bytes_to_write += vectors[j].size;
4047
4048           n_mmap++;
4049           j++;
4050         }
4051       }
4052     }
4053
4054     res =
4055         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4056         &bytes_written, FALSE, watch->conn->cancellable);
4057     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4058
4059     /* First unmap all memories here, this simplifies the code below
4060      * as we don't have to skip all memories that were already written
4061      * before */
4062     for (i = 0; i < n_mmap; i++) {
4063       gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
4064     }
4065
4066     if (bytes_written == bytes_to_write) {
4067       /* fast path, just unmap all memories, free memory, drop all messages and notify them */
4068       l = 0;
4069       while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4070         if (msg->id) {
4071           ids[l] = msg->id;
4072           l++;
4073         }
4074
4075         gst_rtsp_serialized_message_clear (msg);
4076       }
4077
4078       g_assert (watch->messages_bytes >= bytes_written);
4079       watch->messages_bytes -= bytes_written;
4080     } else if (bytes_written > 0) {
4081       /* not done, let's skip all messages that were sent already and free them */
4082       for (i = 0, drop_messages = 0; i < n_messages; i++) {
4083         msg = gst_queue_array_peek_nth_struct (watch->messages, i);
4084
4085         if (bytes_written >= msg->data_size - msg->data_offset) {
4086           guint body_size;
4087
4088           /* all data of this message is sent, check body and otherwise
4089            * skip the whole message for next time */
4090           bytes_written -= (msg->data_size - msg->data_offset);
4091           watch->messages_bytes -= (msg->data_size - msg->data_offset);
4092           msg->data_offset = msg->data_size;
4093
4094           if (msg->body_data) {
4095             body_size = msg->body_data_size;
4096           } else if (msg->body_buffer) {
4097             body_size = gst_buffer_get_size (msg->body_buffer);
4098           } else {
4099             body_size = 0;
4100           }
4101
4102           if (bytes_written + msg->body_offset >= body_size) {
4103             /* body written, drop this message */
4104             bytes_written -= body_size - msg->body_offset;
4105             watch->messages_bytes -= body_size - msg->body_offset;
4106             msg->body_offset = body_size;
4107             drop_messages++;
4108
4109             if (msg->id) {
4110               ids[l] = msg->id;
4111               l++;
4112             }
4113
4114             gst_rtsp_serialized_message_clear (msg);
4115           } else {
4116             msg->body_offset += bytes_written;
4117             watch->messages_bytes -= bytes_written;
4118             bytes_written = 0;
4119           }
4120         } else {
4121           /* Need to continue sending from the data of this message */
4122           msg->data_offset += bytes_written;
4123           watch->messages_bytes -= bytes_written;
4124           bytes_written = 0;
4125         }
4126       }
4127
4128       while (drop_messages > 0) {
4129         msg = gst_queue_array_pop_head_struct (watch->messages);
4130         g_assert (msg);
4131         drop_messages--;
4132       }
4133
4134       g_assert (watch->messages_bytes >= bytes_written);
4135       watch->messages_bytes -= bytes_written;
4136     }
4137
4138     if (!IS_BACKLOG_FULL (watch))
4139       g_cond_signal (&watch->queue_not_full);
4140     g_mutex_unlock (&watch->mutex);
4141
4142     /* notify all messages that were successfully written */
4143     if (ids) {
4144       while (*ids) {
4145         /* only decrease the counter for messages that have an id. Only
4146          * the last message of a messages chunk is counted */
4147         watch->messages_count--;
4148
4149         if (watch->funcs.message_sent)
4150           watch->funcs.message_sent (watch, *ids, watch->user_data);
4151         ids++;
4152       }
4153     }
4154
4155     if (res == GST_RTSP_EINTR) {
4156       goto write_blocked;
4157     } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
4158       goto write_error;
4159     }
4160     g_mutex_lock (&watch->mutex);
4161   } while (TRUE);
4162   g_mutex_unlock (&watch->mutex);
4163
4164 write_blocked:
4165   return TRUE;
4166
4167   /* ERRORS */
4168 eof:
4169   {
4170     return FALSE;
4171   }
4172 write_error:
4173   {
4174     if (watch->funcs.error_full) {
4175       guint i, n_messages;
4176
4177       n_messages = gst_queue_array_get_length (watch->messages);
4178       for (i = 0; i < n_messages; i++) {
4179         GstRTSPSerializedMessage *msg =
4180             gst_queue_array_peek_nth_struct (watch->messages, i);
4181         if (msg->id)
4182           watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
4183       }
4184     } else if (watch->funcs.error) {
4185       watch->funcs.error (watch, res, watch->user_data);
4186     }
4187
4188     return FALSE;
4189   }
4190 }
4191
4192 static void
4193 gst_rtsp_source_finalize (GSource * source)
4194 {
4195   GstRTSPWatch *watch = (GstRTSPWatch *) source;
4196   GstRTSPSerializedMessage *msg;
4197
4198   if (watch->notify)
4199     watch->notify (watch->user_data);
4200
4201   build_reset (&watch->builder);
4202   gst_rtsp_message_unset (&watch->message);
4203
4204   while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4205     gst_rtsp_serialized_message_clear (msg);
4206   }
4207   gst_queue_array_free (watch->messages);
4208   watch->messages = NULL;
4209   watch->messages_bytes = 0;
4210   watch->messages_count = 0;
4211
4212   g_cond_clear (&watch->queue_not_full);
4213
4214   if (watch->readsrc)
4215     g_source_unref (watch->readsrc);
4216   if (watch->writesrc)
4217     g_source_unref (watch->writesrc);
4218   if (watch->controlsrc)
4219     g_source_unref (watch->controlsrc);
4220
4221   g_mutex_clear (&watch->mutex);
4222 }
4223
4224 static GSourceFuncs gst_rtsp_source_funcs = {
4225   gst_rtsp_source_prepare,
4226   gst_rtsp_source_check,
4227   gst_rtsp_source_dispatch,
4228   gst_rtsp_source_finalize,
4229   NULL,
4230   NULL
4231 };
4232
4233 /**
4234  * gst_rtsp_watch_new: (skip)
4235  * @conn: a #GstRTSPConnection
4236  * @funcs: watch functions
4237  * @user_data: user data to pass to @funcs
4238  * @notify: notify when @user_data is not referenced anymore
4239  *
4240  * Create a watch object for @conn. The functions provided in @funcs will be
4241  * called with @user_data when activity happened on the watch.
4242  *
4243  * The new watch is usually created so that it can be attached to a
4244  * maincontext with gst_rtsp_watch_attach().
4245  *
4246  * @conn must exist for the entire lifetime of the watch.
4247  *
4248  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
4249  * communication. Free with gst_rtsp_watch_unref () after usage.
4250  */
4251 GstRTSPWatch *
4252 gst_rtsp_watch_new (GstRTSPConnection * conn,
4253     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
4254 {
4255   GstRTSPWatch *result;
4256
4257   g_return_val_if_fail (conn != NULL, NULL);
4258   g_return_val_if_fail (funcs != NULL, NULL);
4259   g_return_val_if_fail (conn->read_socket != NULL, NULL);
4260   g_return_val_if_fail (conn->write_socket != NULL, NULL);
4261
4262   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
4263       sizeof (GstRTSPWatch));
4264
4265   result->conn = conn;
4266   result->builder.state = STATE_START;
4267
4268   g_mutex_init (&result->mutex);
4269   result->messages =
4270       gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
4271   g_cond_init (&result->queue_not_full);
4272
4273   gst_rtsp_watch_reset (result);
4274   result->keep_running = TRUE;
4275   result->flushing = FALSE;
4276
4277   result->funcs = *funcs;
4278   result->user_data = user_data;
4279   result->notify = notify;
4280
4281   return result;
4282 }
4283
4284 /**
4285  * gst_rtsp_watch_reset:
4286  * @watch: a #GstRTSPWatch
4287  *
4288  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
4289  * when the file descriptors of the connection might have changed.
4290  */
4291 void
4292 gst_rtsp_watch_reset (GstRTSPWatch * watch)
4293 {
4294   g_mutex_lock (&watch->mutex);
4295   if (watch->readsrc) {
4296     g_source_remove_child_source ((GSource *) watch, watch->readsrc);
4297     g_source_unref (watch->readsrc);
4298   }
4299   if (watch->writesrc) {
4300     g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4301     g_source_unref (watch->writesrc);
4302     watch->writesrc = NULL;
4303   }
4304   if (watch->controlsrc) {
4305     g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4306     g_source_unref (watch->controlsrc);
4307     watch->controlsrc = NULL;
4308   }
4309
4310   if (watch->conn->input_stream) {
4311     watch->readsrc =
4312         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4313         (watch->conn->input_stream), NULL);
4314     g_source_set_callback (watch->readsrc,
4315         (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
4316     g_source_add_child_source ((GSource *) watch, watch->readsrc);
4317   } else {
4318     watch->readsrc = NULL;
4319   }
4320
4321   /* we create and add the write source when we actually have something to
4322    * write */
4323
4324   /* when write source is not added we add read source on the write socket
4325    * instead to be able to detect when client closes get channel in tunneled
4326    * mode */
4327   if (watch->conn->control_stream) {
4328     watch->controlsrc =
4329         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4330         (watch->conn->control_stream), NULL);
4331     g_source_set_callback (watch->controlsrc,
4332         (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
4333     g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4334   } else {
4335     watch->controlsrc = NULL;
4336   }
4337   g_mutex_unlock (&watch->mutex);
4338 }
4339
4340 /**
4341  * gst_rtsp_watch_attach:
4342  * @watch: a #GstRTSPWatch
4343  * @context: a GMainContext (if NULL, the default context will be used)
4344  *
4345  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
4346  *
4347  * Returns: the ID (greater than 0) for the watch within the GMainContext.
4348  */
4349 guint
4350 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
4351 {
4352   g_return_val_if_fail (watch != NULL, 0);
4353
4354   return g_source_attach ((GSource *) watch, context);
4355 }
4356
4357 /**
4358  * gst_rtsp_watch_unref:
4359  * @watch: a #GstRTSPWatch
4360  *
4361  * Decreases the reference count of @watch by one. If the resulting reference
4362  * count is zero the watch and associated memory will be destroyed.
4363  */
4364 void
4365 gst_rtsp_watch_unref (GstRTSPWatch * watch)
4366 {
4367   g_return_if_fail (watch != NULL);
4368
4369   g_source_unref ((GSource *) watch);
4370 }
4371
4372 /**
4373  * gst_rtsp_watch_set_send_backlog:
4374  * @watch: a #GstRTSPWatch
4375  * @bytes: maximum bytes
4376  * @messages: maximum messages
4377  *
4378  * Set the maximum amount of bytes and messages that will be queued in @watch.
4379  * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
4380  * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
4381  *
4382  * A value of 0 for @bytes or @messages means no limits.
4383  *
4384  * Since: 1.2
4385  */
4386 void
4387 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
4388     gsize bytes, guint messages)
4389 {
4390   g_return_if_fail (watch != NULL);
4391
4392   g_mutex_lock (&watch->mutex);
4393   watch->max_bytes = bytes;
4394   watch->max_messages = messages;
4395   if (!IS_BACKLOG_FULL (watch))
4396     g_cond_signal (&watch->queue_not_full);
4397   g_mutex_unlock (&watch->mutex);
4398
4399   GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
4400       bytes, messages);
4401 }
4402
4403 /**
4404  * gst_rtsp_watch_get_send_backlog:
4405  * @watch: a #GstRTSPWatch
4406  * @bytes: (out) (allow-none): maximum bytes
4407  * @messages: (out) (allow-none): maximum messages
4408  *
4409  * Get the maximum amount of bytes and messages that will be queued in @watch.
4410  * See gst_rtsp_watch_set_send_backlog().
4411  *
4412  * Since: 1.2
4413  */
4414 void
4415 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
4416     gsize * bytes, guint * messages)
4417 {
4418   g_return_if_fail (watch != NULL);
4419
4420   g_mutex_lock (&watch->mutex);
4421   if (bytes)
4422     *bytes = watch->max_bytes;
4423   if (messages)
4424     *messages = watch->max_messages;
4425   g_mutex_unlock (&watch->mutex);
4426 }
4427
4428 static GstRTSPResult
4429 gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
4430     GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
4431 {
4432   GstRTSPResult res;
4433   GMainContext *context = NULL;
4434   gint i;
4435
4436   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4437   g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
4438
4439   g_mutex_lock (&watch->mutex);
4440   if (watch->flushing)
4441     goto flushing;
4442
4443   /* try to send the message synchronously first */
4444   if (gst_queue_array_get_length (watch->messages) == 0) {
4445     gint j, k;
4446     GOutputVector *vectors;
4447     GstMapInfo *map_infos;
4448     gsize bytes_to_write, bytes_written;
4449     guint n_vectors, n_memories, drop_messages;
4450
4451     for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
4452       n_vectors++;
4453       if (messages[i].body_data) {
4454         n_vectors++;
4455       } else if (messages[i].body_buffer) {
4456         n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
4457         n_memories += gst_buffer_n_memory (messages[i].body_buffer);
4458       }
4459     }
4460
4461     vectors = g_newa (GOutputVector, n_vectors);
4462     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4463
4464     for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
4465       vectors[j].buffer = messages[i].data_is_data_header ?
4466           messages[i].data_header : messages[i].data;
4467       vectors[j].size = messages[i].data_size;
4468       bytes_to_write += vectors[j].size;
4469       j++;
4470
4471       if (messages[i].body_data) {
4472         vectors[j].buffer = messages[i].body_data;
4473         vectors[j].size = messages[i].body_data_size;
4474         bytes_to_write += vectors[j].size;
4475         j++;
4476       } else if (messages[i].body_buffer) {
4477         gint l, n;
4478
4479         n = gst_buffer_n_memory (messages[i].body_buffer);
4480         for (l = 0; l < n; l++) {
4481           GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
4482
4483           gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
4484           vectors[j].buffer = map_infos[k].data;
4485           vectors[j].size = map_infos[k].size;
4486           bytes_to_write += vectors[j].size;
4487
4488           k++;
4489           j++;
4490         }
4491       }
4492     }
4493
4494     res =
4495         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4496         &bytes_written, FALSE, watch->conn->cancellable);
4497     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4498
4499     /* At this point we sent everything we could without blocking or
4500      * error and updated the offsets inside the message accordingly */
4501
4502     /* First of all unmap all memories. This simplifies the code below */
4503     for (k = 0; k < n_memories; k++) {
4504       gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
4505     }
4506
4507     if (res != GST_RTSP_EINTR) {
4508       /* actual error or done completely */
4509       if (id != NULL)
4510         *id = 0;
4511
4512       /* free everything */
4513       for (i = 0, k = 0; i < n_messages; i++) {
4514         gst_rtsp_serialized_message_clear (&messages[i]);
4515       }
4516
4517       goto done;
4518     }
4519
4520     /* not done, let's skip all messages that were sent already and free them */
4521     for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
4522       if (bytes_written >= messages[i].data_size) {
4523         guint body_size;
4524
4525         /* all data of this message is sent, check body and otherwise
4526          * skip the whole message for next time */
4527         messages[i].data_offset = messages[i].data_size;
4528         bytes_written -= messages[i].data_size;
4529
4530         if (messages[i].body_data) {
4531           body_size = messages[i].body_data_size;
4532
4533         } else if (messages[i].body_buffer) {
4534           body_size = gst_buffer_get_size (messages[i].body_buffer);
4535         } else {
4536           body_size = 0;
4537         }
4538
4539         if (bytes_written >= body_size) {
4540           /* body written, drop this message */
4541           messages[i].body_offset = body_size;
4542           bytes_written -= body_size;
4543           drop_messages++;
4544
4545           gst_rtsp_serialized_message_clear (&messages[i]);
4546         } else {
4547           messages[i].body_offset = bytes_written;
4548           bytes_written = 0;
4549         }
4550       } else {
4551         /* Need to continue sending from the data of this message */
4552         messages[i].data_offset = bytes_written;
4553         bytes_written = 0;
4554       }
4555     }
4556
4557     g_assert (n_messages > drop_messages);
4558
4559     messages += drop_messages;
4560     n_messages -= drop_messages;
4561   }
4562
4563   /* check limits */
4564   if (IS_BACKLOG_FULL (watch))
4565     goto too_much_backlog;
4566
4567   for (i = 0; i < n_messages; i++) {
4568     GstRTSPSerializedMessage local_message;
4569
4570     /* make a record with the data and id for sending async */
4571     local_message = messages[i];
4572
4573     /* copy the body data or take an additional reference to the body buffer
4574      * we don't own them here */
4575     if (local_message.body_data) {
4576       local_message.body_data =
4577           g_memdup2 (local_message.body_data, local_message.body_data_size);
4578     } else if (local_message.body_buffer) {
4579       gst_buffer_ref (local_message.body_buffer);
4580     }
4581     local_message.borrowed = FALSE;
4582
4583     /* set an id for the very last message */
4584     if (i == n_messages - 1) {
4585       do {
4586         /* make sure rec->id is never 0 */
4587         local_message.id = ++watch->id;
4588       } while (G_UNLIKELY (local_message.id == 0));
4589
4590       if (id != NULL)
4591         *id = local_message.id;
4592     } else {
4593       local_message.id = 0;
4594     }
4595
4596     /* add the record to a queue. */
4597     gst_queue_array_push_tail_struct (watch->messages, &local_message);
4598     watch->messages_bytes +=
4599         (local_message.data_size - local_message.data_offset);
4600     if (local_message.body_data)
4601       watch->messages_bytes +=
4602           (local_message.body_data_size - local_message.body_offset);
4603     else if (local_message.body_buffer)
4604       watch->messages_bytes +=
4605           (gst_buffer_get_size (local_message.body_buffer) -
4606           local_message.body_offset);
4607   }
4608   /* each message chunks is one unit */
4609   watch->messages_count++;
4610
4611   /* make sure the main context will now also check for writability on the
4612    * socket */
4613   context = ((GSource *) watch)->context;
4614   if (!watch->writesrc) {
4615     /* remove the read source on the write socket, we will be able to detect
4616      * errors while writing */
4617     if (watch->controlsrc) {
4618       g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4619       g_source_unref (watch->controlsrc);
4620       watch->controlsrc = NULL;
4621     }
4622
4623     watch->writesrc =
4624         g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
4625         (watch->conn->output_stream), NULL);
4626     g_source_set_callback (watch->writesrc,
4627         (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
4628     g_source_add_child_source ((GSource *) watch, watch->writesrc);
4629   }
4630   res = GST_RTSP_OK;
4631
4632 done:
4633   g_mutex_unlock (&watch->mutex);
4634
4635   if (context)
4636     g_main_context_wakeup (context);
4637
4638   return res;
4639
4640   /* ERRORS */
4641 flushing:
4642   {
4643     GST_DEBUG ("we are flushing");
4644     g_mutex_unlock (&watch->mutex);
4645     for (i = 0; i < n_messages; i++) {
4646       gst_rtsp_serialized_message_clear (&messages[i]);
4647     }
4648     return GST_RTSP_EINTR;
4649   }
4650 too_much_backlog:
4651   {
4652     GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
4653         G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
4654         watch->messages_bytes, watch->max_messages, watch->messages_count);
4655     g_mutex_unlock (&watch->mutex);
4656     for (i = 0; i < n_messages; i++) {
4657       gst_rtsp_serialized_message_clear (&messages[i]);
4658     }
4659     return GST_RTSP_ENOMEM;
4660   }
4661
4662   return GST_RTSP_OK;
4663 }
4664
4665 /**
4666  * gst_rtsp_watch_write_data:
4667  * @watch: a #GstRTSPWatch
4668  * @data: (array length=size) (transfer full): the data to queue
4669  * @size: the size of @data
4670  * @id: (out) (allow-none): location for a message ID or %NULL
4671  *
4672  * Write @data using the connection of the @watch. If it cannot be sent
4673  * immediately, it will be queued for transmission in @watch. The contents of
4674  * @message will then be serialized and transmitted when the connection of the
4675  * @watch becomes writable. In case the @message is queued, the ID returned in
4676  * @id will be non-zero and used as the ID argument in the message_sent
4677  * callback.
4678  *
4679  * This function will take ownership of @data and g_free() it after use.
4680  *
4681  * If the amount of queued data exceeds the limits set with
4682  * gst_rtsp_watch_set_send_backlog(), this function will return
4683  * #GST_RTSP_ENOMEM.
4684  *
4685  * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
4686  * are reached. #GST_RTSP_EINTR when @watch was flushing.
4687  */
4688 /* FIXME 2.0: This should've been static! */
4689 GstRTSPResult
4690 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
4691     guint size, guint * id)
4692 {
4693   GstRTSPSerializedMessage serialized_message;
4694
4695   memset (&serialized_message, 0, sizeof (serialized_message));
4696   serialized_message.data = (guint8 *) data;
4697   serialized_message.data_size = size;
4698
4699   return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
4700       1, id);
4701 }
4702
4703 /**
4704  * gst_rtsp_watch_send_message:
4705  * @watch: a #GstRTSPWatch
4706  * @message: a #GstRTSPMessage
4707  * @id: (out) (allow-none): location for a message ID or %NULL
4708  *
4709  * Send a @message using the connection of the @watch. If it cannot be sent
4710  * immediately, it will be queued for transmission in @watch. The contents of
4711  * @message will then be serialized and transmitted when the connection of the
4712  * @watch becomes writable. In case the @message is queued, the ID returned in
4713  * @id will be non-zero and used as the ID argument in the message_sent
4714  * callback.
4715  *
4716  * Returns: #GST_RTSP_OK on success.
4717  */
4718 GstRTSPResult
4719 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
4720     guint * id)
4721 {
4722   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4723   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
4724
4725   return gst_rtsp_watch_send_messages (watch, message, 1, id);
4726 }
4727
4728 /**
4729  * gst_rtsp_watch_send_messages:
4730  * @watch: a #GstRTSPWatch
4731  * @messages: (array length=n_messages): the messages to send
4732  * @n_messages: the number of messages to send
4733  * @id: (out) (allow-none): location for a message ID or %NULL
4734  *
4735  * Sends @messages using the connection of the @watch. If they cannot be sent
4736  * immediately, they will be queued for transmission in @watch. The contents of
4737  * @messages will then be serialized and transmitted when the connection of the
4738  * @watch becomes writable. In case the @messages are queued, the ID returned in
4739  * @id will be non-zero and used as the ID argument in the message_sent
4740  * callback once the last message is sent. The callback will only be called
4741  * once for the last message.
4742  *
4743  * Returns: #GST_RTSP_OK on success.
4744  *
4745  * Since: 1.16
4746  */
4747 GstRTSPResult
4748 gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
4749     guint n_messages, guint * id)
4750 {
4751   GstRTSPSerializedMessage *serialized_messages;
4752   gint i;
4753
4754   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4755   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
4756
4757   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
4758   memset (serialized_messages, 0,
4759       sizeof (GstRTSPSerializedMessage) * n_messages);
4760
4761   for (i = 0; i < n_messages; i++) {
4762     if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
4763       goto error;
4764   }
4765
4766   return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
4767       n_messages, id);
4768
4769 error:
4770   for (i = 0; i < n_messages; i++) {
4771     gst_rtsp_serialized_message_clear (&serialized_messages[i]);
4772   }
4773
4774   return GST_RTSP_EINVAL;
4775 }
4776
4777 /**
4778  * gst_rtsp_watch_wait_backlog_usec:
4779  * @watch: a #GstRTSPWatch
4780  * @timeout: a timeout in microseconds
4781  *
4782  * Wait until there is place in the backlog queue, @timeout is reached
4783  * or @watch is set to flushing.
4784  *
4785  * If @timeout is 0 this function can block forever. If @timeout
4786  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
4787  * after the timeout expired.
4788  *
4789  * The typically use of this function is when gst_rtsp_watch_write_data
4790  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
4791  * free space in the backlog queue and try again.
4792  *
4793  * Returns: %GST_RTSP_OK when if there is room in queue.
4794  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
4795  *          %GST_RTSP_EINTR when @watch is flushing
4796  *          %GST_RTSP_EINVAL when called with invalid parameters.
4797  *
4798  * Since: 1.18
4799  */
4800 GstRTSPResult
4801 gst_rtsp_watch_wait_backlog_usec (GstRTSPWatch * watch, gint64 timeout)
4802 {
4803   gint64 end_time;
4804
4805   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4806
4807   end_time = g_get_monotonic_time () + timeout;
4808
4809   g_mutex_lock (&watch->mutex);
4810   if (watch->flushing)
4811     goto flushing;
4812
4813   while (IS_BACKLOG_FULL (watch)) {
4814     gboolean res;
4815
4816     res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time);
4817     if (watch->flushing)
4818       goto flushing;
4819
4820     if (!res)
4821       goto timeout;
4822   }
4823   g_mutex_unlock (&watch->mutex);
4824
4825   return GST_RTSP_OK;
4826
4827   /* ERRORS */
4828 flushing:
4829   {
4830     GST_DEBUG ("we are flushing");
4831     g_mutex_unlock (&watch->mutex);
4832     return GST_RTSP_EINTR;
4833   }
4834 timeout:
4835   {
4836     GST_DEBUG ("we timed out");
4837     g_mutex_unlock (&watch->mutex);
4838     return GST_RTSP_ETIMEOUT;
4839   }
4840 }
4841
4842 /**
4843  * gst_rtsp_watch_set_flushing:
4844  * @watch: a #GstRTSPWatch
4845  * @flushing: new flushing state
4846  *
4847  * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog()
4848  * and make sure gst_rtsp_watch_write_data() returns immediately with
4849  * #GST_RTSP_EINTR. And empty the queue.
4850  *
4851  * Since: 1.4
4852  */
4853 void
4854 gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
4855 {
4856   g_return_if_fail (watch != NULL);
4857
4858   g_mutex_lock (&watch->mutex);
4859   watch->flushing = flushing;
4860   g_cond_signal (&watch->queue_not_full);
4861   if (flushing) {
4862     GstRTSPSerializedMessage *msg;
4863
4864     while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4865       gst_rtsp_serialized_message_clear (msg);
4866     }
4867   }
4868   g_mutex_unlock (&watch->mutex);
4869 }
4870
4871
4872 #ifndef GST_DISABLE_DEPRECATED
4873 G_GNUC_BEGIN_IGNORE_DEPRECATIONS
4874 /* Deprecated */
4875 #define TV_TO_USEC(tv) ((tv) ? ((tv)->tv_sec * G_USEC_PER_SEC + (tv)->tv_usec) : 0)
4876 /**
4877  * gst_rtsp_connection_connect:
4878  * @conn: a #GstRTSPConnection
4879  * @timeout: a GTimeVal timeout
4880  *
4881  * Attempt to connect to the url of @conn made with
4882  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4883  * forever. If @timeout contains a valid timeout, this function will return
4884  * #GST_RTSP_ETIMEOUT after the timeout expired.
4885  *
4886  * This function can be cancelled with gst_rtsp_connection_flush().
4887  *
4888  * Returns: #GST_RTSP_OK when a connection could be made.
4889  *
4890  * Deprecated: 1.18
4891  */
4892     GstRTSPResult
4893 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
4894 {
4895   return gst_rtsp_connection_connect_usec (conn, TV_TO_USEC (timeout));
4896 }
4897
4898 /**
4899  * gst_rtsp_connection_connect_with_response:
4900  * @conn: a #GstRTSPConnection
4901  * @timeout: a GTimeVal timeout
4902  * @response: a #GstRTSPMessage
4903  *
4904  * Attempt to connect to the url of @conn made with
4905  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
4906  * forever. If @timeout contains a valid timeout, this function will return
4907  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
4908  * @response will contain a response to the tunneling request messages.
4909  *
4910  * This function can be cancelled with gst_rtsp_connection_flush().
4911  *
4912  * Returns: #GST_RTSP_OK when a connection could be made.
4913  *
4914  * Since: 1.8
4915  * Deprecated: 1.18
4916  */
4917 GstRTSPResult
4918 gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn,
4919     GTimeVal * timeout, GstRTSPMessage * response)
4920 {
4921   return gst_rtsp_connection_connect_with_response_usec (conn,
4922       TV_TO_USEC (timeout), response);
4923 }
4924
4925 /**
4926  * gst_rtsp_connection_read:
4927  * @conn: a #GstRTSPConnection
4928  * @data: the data to read
4929  * @size: the size of @data
4930  * @timeout: a timeout value or %NULL
4931  *
4932  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
4933  * the specified @timeout. @timeout can be %NULL, in which case this function
4934  * might block forever.
4935  *
4936  * This function can be cancelled with gst_rtsp_connection_flush().
4937  *
4938  * Returns: #GST_RTSP_OK on success.
4939  *
4940  * Deprecated: 1.18
4941  */
4942 GstRTSPResult
4943 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
4944     GTimeVal * timeout)
4945 {
4946   return gst_rtsp_connection_read_usec (conn, data, size, TV_TO_USEC (timeout));
4947 }
4948
4949 /**
4950  * gst_rtsp_connection_write:
4951  * @conn: a #GstRTSPConnection
4952  * @data: the data to write
4953  * @size: the size of @data
4954  * @timeout: a timeout value or %NULL
4955  *
4956  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
4957  * the specified @timeout. @timeout can be %NULL, in which case this function
4958  * might block forever.
4959  *
4960  * This function can be cancelled with gst_rtsp_connection_flush().
4961  *
4962  * Returns: #GST_RTSP_OK on success.
4963  *
4964  * Deprecated: 1.18
4965  */
4966 GstRTSPResult
4967 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
4968     guint size, GTimeVal * timeout)
4969 {
4970   return gst_rtsp_connection_write_usec (conn, data, size,
4971       TV_TO_USEC (timeout));
4972 }
4973
4974 /**
4975  * gst_rtsp_connection_send:
4976  * @conn: a #GstRTSPConnection
4977  * @message: the message to send
4978  * @timeout: a timeout value or %NULL
4979  *
4980  * Attempt to send @message to the connected @conn, blocking up to
4981  * the specified @timeout. @timeout can be %NULL, in which case this function
4982  * might block forever.
4983  *
4984  * This function can be cancelled with gst_rtsp_connection_flush().
4985  *
4986  * Returns: #GST_RTSP_OK on success.
4987  *
4988  * Deprecated: 1.18
4989  */
4990 GstRTSPResult
4991 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
4992     GTimeVal * timeout)
4993 {
4994   return gst_rtsp_connection_send_usec (conn, message, TV_TO_USEC (timeout));
4995 }
4996
4997 /**
4998  * gst_rtsp_connection_send_messages:
4999  * @conn: a #GstRTSPConnection
5000  * @messages: (array length=n_messages): the messages to send
5001  * @n_messages: the number of messages to send
5002  * @timeout: a timeout value or %NULL
5003  *
5004  * Attempt to send @messages to the connected @conn, blocking up to
5005  * the specified @timeout. @timeout can be %NULL, in which case this function
5006  * might block forever.
5007  *
5008  * This function can be cancelled with gst_rtsp_connection_flush().
5009  *
5010  * Returns: #GST_RTSP_OK on success.
5011  *
5012  * Since: 1.16
5013  * Deprecated: 1.18
5014  */
5015 GstRTSPResult
5016 gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
5017     GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
5018 {
5019   return gst_rtsp_connection_send_messages_usec (conn, messages, n_messages,
5020       TV_TO_USEC (timeout));
5021 }
5022
5023 /**
5024  * gst_rtsp_connection_receive:
5025  * @conn: a #GstRTSPConnection
5026  * @message: the message to read
5027  * @timeout: a timeout value or %NULL
5028  *
5029  * Attempt to read into @message from the connected @conn, blocking up to
5030  * the specified @timeout. @timeout can be %NULL, in which case this function
5031  * might block forever.
5032  *
5033  * This function can be cancelled with gst_rtsp_connection_flush().
5034  *
5035  * Returns: #GST_RTSP_OK on success.
5036  *
5037  * Deprecated: 1.18
5038  */
5039 GstRTSPResult
5040 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
5041     GTimeVal * timeout)
5042 {
5043   return gst_rtsp_connection_receive_usec (conn, message, TV_TO_USEC (timeout));
5044 }
5045
5046 /**
5047  * gst_rtsp_connection_poll:
5048  * @conn: a #GstRTSPConnection
5049  * @events: a bitmask of #GstRTSPEvent flags to check
5050  * @revents: location for result flags
5051  * @timeout: a timeout
5052  *
5053  * Wait up to the specified @timeout for the connection to become available for
5054  * at least one of the operations specified in @events. When the function returns
5055  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
5056  * @conn.
5057  *
5058  * @timeout can be %NULL, in which case this function might block forever.
5059  *
5060  * This function can be cancelled with gst_rtsp_connection_flush().
5061  *
5062  * Returns: #GST_RTSP_OK on success.
5063  *
5064  * Deprecated: 1.18
5065  */
5066 GstRTSPResult
5067 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
5068     GstRTSPEvent * revents, GTimeVal * timeout)
5069 {
5070   return gst_rtsp_connection_poll_usec (conn, events, revents,
5071       TV_TO_USEC (timeout));
5072 }
5073
5074 /**
5075  * gst_rtsp_connection_next_timeout:
5076  * @conn: a #GstRTSPConnection
5077  * @timeout: a timeout
5078  *
5079  * Calculate the next timeout for @conn, storing the result in @timeout.
5080  *
5081  * Returns: #GST_RTSP_OK.
5082  *
5083  * Deprecated: 1.18
5084  */
5085 GstRTSPResult
5086 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
5087 {
5088   gint64 tmptimeout = 0;
5089
5090   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
5091
5092   tmptimeout = gst_rtsp_connection_next_timeout_usec (conn);
5093
5094   timeout->tv_sec = tmptimeout / G_USEC_PER_SEC;
5095   timeout->tv_usec = tmptimeout % G_USEC_PER_SEC;
5096
5097   return GST_RTSP_OK;
5098 }
5099
5100
5101 /**
5102  * gst_rtsp_watch_wait_backlog:
5103  * @watch: a #GstRTSPWatch
5104  * @timeout: a GTimeVal timeout
5105  *
5106  * Wait until there is place in the backlog queue, @timeout is reached
5107  * or @watch is set to flushing.
5108  *
5109  * If @timeout is %NULL this function can block forever. If @timeout
5110  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
5111  * after the timeout expired.
5112  *
5113  * The typically use of this function is when gst_rtsp_watch_write_data
5114  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
5115  * free space in the backlog queue and try again.
5116  *
5117  * Returns: %GST_RTSP_OK when if there is room in queue.
5118  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
5119  *          %GST_RTSP_EINTR when @watch is flushing
5120  *          %GST_RTSP_EINVAL when called with invalid parameters.
5121  *
5122  * Since: 1.4
5123  * Deprecated: 1.18
5124  */
5125 GstRTSPResult
5126 gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout)
5127 {
5128   return gst_rtsp_watch_wait_backlog_usec (watch, TV_TO_USEC (timeout));
5129 }
5130
5131 G_GNUC_END_IGNORE_DEPRECATIONS
5132 #endif /* GST_DISABLE_DEPRECATED */