a96a627d911c7a50199eb5ae5c086462e055d08a
[platform/upstream/gstreamer.git] / gst / rtsp-sink / gstrtspclientsink.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim at fluendo dot com>
3  *               <2006> Lutz Mueller <lutz at topfrose dot de>
4  *               <2015> Jan Schmidt <jan at centricular dot com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /*
22  * Unless otherwise indicated, Source Code is licensed under MIT license.
23  * See further explanation attached in License Statement (distributed in the file
24  * LICENSE).
25  *
26  * Permission is hereby granted, free of charge, to any person obtaining a copy of
27  * this software and associated documentation files (the "Software"), to deal in
28  * the Software without restriction, including without limitation the rights to
29  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
30  * of the Software, and to permit persons to whom the Software is furnished to do
31  * so, subject to the following conditions:
32  *
33  * The above copyright notice and this permission notice shall be included in all
34  * copies or substantial portions of the Software.
35  *
36  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
37  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
38  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
39  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
40  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
41  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
42  * SOFTWARE.
43  */
44 /**
45  * SECTION:element-rtspclientsink
46  *
47  * Makes a connection to an RTSP server and send data via RTSP RECORD.
48  * rtspclientsink strictly follows RFC 2326
49  *
50  * RTSP supports transport over TCP or UDP in unicast or multicast mode. By
51  * default rtspclientsink will negotiate a connection in the following order:
52  * UDP unicast/UDP multicast/TCP. The order cannot be changed but the allowed
53  * protocols can be controlled with the #GstRTSPClientSink:protocols property.
54  *
55  * rtspclientsink will internally instantiate an RTP session manager element
56  * that will handle the RTCP messages to and from the server, jitter removal,
57  * and packet reordering.
58  * This feature is implemented using the gstrtpbin element.
59  *
60  * rtspclientsink accepts any stream for which there is an installed payloader,
61  * creates the payloader and manages payload-types, as well as RTX setup.
62  * The new-payloader signal is fired when a payloader is created, in case
63  * an app wants to do custom configuration (such as for MTU).
64  *
65  * <refsect2>
66  * <title>Example launch line</title>
67  * |[
68  * gst-launch-1.0 videotestsrc ! jpegenc ! rtspclientsink location=rtsp://some.server/url
69  * ]| Establish a connection to an RTSP server and send JPEG encoded video packets
70  * </refsect2>
71  */
72
73 /* FIXMEs
74  * - Handle EOS properly and shutdown. The problem with EOS is we don't know
75  *   when the server has received all data, so we don't know when to do teardown.
76  *   At the moment, we forward EOS to the app as soon as we stop sending. Is there
77  *   a way to know from the receiver that it's got all data? Some session timeout?
78  * - Implement extension support for Real / WMS if they support RECORD?
79  * - Add support for network clock synchronised streaming?
80  * - Fix crypto key nego so SAVP/SAVPF profiles work.
81  * - Test (&fix?) HTTP tunnel support
82  * - Add an address pool object for GstRTSPStreams to use for multicast
83  * - Test multicast UDP transport
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #include "config.h"
88 #endif
89
90 #ifdef HAVE_UNISTD_H
91 #include <unistd.h>
92 #endif /* HAVE_UNISTD_H */
93 #include <stdlib.h>
94 #include <string.h>
95 #include <stdio.h>
96 #include <stdarg.h>
97
98 #include <gst/net/gstnet.h>
99 #include <gst/sdp/gstsdpmessage.h>
100 #include <gst/sdp/gstmikey.h>
101 #include <gst/rtp/rtp.h>
102
103 #include "gstrtspclientsink.h"
104
105 typedef struct _GstRtspClientSinkPad GstRtspClientSinkPad;
106 typedef GstGhostPadClass GstRtspClientSinkPadClass;
107
108 struct _GstRtspClientSinkPad
109 {
110   GstGhostPad parent;
111   GstElement *custom_payloader;
112 };
113
114 enum
115 {
116   PROP_PAD_0,
117   PROP_PAD_PAYLOADER
118 };
119
120 static GType gst_rtsp_client_sink_pad_get_type (void);
121 G_DEFINE_TYPE (GstRtspClientSinkPad, gst_rtsp_client_sink_pad,
122     GST_TYPE_GHOST_PAD);
123 #define GST_TYPE_RTSP_CLIENT_SINK_PAD (gst_rtsp_client_sink_pad_get_type ())
124 #define GST_RTSP_CLIENT_SINK_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTSP_CLIENT_SINK_PAD,GstRtspClientSinkPad))
125
126 static void
127 gst_rtsp_client_sink_pad_set_property (GObject * object, guint prop_id,
128     const GValue * value, GParamSpec * pspec)
129 {
130   GstRtspClientSinkPad *pad;
131
132   pad = GST_RTSP_CLIENT_SINK_PAD (object);
133
134   switch (prop_id) {
135     case PROP_PAD_PAYLOADER:
136       GST_OBJECT_LOCK (pad);
137       if (pad->custom_payloader)
138         gst_object_unref (pad->custom_payloader);
139       pad->custom_payloader = g_value_get_object (value);
140       gst_object_ref_sink (pad->custom_payloader);
141       GST_OBJECT_UNLOCK (pad);
142       break;
143     default:
144       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
145       break;
146   }
147 }
148
149 static void
150 gst_rtsp_client_sink_pad_get_property (GObject * object, guint prop_id,
151     GValue * value, GParamSpec * pspec)
152 {
153   GstRtspClientSinkPad *pad;
154
155   pad = GST_RTSP_CLIENT_SINK_PAD (object);
156
157   switch (prop_id) {
158     case PROP_PAD_PAYLOADER:
159       GST_OBJECT_LOCK (pad);
160       g_value_set_object (value, pad->custom_payloader);
161       GST_OBJECT_UNLOCK (pad);
162       break;
163     default:
164       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
165       break;
166   }
167 }
168
169 static void
170 gst_rtsp_client_sink_pad_dispose (GObject * object)
171 {
172   GstRtspClientSinkPad *pad = GST_RTSP_CLIENT_SINK_PAD (object);
173
174   if (pad->custom_payloader)
175     gst_object_unref (pad->custom_payloader);
176
177   G_OBJECT_CLASS (gst_rtsp_client_sink_pad_parent_class)->dispose (object);
178 }
179
180 static void
181 gst_rtsp_client_sink_pad_class_init (GstRtspClientSinkPadClass * klass)
182 {
183   GObjectClass *gobject_klass;
184
185   gobject_klass = (GObjectClass *) klass;
186
187   gobject_klass->set_property = gst_rtsp_client_sink_pad_set_property;
188   gobject_klass->get_property = gst_rtsp_client_sink_pad_get_property;
189   gobject_klass->dispose = gst_rtsp_client_sink_pad_dispose;
190
191   g_object_class_install_property (gobject_klass, PROP_PAD_PAYLOADER,
192       g_param_spec_object ("payloader", "Payloader",
193           "The payloader element to use (NULL = default automatically selected)",
194           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
195 }
196
197 static void
198 gst_rtsp_client_sink_pad_init (GstRtspClientSinkPad * pad)
199 {
200 }
201
202 static GstPad *
203 gst_rtsp_client_sink_pad_new (const GstPadTemplate * pad_tmpl,
204     const gchar * name)
205 {
206   GstRtspClientSinkPad *ret;
207
208   ret =
209       g_object_new (GST_TYPE_RTSP_CLIENT_SINK_PAD, "direction", GST_PAD_SINK,
210       "template", pad_tmpl, "name", name, NULL);
211   gst_ghost_pad_construct (GST_GHOST_PAD_CAST (ret));
212
213   return GST_PAD (ret);
214 }
215
216 GST_DEBUG_CATEGORY_STATIC (rtsp_client_sink_debug);
217 #define GST_CAT_DEFAULT (rtsp_client_sink_debug)
218
219 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
220     GST_PAD_SINK,
221     GST_PAD_REQUEST,
222     GST_STATIC_CAPS_ANY);       /* Actual caps come from available set of payloaders */
223
224 enum
225 {
226   SIGNAL_HANDLE_REQUEST,
227   SIGNAL_NEW_MANAGER,
228   SIGNAL_NEW_PAYLOADER,
229   SIGNAL_REQUEST_RTCP_KEY,
230   SIGNAL_ACCEPT_CERTIFICATE,
231   LAST_SIGNAL
232 };
233
234 enum _GstRTSPClientSinkNtpTimeSource
235 {
236   NTP_TIME_SOURCE_NTP,
237   NTP_TIME_SOURCE_UNIX,
238   NTP_TIME_SOURCE_RUNNING_TIME,
239   NTP_TIME_SOURCE_CLOCK_TIME
240 };
241
242 #define GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE (gst_rtsp_client_sink_ntp_time_source_get_type())
243 static GType
244 gst_rtsp_client_sink_ntp_time_source_get_type (void)
245 {
246   static GType ntp_time_source_type = 0;
247   static const GEnumValue ntp_time_source_values[] = {
248     {NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
249     {NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
250     {NTP_TIME_SOURCE_RUNNING_TIME,
251           "Running time based on pipeline clock",
252         "running-time"},
253     {NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
254     {0, NULL, NULL},
255   };
256
257   if (!ntp_time_source_type) {
258     ntp_time_source_type =
259         g_enum_register_static ("GstRTSPClientSinkNtpTimeSource",
260         ntp_time_source_values);
261   }
262   return ntp_time_source_type;
263 }
264
265 #define DEFAULT_LOCATION         NULL
266 #define DEFAULT_PROTOCOLS        GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP
267 #define DEFAULT_DEBUG            FALSE
268 #define DEFAULT_RETRY            20
269 #define DEFAULT_TIMEOUT          5000000
270 #define DEFAULT_UDP_BUFFER_SIZE  0x80000
271 #define DEFAULT_TCP_TIMEOUT      20000000
272 #define DEFAULT_LATENCY_MS       2000
273 #define DEFAULT_DO_RTSP_KEEP_ALIVE       TRUE
274 #define DEFAULT_PROXY            NULL
275 #define DEFAULT_RTP_BLOCKSIZE    0
276 #define DEFAULT_USER_ID          NULL
277 #define DEFAULT_USER_PW          NULL
278 #define DEFAULT_PORT_RANGE       NULL
279 #define DEFAULT_UDP_RECONNECT    TRUE
280 #define DEFAULT_MULTICAST_IFACE  NULL
281 #define DEFAULT_TLS_VALIDATION_FLAGS     G_TLS_CERTIFICATE_VALIDATE_ALL
282 #define DEFAULT_TLS_DATABASE     NULL
283 #define DEFAULT_TLS_INTERACTION     NULL
284 #define DEFAULT_NTP_TIME_SOURCE  NTP_TIME_SOURCE_NTP
285 #define DEFAULT_USER_AGENT       "GStreamer/" PACKAGE_VERSION
286 #define DEFAULT_PROFILES         GST_RTSP_PROFILE_AVP
287 #define DEFAULT_RTX_TIME_MS      500
288
289 enum
290 {
291   PROP_0,
292   PROP_LOCATION,
293   PROP_PROTOCOLS,
294   PROP_DEBUG,
295   PROP_RETRY,
296   PROP_TIMEOUT,
297   PROP_TCP_TIMEOUT,
298   PROP_LATENCY,
299   PROP_RTX_TIME,
300   PROP_DO_RTSP_KEEP_ALIVE,
301   PROP_PROXY,
302   PROP_PROXY_ID,
303   PROP_PROXY_PW,
304   PROP_RTP_BLOCKSIZE,
305   PROP_USER_ID,
306   PROP_USER_PW,
307   PROP_PORT_RANGE,
308   PROP_UDP_BUFFER_SIZE,
309   PROP_UDP_RECONNECT,
310   PROP_MULTICAST_IFACE,
311   PROP_SDES,
312   PROP_TLS_VALIDATION_FLAGS,
313   PROP_TLS_DATABASE,
314   PROP_TLS_INTERACTION,
315   PROP_NTP_TIME_SOURCE,
316   PROP_USER_AGENT,
317   PROP_PROFILES
318 };
319
320 static void gst_rtsp_client_sink_finalize (GObject * object);
321
322 static void gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
323     const GValue * value, GParamSpec * pspec);
324 static void gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
325     GValue * value, GParamSpec * pspec);
326
327 static GstClock *gst_rtsp_client_sink_provide_clock (GstElement * element);
328
329 static void gst_rtsp_client_sink_uri_handler_init (gpointer g_iface,
330     gpointer iface_data);
331
332 static gboolean gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp,
333     const gchar * proxy);
334 static void gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink *
335     rtsp_client_sink, guint64 timeout);
336
337 static GstStateChangeReturn gst_rtsp_client_sink_change_state (GstElement *
338     element, GstStateChange transition);
339 static void gst_rtsp_client_sink_handle_message (GstBin * bin,
340     GstMessage * message);
341
342 static gboolean gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
343     GstRTSPMessage * response);
344
345 static gboolean gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink,
346     gint cmd, gint mask);
347
348 static GstRTSPResult gst_rtsp_client_sink_open (GstRTSPClientSink * sink,
349     gboolean async);
350 static GstRTSPResult gst_rtsp_client_sink_record (GstRTSPClientSink * sink,
351     gboolean async);
352 static GstRTSPResult gst_rtsp_client_sink_pause (GstRTSPClientSink * sink,
353     gboolean async);
354 static GstRTSPResult gst_rtsp_client_sink_close (GstRTSPClientSink * sink,
355     gboolean async, gboolean only_close);
356 static gboolean gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink);
357
358 static gboolean gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler,
359     const gchar * uri, GError ** error);
360 static gchar *gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler);
361
362 static gboolean gst_rtsp_client_sink_loop (GstRTSPClientSink * sink);
363 static void gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink,
364     gboolean flush);
365
366 static GstPad *gst_rtsp_client_sink_request_new_pad (GstElement * element,
367     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
368 static void gst_rtsp_client_sink_release_pad (GstElement * element,
369     GstPad * pad);
370
371 /* commands we send to out loop to notify it of events */
372 #define CMD_OPEN        (1 << 0)
373 #define CMD_RECORD      (1 << 1)
374 #define CMD_PAUSE       (1 << 2)
375 #define CMD_CLOSE       (1 << 3)
376 #define CMD_WAIT        (1 << 4)
377 #define CMD_RECONNECT   (1 << 5)
378 #define CMD_LOOP        (1 << 6)
379
380 /* mask for all commands */
381 #define CMD_ALL         ((CMD_LOOP << 1) - 1)
382
383 #define GST_ELEMENT_PROGRESS(el, type, code, text)      \
384 G_STMT_START {                                          \
385   gchar *__txt = _gst_element_error_printf text;        \
386   gst_element_post_message (GST_ELEMENT_CAST (el),      \
387       gst_message_new_progress (GST_OBJECT_CAST (el),   \
388           GST_PROGRESS_TYPE_ ##type, code, __txt));     \
389   g_free (__txt);                                       \
390 } G_STMT_END
391
392 static guint gst_rtsp_client_sink_signals[LAST_SIGNAL] = { 0 };
393
394 /*********************************
395  * GstChildProxy implementation  *
396  *********************************/
397 static GObject *
398 gst_rtsp_client_sink_child_proxy_get_child_by_index (GstChildProxy *
399     child_proxy, guint index)
400 {
401   GObject *obj;
402   GstRTSPClientSink *cs = GST_RTSP_CLIENT_SINK (child_proxy);
403
404   GST_OBJECT_LOCK (cs);
405   if ((obj = g_list_nth_data (GST_ELEMENT (cs)->sinkpads, index)))
406     g_object_ref (obj);
407   GST_OBJECT_UNLOCK (cs);
408
409   return obj;
410 }
411
412 static guint
413 gst_rtsp_client_sink_child_proxy_get_children_count (GstChildProxy *
414     child_proxy)
415 {
416   guint count = 0;
417
418   GST_OBJECT_LOCK (child_proxy);
419   count = GST_ELEMENT (child_proxy)->numsinkpads;
420   GST_OBJECT_UNLOCK (child_proxy);
421
422   GST_INFO_OBJECT (child_proxy, "Children Count: %d", count);
423
424   return count;
425 }
426
427 static void
428 gst_rtsp_client_sink_child_proxy_init (gpointer g_iface, gpointer iface_data)
429 {
430   GstChildProxyInterface *iface = g_iface;
431
432   GST_INFO ("intializing child proxy interface");
433   iface->get_child_by_index =
434       gst_rtsp_client_sink_child_proxy_get_child_by_index;
435   iface->get_children_count =
436       gst_rtsp_client_sink_child_proxy_get_children_count;
437 }
438
439 #define gst_rtsp_client_sink_parent_class parent_class
440 G_DEFINE_TYPE_WITH_CODE (GstRTSPClientSink, gst_rtsp_client_sink, GST_TYPE_BIN,
441     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
442         gst_rtsp_client_sink_uri_handler_init);
443     G_IMPLEMENT_INTERFACE (GST_TYPE_CHILD_PROXY,
444         gst_rtsp_client_sink_child_proxy_init);
445     );
446
447 #ifndef GST_DISABLE_GST_DEBUG
448 static inline const gchar *
449 cmd_to_string (guint cmd)
450 {
451   switch (cmd) {
452     case CMD_OPEN:
453       return "OPEN";
454     case CMD_RECORD:
455       return "RECORD";
456     case CMD_PAUSE:
457       return "PAUSE";
458     case CMD_CLOSE:
459       return "CLOSE";
460     case CMD_WAIT:
461       return "WAIT";
462     case CMD_RECONNECT:
463       return "RECONNECT";
464     case CMD_LOOP:
465       return "LOOP";
466   }
467
468   return "unknown";
469 }
470 #endif
471
472 static void
473 gst_rtsp_client_sink_class_init (GstRTSPClientSinkClass * klass)
474 {
475   GObjectClass *gobject_class;
476   GstElementClass *gstelement_class;
477   GstBinClass *gstbin_class;
478
479   gobject_class = (GObjectClass *) klass;
480   gstelement_class = (GstElementClass *) klass;
481   gstbin_class = (GstBinClass *) klass;
482
483   GST_DEBUG_CATEGORY_INIT (rtsp_client_sink_debug, "rtspclientsink", 0,
484       "RTSP sink element");
485
486   gobject_class->set_property = gst_rtsp_client_sink_set_property;
487   gobject_class->get_property = gst_rtsp_client_sink_get_property;
488
489   gobject_class->finalize = gst_rtsp_client_sink_finalize;
490
491   g_object_class_install_property (gobject_class, PROP_LOCATION,
492       g_param_spec_string ("location", "RTSP Location",
493           "Location of the RTSP url to read",
494           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
495
496   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
497       g_param_spec_flags ("protocols", "Protocols",
498           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
499           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
500
501   g_object_class_install_property (gobject_class, PROP_PROFILES,
502       g_param_spec_flags ("profiles", "Profiles",
503           "Allowed RTSP profiles", GST_TYPE_RTSP_PROFILE,
504           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505
506   g_object_class_install_property (gobject_class, PROP_DEBUG,
507       g_param_spec_boolean ("debug", "Debug",
508           "Dump request and response messages to stdout",
509           DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
510
511   g_object_class_install_property (gobject_class, PROP_RETRY,
512       g_param_spec_uint ("retry", "Retry",
513           "Max number of retries when allocating RTP ports.",
514           0, G_MAXUINT16, DEFAULT_RETRY,
515           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
516
517   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
518       g_param_spec_uint64 ("timeout", "Timeout",
519           "Retry TCP transport after UDP timeout microseconds (0 = disabled)",
520           0, G_MAXUINT64, DEFAULT_TIMEOUT,
521           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
522
523   g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT,
524       g_param_spec_uint64 ("tcp-timeout", "TCP Timeout",
525           "Fail after timeout microseconds on TCP connections (0 = disabled)",
526           0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT,
527           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
528
529   g_object_class_install_property (gobject_class, PROP_LATENCY,
530       g_param_spec_uint ("latency", "Buffer latency in ms",
531           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
532           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
533
534   g_object_class_install_property (gobject_class, PROP_RTX_TIME,
535       g_param_spec_uint ("rtx-time", "Retransmission buffer in ms",
536           "Amount of ms to buffer for retransmission. 0 disables retransmission",
537           0, G_MAXUINT, DEFAULT_RTX_TIME_MS,
538           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
539
540   /**
541    * GstRTSPClientSink:do-rtsp-keep-alive:
542    *
543    * Enable RTSP keep alive support. Some old server don't like RTSP
544    * keep alive and then this property needs to be set to FALSE.
545    */
546   g_object_class_install_property (gobject_class, PROP_DO_RTSP_KEEP_ALIVE,
547       g_param_spec_boolean ("do-rtsp-keep-alive", "Do RTSP Keep Alive",
548           "Send RTSP keep alive packets, disable for old incompatible server.",
549           DEFAULT_DO_RTSP_KEEP_ALIVE,
550           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551
552   /**
553    * GstRTSPClientSink:proxy:
554    *
555    * Set the proxy parameters. This has to be a string of the format
556    * [http://][user:passwd@]host[:port].
557    */
558   g_object_class_install_property (gobject_class, PROP_PROXY,
559       g_param_spec_string ("proxy", "Proxy",
560           "Proxy settings for HTTP tunneling. Format: [http://][user:passwd@]host[:port]",
561           DEFAULT_PROXY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
562   /**
563    * GstRTSPClientSink:proxy-id:
564    *
565    * Sets the proxy URI user id for authentication. If the URI set via the
566    * "proxy" property contains a user-id already, that will take precedence.
567    *
568    */
569   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
570       g_param_spec_string ("proxy-id", "proxy-id",
571           "HTTP proxy URI user id for authentication", "",
572           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
573   /**
574    * GstRTSPClientSink:proxy-pw:
575    *
576    * Sets the proxy URI password for authentication. If the URI set via the
577    * "proxy" property contains a password already, that will take precedence.
578    *
579    */
580   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
581       g_param_spec_string ("proxy-pw", "proxy-pw",
582           "HTTP proxy URI user password for authentication", "",
583           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
584
585   /**
586    * GstRTSPClientSink:rtp-blocksize:
587    *
588    * RTP package size to suggest to server.
589    */
590   g_object_class_install_property (gobject_class, PROP_RTP_BLOCKSIZE,
591       g_param_spec_uint ("rtp-blocksize", "RTP Blocksize",
592           "RTP package size to suggest to server (0 = disabled)",
593           0, 65536, DEFAULT_RTP_BLOCKSIZE,
594           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
595
596   g_object_class_install_property (gobject_class,
597       PROP_USER_ID,
598       g_param_spec_string ("user-id", "user-id",
599           "RTSP location URI user id for authentication", DEFAULT_USER_ID,
600           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601   g_object_class_install_property (gobject_class, PROP_USER_PW,
602       g_param_spec_string ("user-pw", "user-pw",
603           "RTSP location URI user password for authentication", DEFAULT_USER_PW,
604           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
605
606   /**
607    * GstRTSPClientSink:port-range:
608    *
609    * Configure the client port numbers that can be used to receive
610    * RTCP.
611    */
612   g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
613       g_param_spec_string ("port-range", "Port range",
614           "Client port range that can be used to receive RTCP data, "
615           "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE,
616           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
617
618   /**
619    * GstRTSPClientSink:udp-buffer-size:
620    *
621    * Size of the kernel UDP receive buffer in bytes.
622    */
623   g_object_class_install_property (gobject_class, PROP_UDP_BUFFER_SIZE,
624       g_param_spec_int ("udp-buffer-size", "UDP Buffer Size",
625           "Size of the kernel UDP receive buffer in bytes, 0=default",
626           0, G_MAXINT, DEFAULT_UDP_BUFFER_SIZE,
627           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
628
629   g_object_class_install_property (gobject_class, PROP_UDP_RECONNECT,
630       g_param_spec_boolean ("udp-reconnect", "Reconnect to the server",
631           "Reconnect to the server if RTSP connection is closed when doing UDP",
632           DEFAULT_UDP_RECONNECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
633
634   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
635       g_param_spec_string ("multicast-iface", "Multicast Interface",
636           "The network interface on which to join the multicast group",
637           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
638
639   g_object_class_install_property (gobject_class, PROP_SDES,
640       g_param_spec_boxed ("sdes", "SDES",
641           "The SDES items of this session",
642           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
643
644   /**
645    * GstRTSPClientSink::tls-validation-flags:
646    *
647    * TLS certificate validation flags used to validate server
648    * certificate.
649    *
650    */
651   g_object_class_install_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
652       g_param_spec_flags ("tls-validation-flags", "TLS validation flags",
653           "TLS certificate validation flags used to validate the server certificate",
654           G_TYPE_TLS_CERTIFICATE_FLAGS, DEFAULT_TLS_VALIDATION_FLAGS,
655           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
656
657   /**
658    * GstRTSPClientSink::tls-database:
659    *
660    * TLS database with anchor certificate authorities used to validate
661    * the server certificate.
662    *
663    */
664   g_object_class_install_property (gobject_class, PROP_TLS_DATABASE,
665       g_param_spec_object ("tls-database", "TLS database",
666           "TLS database with anchor certificate authorities used to validate the server certificate",
667           G_TYPE_TLS_DATABASE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
668
669   /**
670    * GstRTSPClientSink::tls-interaction:
671    *
672    * A #GTlsInteraction object to be used when the connection or certificate
673    * database need to interact with the user. This will be used to prompt the
674    * user for passwords where necessary.
675    *
676    */
677   g_object_class_install_property (gobject_class, PROP_TLS_INTERACTION,
678       g_param_spec_object ("tls-interaction", "TLS interaction",
679           "A GTlsInteraction object to prompt the user for password or certificate",
680           G_TYPE_TLS_INTERACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
681
682   /**
683    * GstRTSPClientSink::ntp-time-source:
684    *
685    * allows to select the time source that should be used
686    * for the NTP time in outgoing packets
687    *
688    */
689   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
690       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
691           "NTP time source for RTCP packets",
692           GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE, DEFAULT_NTP_TIME_SOURCE,
693           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
694
695   /**
696    * GstRTSPClientSink::user-agent:
697    *
698    * The string to set in the User-Agent header.
699    *
700    */
701   g_object_class_install_property (gobject_class, PROP_USER_AGENT,
702       g_param_spec_string ("user-agent", "User Agent",
703           "The User-Agent string to send to the server",
704           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
705
706   /**
707    * GstRTSPClientSink::handle-request:
708    * @rtsp_client_sink: a #GstRTSPClientSink
709    * @request: a #GstRTSPMessage
710    * @response: a #GstRTSPMessage
711    *
712    * Handle a server request in @request and prepare @response.
713    *
714    * This signal is called from the streaming thread, you should therefore not
715    * do any state changes on @rtsp_client_sink because this might deadlock. If you want
716    * to modify the state as a result of this signal, post a
717    * #GST_MESSAGE_REQUEST_STATE message on the bus or signal the main thread
718    * in some other way.
719    *
720    */
721   gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST] =
722       g_signal_new ("handle-request", G_TYPE_FROM_CLASS (klass), 0,
723       0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
724       G_TYPE_POINTER, G_TYPE_POINTER);
725
726   /**
727    * GstRTSPClientSink::new-manager:
728    * @rtsp_client_sink: a #GstRTSPClientSink
729    * @manager: a #GstElement
730    *
731    * Emitted after a new manager (like rtpbin) was created and the default
732    * properties were configured.
733    *
734    */
735   gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER] =
736       g_signal_new_class_handler ("new-manager", G_TYPE_FROM_CLASS (klass),
737       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL,
738       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
739
740   /**
741    * GstRTSPClientSink::new-payloader:
742    * @rtsp_client_sink: a #GstRTSPClientSink
743    * @payloader: a #GstElement
744    *
745    * Emitted after a new RTP payloader was created and the default
746    * properties were configured.
747    *
748    */
749   gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER] =
750       g_signal_new_class_handler ("new-payloader", G_TYPE_FROM_CLASS (klass),
751       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL,
752       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
753
754   /**
755    * GstRTSPClientSink::request-rtcp-key:
756    * @rtsp_client_sink: a #GstRTSPClientSink
757    * @num: the stream number
758    *
759    * Signal emitted to get the crypto parameters relevant to the RTCP
760    * stream. User should provide the key and the RTCP encryption ciphers
761    * and authentication, and return them wrapped in a GstCaps.
762    *
763    */
764   gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY] =
765       g_signal_new ("request-rtcp-key", G_TYPE_FROM_CLASS (klass),
766       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
767
768   /**
769    * GstRTSPClientSink::accept-certificate:
770    * @rtsp_client_sink: a #GstRTSPClientSink
771    * @peer_cert: the peer's #GTlsCertificate
772    * @errors: the problems with @peer_cert
773    * @user_data: user data set when the signal handler was connected.
774    *
775    * This will directly map to #GTlsConnection 's "accept-certificate"
776    * signal and be performed after the default checks of #GstRTSPConnection
777    * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
778    * have failed. If no #GTlsDatabase is set on this connection, only this
779    * signal will be emitted.
780    *
781    * Since: 1.14
782    */
783   gst_rtsp_client_sink_signals[SIGNAL_ACCEPT_CERTIFICATE] =
784       g_signal_new ("accept-certificate", G_TYPE_FROM_CLASS (klass),
785       G_SIGNAL_RUN_LAST, 0, g_signal_accumulator_true_handled, NULL, NULL,
786       G_TYPE_BOOLEAN, 3, G_TYPE_TLS_CONNECTION, G_TYPE_TLS_CERTIFICATE,
787       G_TYPE_TLS_CERTIFICATE_FLAGS);
788
789   gstelement_class->provide_clock = gst_rtsp_client_sink_provide_clock;
790   gstelement_class->change_state = gst_rtsp_client_sink_change_state;
791   gstelement_class->request_new_pad =
792       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_request_new_pad);
793   gstelement_class->release_pad =
794       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_release_pad);
795
796   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
797       &rtptemplate, GST_TYPE_RTSP_CLIENT_SINK_PAD);
798
799   gst_element_class_set_static_metadata (gstelement_class,
800       "RTSP RECORD client", "Sink/Network",
801       "Send data over the network via RTSP RECORD(RFC 2326)",
802       "Jan Schmidt <jan@centricular.com>");
803
804   gstbin_class->handle_message = gst_rtsp_client_sink_handle_message;
805 }
806
807 static void
808 gst_rtsp_client_sink_init (GstRTSPClientSink * sink)
809 {
810   sink->conninfo.location = g_strdup (DEFAULT_LOCATION);
811   sink->protocols = DEFAULT_PROTOCOLS;
812   sink->debug = DEFAULT_DEBUG;
813   sink->retry = DEFAULT_RETRY;
814   sink->udp_timeout = DEFAULT_TIMEOUT;
815   gst_rtsp_client_sink_set_tcp_timeout (sink, DEFAULT_TCP_TIMEOUT);
816   sink->latency = DEFAULT_LATENCY_MS;
817   sink->rtx_time = DEFAULT_RTX_TIME_MS;
818   sink->do_rtsp_keep_alive = DEFAULT_DO_RTSP_KEEP_ALIVE;
819   gst_rtsp_client_sink_set_proxy (sink, DEFAULT_PROXY);
820   sink->rtp_blocksize = DEFAULT_RTP_BLOCKSIZE;
821   sink->user_id = g_strdup (DEFAULT_USER_ID);
822   sink->user_pw = g_strdup (DEFAULT_USER_PW);
823   sink->client_port_range.min = 0;
824   sink->client_port_range.max = 0;
825   sink->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
826   sink->udp_reconnect = DEFAULT_UDP_RECONNECT;
827   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
828   sink->sdes = NULL;
829   sink->tls_validation_flags = DEFAULT_TLS_VALIDATION_FLAGS;
830   sink->tls_database = DEFAULT_TLS_DATABASE;
831   sink->tls_interaction = DEFAULT_TLS_INTERACTION;
832   sink->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
833   sink->user_agent = g_strdup (DEFAULT_USER_AGENT);
834
835   sink->profiles = DEFAULT_PROFILES;
836
837   /* protects the streaming thread in interleaved mode or the polling
838    * thread in UDP mode. */
839   g_rec_mutex_init (&sink->stream_rec_lock);
840
841   /* protects our state changes from multiple invocations */
842   g_rec_mutex_init (&sink->state_rec_lock);
843
844   g_mutex_init (&sink->send_lock);
845
846   g_mutex_init (&sink->preroll_lock);
847   g_cond_init (&sink->preroll_cond);
848
849   sink->state = GST_RTSP_STATE_INVALID;
850
851   g_mutex_init (&sink->conninfo.send_lock);
852   g_mutex_init (&sink->conninfo.recv_lock);
853
854   g_mutex_init (&sink->block_streams_lock);
855   g_cond_init (&sink->block_streams_cond);
856
857   g_mutex_init (&sink->open_conn_lock);
858   g_cond_init (&sink->open_conn_cond);
859
860   sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
861   gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
862   gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
863
864   sink->next_dyn_pt = 96;
865
866   gst_sdp_message_init (&sink->cursdp);
867
868   GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
869 }
870
871 static void
872 gst_rtsp_client_sink_finalize (GObject * object)
873 {
874   GstRTSPClientSink *rtsp_client_sink;
875
876   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
877
878   gst_sdp_message_uninit (&rtsp_client_sink->cursdp);
879
880   g_free (rtsp_client_sink->conninfo.location);
881   gst_rtsp_url_free (rtsp_client_sink->conninfo.url);
882   g_free (rtsp_client_sink->conninfo.url_str);
883   g_free (rtsp_client_sink->user_id);
884   g_free (rtsp_client_sink->user_pw);
885   g_free (rtsp_client_sink->multi_iface);
886   g_free (rtsp_client_sink->user_agent);
887
888   if (rtsp_client_sink->uri_sdp) {
889     gst_sdp_message_free (rtsp_client_sink->uri_sdp);
890     rtsp_client_sink->uri_sdp = NULL;
891   }
892   if (rtsp_client_sink->provided_clock)
893     gst_object_unref (rtsp_client_sink->provided_clock);
894
895   if (rtsp_client_sink->sdes)
896     gst_structure_free (rtsp_client_sink->sdes);
897
898   if (rtsp_client_sink->tls_database)
899     g_object_unref (rtsp_client_sink->tls_database);
900
901   if (rtsp_client_sink->tls_interaction)
902     g_object_unref (rtsp_client_sink->tls_interaction);
903
904   /* free locks */
905   g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
906   g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
907
908   g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
909   g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
910
911   g_mutex_clear (&rtsp_client_sink->send_lock);
912
913   g_mutex_clear (&rtsp_client_sink->preroll_lock);
914   g_cond_clear (&rtsp_client_sink->preroll_cond);
915
916   g_mutex_clear (&rtsp_client_sink->block_streams_lock);
917   g_cond_clear (&rtsp_client_sink->block_streams_cond);
918
919   g_mutex_clear (&rtsp_client_sink->open_conn_lock);
920   g_cond_clear (&rtsp_client_sink->open_conn_cond);
921
922   G_OBJECT_CLASS (parent_class)->finalize (object);
923 }
924
925 static gboolean
926 gst_rtp_payloader_filter_func (GstPluginFeature * feature, gpointer user_data)
927 {
928   GstElementFactory *factory = NULL;
929   const gchar *klass;
930
931   if (!GST_IS_ELEMENT_FACTORY (feature))
932     return FALSE;
933
934   factory = GST_ELEMENT_FACTORY (feature);
935
936   if (gst_plugin_feature_get_rank (feature) == GST_RANK_NONE)
937     return FALSE;
938
939   if (!gst_element_factory_list_is_type (factory,
940           GST_ELEMENT_FACTORY_TYPE_PAYLOADER))
941     return FALSE;
942
943   klass =
944       gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
945   if (strstr (klass, "Codec") == NULL)
946     return FALSE;
947   if (strstr (klass, "RTP") == NULL)
948     return FALSE;
949
950   return TRUE;
951 }
952
953 static gint
954 compare_ranks (GstPluginFeature * f1, GstPluginFeature * f2)
955 {
956   gint diff;
957   const gchar *rname1, *rname2;
958   GstRank rank1, rank2;
959
960   rname1 = gst_plugin_feature_get_name (f1);
961   rname2 = gst_plugin_feature_get_name (f2);
962
963   rank1 = gst_plugin_feature_get_rank (f1);
964   rank2 = gst_plugin_feature_get_rank (f2);
965
966   /* HACK: Prefer rtpmp4apay over rtpmp4gpay */
967   if (g_str_equal (rname1, "rtpmp4apay"))
968     rank1 = GST_RANK_SECONDARY + 1;
969   if (g_str_equal (rname2, "rtpmp4apay"))
970     rank2 = GST_RANK_SECONDARY + 1;
971
972   diff = rank2 - rank1;
973   if (diff != 0)
974     return diff;
975
976   diff = strcmp (rname2, rname1);
977
978   return diff;
979 }
980
981 static GList *
982 gst_rtsp_client_sink_get_factories (void)
983 {
984   static GList *payloader_factories = NULL;
985
986   if (g_once_init_enter (&payloader_factories)) {
987     GList *all_factories;
988
989     all_factories =
990         gst_registry_feature_filter (gst_registry_get (),
991         gst_rtp_payloader_filter_func, FALSE, NULL);
992
993     all_factories = g_list_sort (all_factories, (GCompareFunc) compare_ranks);
994
995     g_once_init_leave (&payloader_factories, all_factories);
996   }
997
998   return payloader_factories;
999 }
1000
1001 static GstCaps *
1002 gst_rtsp_client_sink_get_payloader_caps (GstElementFactory * factory)
1003 {
1004   const GList *tmp;
1005   GstCaps *caps = gst_caps_new_empty ();
1006
1007   for (tmp = gst_element_factory_get_static_pad_templates (factory);
1008       tmp; tmp = g_list_next (tmp)) {
1009     GstStaticPadTemplate *template = tmp->data;
1010
1011     if (template->direction == GST_PAD_SINK) {
1012       GstCaps *static_caps = gst_static_pad_template_get_caps (template);
1013
1014       GST_LOG ("Found pad template %s on factory %s",
1015           template->name_template, gst_plugin_feature_get_name (factory));
1016
1017       if (static_caps)
1018         caps = gst_caps_merge (caps, static_caps);
1019
1020       /* Early out, any is absorbing */
1021       if (gst_caps_is_any (caps))
1022         goto out;
1023     }
1024   }
1025
1026 out:
1027   return caps;
1028 }
1029
1030 static GstCaps *
1031 gst_rtsp_client_sink_get_all_payloaders_caps (void)
1032 {
1033   /* Cached caps result */
1034   static GstCaps *ret;
1035
1036   if (g_once_init_enter (&ret)) {
1037     GList *factories, *cur;
1038     GstCaps *caps = gst_caps_new_empty ();
1039
1040     factories = gst_rtsp_client_sink_get_factories ();
1041     for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
1042       GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
1043       GstCaps *payloader_caps =
1044           gst_rtsp_client_sink_get_payloader_caps (factory);
1045
1046       caps = gst_caps_merge (caps, payloader_caps);
1047
1048       /* Early out, any is absorbing */
1049       if (gst_caps_is_any (caps))
1050         goto out;
1051     }
1052
1053   out:
1054     g_once_init_leave (&ret, caps);
1055   }
1056
1057   /* Return cached result */
1058   return gst_caps_ref (ret);
1059 }
1060
1061 static GstElement *
1062 gst_rtsp_client_sink_make_payloader (GstCaps * caps)
1063 {
1064   GList *factories, *cur;
1065
1066   factories = gst_rtsp_client_sink_get_factories ();
1067   for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
1068     GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
1069     const GList *tmp;
1070
1071     for (tmp = gst_element_factory_get_static_pad_templates (factory);
1072         tmp; tmp = g_list_next (tmp)) {
1073       GstStaticPadTemplate *template = tmp->data;
1074
1075       if (template->direction == GST_PAD_SINK) {
1076         GstCaps *static_caps = gst_static_pad_template_get_caps (template);
1077         GstElement *payloader = NULL;
1078
1079         if (gst_caps_can_intersect (static_caps, caps)) {
1080           GST_DEBUG ("caps %" GST_PTR_FORMAT " intersects with template %"
1081               GST_PTR_FORMAT " for payloader %s", caps, static_caps,
1082               gst_plugin_feature_get_name (factory));
1083           payloader = gst_element_factory_create (factory, NULL);
1084         }
1085
1086         gst_caps_unref (static_caps);
1087
1088         if (payloader)
1089           return payloader;
1090       }
1091     }
1092   }
1093
1094   return NULL;
1095 }
1096
1097 static GstRTSPStream *
1098 gst_rtsp_client_sink_create_stream (GstRTSPClientSink * sink,
1099     GstRTSPStreamContext * context, GstElement * payloader, GstPad * pad)
1100 {
1101   GstRTSPStream *stream = NULL;
1102   guint pt, aux_pt;
1103
1104   GST_OBJECT_LOCK (sink);
1105
1106   g_object_get (G_OBJECT (payloader), "pt", &pt, NULL);
1107   if (pt >= 96 && pt <= sink->next_dyn_pt) {
1108     /* Payloader has a dynamic PT, but one that's already used */
1109     /* FIXME: Create a caps->ptmap instead? */
1110     pt = sink->next_dyn_pt;
1111
1112     if (pt > 127)
1113       goto no_free_pt;
1114
1115     GST_DEBUG_OBJECT (sink, "Assigning pt %u to stream %d", pt, context->index);
1116
1117     sink->next_dyn_pt++;
1118   } else {
1119     GST_DEBUG_OBJECT (sink, "Keeping existing pt %u for stream %d",
1120         pt, context->index);
1121   }
1122
1123   aux_pt = sink->next_dyn_pt;
1124   if (aux_pt > 127)
1125     goto no_free_pt;
1126   sink->next_dyn_pt++;
1127
1128   GST_OBJECT_UNLOCK (sink);
1129
1130
1131   g_object_set (G_OBJECT (payloader), "pt", pt, NULL);
1132
1133   stream = gst_rtsp_stream_new (context->index, payloader, pad);
1134
1135   gst_rtsp_stream_set_client_side (stream, TRUE);
1136   gst_rtsp_stream_set_retransmission_time (stream,
1137       (GstClockTime) (sink->rtx_time) * GST_MSECOND);
1138   gst_rtsp_stream_set_protocols (stream, sink->protocols);
1139   gst_rtsp_stream_set_profiles (stream, sink->profiles);
1140   gst_rtsp_stream_set_retransmission_pt (stream, aux_pt);
1141   gst_rtsp_stream_set_buffer_size (stream, sink->udp_buffer_size);
1142   if (sink->rtp_blocksize > 0)
1143     gst_rtsp_stream_set_mtu (stream, sink->rtp_blocksize);
1144   gst_rtsp_stream_set_multicast_iface (stream, sink->multi_iface);
1145
1146 #if 0
1147   if (priv->pool)
1148     gst_rtsp_stream_set_address_pool (stream, priv->pool);
1149 #endif
1150
1151   return stream;
1152 no_free_pt:
1153   GST_OBJECT_UNLOCK (sink);
1154
1155   GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL),
1156       ("Ran out of dynamic payload types."));
1157
1158   return NULL;
1159 }
1160
1161 static GstPadProbeReturn
1162 handle_payloader_block (GstPad * pad, GstPadProbeInfo * info,
1163     GstRTSPStreamContext * context)
1164 {
1165   GstRTSPClientSink *sink = context->parent;
1166
1167   GST_INFO_OBJECT (sink, "Block on pad %" GST_PTR_FORMAT, pad);
1168
1169   g_mutex_lock (&sink->preroll_lock);
1170   context->prerolled = TRUE;
1171   g_cond_broadcast (&sink->preroll_cond);
1172   g_mutex_unlock (&sink->preroll_lock);
1173
1174   GST_INFO_OBJECT (sink, "Announced preroll on pad %" GST_PTR_FORMAT, pad);
1175
1176   return GST_PAD_PROBE_OK;
1177 }
1178
1179 static gboolean
1180 gst_rtsp_client_sink_setup_payloader (GstRTSPClientSink * sink, GstPad * pad,
1181     GstCaps * caps)
1182 {
1183   GstRTSPStreamContext *context;
1184   GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1185
1186   GstElement *payloader;
1187   GstPad *sinkpad, *srcpad, *ghostsink;
1188
1189   context = gst_pad_get_element_private (pad);
1190
1191   if (cspad->custom_payloader) {
1192     payloader = cspad->custom_payloader;
1193   } else {
1194     /* Find the payloader. */
1195     payloader = gst_rtsp_client_sink_make_payloader (caps);
1196   }
1197
1198   if (payloader == NULL)
1199     return FALSE;
1200
1201   GST_DEBUG_OBJECT (sink, "Configuring payloader %" GST_PTR_FORMAT
1202       " for pad %" GST_PTR_FORMAT, payloader, pad);
1203
1204   sinkpad = gst_element_get_static_pad (payloader, "sink");
1205   if (sinkpad == NULL)
1206     goto no_sinkpad;
1207
1208   srcpad = gst_element_get_static_pad (payloader, "src");
1209   if (srcpad == NULL)
1210     goto no_srcpad;
1211
1212   gst_bin_add (GST_BIN (sink->internal_bin), payloader);
1213   ghostsink = gst_ghost_pad_new (NULL, sinkpad);
1214   gst_pad_set_active (ghostsink, TRUE);
1215   gst_element_add_pad (GST_ELEMENT (sink->internal_bin), ghostsink);
1216
1217   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER], 0,
1218       payloader);
1219
1220   GST_RTSP_STATE_LOCK (sink);
1221   context->payloader_block_id =
1222       gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1223       (GstPadProbeCallback) handle_payloader_block, context, NULL);
1224   context->payloader = payloader;
1225
1226   payloader = gst_object_ref (payloader);
1227
1228   gst_ghost_pad_set_target (GST_GHOST_PAD (pad), ghostsink);
1229   gst_object_unref (GST_OBJECT (sinkpad));
1230   GST_RTSP_STATE_UNLOCK (sink);
1231
1232   gst_element_sync_state_with_parent (payloader);
1233
1234   gst_object_unref (payloader);
1235   gst_object_unref (GST_OBJECT (srcpad));
1236
1237   return TRUE;
1238
1239 no_sinkpad:
1240   GST_ERROR_OBJECT (sink,
1241       "Could not find sink pad on payloader %" GST_PTR_FORMAT, payloader);
1242   if (!cspad->custom_payloader)
1243     gst_object_unref (payloader);
1244   return FALSE;
1245
1246 no_srcpad:
1247   GST_ERROR_OBJECT (sink,
1248       "Could not find src pad on payloader %" GST_PTR_FORMAT, payloader);
1249   gst_object_unref (GST_OBJECT (sinkpad));
1250   gst_object_unref (payloader);
1251   return TRUE;
1252 }
1253
1254 static gboolean
1255 gst_rtsp_client_sink_sinkpad_event (GstPad * pad, GstObject * parent,
1256     GstEvent * event)
1257 {
1258   if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
1259     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1260     if (target == NULL) {
1261       GstCaps *caps;
1262
1263       /* No target yet - choose a payloader and configure it */
1264       gst_event_parse_caps (event, &caps);
1265
1266       GST_DEBUG_OBJECT (parent,
1267           "Have set caps event on pad %" GST_PTR_FORMAT
1268           " caps %" GST_PTR_FORMAT, pad, caps);
1269
1270       if (!gst_rtsp_client_sink_setup_payloader (GST_RTSP_CLIENT_SINK (parent),
1271               pad, caps)) {
1272         GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1273         GST_ELEMENT_ERROR (parent, CORE, NEGOTIATION,
1274             ("Could not create payloader"),
1275             ("Custom payloader: %p, caps: %" GST_PTR_FORMAT,
1276                 cspad->custom_payloader, caps));
1277         gst_event_unref (event);
1278         return FALSE;
1279       }
1280     } else {
1281       gst_object_unref (target);
1282     }
1283   }
1284
1285   return gst_pad_event_default (pad, parent, event);
1286 }
1287
1288 static gboolean
1289 gst_rtsp_client_sink_sinkpad_query (GstPad * pad, GstObject * parent,
1290     GstQuery * query)
1291 {
1292   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1293     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1294     if (target == NULL) {
1295       GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1296       GstCaps *caps;
1297
1298       if (cspad->custom_payloader) {
1299         GstPad *sinkpad =
1300             gst_element_get_static_pad (cspad->custom_payloader, "sink");
1301
1302         if (sinkpad) {
1303           caps = gst_pad_query_caps (sinkpad, NULL);
1304           gst_object_unref (sinkpad);
1305         } else {
1306           GST_ELEMENT_ERROR (parent, CORE, NEGOTIATION, (NULL),
1307               ("Custom payloaders are expected to expose a sink pad named 'sink'"));
1308           return FALSE;
1309         }
1310       } else {
1311         /* No target yet - return the union of all payloader caps */
1312         caps = gst_rtsp_client_sink_get_all_payloaders_caps ();
1313       }
1314
1315       GST_TRACE_OBJECT (parent, "Returning payloader caps %" GST_PTR_FORMAT,
1316           caps);
1317
1318       gst_query_set_caps_result (query, caps);
1319       gst_caps_unref (caps);
1320
1321       return TRUE;
1322     }
1323     gst_object_unref (target);
1324   }
1325
1326   return gst_pad_query_default (pad, parent, query);
1327 }
1328
1329 static GstPad *
1330 gst_rtsp_client_sink_request_new_pad (GstElement * element,
1331     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1332 {
1333   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1334   GstPad *pad;
1335   GstRTSPStreamContext *context;
1336   guint idx = (guint) - 1;
1337   gchar *tmpname;
1338
1339   g_mutex_lock (&sink->preroll_lock);
1340   if (sink->streams_collected) {
1341     GST_WARNING_OBJECT (element, "Can't add streams to a running session");
1342     g_mutex_unlock (&sink->preroll_lock);
1343     return NULL;
1344   }
1345   g_mutex_unlock (&sink->preroll_lock);
1346
1347   GST_OBJECT_LOCK (sink);
1348   if (name) {
1349     if (!sscanf (name, "sink_%u", &idx)) {
1350       GST_OBJECT_UNLOCK (sink);
1351       GST_ERROR_OBJECT (element, "Invalid sink pad name %s", name);
1352       return NULL;
1353     }
1354
1355     if (idx >= sink->next_pad_id)
1356       sink->next_pad_id = idx + 1;
1357   }
1358   if (idx == (guint) - 1) {
1359     idx = sink->next_pad_id;
1360     sink->next_pad_id++;
1361   }
1362   GST_OBJECT_UNLOCK (sink);
1363
1364   tmpname = g_strdup_printf ("sink_%u", idx);
1365   pad = gst_rtsp_client_sink_pad_new (templ, tmpname);
1366   g_free (tmpname);
1367
1368   GST_DEBUG_OBJECT (element, "Creating request pad %" GST_PTR_FORMAT, pad);
1369
1370   gst_pad_set_event_function (pad,
1371       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_event));
1372   gst_pad_set_query_function (pad,
1373       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_query));
1374
1375   context = g_new0 (GstRTSPStreamContext, 1);
1376   context->parent = sink;
1377   context->index = idx;
1378
1379   gst_pad_set_element_private (pad, context);
1380
1381   /* The rest of the context is configured on a caps set */
1382   gst_pad_set_active (pad, TRUE);
1383   gst_element_add_pad (element, pad);
1384   gst_child_proxy_child_added (GST_CHILD_PROXY (element), G_OBJECT (pad),
1385       GST_PAD_NAME (pad));
1386
1387   (void) gst_rtsp_client_sink_get_factories ();
1388
1389   g_mutex_init (&context->conninfo.send_lock);
1390   g_mutex_init (&context->conninfo.recv_lock);
1391
1392   GST_RTSP_STATE_LOCK (sink);
1393   sink->contexts = g_list_prepend (sink->contexts, context);
1394   GST_RTSP_STATE_UNLOCK (sink);
1395
1396   return pad;
1397 }
1398
1399 static void
1400 gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad)
1401 {
1402   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1403   GstRTSPStreamContext *context;
1404
1405   context = gst_pad_get_element_private (pad);
1406
1407   GST_RTSP_STATE_LOCK (sink);
1408   sink->contexts = g_list_remove (sink->contexts, context);
1409   GST_RTSP_STATE_UNLOCK (sink);
1410
1411   /* FIXME: Shut down and clean up streaming on this pad,
1412    * do teardown if needed */
1413   GST_LOG_OBJECT (sink,
1414       "Cleaning up payloader and stream for released pad %" GST_PTR_FORMAT,
1415       pad);
1416
1417   if (context->stream_transport) {
1418     gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1419     gst_object_unref (context->stream_transport);
1420     context->stream_transport = NULL;
1421   }
1422   if (context->stream) {
1423     if (context->joined) {
1424       gst_rtsp_stream_leave_bin (context->stream,
1425           GST_BIN (sink->internal_bin), sink->rtpbin);
1426       context->joined = FALSE;
1427     }
1428     gst_object_unref (context->stream);
1429     context->stream = NULL;
1430   }
1431   if (context->srtcpparams)
1432     gst_caps_unref (context->srtcpparams);
1433
1434   g_free (context->conninfo.location);
1435   context->conninfo.location = NULL;
1436
1437   g_mutex_clear (&context->conninfo.send_lock);
1438   g_mutex_clear (&context->conninfo.recv_lock);
1439
1440   g_free (context);
1441
1442   gst_element_remove_pad (element, pad);
1443 }
1444
1445 static GstClock *
1446 gst_rtsp_client_sink_provide_clock (GstElement * element)
1447 {
1448   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1449   GstClock *clock;
1450
1451   if ((clock = sink->provided_clock) != NULL)
1452     gst_object_ref (clock);
1453
1454   return clock;
1455 }
1456
1457 /* a proxy string of the format [user:passwd@]host[:port] */
1458 static gboolean
1459 gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp, const gchar * proxy)
1460 {
1461   gchar *p, *at, *col;
1462
1463   g_free (rtsp->proxy_user);
1464   rtsp->proxy_user = NULL;
1465   g_free (rtsp->proxy_passwd);
1466   rtsp->proxy_passwd = NULL;
1467   g_free (rtsp->proxy_host);
1468   rtsp->proxy_host = NULL;
1469   rtsp->proxy_port = 0;
1470
1471   p = (gchar *) proxy;
1472
1473   if (p == NULL)
1474     return TRUE;
1475
1476   /* we allow http:// in front but ignore it */
1477   if (g_str_has_prefix (p, "http://"))
1478     p += 7;
1479
1480   at = strchr (p, '@');
1481   if (at) {
1482     /* look for user:passwd */
1483     col = strchr (proxy, ':');
1484     if (col == NULL || col > at)
1485       return FALSE;
1486
1487     rtsp->proxy_user = g_strndup (p, col - p);
1488     col++;
1489     rtsp->proxy_passwd = g_strndup (col, at - col);
1490
1491     /* move to host */
1492     p = at + 1;
1493   } else {
1494     if (rtsp->prop_proxy_id != NULL && *rtsp->prop_proxy_id != '\0')
1495       rtsp->proxy_user = g_strdup (rtsp->prop_proxy_id);
1496     if (rtsp->prop_proxy_pw != NULL && *rtsp->prop_proxy_pw != '\0')
1497       rtsp->proxy_passwd = g_strdup (rtsp->prop_proxy_pw);
1498     if (rtsp->proxy_user != NULL || rtsp->proxy_passwd != NULL) {
1499       GST_LOG_OBJECT (rtsp, "set proxy user/pw from properties: %s:%s",
1500           GST_STR_NULL (rtsp->proxy_user), GST_STR_NULL (rtsp->proxy_passwd));
1501     }
1502   }
1503   col = strchr (p, ':');
1504
1505   if (col) {
1506     /* everything before the colon is the hostname */
1507     rtsp->proxy_host = g_strndup (p, col - p);
1508     p = col + 1;
1509     rtsp->proxy_port = strtoul (p, (char **) &p, 10);
1510   } else {
1511     rtsp->proxy_host = g_strdup (p);
1512     rtsp->proxy_port = 8080;
1513   }
1514   return TRUE;
1515 }
1516
1517 static void
1518 gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink * rtsp_client_sink,
1519     guint64 timeout)
1520 {
1521   rtsp_client_sink->tcp_timeout.tv_sec = timeout / G_USEC_PER_SEC;
1522   rtsp_client_sink->tcp_timeout.tv_usec = timeout % G_USEC_PER_SEC;
1523
1524   if (timeout != 0)
1525     rtsp_client_sink->ptcp_timeout = &rtsp_client_sink->tcp_timeout;
1526   else
1527     rtsp_client_sink->ptcp_timeout = NULL;
1528 }
1529
1530 static void
1531 gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
1532     const GValue * value, GParamSpec * pspec)
1533 {
1534   GstRTSPClientSink *rtsp_client_sink;
1535
1536   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1537
1538   switch (prop_id) {
1539     case PROP_LOCATION:
1540       gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (rtsp_client_sink),
1541           g_value_get_string (value), NULL);
1542       break;
1543     case PROP_PROTOCOLS:
1544       rtsp_client_sink->protocols = g_value_get_flags (value);
1545       break;
1546     case PROP_PROFILES:
1547       rtsp_client_sink->profiles = g_value_get_flags (value);
1548       break;
1549     case PROP_DEBUG:
1550       rtsp_client_sink->debug = g_value_get_boolean (value);
1551       break;
1552     case PROP_RETRY:
1553       rtsp_client_sink->retry = g_value_get_uint (value);
1554       break;
1555     case PROP_TIMEOUT:
1556       rtsp_client_sink->udp_timeout = g_value_get_uint64 (value);
1557       break;
1558     case PROP_TCP_TIMEOUT:
1559       gst_rtsp_client_sink_set_tcp_timeout (rtsp_client_sink,
1560           g_value_get_uint64 (value));
1561       break;
1562     case PROP_LATENCY:
1563       rtsp_client_sink->latency = g_value_get_uint (value);
1564       break;
1565     case PROP_RTX_TIME:
1566       rtsp_client_sink->rtx_time = g_value_get_uint (value);
1567       break;
1568     case PROP_DO_RTSP_KEEP_ALIVE:
1569       rtsp_client_sink->do_rtsp_keep_alive = g_value_get_boolean (value);
1570       break;
1571     case PROP_PROXY:
1572       gst_rtsp_client_sink_set_proxy (rtsp_client_sink,
1573           g_value_get_string (value));
1574       break;
1575     case PROP_PROXY_ID:
1576       if (rtsp_client_sink->prop_proxy_id)
1577         g_free (rtsp_client_sink->prop_proxy_id);
1578       rtsp_client_sink->prop_proxy_id = g_value_dup_string (value);
1579       break;
1580     case PROP_PROXY_PW:
1581       if (rtsp_client_sink->prop_proxy_pw)
1582         g_free (rtsp_client_sink->prop_proxy_pw);
1583       rtsp_client_sink->prop_proxy_pw = g_value_dup_string (value);
1584       break;
1585     case PROP_RTP_BLOCKSIZE:
1586       rtsp_client_sink->rtp_blocksize = g_value_get_uint (value);
1587       break;
1588     case PROP_USER_ID:
1589       if (rtsp_client_sink->user_id)
1590         g_free (rtsp_client_sink->user_id);
1591       rtsp_client_sink->user_id = g_value_dup_string (value);
1592       break;
1593     case PROP_USER_PW:
1594       if (rtsp_client_sink->user_pw)
1595         g_free (rtsp_client_sink->user_pw);
1596       rtsp_client_sink->user_pw = g_value_dup_string (value);
1597       break;
1598     case PROP_PORT_RANGE:
1599     {
1600       const gchar *str;
1601
1602       str = g_value_get_string (value);
1603       if (!str || !sscanf (str, "%u-%u",
1604               &rtsp_client_sink->client_port_range.min,
1605               &rtsp_client_sink->client_port_range.max)) {
1606         rtsp_client_sink->client_port_range.min = 0;
1607         rtsp_client_sink->client_port_range.max = 0;
1608       }
1609       break;
1610     }
1611     case PROP_UDP_BUFFER_SIZE:
1612       rtsp_client_sink->udp_buffer_size = g_value_get_int (value);
1613       break;
1614     case PROP_UDP_RECONNECT:
1615       rtsp_client_sink->udp_reconnect = g_value_get_boolean (value);
1616       break;
1617     case PROP_MULTICAST_IFACE:
1618       g_free (rtsp_client_sink->multi_iface);
1619
1620       if (g_value_get_string (value) == NULL)
1621         rtsp_client_sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1622       else
1623         rtsp_client_sink->multi_iface = g_value_dup_string (value);
1624       break;
1625     case PROP_SDES:
1626       rtsp_client_sink->sdes = g_value_dup_boxed (value);
1627       break;
1628     case PROP_TLS_VALIDATION_FLAGS:
1629       rtsp_client_sink->tls_validation_flags = g_value_get_flags (value);
1630       break;
1631     case PROP_TLS_DATABASE:
1632       g_clear_object (&rtsp_client_sink->tls_database);
1633       rtsp_client_sink->tls_database = g_value_dup_object (value);
1634       break;
1635     case PROP_TLS_INTERACTION:
1636       g_clear_object (&rtsp_client_sink->tls_interaction);
1637       rtsp_client_sink->tls_interaction = g_value_dup_object (value);
1638       break;
1639     case PROP_NTP_TIME_SOURCE:
1640       rtsp_client_sink->ntp_time_source = g_value_get_enum (value);
1641       break;
1642     case PROP_USER_AGENT:
1643       g_free (rtsp_client_sink->user_agent);
1644       rtsp_client_sink->user_agent = g_value_dup_string (value);
1645       break;
1646     default:
1647       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1648       break;
1649   }
1650 }
1651
1652 static void
1653 gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
1654     GValue * value, GParamSpec * pspec)
1655 {
1656   GstRTSPClientSink *rtsp_client_sink;
1657
1658   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1659
1660   switch (prop_id) {
1661     case PROP_LOCATION:
1662       g_value_set_string (value, rtsp_client_sink->conninfo.location);
1663       break;
1664     case PROP_PROTOCOLS:
1665       g_value_set_flags (value, rtsp_client_sink->protocols);
1666       break;
1667     case PROP_PROFILES:
1668       g_value_set_flags (value, rtsp_client_sink->profiles);
1669       break;
1670     case PROP_DEBUG:
1671       g_value_set_boolean (value, rtsp_client_sink->debug);
1672       break;
1673     case PROP_RETRY:
1674       g_value_set_uint (value, rtsp_client_sink->retry);
1675       break;
1676     case PROP_TIMEOUT:
1677       g_value_set_uint64 (value, rtsp_client_sink->udp_timeout);
1678       break;
1679     case PROP_TCP_TIMEOUT:
1680     {
1681       guint64 timeout;
1682
1683       timeout = rtsp_client_sink->tcp_timeout.tv_sec * G_USEC_PER_SEC +
1684           rtsp_client_sink->tcp_timeout.tv_usec;
1685       g_value_set_uint64 (value, timeout);
1686       break;
1687     }
1688     case PROP_LATENCY:
1689       g_value_set_uint (value, rtsp_client_sink->latency);
1690       break;
1691     case PROP_RTX_TIME:
1692       g_value_set_uint (value, rtsp_client_sink->rtx_time);
1693       break;
1694     case PROP_DO_RTSP_KEEP_ALIVE:
1695       g_value_set_boolean (value, rtsp_client_sink->do_rtsp_keep_alive);
1696       break;
1697     case PROP_PROXY:
1698     {
1699       gchar *str;
1700
1701       if (rtsp_client_sink->proxy_host) {
1702         str =
1703             g_strdup_printf ("%s:%d", rtsp_client_sink->proxy_host,
1704             rtsp_client_sink->proxy_port);
1705       } else {
1706         str = NULL;
1707       }
1708       g_value_take_string (value, str);
1709       break;
1710     }
1711     case PROP_PROXY_ID:
1712       g_value_set_string (value, rtsp_client_sink->prop_proxy_id);
1713       break;
1714     case PROP_PROXY_PW:
1715       g_value_set_string (value, rtsp_client_sink->prop_proxy_pw);
1716       break;
1717     case PROP_RTP_BLOCKSIZE:
1718       g_value_set_uint (value, rtsp_client_sink->rtp_blocksize);
1719       break;
1720     case PROP_USER_ID:
1721       g_value_set_string (value, rtsp_client_sink->user_id);
1722       break;
1723     case PROP_USER_PW:
1724       g_value_set_string (value, rtsp_client_sink->user_pw);
1725       break;
1726     case PROP_PORT_RANGE:
1727     {
1728       gchar *str;
1729
1730       if (rtsp_client_sink->client_port_range.min != 0) {
1731         str = g_strdup_printf ("%u-%u", rtsp_client_sink->client_port_range.min,
1732             rtsp_client_sink->client_port_range.max);
1733       } else {
1734         str = NULL;
1735       }
1736       g_value_take_string (value, str);
1737       break;
1738     }
1739     case PROP_UDP_BUFFER_SIZE:
1740       g_value_set_int (value, rtsp_client_sink->udp_buffer_size);
1741       break;
1742     case PROP_UDP_RECONNECT:
1743       g_value_set_boolean (value, rtsp_client_sink->udp_reconnect);
1744       break;
1745     case PROP_MULTICAST_IFACE:
1746       g_value_set_string (value, rtsp_client_sink->multi_iface);
1747       break;
1748     case PROP_SDES:
1749       g_value_set_boxed (value, rtsp_client_sink->sdes);
1750       break;
1751     case PROP_TLS_VALIDATION_FLAGS:
1752       g_value_set_flags (value, rtsp_client_sink->tls_validation_flags);
1753       break;
1754     case PROP_TLS_DATABASE:
1755       g_value_set_object (value, rtsp_client_sink->tls_database);
1756       break;
1757     case PROP_TLS_INTERACTION:
1758       g_value_set_object (value, rtsp_client_sink->tls_interaction);
1759       break;
1760     case PROP_NTP_TIME_SOURCE:
1761       g_value_set_enum (value, rtsp_client_sink->ntp_time_source);
1762       break;
1763     case PROP_USER_AGENT:
1764       g_value_set_string (value, rtsp_client_sink->user_agent);
1765       break;
1766     default:
1767       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1768       break;
1769   }
1770 }
1771
1772 static const gchar *
1773 get_aggregate_control (GstRTSPClientSink * sink)
1774 {
1775   const gchar *base;
1776
1777   if (sink->control)
1778     base = sink->control;
1779   else if (sink->content_base)
1780     base = sink->content_base;
1781   else if (sink->conninfo.url_str)
1782     base = sink->conninfo.url_str;
1783   else
1784     base = "/";
1785
1786   return base;
1787 }
1788
1789 static void
1790 gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink)
1791 {
1792   GList *walk;
1793
1794   GST_DEBUG_OBJECT (sink, "cleanup");
1795
1796   gst_element_set_state (GST_ELEMENT (sink->internal_bin), GST_STATE_NULL);
1797
1798   /* Clean up any left over stream objects */
1799   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
1800     GstRTSPStreamContext *context = (GstRTSPStreamContext *) (walk->data);
1801     if (context->stream_transport) {
1802       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1803       gst_object_unref (context->stream_transport);
1804       context->stream_transport = NULL;
1805     }
1806
1807     if (context->stream) {
1808       if (context->joined) {
1809         gst_rtsp_stream_leave_bin (context->stream,
1810             GST_BIN (sink->internal_bin), sink->rtpbin);
1811         context->joined = FALSE;
1812       }
1813       gst_object_unref (context->stream);
1814       context->stream = NULL;
1815     }
1816
1817     if (context->srtcpparams) {
1818       gst_caps_unref (context->srtcpparams);
1819       context->srtcpparams = NULL;
1820     }
1821     g_free (context->conninfo.location);
1822     context->conninfo.location = NULL;
1823   }
1824
1825   if (sink->rtpbin) {
1826     gst_element_set_state (sink->rtpbin, GST_STATE_NULL);
1827     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), sink->rtpbin);
1828     sink->rtpbin = NULL;
1829   }
1830
1831   g_free (sink->content_base);
1832   sink->content_base = NULL;
1833
1834   g_free (sink->control);
1835   sink->control = NULL;
1836
1837   if (sink->range)
1838     gst_rtsp_range_free (sink->range);
1839   sink->range = NULL;
1840
1841   /* don't clear the SDP when it was used in the url */
1842   if (sink->uri_sdp && !sink->from_sdp) {
1843     gst_sdp_message_free (sink->uri_sdp);
1844     sink->uri_sdp = NULL;
1845   }
1846
1847   if (sink->provided_clock) {
1848     gst_object_unref (sink->provided_clock);
1849     sink->provided_clock = NULL;
1850   }
1851
1852   g_free (sink->server_ip);
1853   sink->server_ip = NULL;
1854
1855   sink->next_pad_id = 0;
1856   sink->next_dyn_pt = 96;
1857 }
1858
1859 static GstRTSPResult
1860 gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
1861     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
1862 {
1863   GstRTSPResult ret;
1864
1865   if (conninfo->connection) {
1866     g_mutex_lock (&conninfo->send_lock);
1867     ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
1868     g_mutex_unlock (&conninfo->send_lock);
1869   } else {
1870     ret = GST_RTSP_ERROR;
1871   }
1872
1873   return ret;
1874 }
1875
1876 static GstRTSPResult
1877 gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
1878     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
1879 {
1880   GstRTSPResult ret;
1881
1882   if (conninfo->connection) {
1883     g_mutex_lock (&conninfo->recv_lock);
1884     ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
1885     g_mutex_unlock (&conninfo->recv_lock);
1886   } else {
1887     ret = GST_RTSP_ERROR;
1888   }
1889
1890   return ret;
1891 }
1892
1893 static gboolean
1894 accept_certificate_cb (GTlsConnection * conn, GTlsCertificate * peer_cert,
1895     GTlsCertificateFlags errors, gpointer user_data)
1896 {
1897   GstRTSPClientSink *sink = user_data;
1898   gboolean accept = FALSE;
1899
1900   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_ACCEPT_CERTIFICATE],
1901       0, conn, peer_cert, errors, &accept);
1902
1903   return accept;
1904 }
1905
1906 static GstRTSPResult
1907 gst_rtsp_conninfo_connect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1908     gboolean async)
1909 {
1910   GstRTSPResult res;
1911
1912   if (info->connection == NULL) {
1913     if (info->url == NULL) {
1914       GST_DEBUG_OBJECT (sink, "parsing uri (%s)...", info->location);
1915       if ((res = gst_rtsp_url_parse (info->location, &info->url)) < 0)
1916         goto parse_error;
1917     }
1918
1919     /* create connection */
1920     GST_DEBUG_OBJECT (sink, "creating connection (%s)...", info->location);
1921     if ((res = gst_rtsp_connection_create (info->url, &info->connection)) < 0)
1922       goto could_not_create;
1923
1924     if (info->url_str)
1925       g_free (info->url_str);
1926     info->url_str = gst_rtsp_url_get_request_uri (info->url);
1927
1928     GST_DEBUG_OBJECT (sink, "sanitized uri %s", info->url_str);
1929
1930     if (info->url->transports & GST_RTSP_LOWER_TRANS_TLS) {
1931       if (!gst_rtsp_connection_set_tls_validation_flags (info->connection,
1932               sink->tls_validation_flags))
1933         GST_WARNING_OBJECT (sink, "Unable to set TLS validation flags");
1934
1935       if (sink->tls_database)
1936         gst_rtsp_connection_set_tls_database (info->connection,
1937             sink->tls_database);
1938
1939       if (sink->tls_interaction)
1940         gst_rtsp_connection_set_tls_interaction (info->connection,
1941             sink->tls_interaction);
1942
1943       gst_rtsp_connection_set_accept_certificate_func (info->connection,
1944           accept_certificate_cb, sink, NULL);
1945     }
1946
1947     if (info->url->transports & GST_RTSP_LOWER_TRANS_HTTP)
1948       gst_rtsp_connection_set_tunneled (info->connection, TRUE);
1949
1950     if (sink->proxy_host) {
1951       GST_DEBUG_OBJECT (sink, "setting proxy %s:%d", sink->proxy_host,
1952           sink->proxy_port);
1953       gst_rtsp_connection_set_proxy (info->connection, sink->proxy_host,
1954           sink->proxy_port);
1955     }
1956   }
1957
1958   if (!info->connected) {
1959     /* connect */
1960     if (async)
1961       GST_ELEMENT_PROGRESS (sink, CONTINUE, "connect",
1962           ("Connecting to %s", info->location));
1963     GST_DEBUG_OBJECT (sink, "connecting (%s)...", info->location);
1964     if ((res =
1965             gst_rtsp_connection_connect (info->connection,
1966                 sink->ptcp_timeout)) < 0)
1967       goto could_not_connect;
1968
1969     info->connected = TRUE;
1970   }
1971   return GST_RTSP_OK;
1972
1973   /* ERRORS */
1974 parse_error:
1975   {
1976     GST_ERROR_OBJECT (sink, "No valid RTSP URL was provided");
1977     return res;
1978   }
1979 could_not_create:
1980   {
1981     gchar *str = gst_rtsp_strresult (res);
1982     GST_ERROR_OBJECT (sink, "Could not create connection. (%s)", str);
1983     g_free (str);
1984     return res;
1985   }
1986 could_not_connect:
1987   {
1988     gchar *str = gst_rtsp_strresult (res);
1989     GST_ERROR_OBJECT (sink, "Could not connect to server. (%s)", str);
1990     g_free (str);
1991     return res;
1992   }
1993 }
1994
1995 static GstRTSPResult
1996 gst_rtsp_conninfo_close (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1997     gboolean free)
1998 {
1999   GST_RTSP_STATE_LOCK (sink);
2000   if (info->connected) {
2001     GST_DEBUG_OBJECT (sink, "closing connection...");
2002     gst_rtsp_connection_close (info->connection);
2003     info->connected = FALSE;
2004   }
2005   if (free && info->connection) {
2006     /* free connection */
2007     GST_DEBUG_OBJECT (sink, "freeing connection...");
2008     gst_rtsp_connection_free (info->connection);
2009     info->connection = NULL;
2010   }
2011   GST_RTSP_STATE_UNLOCK (sink);
2012   return GST_RTSP_OK;
2013 }
2014
2015 static GstRTSPResult
2016 gst_rtsp_conninfo_reconnect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
2017     gboolean async)
2018 {
2019   GstRTSPResult res;
2020
2021   GST_DEBUG_OBJECT (sink, "reconnecting connection...");
2022   gst_rtsp_conninfo_close (sink, info, FALSE);
2023   res = gst_rtsp_conninfo_connect (sink, info, async);
2024
2025   return res;
2026 }
2027
2028 static void
2029 gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink, gboolean flush)
2030 {
2031   GList *walk;
2032
2033   GST_DEBUG_OBJECT (sink, "set flushing %d", flush);
2034   g_mutex_lock (&sink->preroll_lock);
2035   if (sink->conninfo.connection && sink->conninfo.flushing != flush) {
2036     GST_DEBUG_OBJECT (sink, "connection flush");
2037     gst_rtsp_connection_flush (sink->conninfo.connection, flush);
2038     sink->conninfo.flushing = flush;
2039   }
2040   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
2041     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
2042     if (stream->conninfo.connection && stream->conninfo.flushing != flush) {
2043       GST_DEBUG_OBJECT (sink, "stream %p flush", stream);
2044       gst_rtsp_connection_flush (stream->conninfo.connection, flush);
2045       stream->conninfo.flushing = flush;
2046     }
2047   }
2048   g_cond_broadcast (&sink->preroll_cond);
2049   g_mutex_unlock (&sink->preroll_lock);
2050 }
2051
2052 static GstRTSPResult
2053 gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink,
2054     GstRTSPMessage * msg, GstRTSPMethod method, const gchar * uri)
2055 {
2056   GstRTSPResult res;
2057
2058   res = gst_rtsp_message_init_request (msg, method, uri);
2059   if (res < 0)
2060     return res;
2061
2062   /* set user-agent */
2063   if (sink->user_agent)
2064     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_USER_AGENT,
2065         sink->user_agent);
2066
2067   return res;
2068 }
2069
2070 /* FIXME, handle server request, reply with OK, for now */
2071 static GstRTSPResult
2072 gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
2073     GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
2074 {
2075   GstRTSPMessage response = { 0 };
2076   GstRTSPResult res;
2077
2078   GST_DEBUG_OBJECT (sink, "got server request message");
2079
2080   if (sink->debug)
2081     gst_rtsp_message_dump (request);
2082
2083   /* default implementation, send OK */
2084   GST_DEBUG_OBJECT (sink, "prepare OK reply");
2085   res =
2086       gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, "OK",
2087       request);
2088   if (res < 0)
2089     goto send_error;
2090
2091   /* let app parse and reply */
2092   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST],
2093       0, request, &response);
2094
2095   if (sink->debug)
2096     gst_rtsp_message_dump (&response);
2097
2098   res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL);
2099   if (res < 0)
2100     goto send_error;
2101
2102   gst_rtsp_message_unset (&response);
2103
2104   return GST_RTSP_OK;
2105
2106   /* ERRORS */
2107 send_error:
2108   {
2109     gst_rtsp_message_unset (&response);
2110     return res;
2111   }
2112 }
2113
2114 /* send server keep-alive */
2115 static GstRTSPResult
2116 gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink)
2117 {
2118   GstRTSPMessage request = { 0 };
2119   GstRTSPResult res;
2120   GstRTSPMethod method;
2121   const gchar *control;
2122
2123   if (sink->do_rtsp_keep_alive == FALSE) {
2124     GST_DEBUG_OBJECT (sink, "do-rtsp-keep-alive is FALSE, not sending.");
2125     gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
2126     return GST_RTSP_OK;
2127   }
2128
2129   GST_DEBUG_OBJECT (sink, "creating server keep-alive");
2130
2131   /* find a method to use for keep-alive */
2132   if (sink->methods & GST_RTSP_GET_PARAMETER)
2133     method = GST_RTSP_GET_PARAMETER;
2134   else
2135     method = GST_RTSP_OPTIONS;
2136
2137   control = get_aggregate_control (sink);
2138   if (control == NULL)
2139     goto no_control;
2140
2141   res = gst_rtsp_client_sink_init_request (sink, &request, method, control);
2142   if (res < 0)
2143     goto send_error;
2144
2145   if (sink->debug)
2146     gst_rtsp_message_dump (&request);
2147
2148   res =
2149       gst_rtsp_client_sink_connection_send (sink, &sink->conninfo,
2150       &request, NULL);
2151   if (res < 0)
2152     goto send_error;
2153
2154   gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
2155   gst_rtsp_message_unset (&request);
2156
2157   return GST_RTSP_OK;
2158
2159   /* ERRORS */
2160 no_control:
2161   {
2162     GST_WARNING_OBJECT (sink, "no control url to send keepalive");
2163     return GST_RTSP_OK;
2164   }
2165 send_error:
2166   {
2167     gchar *str = gst_rtsp_strresult (res);
2168
2169     gst_rtsp_message_unset (&request);
2170     GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, (NULL),
2171         ("Could not send keep-alive. (%s)", str));
2172     g_free (str);
2173     return res;
2174   }
2175 }
2176
2177 static GstFlowReturn
2178 gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
2179 {
2180   GstRTSPResult res;
2181   GstRTSPMessage message = { 0 };
2182   gint retry = 0;
2183
2184   while (TRUE) {
2185     GTimeVal tv_timeout;
2186
2187     /* get the next timeout interval */
2188     gst_rtsp_connection_next_timeout (sink->conninfo.connection, &tv_timeout);
2189
2190     GST_DEBUG_OBJECT (sink, "doing receive with timeout %d seconds",
2191         (gint) tv_timeout.tv_sec);
2192
2193     gst_rtsp_message_unset (&message);
2194
2195     /* we should continue reading the TCP socket because the server might
2196      * send us requests. When the session timeout expires, we need to send a
2197      * keep-alive request to keep the session open. */
2198     res =
2199         gst_rtsp_client_sink_connection_receive (sink,
2200         &sink->conninfo, &message, &tv_timeout);
2201
2202     switch (res) {
2203       case GST_RTSP_OK:
2204         GST_DEBUG_OBJECT (sink, "we received a server message");
2205         break;
2206       case GST_RTSP_EINTR:
2207         /* we got interrupted, see what we have to do */
2208         goto interrupt;
2209       case GST_RTSP_ETIMEOUT:
2210         /* send keep-alive, ignore the result, a warning will be posted. */
2211         GST_DEBUG_OBJECT (sink, "timeout, sending keep-alive");
2212         if ((res =
2213                 gst_rtsp_client_sink_send_keep_alive (sink)) == GST_RTSP_EINTR)
2214           goto interrupt;
2215         continue;
2216       case GST_RTSP_EEOF:
2217         /* server closed the connection. not very fatal for UDP, reconnect and
2218          * see what happens. */
2219         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2220             ("The server closed the connection."));
2221         if (sink->udp_reconnect) {
2222           if ((res =
2223                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2224                       FALSE)) < 0)
2225             goto connect_error;
2226         } else {
2227           goto server_eof;
2228         }
2229         continue;
2230         break;
2231       case GST_RTSP_ENET:
2232         GST_DEBUG_OBJECT (sink, "An ethernet problem occured.");
2233       default:
2234         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2235             ("Unhandled return value %d.", res));
2236         goto receive_error;
2237     }
2238
2239     switch (message.type) {
2240       case GST_RTSP_MESSAGE_REQUEST:
2241         /* server sends us a request message, handle it */
2242         res =
2243             gst_rtsp_client_sink_handle_request (sink,
2244             &sink->conninfo, &message);
2245         if (res == GST_RTSP_EEOF)
2246           goto server_eof;
2247         else if (res < 0)
2248           goto handle_request_failed;
2249         break;
2250       case GST_RTSP_MESSAGE_RESPONSE:
2251         /* we ignore response and data messages */
2252         GST_DEBUG_OBJECT (sink, "ignoring response message");
2253         if (sink->debug)
2254           gst_rtsp_message_dump (&message);
2255         if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) {
2256           GST_DEBUG_OBJECT (sink, "but is Unauthorized response ...");
2257           if (gst_rtsp_client_sink_setup_auth (sink, &message) && !(retry++)) {
2258             GST_DEBUG_OBJECT (sink, "so retrying keep-alive");
2259             if ((res =
2260                     gst_rtsp_client_sink_send_keep_alive (sink)) ==
2261                 GST_RTSP_EINTR)
2262               goto interrupt;
2263           }
2264         } else {
2265           retry = 0;
2266         }
2267         break;
2268       case GST_RTSP_MESSAGE_DATA:
2269         /* we ignore response and data messages */
2270         GST_DEBUG_OBJECT (sink, "ignoring data message");
2271         break;
2272       default:
2273         GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2274             message.type);
2275         break;
2276     }
2277   }
2278   g_assert_not_reached ();
2279
2280   /* we get here when the connection got interrupted */
2281 interrupt:
2282   {
2283     gst_rtsp_message_unset (&message);
2284     GST_DEBUG_OBJECT (sink, "got interrupted");
2285     return GST_FLOW_FLUSHING;
2286   }
2287 connect_error:
2288   {
2289     gchar *str = gst_rtsp_strresult (res);
2290     GstFlowReturn ret;
2291
2292     sink->conninfo.connected = FALSE;
2293     if (res != GST_RTSP_EINTR) {
2294       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
2295           ("Could not connect to server. (%s)", str));
2296       g_free (str);
2297       ret = GST_FLOW_ERROR;
2298     } else {
2299       ret = GST_FLOW_FLUSHING;
2300     }
2301     return ret;
2302   }
2303 receive_error:
2304   {
2305     gchar *str = gst_rtsp_strresult (res);
2306
2307     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2308         ("Could not receive message. (%s)", str));
2309     g_free (str);
2310     return GST_FLOW_ERROR;
2311   }
2312 handle_request_failed:
2313   {
2314     gchar *str = gst_rtsp_strresult (res);
2315     GstFlowReturn ret;
2316
2317     gst_rtsp_message_unset (&message);
2318     if (res != GST_RTSP_EINTR) {
2319       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2320           ("Could not handle server message. (%s)", str));
2321       g_free (str);
2322       ret = GST_FLOW_ERROR;
2323     } else {
2324       ret = GST_FLOW_FLUSHING;
2325     }
2326     return ret;
2327   }
2328 server_eof:
2329   {
2330     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2331     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2332         ("The server closed the connection."));
2333     sink->conninfo.connected = FALSE;
2334     gst_rtsp_message_unset (&message);
2335     return GST_FLOW_EOS;
2336   }
2337 }
2338
2339 static GstRTSPResult
2340 gst_rtsp_client_sink_reconnect (GstRTSPClientSink * sink, gboolean async)
2341 {
2342   GstRTSPResult res = GST_RTSP_OK;
2343   gboolean restart = FALSE;
2344
2345   GST_DEBUG_OBJECT (sink, "doing reconnect");
2346
2347   GST_FIXME_OBJECT (sink, "Reconnection is not yet implemented");
2348
2349   /* no need to restart, we're done */
2350   if (!restart)
2351     goto done;
2352
2353   /* we can try only TCP now */
2354   sink->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
2355
2356   /* close and cleanup our state */
2357   if ((res = gst_rtsp_client_sink_close (sink, async, FALSE)) < 0)
2358     goto done;
2359
2360   /* see if we have TCP left to try. Also don't try TCP when we were configured
2361    * with an SDP. */
2362   if (!(sink->protocols & GST_RTSP_LOWER_TRANS_TCP) || sink->from_sdp)
2363     goto no_protocols;
2364
2365   /* We post a warning message now to inform the user
2366    * that nothing happened. It's most likely a firewall thing. */
2367   GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2368       ("Could not receive any UDP packets for %.4f seconds, maybe your "
2369           "firewall is blocking it. Retrying using a TCP connection.",
2370           gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2371
2372   /* open new connection using tcp */
2373   if (gst_rtsp_client_sink_open (sink, async) < 0)
2374     goto open_failed;
2375
2376   /* start recording */
2377   if (gst_rtsp_client_sink_record (sink, async) < 0)
2378     goto play_failed;
2379
2380 done:
2381   return res;
2382
2383   /* ERRORS */
2384 no_protocols:
2385   {
2386     sink->cur_protocols = 0;
2387     /* no transport possible, post an error and stop */
2388     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2389         ("Could not receive any UDP packets for %.4f seconds, maybe your "
2390             "firewall is blocking it. No other protocols to try.",
2391             gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2392     return GST_RTSP_ERROR;
2393   }
2394 open_failed:
2395   {
2396     GST_DEBUG_OBJECT (sink, "open failed");
2397     return GST_RTSP_OK;
2398   }
2399 play_failed:
2400   {
2401     GST_DEBUG_OBJECT (sink, "play failed");
2402     return GST_RTSP_OK;
2403   }
2404 }
2405
2406 static void
2407 gst_rtsp_client_sink_loop_start_cmd (GstRTSPClientSink * sink, gint cmd)
2408 {
2409   switch (cmd) {
2410     case CMD_OPEN:
2411       GST_ELEMENT_PROGRESS (sink, START, "open", ("Opening Stream"));
2412       break;
2413     case CMD_RECORD:
2414       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending RECORD request"));
2415       break;
2416     case CMD_PAUSE:
2417       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending PAUSE request"));
2418       break;
2419     case CMD_CLOSE:
2420       GST_ELEMENT_PROGRESS (sink, START, "close", ("Closing Stream"));
2421       break;
2422     default:
2423       break;
2424   }
2425 }
2426
2427 static void
2428 gst_rtsp_client_sink_loop_complete_cmd (GstRTSPClientSink * sink, gint cmd)
2429 {
2430   switch (cmd) {
2431     case CMD_OPEN:
2432       GST_ELEMENT_PROGRESS (sink, COMPLETE, "open", ("Opened Stream"));
2433       break;
2434     case CMD_RECORD:
2435       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent RECORD request"));
2436       break;
2437     case CMD_PAUSE:
2438       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent PAUSE request"));
2439       break;
2440     case CMD_CLOSE:
2441       GST_ELEMENT_PROGRESS (sink, COMPLETE, "close", ("Closed Stream"));
2442       break;
2443     default:
2444       break;
2445   }
2446 }
2447
2448 static void
2449 gst_rtsp_client_sink_loop_cancel_cmd (GstRTSPClientSink * sink, gint cmd)
2450 {
2451   switch (cmd) {
2452     case CMD_OPEN:
2453       GST_ELEMENT_PROGRESS (sink, CANCELED, "open", ("Open canceled"));
2454       break;
2455     case CMD_RECORD:
2456       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("RECORD canceled"));
2457       break;
2458     case CMD_PAUSE:
2459       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("PAUSE canceled"));
2460       break;
2461     case CMD_CLOSE:
2462       GST_ELEMENT_PROGRESS (sink, CANCELED, "close", ("Close canceled"));
2463       break;
2464     default:
2465       break;
2466   }
2467 }
2468
2469 static void
2470 gst_rtsp_client_sink_loop_error_cmd (GstRTSPClientSink * sink, gint cmd)
2471 {
2472   switch (cmd) {
2473     case CMD_OPEN:
2474       GST_ELEMENT_PROGRESS (sink, ERROR, "open", ("Open failed"));
2475       break;
2476     case CMD_RECORD:
2477       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("RECORD failed"));
2478       break;
2479     case CMD_PAUSE:
2480       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("PAUSE failed"));
2481       break;
2482     case CMD_CLOSE:
2483       GST_ELEMENT_PROGRESS (sink, ERROR, "close", ("Close failed"));
2484       break;
2485     default:
2486       break;
2487   }
2488 }
2489
2490 static void
2491 gst_rtsp_client_sink_loop_end_cmd (GstRTSPClientSink * sink, gint cmd,
2492     GstRTSPResult ret)
2493 {
2494   if (ret == GST_RTSP_OK)
2495     gst_rtsp_client_sink_loop_complete_cmd (sink, cmd);
2496   else if (ret == GST_RTSP_EINTR)
2497     gst_rtsp_client_sink_loop_cancel_cmd (sink, cmd);
2498   else
2499     gst_rtsp_client_sink_loop_error_cmd (sink, cmd);
2500 }
2501
2502 static gboolean
2503 gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink, gint cmd,
2504     gint mask)
2505 {
2506   gint old;
2507   gboolean flushed = FALSE;
2508
2509   /* start new request */
2510   gst_rtsp_client_sink_loop_start_cmd (sink, cmd);
2511
2512   GST_DEBUG_OBJECT (sink, "sending cmd %s", cmd_to_string (cmd));
2513
2514   GST_OBJECT_LOCK (sink);
2515   old = sink->pending_cmd;
2516   if (old == CMD_RECONNECT) {
2517     GST_DEBUG_OBJECT (sink, "ignore, we were reconnecting");
2518     cmd = CMD_RECONNECT;
2519   }
2520   if (old != CMD_WAIT) {
2521     sink->pending_cmd = CMD_WAIT;
2522     GST_OBJECT_UNLOCK (sink);
2523     /* cancel previous request */
2524     GST_DEBUG_OBJECT (sink, "cancel previous request %s", cmd_to_string (old));
2525     gst_rtsp_client_sink_loop_cancel_cmd (sink, old);
2526     GST_OBJECT_LOCK (sink);
2527   }
2528   sink->pending_cmd = cmd;
2529   /* interrupt if allowed */
2530   if (sink->busy_cmd & mask) {
2531     GST_DEBUG_OBJECT (sink, "connection flush busy %s",
2532         cmd_to_string (sink->busy_cmd));
2533     gst_rtsp_client_sink_connection_flush (sink, TRUE);
2534     flushed = TRUE;
2535   } else {
2536     GST_DEBUG_OBJECT (sink, "not interrupting busy cmd %s",
2537         cmd_to_string (sink->busy_cmd));
2538   }
2539   if (sink->task)
2540     gst_task_start (sink->task);
2541   GST_OBJECT_UNLOCK (sink);
2542
2543   return flushed;
2544 }
2545
2546 static gboolean
2547 gst_rtsp_client_sink_loop (GstRTSPClientSink * sink)
2548 {
2549   GstFlowReturn ret;
2550
2551   if (!sink->conninfo.connection || !sink->conninfo.connected)
2552     goto no_connection;
2553
2554   ret = gst_rtsp_client_sink_loop_rx (sink);
2555   if (ret != GST_FLOW_OK)
2556     goto pause;
2557
2558   return TRUE;
2559
2560   /* ERRORS */
2561 no_connection:
2562   {
2563     GST_WARNING_OBJECT (sink, "we are not connected");
2564     ret = GST_FLOW_FLUSHING;
2565     goto pause;
2566   }
2567 pause:
2568   {
2569     const gchar *reason = gst_flow_get_name (ret);
2570
2571     GST_DEBUG_OBJECT (sink, "pausing task, reason %s", reason);
2572     gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_LOOP);
2573     return FALSE;
2574   }
2575 }
2576
2577 #ifndef GST_DISABLE_GST_DEBUG
2578 static const gchar *
2579 gst_rtsp_auth_method_to_string (GstRTSPAuthMethod method)
2580 {
2581   gint index = 0;
2582
2583   while (method != 0) {
2584     index++;
2585     method >>= 1;
2586   }
2587   switch (index) {
2588     case 0:
2589       return "None";
2590     case 1:
2591       return "Basic";
2592     case 2:
2593       return "Digest";
2594   }
2595
2596   return "Unknown";
2597 }
2598 #endif
2599
2600 /* Parse a WWW-Authenticate Response header and determine the
2601  * available authentication methods
2602  *
2603  * This code should also cope with the fact that each WWW-Authenticate
2604  * header can contain multiple challenge methods + tokens
2605  *
2606  * At the moment, for Basic auth, we just do a minimal check and don't
2607  * even parse out the realm */
2608 static void
2609 gst_rtsp_client_sink_parse_auth_hdr (GstRTSPMessage * response,
2610     GstRTSPAuthMethod * methods, GstRTSPConnection * conn, gboolean * stale)
2611 {
2612   GstRTSPAuthCredential **credentials, **credential;
2613
2614   g_return_if_fail (response != NULL);
2615   g_return_if_fail (methods != NULL);
2616   g_return_if_fail (stale != NULL);
2617
2618   credentials =
2619       gst_rtsp_message_parse_auth_credentials (response,
2620       GST_RTSP_HDR_WWW_AUTHENTICATE);
2621   if (!credentials)
2622     return;
2623
2624   credential = credentials;
2625   while (*credential) {
2626     if ((*credential)->scheme == GST_RTSP_AUTH_BASIC) {
2627       *methods |= GST_RTSP_AUTH_BASIC;
2628     } else if ((*credential)->scheme == GST_RTSP_AUTH_DIGEST) {
2629       GstRTSPAuthParam **param = (*credential)->params;
2630
2631       *methods |= GST_RTSP_AUTH_DIGEST;
2632
2633       gst_rtsp_connection_clear_auth_params (conn);
2634       *stale = FALSE;
2635
2636       while (*param) {
2637         if (strcmp ((*param)->name, "stale") == 0
2638             && g_ascii_strcasecmp ((*param)->value, "TRUE") == 0)
2639           *stale = TRUE;
2640         gst_rtsp_connection_set_auth_param (conn, (*param)->name,
2641             (*param)->value);
2642         param++;
2643       }
2644     }
2645
2646     credential++;
2647   }
2648
2649   gst_rtsp_auth_credentials_free (credentials);
2650 }
2651
2652 /**
2653  * gst_rtsp_client_sink_setup_auth:
2654  * @src: the rtsp source
2655  *
2656  * Configure a username and password and auth method on the
2657  * connection object based on a response we received from the
2658  * peer.
2659  *
2660  * Currently, this requires that a username and password were supplied
2661  * in the uri. In the future, they may be requested on demand by sending
2662  * a message up the bus.
2663  *
2664  * Returns: TRUE if authentication information could be set up correctly.
2665  */
2666 static gboolean
2667 gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
2668     GstRTSPMessage * response)
2669 {
2670   gchar *user = NULL;
2671   gchar *pass = NULL;
2672   GstRTSPAuthMethod avail_methods = GST_RTSP_AUTH_NONE;
2673   GstRTSPAuthMethod method;
2674   GstRTSPResult auth_result;
2675   GstRTSPUrl *url;
2676   GstRTSPConnection *conn;
2677   gboolean stale = FALSE;
2678
2679   conn = sink->conninfo.connection;
2680
2681   /* Identify the available auth methods and see if any are supported */
2682   gst_rtsp_client_sink_parse_auth_hdr (response, &avail_methods, conn, &stale);
2683
2684   if (avail_methods == GST_RTSP_AUTH_NONE)
2685     goto no_auth_available;
2686
2687   /* For digest auth, if the response indicates that the session
2688    * data are stale, we just update them in the connection object and
2689    * return TRUE to retry the request */
2690   if (stale)
2691     sink->tried_url_auth = FALSE;
2692
2693   url = gst_rtsp_connection_get_url (conn);
2694
2695   /* Do we have username and password available? */
2696   if (url != NULL && !sink->tried_url_auth && url->user != NULL
2697       && url->passwd != NULL) {
2698     user = url->user;
2699     pass = url->passwd;
2700     sink->tried_url_auth = TRUE;
2701     GST_DEBUG_OBJECT (sink,
2702         "Attempting authentication using credentials from the URL");
2703   } else {
2704     user = sink->user_id;
2705     pass = sink->user_pw;
2706     GST_DEBUG_OBJECT (sink,
2707         "Attempting authentication using credentials from the properties");
2708   }
2709
2710   /* FIXME: If the url didn't contain username and password or we tried them
2711    * already, request a username and passwd from the application via some kind
2712    * of credentials request message */
2713
2714   /* If we don't have a username and passwd at this point, bail out. */
2715   if (user == NULL || pass == NULL)
2716     goto no_user_pass;
2717
2718   /* Try to configure for each available authentication method, strongest to
2719    * weakest */
2720   for (method = GST_RTSP_AUTH_MAX; method != GST_RTSP_AUTH_NONE; method >>= 1) {
2721     /* Check if this method is available on the server */
2722     if ((method & avail_methods) == 0)
2723       continue;
2724
2725     /* Pass the credentials to the connection to try on the next request */
2726     auth_result = gst_rtsp_connection_set_auth (conn, method, user, pass);
2727     /* INVAL indicates an invalid username/passwd were supplied, so we'll just
2728      * ignore it and end up retrying later */
2729     if (auth_result == GST_RTSP_OK || auth_result == GST_RTSP_EINVAL) {
2730       GST_DEBUG_OBJECT (sink, "Attempting %s authentication",
2731           gst_rtsp_auth_method_to_string (method));
2732       break;
2733     }
2734   }
2735
2736   if (method == GST_RTSP_AUTH_NONE)
2737     goto no_auth_available;
2738
2739   return TRUE;
2740
2741 no_auth_available:
2742   {
2743     /* Output an error indicating that we couldn't connect because there were
2744      * no supported authentication protocols */
2745     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
2746         ("No supported authentication protocol was found"));
2747     return FALSE;
2748   }
2749 no_user_pass:
2750   {
2751     /* We don't fire an error message, we just return FALSE and let the
2752      * normal NOT_AUTHORIZED error be propagated */
2753     return FALSE;
2754   }
2755 }
2756
2757 static GstRTSPResult
2758 gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
2759     GstRTSPConnInfo * conninfo, GstRTSPMessage * request,
2760     GstRTSPMessage * response, GstRTSPStatusCode * code)
2761 {
2762   GstRTSPResult res;
2763   GstRTSPStatusCode thecode;
2764   gchar *content_base = NULL;
2765   gint try = 0;
2766
2767 again:
2768   GST_DEBUG_OBJECT (sink, "sending message");
2769
2770   if (sink->debug)
2771     gst_rtsp_message_dump (request);
2772
2773   g_mutex_lock (&sink->send_lock);
2774
2775   res =
2776       gst_rtsp_client_sink_connection_send (sink, conninfo, request,
2777       sink->ptcp_timeout);
2778   if (res < 0) {
2779     g_mutex_unlock (&sink->send_lock);
2780     goto send_error;
2781   }
2782
2783   gst_rtsp_connection_reset_timeout (conninfo->connection);
2784
2785   /* See if we should handle the response */
2786   if (response == NULL) {
2787     g_mutex_unlock (&sink->send_lock);
2788     return GST_RTSP_OK;
2789   }
2790 next:
2791   res =
2792       gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
2793       sink->ptcp_timeout);
2794
2795   g_mutex_unlock (&sink->send_lock);
2796
2797   if (res < 0)
2798     goto receive_error;
2799
2800   if (sink->debug)
2801     gst_rtsp_message_dump (response);
2802
2803
2804   switch (response->type) {
2805     case GST_RTSP_MESSAGE_REQUEST:
2806       res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
2807       if (res == GST_RTSP_EEOF)
2808         goto server_eof;
2809       else if (res < 0)
2810         goto handle_request_failed;
2811       g_mutex_lock (&sink->send_lock);
2812       goto next;
2813     case GST_RTSP_MESSAGE_RESPONSE:
2814       /* ok, a response is good */
2815       GST_DEBUG_OBJECT (sink, "received response message");
2816       break;
2817     case GST_RTSP_MESSAGE_DATA:
2818       /* we ignore data messages */
2819       GST_DEBUG_OBJECT (sink, "ignoring data message");
2820       g_mutex_lock (&sink->send_lock);
2821       goto next;
2822     default:
2823       GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2824           response->type);
2825       g_mutex_lock (&sink->send_lock);
2826       goto next;
2827   }
2828
2829   thecode = response->type_data.response.code;
2830
2831   GST_DEBUG_OBJECT (sink, "got response message %d", thecode);
2832
2833   /* if the caller wanted the result code, we store it. */
2834   if (code)
2835     *code = thecode;
2836
2837   /* If the request didn't succeed, bail out before doing any more */
2838   if (thecode != GST_RTSP_STS_OK)
2839     return GST_RTSP_OK;
2840
2841   /* store new content base if any */
2842   gst_rtsp_message_get_header (response, GST_RTSP_HDR_CONTENT_BASE,
2843       &content_base, 0);
2844   if (content_base) {
2845     g_free (sink->content_base);
2846     sink->content_base = g_strdup (content_base);
2847   }
2848
2849   return GST_RTSP_OK;
2850
2851   /* ERRORS */
2852 send_error:
2853   {
2854     gchar *str = gst_rtsp_strresult (res);
2855
2856     if (res != GST_RTSP_EINTR) {
2857       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2858           ("Could not send message. (%s)", str));
2859     } else {
2860       GST_WARNING_OBJECT (sink, "send interrupted");
2861     }
2862     g_free (str);
2863     return res;
2864   }
2865 receive_error:
2866   {
2867     switch (res) {
2868       case GST_RTSP_EEOF:
2869         GST_WARNING_OBJECT (sink, "server closed connection");
2870         if ((try == 0) && !sink->interleaved && sink->udp_reconnect) {
2871           try++;
2872           /* if reconnect succeeds, try again */
2873           if ((res =
2874                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2875                       FALSE)) == 0)
2876             goto again;
2877         }
2878         /* only try once after reconnect, then fallthrough and error out */
2879       default:
2880       {
2881         gchar *str = gst_rtsp_strresult (res);
2882
2883         if (res != GST_RTSP_EINTR) {
2884           GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2885               ("Could not receive message. (%s)", str));
2886         } else {
2887           GST_WARNING_OBJECT (sink, "receive interrupted");
2888         }
2889         g_free (str);
2890         break;
2891       }
2892     }
2893     return res;
2894   }
2895 handle_request_failed:
2896   {
2897     /* ERROR was posted */
2898     gst_rtsp_message_unset (response);
2899     return res;
2900   }
2901 server_eof:
2902   {
2903     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2904     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2905         ("The server closed the connection."));
2906     gst_rtsp_message_unset (response);
2907     return res;
2908   }
2909 }
2910
2911 static void
2912 gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state)
2913 {
2914   GST_DEBUG_OBJECT (sink, "Setting internal state to %s",
2915       gst_element_state_get_name (state));
2916   gst_element_set_state (GST_ELEMENT (sink->internal_bin), state);
2917 }
2918
2919 /**
2920  * gst_rtsp_client_sink_send:
2921  * @src: the rtsp source
2922  * @conn: the connection to send on
2923  * @request: must point to a valid request
2924  * @response: must point to an empty #GstRTSPMessage
2925  * @code: an optional code result
2926  *
2927  * send @request and retrieve the response in @response. optionally @code can be
2928  * non-NULL in which case it will contain the status code of the response.
2929  *
2930  * If This function returns #GST_RTSP_OK, @response will contain a valid response
2931  * message that should be cleaned with gst_rtsp_message_unset() after usage.
2932  *
2933  * If @code is NULL, this function will return #GST_RTSP_ERROR (with an invalid
2934  * @response message) if the response code was not 200 (OK).
2935  *
2936  * If the attempt results in an authentication failure, then this will attempt
2937  * to retrieve authentication credentials via gst_rtsp_client_sink_setup_auth and retry
2938  * the request.
2939  *
2940  * Returns: #GST_RTSP_OK if the processing was successful.
2941  */
2942 static GstRTSPResult
2943 gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
2944     GstRTSPMessage * request, GstRTSPMessage * response,
2945     GstRTSPStatusCode * code)
2946 {
2947   GstRTSPStatusCode int_code = GST_RTSP_STS_OK;
2948   GstRTSPResult res = GST_RTSP_ERROR;
2949   gint count;
2950   gboolean retry;
2951   GstRTSPMethod method = GST_RTSP_INVALID;
2952
2953   count = 0;
2954   do {
2955     retry = FALSE;
2956
2957     /* make sure we don't loop forever */
2958     if (count++ > 8)
2959       break;
2960
2961     /* save method so we can disable it when the server complains */
2962     method = request->type_data.request.method;
2963
2964     if ((res =
2965             gst_rtsp_client_sink_try_send (sink, conninfo, request, response,
2966                 &int_code)) < 0)
2967       goto error;
2968
2969     switch (int_code) {
2970       case GST_RTSP_STS_UNAUTHORIZED:
2971         if (gst_rtsp_client_sink_setup_auth (sink, response)) {
2972           /* Try the request/response again after configuring the auth info
2973            * and loop again */
2974           retry = TRUE;
2975         }
2976         break;
2977       default:
2978         break;
2979     }
2980   } while (retry == TRUE);
2981
2982   /* If the user requested the code, let them handle errors, otherwise
2983    * post an error below */
2984   if (code != NULL)
2985     *code = int_code;
2986   else if (int_code != GST_RTSP_STS_OK)
2987     goto error_response;
2988
2989   return res;
2990
2991   /* ERRORS */
2992 error:
2993   {
2994     GST_DEBUG_OBJECT (sink, "got error %d", res);
2995     return res;
2996   }
2997 error_response:
2998   {
2999     res = GST_RTSP_ERROR;
3000
3001     switch (response->type_data.response.code) {
3002       case GST_RTSP_STS_NOT_FOUND:
3003         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL), ("%s",
3004                 response->type_data.response.reason));
3005         break;
3006       case GST_RTSP_STS_UNAUTHORIZED:
3007         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_AUTHORIZED, (NULL), ("%s",
3008                 response->type_data.response.reason));
3009         break;
3010       case GST_RTSP_STS_MOVED_PERMANENTLY:
3011       case GST_RTSP_STS_MOVE_TEMPORARILY:
3012       {
3013         gchar *new_location;
3014         GstRTSPLowerTrans transports;
3015
3016         GST_DEBUG_OBJECT (sink, "got redirection");
3017         /* if we don't have a Location Header, we must error */
3018         if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_LOCATION,
3019                 &new_location, 0) < 0)
3020           break;
3021
3022         /* When we receive a redirect result, we go back to the INIT state after
3023          * parsing the new URI. The caller should do the needed steps to issue
3024          * a new setup when it detects this state change. */
3025         GST_DEBUG_OBJECT (sink, "redirection to %s", new_location);
3026
3027         /* save current transports */
3028         if (sink->conninfo.url)
3029           transports = sink->conninfo.url->transports;
3030         else
3031           transports = GST_RTSP_LOWER_TRANS_UNKNOWN;
3032
3033         gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (sink), new_location,
3034             NULL);
3035
3036         /* set old transports */
3037         if (sink->conninfo.url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN)
3038           sink->conninfo.url->transports = transports;
3039
3040         sink->need_redirect = TRUE;
3041         sink->state = GST_RTSP_STATE_INIT;
3042         res = GST_RTSP_OK;
3043         break;
3044       }
3045       case GST_RTSP_STS_NOT_ACCEPTABLE:
3046       case GST_RTSP_STS_NOT_IMPLEMENTED:
3047       case GST_RTSP_STS_METHOD_NOT_ALLOWED:
3048         GST_WARNING_OBJECT (sink, "got NOT IMPLEMENTED, disable method %s",
3049             gst_rtsp_method_as_text (method));
3050         sink->methods &= ~method;
3051         res = GST_RTSP_OK;
3052         break;
3053       default:
3054         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3055             ("Got error response: %d (%s).", response->type_data.response.code,
3056                 response->type_data.response.reason));
3057         break;
3058     }
3059     /* if we return ERROR we should unset the response ourselves */
3060     if (res == GST_RTSP_ERROR)
3061       gst_rtsp_message_unset (response);
3062
3063     return res;
3064   }
3065 }
3066
3067 /* parse the response and collect all the supported methods. We need this
3068  * information so that we don't try to send an unsupported request to the
3069  * server.
3070  */
3071 static gboolean
3072 gst_rtsp_client_sink_parse_methods (GstRTSPClientSink * sink,
3073     GstRTSPMessage * response)
3074 {
3075   GstRTSPHeaderField field;
3076   gchar *respoptions;
3077   gint indx = 0;
3078
3079   /* reset supported methods */
3080   sink->methods = 0;
3081
3082   /* Try Allow Header first */
3083   field = GST_RTSP_HDR_ALLOW;
3084   while (TRUE) {
3085     respoptions = NULL;
3086     gst_rtsp_message_get_header (response, field, &respoptions, indx);
3087     if (indx == 0 && !respoptions) {
3088       /* if no Allow header was found then try the Public header... */
3089       field = GST_RTSP_HDR_PUBLIC;
3090       gst_rtsp_message_get_header (response, field, &respoptions, indx);
3091     }
3092     if (!respoptions)
3093       break;
3094
3095     sink->methods |= gst_rtsp_options_from_text (respoptions);
3096
3097     indx++;
3098   }
3099
3100   if (sink->methods == 0) {
3101     /* neither Allow nor Public are required, assume the server supports
3102      * at least SETUP. */
3103     GST_DEBUG_OBJECT (sink, "could not get OPTIONS");
3104     sink->methods = GST_RTSP_SETUP;
3105   }
3106
3107   /* Even if the server replied, and didn't say it supports
3108    * RECORD|ANNOUNCE, try anyway by assuming it does */
3109   sink->methods |= GST_RTSP_ANNOUNCE | GST_RTSP_RECORD;
3110
3111   if (!(sink->methods & GST_RTSP_SETUP))
3112     goto no_setup;
3113
3114   return TRUE;
3115
3116   /* ERRORS */
3117 no_setup:
3118   {
3119     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
3120         ("Server does not support SETUP."));
3121     return FALSE;
3122   }
3123 }
3124
3125 static GstRTSPResult
3126 gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink,
3127     gboolean async)
3128 {
3129   GstRTSPResult res;
3130   GstRTSPMessage request = { 0 };
3131   GstRTSPMessage response = { 0 };
3132   GSocket *conn_socket;
3133   GSocketAddress *sa;
3134   GInetAddress *ia;
3135
3136   sink->need_redirect = FALSE;
3137
3138   /* can't continue without a valid url */
3139   if (G_UNLIKELY (sink->conninfo.url == NULL)) {
3140     res = GST_RTSP_EINVAL;
3141     goto no_url;
3142   }
3143   sink->tried_url_auth = FALSE;
3144
3145   if ((res = gst_rtsp_conninfo_connect (sink, &sink->conninfo, async)) < 0)
3146     goto connect_failed;
3147
3148   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
3149   sa = g_socket_get_remote_address (conn_socket, NULL);
3150   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
3151
3152   sink->server_ip = g_inet_address_to_string (ia);
3153
3154   g_object_unref (sa);
3155
3156   /* create OPTIONS */
3157   GST_DEBUG_OBJECT (sink, "create options...");
3158   res =
3159       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_OPTIONS,
3160       sink->conninfo.url_str);
3161   if (res < 0)
3162     goto create_request_failed;
3163
3164   /* send OPTIONS */
3165   GST_DEBUG_OBJECT (sink, "send options...");
3166
3167   if (async)
3168     GST_ELEMENT_PROGRESS (sink, CONTINUE, "open",
3169         ("Retrieving server options"));
3170
3171   if ((res =
3172           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
3173               &response, NULL)) < 0)
3174     goto send_error;
3175
3176   /* parse OPTIONS */
3177   if (!gst_rtsp_client_sink_parse_methods (sink, &response))
3178     goto methods_error;
3179
3180   /* FIXME: Do we need to handle REDIRECT responses for OPTIONS? */
3181
3182   /* clean up any messages */
3183   gst_rtsp_message_unset (&request);
3184   gst_rtsp_message_unset (&response);
3185
3186   return res;
3187
3188   /* ERRORS */
3189 no_url:
3190   {
3191     GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
3192         ("No valid RTSP URL was provided"));
3193     goto cleanup_error;
3194   }
3195 connect_failed:
3196   {
3197     gchar *str = gst_rtsp_strresult (res);
3198
3199     if (res != GST_RTSP_EINTR) {
3200       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
3201           ("Failed to connect. (%s)", str));
3202     } else {
3203       GST_WARNING_OBJECT (sink, "connect interrupted");
3204     }
3205     g_free (str);
3206     goto cleanup_error;
3207   }
3208 create_request_failed:
3209   {
3210     gchar *str = gst_rtsp_strresult (res);
3211
3212     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3213         ("Could not create request. (%s)", str));
3214     g_free (str);
3215     goto cleanup_error;
3216   }
3217 send_error:
3218   {
3219     /* Don't post a message - the rtsp_send method will have
3220      * taken care of it because we passed NULL for the response code */
3221     goto cleanup_error;
3222   }
3223 methods_error:
3224   {
3225     /* error was posted */
3226     res = GST_RTSP_ERROR;
3227     goto cleanup_error;
3228   }
3229 cleanup_error:
3230   {
3231     if (sink->conninfo.connection) {
3232       GST_DEBUG_OBJECT (sink, "free connection");
3233       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3234     }
3235     gst_rtsp_message_unset (&request);
3236     gst_rtsp_message_unset (&response);
3237     return res;
3238   }
3239 }
3240
3241 static GstRTSPResult
3242 gst_rtsp_client_sink_open (GstRTSPClientSink * sink, gboolean async)
3243 {
3244   GstRTSPResult ret;
3245
3246   sink->methods =
3247       GST_RTSP_SETUP | GST_RTSP_RECORD | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN;
3248
3249   g_mutex_lock (&sink->open_conn_lock);
3250   sink->open_conn_start = TRUE;
3251   g_cond_broadcast (&sink->open_conn_cond);
3252   GST_DEBUG_OBJECT (sink, "connection to server started");
3253   g_mutex_unlock (&sink->open_conn_lock);
3254
3255   if ((ret = gst_rtsp_client_sink_connect_to_server (sink, async)) < 0)
3256     goto open_failed;
3257
3258   if (async)
3259     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3260
3261   return ret;
3262
3263   /* ERRORS */
3264 open_failed:
3265   {
3266     GST_WARNING_OBJECT (sink, "Failed to connect to server");
3267     sink->open_error = TRUE;
3268     if (async)
3269       gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3270     return ret;
3271   }
3272 }
3273
3274 static GstRTSPResult
3275 gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async,
3276     gboolean only_close)
3277 {
3278   GstRTSPMessage request = { 0 };
3279   GstRTSPMessage response = { 0 };
3280   GstRTSPResult res = GST_RTSP_OK;
3281   GList *walk;
3282   const gchar *control;
3283
3284   GST_DEBUG_OBJECT (sink, "TEARDOWN...");
3285
3286   gst_rtsp_client_sink_set_state (sink, GST_STATE_NULL);
3287
3288   if (sink->state < GST_RTSP_STATE_READY) {
3289     GST_DEBUG_OBJECT (sink, "not ready, doing cleanup");
3290     goto close;
3291   }
3292
3293   if (only_close)
3294     goto close;
3295
3296   /* construct a control url */
3297   control = get_aggregate_control (sink);
3298
3299   if (!(sink->methods & (GST_RTSP_RECORD | GST_RTSP_TEARDOWN)))
3300     goto not_supported;
3301
3302   /* stop streaming */
3303   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3304     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3305
3306     if (context->stream_transport)
3307       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
3308
3309     if (context->joined) {
3310       gst_rtsp_stream_leave_bin (context->stream, GST_BIN (sink->internal_bin),
3311           sink->rtpbin);
3312       context->joined = FALSE;
3313     }
3314   }
3315
3316   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3317     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3318     const gchar *setup_url;
3319     GstRTSPConnInfo *info;
3320
3321     GST_DEBUG_OBJECT (sink, "Looking at stream %p for teardown",
3322         context->stream);
3323
3324     /* try aggregate control first but do non-aggregate control otherwise */
3325     if (control)
3326       setup_url = control;
3327     else if ((setup_url = context->conninfo.location) == NULL) {
3328       GST_DEBUG_OBJECT (sink, "Skipping TEARDOWN stream %p - no setup URL",
3329           context->stream);
3330       continue;
3331     }
3332
3333     if (sink->conninfo.connection) {
3334       info = &sink->conninfo;
3335     } else if (context->conninfo.connection) {
3336       info = &context->conninfo;
3337     } else {
3338       continue;
3339     }
3340     if (!info->connected)
3341       goto next;
3342
3343     /* do TEARDOWN */
3344     GST_DEBUG_OBJECT (sink, "Sending teardown for stream %p at URL %s",
3345         context->stream, setup_url);
3346     res =
3347         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_TEARDOWN,
3348         setup_url);
3349     if (res < 0)
3350       goto create_request_failed;
3351
3352     if (async)
3353       GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
3354
3355     if ((res =
3356             gst_rtsp_client_sink_send (sink, info, &request,
3357                 &response, NULL)) < 0)
3358       goto send_error;
3359
3360     /* FIXME, parse result? */
3361     gst_rtsp_message_unset (&request);
3362     gst_rtsp_message_unset (&response);
3363
3364   next:
3365     /* early exit when we did aggregate control */
3366     if (control)
3367       break;
3368   }
3369
3370 close:
3371   /* close connections */
3372   GST_DEBUG_OBJECT (sink, "closing connection...");
3373   gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3374   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3375     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
3376     gst_rtsp_conninfo_close (sink, &stream->conninfo, TRUE);
3377   }
3378
3379   /* cleanup */
3380   gst_rtsp_client_sink_cleanup (sink);
3381
3382   sink->state = GST_RTSP_STATE_INVALID;
3383
3384   if (async)
3385     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_CLOSE, res);
3386
3387   return res;
3388
3389   /* ERRORS */
3390 create_request_failed:
3391   {
3392     gchar *str = gst_rtsp_strresult (res);
3393
3394     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3395         ("Could not create request. (%s)", str));
3396     g_free (str);
3397     goto close;
3398   }
3399 send_error:
3400   {
3401     gchar *str = gst_rtsp_strresult (res);
3402
3403     gst_rtsp_message_unset (&request);
3404     if (res != GST_RTSP_EINTR) {
3405       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
3406           ("Could not send message. (%s)", str));
3407     } else {
3408       GST_WARNING_OBJECT (sink, "TEARDOWN interrupted");
3409     }
3410     g_free (str);
3411     goto close;
3412   }
3413 not_supported:
3414   {
3415     GST_DEBUG_OBJECT (sink,
3416         "TEARDOWN and PLAY not supported, can't do TEARDOWN");
3417     goto close;
3418   }
3419 }
3420
3421 static gboolean
3422 gst_rtsp_client_sink_configure_manager (GstRTSPClientSink * sink)
3423 {
3424   GstElement *rtpbin;
3425   GstStateChangeReturn ret;
3426
3427   rtpbin = sink->rtpbin;
3428
3429   if (rtpbin == NULL) {
3430     GObjectClass *klass;
3431
3432     rtpbin = gst_element_factory_make ("rtpbin", NULL);
3433     if (rtpbin == NULL)
3434       goto no_rtpbin;
3435
3436     gst_bin_add (GST_BIN_CAST (sink->internal_bin), rtpbin);
3437
3438     sink->rtpbin = rtpbin;
3439
3440     /* Any more settings we should configure on rtpbin here? */
3441     g_object_set (sink->rtpbin, "latency", sink->latency, NULL);
3442
3443     klass = G_OBJECT_GET_CLASS (G_OBJECT (rtpbin));
3444
3445     if (g_object_class_find_property (klass, "ntp-time-source")) {
3446       g_object_set (sink->rtpbin, "ntp-time-source", sink->ntp_time_source,
3447           NULL);
3448     }
3449
3450     if (sink->sdes && g_object_class_find_property (klass, "sdes")) {
3451       g_object_set (sink->rtpbin, "sdes", sink->sdes, NULL);
3452     }
3453
3454     g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER], 0,
3455         sink->rtpbin);
3456   }
3457
3458   ret = gst_element_set_state (rtpbin, GST_STATE_PAUSED);
3459   if (ret == GST_STATE_CHANGE_FAILURE)
3460     goto start_manager_failure;
3461
3462   return TRUE;
3463
3464 no_rtpbin:
3465   {
3466     GST_WARNING ("no rtpbin element");
3467     g_warning ("failed to create element 'rtpbin', check your installation");
3468     return FALSE;
3469   }
3470 start_manager_failure:
3471   {
3472     GST_DEBUG_OBJECT (sink, "could not start session manager");
3473     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), rtpbin);
3474     return FALSE;
3475   }
3476 }
3477
3478 static GstElement *
3479 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPClientSink * sink)
3480 {
3481   GstRTSPStream *stream = NULL;
3482   GstElement *ret = NULL;
3483   GList *walk;
3484
3485   GST_RTSP_STATE_LOCK (sink);
3486   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3487     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3488
3489     if (sessid == gst_rtsp_stream_get_index (context->stream)) {
3490       stream = context->stream;
3491       break;
3492     }
3493   }
3494
3495   if (stream != NULL) {
3496     GST_DEBUG_OBJECT (sink, "Creating aux sender for stream %u", sessid);
3497     ret = gst_rtsp_stream_request_aux_sender (stream, sessid);
3498   }
3499
3500   GST_RTSP_STATE_UNLOCK (sink);
3501
3502   return ret;
3503 }
3504
3505 static gboolean
3506 gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink)
3507 {
3508   GstRTSPStreamContext *context;
3509   GList *walk;
3510   const gchar *base;
3511   gboolean has_slash;
3512
3513   GST_DEBUG_OBJECT (sink, "Collecting stream information");
3514
3515   if (!gst_rtsp_client_sink_configure_manager (sink))
3516     return FALSE;
3517
3518   base = get_aggregate_control (sink);
3519   /* check if the base ends with / */
3520   has_slash = g_str_has_suffix (base, "/");
3521
3522   g_mutex_lock (&sink->preroll_lock);
3523   while (sink->contexts == NULL && !sink->conninfo.flushing) {
3524     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3525   }
3526   g_mutex_unlock (&sink->preroll_lock);
3527
3528   /* FIXME: Need different locking - need to protect against pad releases
3529    * and potential state changes ruining things here */
3530   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3531     GstPad *srcpad;
3532
3533     context = (GstRTSPStreamContext *) walk->data;
3534     if (context->stream)
3535       continue;
3536
3537     g_mutex_lock (&sink->preroll_lock);
3538     while (!context->prerolled && !sink->conninfo.flushing) {
3539       GST_DEBUG_OBJECT (sink, "Waiting for caps on stream %d", context->index);
3540       g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3541     }
3542     if (sink->conninfo.flushing) {
3543       g_mutex_unlock (&sink->preroll_lock);
3544       break;
3545     }
3546     g_mutex_unlock (&sink->preroll_lock);
3547
3548     if (context->payloader == NULL)
3549       continue;
3550
3551     srcpad = gst_element_get_static_pad (context->payloader, "src");
3552
3553     GST_DEBUG_OBJECT (sink, "Creating stream object for stream %d",
3554         context->index);
3555     context->stream =
3556         gst_rtsp_client_sink_create_stream (sink, context, context->payloader,
3557         srcpad);
3558
3559     /* concatenate the two strings, insert / when not present */
3560     g_free (context->conninfo.location);
3561     context->conninfo.location =
3562         g_strdup_printf ("%s%sstream=%d", base, has_slash ? "" : "/",
3563         context->index);
3564
3565     if (sink->rtx_time > 0) {
3566       /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
3567       g_signal_connect (sink->rtpbin, "request-aux-sender",
3568           (GCallback) request_aux_sender, sink);
3569     }
3570
3571     if (!gst_rtsp_stream_join_bin (context->stream,
3572             GST_BIN (sink->internal_bin), sink->rtpbin, GST_STATE_PAUSED)) {
3573       goto join_bin_failed;
3574     }
3575     context->joined = TRUE;
3576
3577     /* Block the stream, as it does not have any transport parts yet */
3578     gst_rtsp_stream_set_blocked (context->stream, TRUE);
3579
3580     /* Let the stream object receive data */
3581     gst_pad_remove_probe (srcpad, context->payloader_block_id);
3582
3583     gst_object_unref (srcpad);
3584   }
3585
3586   /* Now wait for the preroll of the rtp bin */
3587   g_mutex_lock (&sink->preroll_lock);
3588   while (!sink->prerolled && !sink->conninfo.flushing) {
3589     GST_LOG_OBJECT (sink, "Waiting for preroll before continuing");
3590     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3591   }
3592   GST_LOG_OBJECT (sink, "Marking streams as collected");
3593   sink->streams_collected = TRUE;
3594   g_mutex_unlock (&sink->preroll_lock);
3595
3596   return TRUE;
3597
3598 join_bin_failed:
3599
3600   GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3601       ("Could not start stream %d", context->index));
3602   return FALSE;
3603 }
3604
3605 static GstRTSPResult
3606 gst_rtsp_client_sink_create_transports_string (GstRTSPClientSink * sink,
3607     GstRTSPStreamContext * context, GSocketFamily family,
3608     GstRTSPLowerTrans protocols, GstRTSPProfile profiles, gchar ** transports)
3609 {
3610   GString *result;
3611   GstRTSPStream *stream = context->stream;
3612   gboolean first = TRUE;
3613
3614   /* the default RTSP transports */
3615   result = g_string_new ("RTP");
3616
3617   while (profiles != 0) {
3618     if (!first)
3619       g_string_append (result, ",RTP");
3620
3621     if (profiles & GST_RTSP_PROFILE_SAVPF) {
3622       g_string_append (result, "/SAVPF");
3623       profiles &= ~GST_RTSP_PROFILE_SAVPF;
3624     } else if (profiles & GST_RTSP_PROFILE_SAVP) {
3625       g_string_append (result, "/SAVP");
3626       profiles &= ~GST_RTSP_PROFILE_SAVP;
3627     } else if (profiles & GST_RTSP_PROFILE_AVPF) {
3628       g_string_append (result, "/AVPF");
3629       profiles &= ~GST_RTSP_PROFILE_AVPF;
3630     } else if (profiles & GST_RTSP_PROFILE_AVP) {
3631       g_string_append (result, "/AVP");
3632       profiles &= ~GST_RTSP_PROFILE_AVP;
3633     } else {
3634       GST_WARNING_OBJECT (sink, "Unimplemented profile(s) 0x%x", profiles);
3635       break;
3636     }
3637
3638     if (protocols & GST_RTSP_LOWER_TRANS_UDP) {
3639       GstRTSPRange ports;
3640
3641       GST_DEBUG_OBJECT (sink, "adding UDP unicast");
3642       gst_rtsp_stream_get_server_port (stream, &ports, family);
3643
3644       g_string_append_printf (result, "/UDP;unicast;client_port=%d-%d",
3645           ports.min, ports.max);
3646     } else if (protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3647       GstRTSPAddress *addr =
3648           gst_rtsp_stream_get_multicast_address (stream, family);
3649       if (addr) {
3650         GST_DEBUG_OBJECT (sink, "adding UDP multicast");
3651         g_string_append_printf (result, "/UDP;multicast;client_port=%d-%d",
3652             addr->port, addr->port + addr->n_ports - 1);
3653         gst_rtsp_address_free (addr);
3654       }
3655     } else if (protocols & GST_RTSP_LOWER_TRANS_TCP) {
3656       GST_DEBUG_OBJECT (sink, "adding TCP");
3657       g_string_append_printf (result, "/TCP;unicast;interleaved=%d-%d",
3658           sink->free_channel, sink->free_channel + 1);
3659     }
3660
3661     g_string_append (result, ";mode=RECORD");
3662     /* FIXME: Support appending too:
3663        if (sink->append)
3664        g_string_append (result, ";append");
3665      */
3666
3667     first = FALSE;
3668   }
3669
3670   if (first) {
3671     /* No valid transport could be constructed */
3672     GST_ERROR_OBJECT (sink, "No supported profiles configured");
3673     goto fail;
3674   }
3675
3676   *transports = g_string_free (result, FALSE);
3677
3678   GST_DEBUG_OBJECT (sink, "prepared transports %s", GST_STR_NULL (*transports));
3679
3680   return GST_RTSP_OK;
3681 fail:
3682   g_string_free (result, TRUE);
3683   return GST_RTSP_ERROR;
3684 }
3685
3686 static GstCaps *
3687 signal_get_srtcp_params (GstRTSPClientSink * sink,
3688     GstRTSPStreamContext * context)
3689 {
3690   GstCaps *caps = NULL;
3691
3692   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY], 0,
3693       context->index, &caps);
3694
3695   if (caps != NULL)
3696     GST_DEBUG_OBJECT (sink, "SRTP parameters received");
3697
3698   return caps;
3699 }
3700
3701 static gchar *
3702 gst_rtsp_client_sink_stream_make_keymgmt (GstRTSPClientSink * sink,
3703     GstRTSPStreamContext * context)
3704 {
3705   gchar *base64, *result = NULL;
3706   GstMIKEYMessage *mikey_msg;
3707
3708   context->srtcpparams = signal_get_srtcp_params (sink, context);
3709   if (context->srtcpparams == NULL)
3710     context->srtcpparams = gst_rtsp_stream_get_caps (context->stream);
3711
3712   mikey_msg = gst_mikey_message_new_from_caps (context->srtcpparams);
3713   if (mikey_msg) {
3714     guint send_ssrc, send_rtx_ssrc;
3715     const GstStructure *s = gst_caps_get_structure (context->srtcpparams, 0);
3716
3717     /* add policy '0' for our SSRC */
3718     gst_rtsp_stream_get_ssrc (context->stream, &send_ssrc);
3719     GST_LOG_OBJECT (sink, "Stream %p ssrc %x", context->stream, send_ssrc);
3720     gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_ssrc, 0);
3721
3722     if (gst_structure_get_uint (s, "rtx-ssrc", &send_rtx_ssrc))
3723       gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_rtx_ssrc, 0);
3724
3725     base64 = gst_mikey_message_base64_encode (mikey_msg);
3726     gst_mikey_message_unref (mikey_msg);
3727
3728     if (base64) {
3729       result = gst_sdp_make_keymgmt (context->conninfo.location, base64);
3730       g_free (base64);
3731     }
3732   }
3733
3734   return result;
3735 }
3736
3737 /* masks to be kept in sync with the hardcoded protocol order of preference
3738  * in code below */
3739 static const guint protocol_masks[] = {
3740   GST_RTSP_LOWER_TRANS_UDP,
3741   GST_RTSP_LOWER_TRANS_UDP_MCAST,
3742   GST_RTSP_LOWER_TRANS_TCP,
3743   0
3744 };
3745
3746 /* Same for profile_masks */
3747 static const guint profile_masks[] = {
3748   GST_RTSP_PROFILE_SAVPF,
3749   GST_RTSP_PROFILE_SAVP,
3750   GST_RTSP_PROFILE_AVPF,
3751   GST_RTSP_PROFILE_AVP,
3752   0
3753 };
3754
3755 static gboolean
3756 do_send_data (GstBuffer * buffer, guint8 channel,
3757     GstRTSPStreamContext * context)
3758 {
3759   GstRTSPClientSink *sink = context->parent;
3760   GstRTSPMessage message = { 0 };
3761   GstRTSPResult res = GST_RTSP_OK;
3762   GstMapInfo map_info;
3763   guint8 *data;
3764   guint usize;
3765
3766   gst_rtsp_message_init_data (&message, channel);
3767
3768   /* FIXME, need some sort of iovec RTSPMessage here */
3769   if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ))
3770     return FALSE;
3771
3772   gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
3773
3774   res =
3775       gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message,
3776       NULL, NULL);
3777
3778   gst_rtsp_message_steal_body (&message, &data, &usize);
3779   gst_buffer_unmap (buffer, &map_info);
3780
3781   gst_rtsp_message_unset (&message);
3782
3783   return res == GST_RTSP_OK;
3784 }
3785
3786 static GstRTSPResult
3787 gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
3788 {
3789   GstRTSPResult res = GST_RTSP_ERROR;
3790   GstRTSPMessage request = { 0 };
3791   GstRTSPMessage response = { 0 };
3792   GstRTSPLowerTrans protocols;
3793   GstRTSPStatusCode code;
3794   GSocketFamily family;
3795   GSocketAddress *sa;
3796   GSocket *conn_socket;
3797   GstRTSPUrl *url;
3798   GList *walk;
3799   gchar *hval;
3800
3801   if (sink->conninfo.connection) {
3802     url = gst_rtsp_connection_get_url (sink->conninfo.connection);
3803     /* we initially allow all configured lower transports. based on the URL
3804      * transports and the replies from the server we narrow them down. */
3805     protocols = url->transports & sink->cur_protocols;
3806   } else {
3807     url = NULL;
3808     protocols = sink->cur_protocols;
3809   }
3810
3811   if (protocols == 0)
3812     goto no_protocols;
3813
3814   GST_RTSP_STATE_LOCK (sink);
3815
3816   if (G_UNLIKELY (sink->contexts == NULL))
3817     goto no_streams;
3818
3819   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3820     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3821     GstRTSPStream *stream;
3822
3823     GstRTSPConnInfo *info;
3824     GstRTSPProfile profiles;
3825     GstRTSPProfile cur_profile;
3826     gchar *transports;
3827     gint retry = 0;
3828     guint profile_mask = 0;
3829     guint mask = 0;
3830     GstCaps *caps;
3831     const GstSDPMedia *media;
3832
3833     stream = context->stream;
3834     profiles = gst_rtsp_stream_get_profiles (stream);
3835
3836     caps = gst_rtsp_stream_get_caps (stream);
3837     if (caps == NULL) {
3838       GST_DEBUG_OBJECT (sink, "skipping stream %p, no caps", stream);
3839       continue;
3840     }
3841     gst_caps_unref (caps);
3842     media = gst_sdp_message_get_media (&sink->cursdp, context->sdp_index);
3843     if (media == NULL) {
3844       GST_DEBUG_OBJECT (sink, "skipping stream %p, no SDP info", stream);
3845       continue;
3846     }
3847
3848     /* skip setup if we have no URL for it */
3849     if (context->conninfo.location == NULL) {
3850       GST_DEBUG_OBJECT (sink, "skipping stream %p, no setup", stream);
3851       continue;
3852     }
3853
3854     if (sink->conninfo.connection == NULL) {
3855       if (!gst_rtsp_conninfo_connect (sink, &context->conninfo, async)) {
3856         GST_DEBUG_OBJECT (sink, "skipping stream %p, failed to connect",
3857             stream);
3858         continue;
3859       }
3860       info = &context->conninfo;
3861     } else {
3862       info = &sink->conninfo;
3863     }
3864     GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
3865         context->conninfo.location);
3866
3867     conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
3868     sa = g_socket_get_local_address (conn_socket, NULL);
3869     family = g_socket_address_get_family (sa);
3870     g_object_unref (sa);
3871
3872   next_protocol:
3873     /* first selectable profile */
3874     while (profile_masks[profile_mask]
3875         && !(profiles & profile_masks[profile_mask]))
3876       profile_mask++;
3877     if (!profile_masks[profile_mask])
3878       goto no_profiles;
3879
3880     /* first selectable protocol */
3881     while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
3882       mask++;
3883     if (!protocol_masks[mask])
3884       goto no_protocols;
3885
3886   retry:
3887     GST_DEBUG_OBJECT (sink, "protocols = 0x%x, protocol mask = 0x%x", protocols,
3888         protocol_masks[mask]);
3889     /* create a string with first transport in line */
3890     transports = NULL;
3891     cur_profile = profiles & profile_masks[profile_mask];
3892     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
3893         protocols & protocol_masks[mask], cur_profile, &transports);
3894     if (res < 0 || transports == NULL)
3895       goto setup_transport_failed;
3896
3897     if (strlen (transports) == 0) {
3898       g_free (transports);
3899       GST_DEBUG_OBJECT (sink, "no transports found");
3900       mask++;
3901       profile_mask = 0;
3902       goto next_protocol;
3903     }
3904
3905     GST_DEBUG_OBJECT (sink, "transport is %s", GST_STR_NULL (transports));
3906
3907     /* create SETUP request */
3908     res =
3909         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_SETUP,
3910         context->conninfo.location);
3911     if (res < 0) {
3912       g_free (transports);
3913       goto create_request_failed;
3914     }
3915
3916     /* set up keys */
3917     if (cur_profile == GST_RTSP_PROFILE_SAVP ||
3918         cur_profile == GST_RTSP_PROFILE_SAVPF) {
3919       hval = gst_rtsp_client_sink_stream_make_keymgmt (sink, context);
3920       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_KEYMGMT, hval);
3921     }
3922
3923     /* if the user wants a non default RTP packet size we add the blocksize
3924      * parameter */
3925     if (sink->rtp_blocksize > 0) {
3926       hval = g_strdup_printf ("%d", sink->rtp_blocksize);
3927       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_BLOCKSIZE, hval);
3928     }
3929
3930     if (async)
3931       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request", ("SETUP stream %d",
3932               context->index));
3933
3934     {
3935       GstRTSPTransport *transport;
3936
3937       gst_rtsp_transport_new (&transport);
3938       if (gst_rtsp_transport_parse (transports, transport) != GST_RTSP_OK)
3939         goto parse_transport_failed;
3940       if (transport->lower_transport != GST_RTSP_LOWER_TRANS_TCP) {
3941         if (!gst_rtsp_stream_allocate_udp_sockets (stream, family, transport,
3942                 FALSE)) {
3943           gst_rtsp_transport_free (transport);
3944           goto allocate_udp_ports_failed;
3945         }
3946       }
3947       if (!gst_rtsp_stream_complete_stream (stream, transport)) {
3948         gst_rtsp_transport_free (transport);
3949         goto complete_stream_failed;
3950       }
3951
3952       gst_rtsp_transport_free (transport);
3953       gst_rtsp_stream_set_blocked (stream, FALSE);
3954     }
3955
3956     /* FIXME:
3957      * the creation of the transports string depends on
3958      * calling stream_get_server_port, which only starts returning
3959      * something meaningful after a call to stream_allocate_udp_sockets
3960      * has been made, this function expects a transport that we parse
3961      * from the transport string ...
3962      *
3963      * Significant refactoring is in order, but does not look entirely
3964      * trivial, for now we put a band aid on and create a second transport
3965      * string after the stream has been completed, to pass it in
3966      * the request headers instead of the previous, incomplete one.
3967      */
3968     g_free (transports);
3969     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
3970         protocols & protocol_masks[mask], cur_profile, &transports);
3971
3972     /* select transport */
3973     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_TRANSPORT, transports);
3974
3975     /* handle the code ourselves */
3976     res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
3977     if (res < 0)
3978       goto send_error;
3979
3980     switch (code) {
3981       case GST_RTSP_STS_OK:
3982         break;
3983       case GST_RTSP_STS_UNSUPPORTED_TRANSPORT:
3984         gst_rtsp_message_unset (&request);
3985         gst_rtsp_message_unset (&response);
3986
3987         /* Try another profile. If no more, move to the next protocol */
3988         profile_mask++;
3989         while (profile_masks[profile_mask]
3990             && !(profiles & profile_masks[profile_mask]))
3991           profile_mask++;
3992         if (profile_masks[profile_mask])
3993           goto retry;
3994
3995         /* select next available protocol, give up on this stream if none */
3996         /* Reset profiles to try: */
3997         profile_mask = 0;
3998
3999         mask++;
4000         while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
4001           mask++;
4002         if (!protocol_masks[mask])
4003           continue;
4004         else
4005           goto retry;
4006       default:
4007         goto response_error;
4008     }
4009
4010     /* parse response transport */
4011     {
4012       gchar *resptrans = NULL;
4013       GstRTSPTransport *transport;
4014
4015       gst_rtsp_message_get_header (&response, GST_RTSP_HDR_TRANSPORT,
4016           &resptrans, 0);
4017       if (!resptrans) {
4018         goto no_transport;
4019       }
4020
4021       gst_rtsp_transport_new (&transport);
4022
4023       /* parse transport, go to next stream on parse error */
4024       if (gst_rtsp_transport_parse (resptrans, transport) != GST_RTSP_OK) {
4025         GST_WARNING_OBJECT (sink, "failed to parse transport %s", resptrans);
4026         goto next;
4027       }
4028
4029       /* update allowed transports for other streams. once the transport of
4030        * one stream has been determined, we make sure that all other streams
4031        * are configured in the same way */
4032       switch (transport->lower_transport) {
4033         case GST_RTSP_LOWER_TRANS_TCP:
4034           GST_DEBUG_OBJECT (sink, "stream %p as TCP interleaved", stream);
4035           protocols = GST_RTSP_LOWER_TRANS_TCP;
4036           sink->interleaved = TRUE;
4037           /* update free channels */
4038           sink->free_channel =
4039               MAX (transport->interleaved.min, sink->free_channel);
4040           sink->free_channel =
4041               MAX (transport->interleaved.max, sink->free_channel);
4042           sink->free_channel++;
4043           break;
4044         case GST_RTSP_LOWER_TRANS_UDP_MCAST:
4045           /* only allow multicast for other streams */
4046           GST_DEBUG_OBJECT (sink, "stream %p as UDP multicast", stream);
4047           protocols = GST_RTSP_LOWER_TRANS_UDP_MCAST;
4048           break;
4049         case GST_RTSP_LOWER_TRANS_UDP:
4050           /* only allow unicast for other streams */
4051           GST_DEBUG_OBJECT (sink, "stream %p as UDP unicast", stream);
4052           protocols = GST_RTSP_LOWER_TRANS_UDP;
4053           /* Update transport with server destination if not provided by the server */
4054           if (transport->destination == NULL) {
4055             transport->destination = g_strdup (sink->server_ip);
4056           }
4057           break;
4058         default:
4059           GST_DEBUG_OBJECT (sink, "stream %p unknown transport %d", stream,
4060               transport->lower_transport);
4061           break;
4062       }
4063
4064       if (!retry) {
4065         GST_DEBUG ("Configuring the stream transport for stream %d",
4066             context->index);
4067         if (context->stream_transport == NULL)
4068           context->stream_transport =
4069               gst_rtsp_stream_transport_new (stream, transport);
4070         else
4071           gst_rtsp_stream_transport_set_transport (context->stream_transport,
4072               transport);
4073
4074         if (transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
4075           /* our callbacks to send data on this TCP connection */
4076           gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
4077               (GstRTSPSendFunc) do_send_data,
4078               (GstRTSPSendFunc) do_send_data, context, NULL);
4079         }
4080
4081         /* The stream_transport now owns the transport */
4082         transport = NULL;
4083
4084         gst_rtsp_stream_transport_set_active (context->stream_transport, TRUE);
4085       }
4086     next:
4087       if (transport)
4088         gst_rtsp_transport_free (transport);
4089       /* clean up used RTSP messages */
4090       gst_rtsp_message_unset (&request);
4091       gst_rtsp_message_unset (&response);
4092     }
4093   }
4094   GST_RTSP_STATE_UNLOCK (sink);
4095
4096   /* store the transport protocol that was configured */
4097   sink->cur_protocols = protocols;
4098
4099   return res;
4100
4101 no_streams:
4102   {
4103     GST_RTSP_STATE_UNLOCK (sink);
4104     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4105         ("SDP contains no streams"));
4106     return GST_RTSP_ERROR;
4107   }
4108 setup_transport_failed:
4109   {
4110     GST_RTSP_STATE_UNLOCK (sink);
4111     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4112         ("Could not setup transport."));
4113     res = GST_RTSP_ERROR;
4114     goto cleanup_error;
4115   }
4116 no_profiles:
4117   {
4118     GST_RTSP_STATE_UNLOCK (sink);
4119     /* no transport possible, post an error and stop */
4120     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
4121         ("Could not connect to server, no profiles left"));
4122     return GST_RTSP_ERROR;
4123   }
4124 no_protocols:
4125   {
4126     GST_RTSP_STATE_UNLOCK (sink);
4127     /* no transport possible, post an error and stop */
4128     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
4129         ("Could not connect to server, no protocols left"));
4130     return GST_RTSP_ERROR;
4131   }
4132 no_transport:
4133   {
4134     GST_RTSP_STATE_UNLOCK (sink);
4135     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4136         ("Server did not select transport."));
4137     res = GST_RTSP_ERROR;
4138     goto cleanup_error;
4139   }
4140 create_request_failed:
4141   {
4142     gchar *str = gst_rtsp_strresult (res);
4143
4144     GST_RTSP_STATE_UNLOCK (sink);
4145     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4146         ("Could not create request. (%s)", str));
4147     g_free (str);
4148     goto cleanup_error;
4149   }
4150 parse_transport_failed:
4151   {
4152     GST_RTSP_STATE_UNLOCK (sink);
4153     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4154         ("Could not parse transport."));
4155     res = GST_RTSP_ERROR;
4156     goto cleanup_error;
4157   }
4158 allocate_udp_ports_failed:
4159   {
4160     GST_RTSP_STATE_UNLOCK (sink);
4161     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4162         ("Could not parse transport."));
4163     res = GST_RTSP_ERROR;
4164     goto cleanup_error;
4165   }
4166 complete_stream_failed:
4167   {
4168     GST_RTSP_STATE_UNLOCK (sink);
4169     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4170         ("Could not parse transport."));
4171     res = GST_RTSP_ERROR;
4172     goto cleanup_error;
4173   }
4174 send_error:
4175   {
4176     gchar *str = gst_rtsp_strresult (res);
4177
4178     GST_RTSP_STATE_UNLOCK (sink);
4179     if (res != GST_RTSP_EINTR) {
4180       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4181           ("Could not send message. (%s)", str));
4182     } else {
4183       GST_WARNING_OBJECT (sink, "send interrupted");
4184     }
4185     g_free (str);
4186     goto cleanup_error;
4187   }
4188 response_error:
4189   {
4190     const gchar *str = gst_rtsp_status_as_text (code);
4191
4192     GST_RTSP_STATE_UNLOCK (sink);
4193     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4194         ("Error (%d): %s", code, GST_STR_NULL (str)));
4195     res = GST_RTSP_ERROR;
4196     goto cleanup_error;
4197   }
4198 cleanup_error:
4199   {
4200     gst_rtsp_message_unset (&request);
4201     gst_rtsp_message_unset (&response);
4202     return res;
4203   }
4204 }
4205
4206 static GstRTSPResult
4207 gst_rtsp_client_sink_ensure_open (GstRTSPClientSink * sink, gboolean async)
4208 {
4209   GstRTSPResult res = GST_RTSP_OK;
4210
4211   if (sink->state < GST_RTSP_STATE_READY) {
4212     res = GST_RTSP_ERROR;
4213     if (sink->open_error) {
4214       GST_DEBUG_OBJECT (sink, "the stream was in error");
4215       goto done;
4216     }
4217     if (async)
4218       gst_rtsp_client_sink_loop_start_cmd (sink, CMD_OPEN);
4219
4220     if ((res = gst_rtsp_client_sink_open (sink, async)) < 0) {
4221       GST_DEBUG_OBJECT (sink, "failed to open stream");
4222       goto done;
4223     }
4224   }
4225
4226 done:
4227   return res;
4228 }
4229
4230 static GstRTSPResult
4231 gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
4232 {
4233   GstRTSPMessage request = { 0 };
4234   GstRTSPMessage response = { 0 };
4235   GstRTSPResult res = GST_RTSP_OK;
4236   GstSDPMessage *sdp;
4237   guint sdp_index = 0;
4238   GstSDPInfo info = { 0, };
4239   gchar *keymgmt;
4240   guint i;
4241
4242   const gchar *proto;
4243   gchar *sess_id, *client_ip, *str;
4244   GSocketAddress *sa;
4245   GInetAddress *ia;
4246   GSocket *conn_socket;
4247   GList *walk;
4248
4249   g_mutex_lock (&sink->preroll_lock);
4250   if (sink->state == GST_RTSP_STATE_PLAYING) {
4251     /* Already recording, don't send another request */
4252     GST_LOG_OBJECT (sink, "Already in RECORD. Skipping duplicate request.");
4253     g_mutex_unlock (&sink->preroll_lock);
4254     goto done;
4255   }
4256   g_mutex_unlock (&sink->preroll_lock);
4257
4258   /* Collect all our input streams and create
4259    * stream objects before actually returning.
4260    * The streams are blocked at this point as we do not have any transport
4261    * parts yet. */
4262   gst_rtsp_client_sink_collect_streams (sink);
4263
4264   g_mutex_lock (&sink->block_streams_lock);
4265   /* Wait for streams to be blocked */
4266   while (!sink->streams_blocked) {
4267     GST_DEBUG_OBJECT (sink, "waiting for streams to be blocked");
4268     g_cond_wait (&sink->block_streams_cond, &sink->block_streams_lock);
4269   }
4270   g_mutex_unlock (&sink->block_streams_lock);
4271
4272   /* Send announce, then setup for all streams */
4273   gst_sdp_message_init (&sink->cursdp);
4274   sdp = &sink->cursdp;
4275
4276   /* some standard things first */
4277   gst_sdp_message_set_version (sdp, "0");
4278
4279   /* session ID doesn't have to be super-unique in this case */
4280   sess_id = g_strdup_printf ("%u", g_random_int ());
4281
4282   if (sink->conninfo.connection == NULL)
4283     return GST_RTSP_ERROR;
4284
4285   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
4286
4287   sa = g_socket_get_local_address (conn_socket, NULL);
4288   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
4289   client_ip = g_inet_address_to_string (ia);
4290   if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV6) {
4291     info.is_ipv6 = TRUE;
4292     proto = "IP6";
4293   } else if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV4)
4294     proto = "IP4";
4295   else
4296     g_assert_not_reached ();
4297   g_object_unref (sa);
4298
4299   /* FIXME: Should this actually be the server's IP or ours? */
4300   info.server_ip = sink->server_ip;
4301
4302   gst_sdp_message_set_origin (sdp, "-", sess_id, "1", "IN", proto, client_ip);
4303
4304   gst_sdp_message_set_session_name (sdp, "Session streamed with GStreamer");
4305   gst_sdp_message_set_information (sdp, "rtspclientsink");
4306   gst_sdp_message_add_time (sdp, "0", "0", NULL);
4307   gst_sdp_message_add_attribute (sdp, "tool", "GStreamer");
4308
4309   /* add stream */
4310   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4311     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
4312
4313     gst_rtsp_sdp_from_stream (sdp, &info, context->stream);
4314     context->sdp_index = sdp_index++;
4315   }
4316
4317   g_free (sess_id);
4318   g_free (client_ip);
4319
4320   /* send ANNOUNCE request */
4321   GST_DEBUG_OBJECT (sink, "create ANNOUNCE request...");
4322   res =
4323       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_ANNOUNCE,
4324       sink->conninfo.url_str);
4325   if (res < 0)
4326     goto create_request_failed;
4327
4328   gst_rtsp_message_add_header (&request, GST_RTSP_HDR_CONTENT_TYPE,
4329       "application/sdp");
4330
4331   /* add SDP to the request body */
4332   str = gst_sdp_message_as_text (sdp);
4333   gst_rtsp_message_take_body (&request, (guint8 *) str, strlen (str));
4334
4335   /* send ANNOUNCE */
4336   GST_DEBUG_OBJECT (sink, "sending announce...");
4337
4338   if (async)
4339     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record",
4340         ("Sending server stream info"));
4341
4342   if ((res =
4343           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4344               &response, NULL)) < 0)
4345     goto send_error;
4346
4347   /* parse the keymgmt */
4348   i = 0;
4349   walk = sink->contexts;
4350   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_KEYMGMT,
4351           &keymgmt, i++) == GST_RTSP_OK) {
4352     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
4353     walk = g_list_next (walk);
4354     if (!gst_rtsp_stream_handle_keymgmt (context->stream, keymgmt))
4355       goto keymgmt_error;
4356   }
4357
4358   /* send setup for all streams */
4359   if ((res = gst_rtsp_client_sink_setup_streams (sink, async)) < 0)
4360     goto setup_failed;
4361
4362   res = gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_RECORD,
4363       sink->conninfo.url_str);
4364
4365   if (res < 0)
4366     goto create_request_failed;
4367
4368 #if 0                           /* FIXME: Configure a range based on input segments? */
4369   if (src->need_range) {
4370     hval = gen_range_header (src, segment);
4371
4372     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_RANGE, hval);
4373   }
4374
4375   if (segment->rate != 1.0) {
4376     gchar hval[G_ASCII_DTOSTR_BUF_SIZE];
4377
4378     g_ascii_dtostr (hval, sizeof (hval), segment->rate);
4379     if (src->skip)
4380       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SCALE, hval);
4381     else
4382       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SPEED, hval);
4383   }
4384 #endif
4385
4386   if (async)
4387     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
4388   if ((res =
4389           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4390               &response, NULL)) < 0)
4391     goto send_error;
4392
4393 #if 0                           /* FIXME: Check if servers return these for record: */
4394   /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp
4395    * for the RTP packets. If this is not present, we assume all starts from 0...
4396    * This is info for the RTP session manager that we pass to it in caps. */
4397   hval_idx = 0;
4398   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTP_INFO,
4399           &hval, hval_idx++) == GST_RTSP_OK)
4400     gst_rtspsrc_parse_rtpinfo (src, hval);
4401
4402   /* some servers indicate RTCP parameters in PLAY response,
4403    * rather than properly in SDP */
4404   if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTCP_INTERVAL,
4405           &hval, 0) == GST_RTSP_OK)
4406     gst_rtspsrc_handle_rtcp_interval (src, hval);
4407 #endif
4408
4409   gst_rtsp_client_sink_set_state (sink, GST_STATE_PLAYING);
4410   sink->state = GST_RTSP_STATE_PLAYING;
4411
4412   /* clean up any messages */
4413   gst_rtsp_message_unset (&request);
4414   gst_rtsp_message_unset (&response);
4415
4416 done:
4417   return res;
4418
4419 create_request_failed:
4420   {
4421     gchar *str = gst_rtsp_strresult (res);
4422
4423     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4424         ("Could not create request. (%s)", str));
4425     g_free (str);
4426     goto cleanup_error;
4427   }
4428 send_error:
4429   {
4430     /* Don't post a message - the rtsp_send method will have
4431      * taken care of it because we passed NULL for the response code */
4432     goto cleanup_error;
4433   }
4434 keymgmt_error:
4435   {
4436     GST_ELEMENT_ERROR (sink, STREAM, DECRYPT_NOKEY, (NULL),
4437         ("Could not handle KeyMgmt"));
4438   }
4439 setup_failed:
4440   {
4441     GST_ERROR_OBJECT (sink, "setup failed");
4442     goto cleanup_error;
4443   }
4444 cleanup_error:
4445   {
4446     if (sink->conninfo.connection) {
4447       GST_DEBUG_OBJECT (sink, "free connection");
4448       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
4449     }
4450     gst_rtsp_message_unset (&request);
4451     gst_rtsp_message_unset (&response);
4452     return res;
4453   }
4454 }
4455
4456 static GstRTSPResult
4457 gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
4458 {
4459   GstRTSPResult res = GST_RTSP_OK;
4460   GstRTSPMessage request = { 0 };
4461   GstRTSPMessage response = { 0 };
4462   GList *walk;
4463   const gchar *control;
4464
4465   GST_DEBUG_OBJECT (sink, "PAUSE...");
4466
4467   if ((res = gst_rtsp_client_sink_ensure_open (sink, async)) < 0)
4468     goto open_failed;
4469
4470   if (!(sink->methods & GST_RTSP_PAUSE))
4471     goto not_supported;
4472
4473   if (sink->state == GST_RTSP_STATE_READY)
4474     goto was_paused;
4475
4476   if (!sink->conninfo.connection || !sink->conninfo.connected)
4477     goto no_connection;
4478
4479   /* construct a control url */
4480   control = get_aggregate_control (sink);
4481
4482   /* loop over the streams. We might exit the loop early when we could do an
4483    * aggregate control */
4484   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4485     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
4486     GstRTSPConnInfo *info;
4487     const gchar *setup_url;
4488
4489     /* try aggregate control first but do non-aggregate control otherwise */
4490     if (control)
4491       setup_url = control;
4492     else if ((setup_url = stream->conninfo.location) == NULL)
4493       continue;
4494
4495     if (sink->conninfo.connection) {
4496       info = &sink->conninfo;
4497     } else if (stream->conninfo.connection) {
4498       info = &stream->conninfo;
4499     } else {
4500       continue;
4501     }
4502
4503     if (async)
4504       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request",
4505           ("Sending PAUSE request"));
4506
4507     if ((res =
4508             gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_PAUSE,
4509                 setup_url)) < 0)
4510       goto create_request_failed;
4511
4512     if ((res =
4513             gst_rtsp_client_sink_send (sink, info, &request, &response,
4514                 NULL)) < 0)
4515       goto send_error;
4516
4517     gst_rtsp_message_unset (&request);
4518     gst_rtsp_message_unset (&response);
4519
4520     /* exit early when we did agregate control */
4521     if (control)
4522       break;
4523   }
4524
4525   /* change element states now */
4526   gst_rtsp_client_sink_set_state (sink, GST_STATE_PAUSED);
4527
4528 no_connection:
4529   sink->state = GST_RTSP_STATE_READY;
4530
4531 done:
4532   if (async)
4533     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_PAUSE, res);
4534
4535   return res;
4536
4537   /* ERRORS */
4538 open_failed:
4539   {
4540     GST_DEBUG_OBJECT (sink, "failed to open stream");
4541     goto done;
4542   }
4543 not_supported:
4544   {
4545     GST_DEBUG_OBJECT (sink, "PAUSE is not supported");
4546     goto done;
4547   }
4548 was_paused:
4549   {
4550     GST_DEBUG_OBJECT (sink, "we were already PAUSED");
4551     goto done;
4552   }
4553 create_request_failed:
4554   {
4555     gchar *str = gst_rtsp_strresult (res);
4556
4557     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4558         ("Could not create request. (%s)", str));
4559     g_free (str);
4560     goto done;
4561   }
4562 send_error:
4563   {
4564     gchar *str = gst_rtsp_strresult (res);
4565
4566     gst_rtsp_message_unset (&request);
4567     if (res != GST_RTSP_EINTR) {
4568       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4569           ("Could not send message. (%s)", str));
4570     } else {
4571       GST_WARNING_OBJECT (sink, "PAUSE interrupted");
4572     }
4573     g_free (str);
4574     goto done;
4575   }
4576 }
4577
4578 static void
4579 gst_rtsp_client_sink_handle_message (GstBin * bin, GstMessage * message)
4580 {
4581   GstRTSPClientSink *rtsp_client_sink;
4582
4583   rtsp_client_sink = GST_RTSP_CLIENT_SINK (bin);
4584
4585   switch (GST_MESSAGE_TYPE (message)) {
4586     case GST_MESSAGE_ELEMENT:
4587     {
4588       const GstStructure *s = gst_message_get_structure (message);
4589
4590       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
4591         gboolean ignore_timeout;
4592
4593         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
4594
4595         GST_OBJECT_LOCK (rtsp_client_sink);
4596         ignore_timeout = rtsp_client_sink->ignore_timeout;
4597         rtsp_client_sink->ignore_timeout = TRUE;
4598         GST_OBJECT_UNLOCK (rtsp_client_sink);
4599
4600         /* we only act on the first udp timeout message, others are irrelevant
4601          * and can be ignored. */
4602         if (!ignore_timeout)
4603           gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECONNECT,
4604               CMD_LOOP);
4605         /* eat and free */
4606         gst_message_unref (message);
4607         return;
4608       } else if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
4609         /* An RTSPStream has prerolled */
4610         GST_DEBUG_OBJECT (rtsp_client_sink, "received GstRTSPStreamBlocking");
4611         g_mutex_lock (&rtsp_client_sink->block_streams_lock);
4612         rtsp_client_sink->streams_blocked = TRUE;
4613         g_cond_broadcast (&rtsp_client_sink->block_streams_cond);
4614         g_mutex_unlock (&rtsp_client_sink->block_streams_lock);
4615       }
4616       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4617       break;
4618     }
4619     case GST_MESSAGE_ASYNC_START:{
4620       GstObject *sender;
4621
4622       sender = GST_MESSAGE_SRC (message);
4623
4624       GST_LOG_OBJECT (rtsp_client_sink,
4625           "Have async-start from %" GST_PTR_FORMAT, sender);
4626       if (sender == GST_OBJECT (rtsp_client_sink->internal_bin)) {
4627         GST_LOG_OBJECT (rtsp_client_sink, "child bin is now ASYNC");
4628       }
4629       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4630       break;
4631     }
4632     case GST_MESSAGE_ASYNC_DONE:
4633     {
4634       GstObject *sender;
4635       gboolean need_async_done;
4636
4637       sender = GST_MESSAGE_SRC (message);
4638       GST_LOG_OBJECT (rtsp_client_sink, "Have async-done from %" GST_PTR_FORMAT,
4639           sender);
4640
4641       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4642       if (sender == GST_OBJECT_CAST (rtsp_client_sink->internal_bin)) {
4643         GST_LOG_OBJECT (rtsp_client_sink, "child bin is no longer ASYNC");
4644       }
4645       need_async_done = rtsp_client_sink->in_async;
4646       if (rtsp_client_sink->in_async) {
4647         rtsp_client_sink->in_async = FALSE;
4648         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4649       }
4650       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4651
4652       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4653
4654       if (need_async_done) {
4655         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-DONE");
4656         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4657             gst_message_new_async_done (GST_OBJECT_CAST (rtsp_client_sink),
4658                 GST_CLOCK_TIME_NONE));
4659       }
4660       break;
4661     }
4662     case GST_MESSAGE_ERROR:
4663     {
4664       GstObject *sender;
4665
4666       sender = GST_MESSAGE_SRC (message);
4667
4668       GST_DEBUG_OBJECT (rtsp_client_sink, "got error from %s",
4669           GST_ELEMENT_NAME (sender));
4670
4671       /* FIXME: Ignore errors on RTCP? */
4672       /* fatal but not our message, forward */
4673       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4674       break;
4675     }
4676     case GST_MESSAGE_STATE_CHANGED:
4677     {
4678       if (GST_MESSAGE_SRC (message) ==
4679           (GstObject *) rtsp_client_sink->internal_bin) {
4680         GstState newstate, pending;
4681         gst_message_parse_state_changed (message, NULL, &newstate, &pending);
4682         g_mutex_lock (&rtsp_client_sink->preroll_lock);
4683         rtsp_client_sink->prerolled = (newstate >= GST_STATE_PAUSED)
4684             && pending == GST_STATE_VOID_PENDING;
4685         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4686         g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4687         GST_DEBUG_OBJECT (bin,
4688             "Internal bin changed state to %s (pending %s). Prerolled now %d",
4689             gst_element_state_get_name (newstate),
4690             gst_element_state_get_name (pending), rtsp_client_sink->prerolled);
4691       }
4692       /* fallthrough */
4693     }
4694     default:
4695     {
4696       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4697       break;
4698     }
4699   }
4700 }
4701
4702 /* the thread where everything happens */
4703 static void
4704 gst_rtsp_client_sink_thread (GstRTSPClientSink * sink)
4705 {
4706   gint cmd;
4707
4708   GST_OBJECT_LOCK (sink);
4709   cmd = sink->pending_cmd;
4710   if (cmd == CMD_RECONNECT || cmd == CMD_RECORD || cmd == CMD_PAUSE
4711       || cmd == CMD_LOOP || cmd == CMD_OPEN)
4712     sink->pending_cmd = CMD_LOOP;
4713   else
4714     sink->pending_cmd = CMD_WAIT;
4715   GST_DEBUG_OBJECT (sink, "got command %s", cmd_to_string (cmd));
4716
4717   /* we got the message command, so ensure communication is possible again */
4718   gst_rtsp_client_sink_connection_flush (sink, FALSE);
4719
4720   sink->busy_cmd = cmd;
4721   GST_OBJECT_UNLOCK (sink);
4722
4723   switch (cmd) {
4724     case CMD_OPEN:
4725       if (gst_rtsp_client_sink_open (sink, TRUE) == GST_RTSP_ERROR)
4726         gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT,
4727             CMD_ALL & ~CMD_CLOSE);
4728       break;
4729     case CMD_RECORD:
4730       gst_rtsp_client_sink_record (sink, TRUE);
4731       break;
4732     case CMD_PAUSE:
4733       gst_rtsp_client_sink_pause (sink, TRUE);
4734       break;
4735     case CMD_CLOSE:
4736       gst_rtsp_client_sink_close (sink, TRUE, FALSE);
4737       break;
4738     case CMD_LOOP:
4739       gst_rtsp_client_sink_loop (sink);
4740       break;
4741     case CMD_RECONNECT:
4742       gst_rtsp_client_sink_reconnect (sink, FALSE);
4743       break;
4744     default:
4745       break;
4746   }
4747
4748   GST_OBJECT_LOCK (sink);
4749   /* and go back to sleep */
4750   if (sink->pending_cmd == CMD_WAIT) {
4751     if (sink->task)
4752       gst_task_pause (sink->task);
4753   }
4754   /* reset waiting */
4755   sink->busy_cmd = CMD_WAIT;
4756   GST_OBJECT_UNLOCK (sink);
4757 }
4758
4759 static gboolean
4760 gst_rtsp_client_sink_start (GstRTSPClientSink * sink)
4761 {
4762   GST_DEBUG_OBJECT (sink, "starting");
4763
4764   sink->streams_collected = FALSE;
4765   gst_element_set_locked_state (GST_ELEMENT (sink->internal_bin), TRUE);
4766
4767   gst_rtsp_client_sink_set_state (sink, GST_STATE_READY);
4768
4769   GST_OBJECT_LOCK (sink);
4770   sink->pending_cmd = CMD_WAIT;
4771
4772   if (sink->task == NULL) {
4773     sink->task =
4774         gst_task_new ((GstTaskFunction) gst_rtsp_client_sink_thread, sink,
4775         NULL);
4776     if (sink->task == NULL)
4777       goto task_error;
4778
4779     gst_task_set_lock (sink->task, GST_RTSP_STREAM_GET_LOCK (sink));
4780   }
4781   GST_OBJECT_UNLOCK (sink);
4782
4783   return TRUE;
4784
4785   /* ERRORS */
4786 task_error:
4787   {
4788     GST_OBJECT_UNLOCK (sink);
4789     GST_ERROR_OBJECT (sink, "failed to create task");
4790     return FALSE;
4791   }
4792 }
4793
4794 static gboolean
4795 gst_rtsp_client_sink_stop (GstRTSPClientSink * sink)
4796 {
4797   GstTask *task;
4798
4799   GST_DEBUG_OBJECT (sink, "stopping");
4800
4801   /* also cancels pending task */
4802   gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_ALL & ~CMD_CLOSE);
4803
4804   GST_OBJECT_LOCK (sink);
4805   if ((task = sink->task)) {
4806     sink->task = NULL;
4807     GST_OBJECT_UNLOCK (sink);
4808
4809     gst_task_stop (task);
4810
4811     /* make sure it is not running */
4812     GST_RTSP_STREAM_LOCK (sink);
4813     GST_RTSP_STREAM_UNLOCK (sink);
4814
4815     /* now wait for the task to finish */
4816     gst_task_join (task);
4817
4818     /* and free the task */
4819     gst_object_unref (GST_OBJECT (task));
4820
4821     GST_OBJECT_LOCK (sink);
4822   }
4823   GST_OBJECT_UNLOCK (sink);
4824
4825   /* ensure synchronously all is closed and clean */
4826   gst_rtsp_client_sink_close (sink, FALSE, TRUE);
4827
4828   return TRUE;
4829 }
4830
4831 static GstStateChangeReturn
4832 gst_rtsp_client_sink_change_state (GstElement * element,
4833     GstStateChange transition)
4834 {
4835   GstRTSPClientSink *rtsp_client_sink;
4836   GstStateChangeReturn ret;
4837
4838   rtsp_client_sink = GST_RTSP_CLIENT_SINK (element);
4839
4840   switch (transition) {
4841     case GST_STATE_CHANGE_NULL_TO_READY:
4842       if (!gst_rtsp_client_sink_start (rtsp_client_sink))
4843         goto start_failed;
4844       break;
4845     case GST_STATE_CHANGE_READY_TO_PAUSED:
4846       /* init some state */
4847       rtsp_client_sink->cur_protocols = rtsp_client_sink->protocols;
4848       /* first attempt, don't ignore timeouts */
4849       rtsp_client_sink->ignore_timeout = FALSE;
4850       rtsp_client_sink->open_error = FALSE;
4851
4852       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_PAUSED);
4853
4854       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4855       if (rtsp_client_sink->in_async) {
4856         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-START");
4857         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4858             gst_message_new_async_start (GST_OBJECT_CAST (rtsp_client_sink)));
4859       }
4860       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4861
4862       break;
4863     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4864       /* fall-through */
4865     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4866       /* unblock the tcp tasks and make the loop waiting */
4867       if (gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_WAIT,
4868               CMD_LOOP)) {
4869         /* make sure it is waiting before we send PLAY below */
4870         GST_RTSP_STREAM_LOCK (rtsp_client_sink);
4871         GST_RTSP_STREAM_UNLOCK (rtsp_client_sink);
4872       }
4873       break;
4874     case GST_STATE_CHANGE_PAUSED_TO_READY:
4875       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_READY);
4876       break;
4877     default:
4878       break;
4879   }
4880
4881   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4882   if (ret == GST_STATE_CHANGE_FAILURE)
4883     goto done;
4884
4885   switch (transition) {
4886     case GST_STATE_CHANGE_NULL_TO_READY:
4887       ret = GST_STATE_CHANGE_SUCCESS;
4888       break;
4889     case GST_STATE_CHANGE_READY_TO_PAUSED:
4890       /* Return ASYNC and preroll input streams */
4891       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4892       if (rtsp_client_sink->in_async)
4893         ret = GST_STATE_CHANGE_ASYNC;
4894       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4895       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_OPEN, 0);
4896
4897       /* CMD_OPEN has been scheduled. Wait until the sink thread starts
4898        * opening connection to the server */
4899       g_mutex_lock (&rtsp_client_sink->open_conn_lock);
4900       while (!rtsp_client_sink->open_conn_start) {
4901         GST_DEBUG_OBJECT (rtsp_client_sink,
4902             "wait for connection to be started");
4903         g_cond_wait (&rtsp_client_sink->open_conn_cond,
4904             &rtsp_client_sink->open_conn_lock);
4905       }
4906       rtsp_client_sink->open_conn_start = FALSE;
4907       g_mutex_unlock (&rtsp_client_sink->open_conn_lock);
4908       break;
4909     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:{
4910       GST_DEBUG_OBJECT (rtsp_client_sink,
4911           "Switching to playing -sending RECORD");
4912       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECORD, 0);
4913       ret = GST_STATE_CHANGE_SUCCESS;
4914       break;
4915     }
4916     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4917       /* send pause request and keep the idle task around */
4918       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_PAUSE,
4919           CMD_LOOP);
4920       ret = GST_STATE_CHANGE_NO_PREROLL;
4921       break;
4922     case GST_STATE_CHANGE_PAUSED_TO_READY:
4923       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_CLOSE,
4924           CMD_PAUSE);
4925       ret = GST_STATE_CHANGE_SUCCESS;
4926       break;
4927     case GST_STATE_CHANGE_READY_TO_NULL:
4928       gst_rtsp_client_sink_stop (rtsp_client_sink);
4929       ret = GST_STATE_CHANGE_SUCCESS;
4930       break;
4931     default:
4932       break;
4933   }
4934
4935 done:
4936   return ret;
4937
4938 start_failed:
4939   {
4940     GST_DEBUG_OBJECT (rtsp_client_sink, "start failed");
4941     return GST_STATE_CHANGE_FAILURE;
4942   }
4943 }
4944
4945 /*** GSTURIHANDLER INTERFACE *************************************************/
4946
4947 static GstURIType
4948 gst_rtsp_client_sink_uri_get_type (GType type)
4949 {
4950   return GST_URI_SINK;
4951 }
4952
4953 static const gchar *const *
4954 gst_rtsp_client_sink_uri_get_protocols (GType type)
4955 {
4956   static const gchar *protocols[] =
4957       { "rtsp", "rtspu", "rtspt", "rtsph", "rtsp-sdp",
4958     "rtsps", "rtspsu", "rtspst", "rtspsh", NULL
4959   };
4960
4961   return protocols;
4962 }
4963
4964 static gchar *
4965 gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler)
4966 {
4967   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (handler);
4968
4969   /* FIXME: make thread-safe */
4970   return g_strdup (sink->conninfo.location);
4971 }
4972
4973 static gboolean
4974 gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
4975     GError ** error)
4976 {
4977   GstRTSPClientSink *sink;
4978   GstRTSPResult res;
4979   GstSDPResult sres;
4980   GstRTSPUrl *newurl = NULL;
4981   GstSDPMessage *sdp = NULL;
4982
4983   sink = GST_RTSP_CLIENT_SINK (handler);
4984
4985   /* same URI, we're fine */
4986   if (sink->conninfo.location && uri && !strcmp (uri, sink->conninfo.location))
4987     goto was_ok;
4988
4989   if (g_str_has_prefix (uri, "rtsp-sdp://")) {
4990     sres = gst_sdp_message_new (&sdp);
4991     if (sres < 0)
4992       goto sdp_failed;
4993
4994     GST_DEBUG_OBJECT (sink, "parsing SDP message");
4995     sres = gst_sdp_message_parse_uri (uri, sdp);
4996     if (sres < 0)
4997       goto invalid_sdp;
4998   } else {
4999     /* try to parse */
5000     GST_DEBUG_OBJECT (sink, "parsing URI");
5001     if ((res = gst_rtsp_url_parse (uri, &newurl)) < 0)
5002       goto parse_error;
5003   }
5004
5005   /* if worked, free previous and store new url object along with the original
5006    * location. */
5007   GST_DEBUG_OBJECT (sink, "configuring URI");
5008   g_free (sink->conninfo.location);
5009   sink->conninfo.location = g_strdup (uri);
5010   gst_rtsp_url_free (sink->conninfo.url);
5011   sink->conninfo.url = newurl;
5012   g_free (sink->conninfo.url_str);
5013   if (newurl)
5014     sink->conninfo.url_str = gst_rtsp_url_get_request_uri (sink->conninfo.url);
5015   else
5016     sink->conninfo.url_str = NULL;
5017
5018   if (sink->uri_sdp)
5019     gst_sdp_message_free (sink->uri_sdp);
5020   sink->uri_sdp = sdp;
5021   sink->from_sdp = sdp != NULL;
5022
5023   GST_DEBUG_OBJECT (sink, "set uri: %s", GST_STR_NULL (uri));
5024   GST_DEBUG_OBJECT (sink, "request uri is: %s",
5025       GST_STR_NULL (sink->conninfo.url_str));
5026
5027   return TRUE;
5028
5029   /* Special cases */
5030 was_ok:
5031   {
5032     GST_DEBUG_OBJECT (sink, "URI was ok: '%s'", GST_STR_NULL (uri));
5033     return TRUE;
5034   }
5035 sdp_failed:
5036   {
5037     GST_ERROR_OBJECT (sink, "Could not create new SDP (%d)", sres);
5038     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5039         "Could not create SDP");
5040     return FALSE;
5041   }
5042 invalid_sdp:
5043   {
5044     GST_ERROR_OBJECT (sink, "Not a valid SDP (%d) '%s'", sres,
5045         GST_STR_NULL (uri));
5046     gst_sdp_message_free (sdp);
5047     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5048         "Invalid SDP");
5049     return FALSE;
5050   }
5051 parse_error:
5052   {
5053     GST_ERROR_OBJECT (sink, "Not a valid RTSP url '%s' (%d)",
5054         GST_STR_NULL (uri), res);
5055     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5056         "Invalid RTSP URI");
5057     return FALSE;
5058   }
5059 }
5060
5061 static void
5062 gst_rtsp_client_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
5063 {
5064   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
5065
5066   iface->get_type = gst_rtsp_client_sink_uri_get_type;
5067   iface->get_protocols = gst_rtsp_client_sink_uri_get_protocols;
5068   iface->get_uri = gst_rtsp_client_sink_uri_get_uri;
5069   iface->set_uri = gst_rtsp_client_sink_uri_set_uri;
5070 }