rtspclientsink: Add "update-sdp" signal that allows updating the SDP before sending...
[platform/upstream/gstreamer.git] / gst / rtsp-sink / gstrtspclientsink.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim at fluendo dot com>
3  *               <2006> Lutz Mueller <lutz at topfrose dot de>
4  *               <2015> Jan Schmidt <jan at centricular dot com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /*
22  * Unless otherwise indicated, Source Code is licensed under MIT license.
23  * See further explanation attached in License Statement (distributed in the file
24  * LICENSE).
25  *
26  * Permission is hereby granted, free of charge, to any person obtaining a copy of
27  * this software and associated documentation files (the "Software"), to deal in
28  * the Software without restriction, including without limitation the rights to
29  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
30  * of the Software, and to permit persons to whom the Software is furnished to do
31  * so, subject to the following conditions:
32  *
33  * The above copyright notice and this permission notice shall be included in all
34  * copies or substantial portions of the Software.
35  *
36  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
37  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
38  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
39  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
40  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
41  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
42  * SOFTWARE.
43  */
44 /**
45  * SECTION:element-rtspclientsink
46  *
47  * Makes a connection to an RTSP server and send data via RTSP RECORD.
48  * rtspclientsink strictly follows RFC 2326
49  *
50  * RTSP supports transport over TCP or UDP in unicast or multicast mode. By
51  * default rtspclientsink will negotiate a connection in the following order:
52  * UDP unicast/UDP multicast/TCP. The order cannot be changed but the allowed
53  * protocols can be controlled with the #GstRTSPClientSink:protocols property.
54  *
55  * rtspclientsink will internally instantiate an RTP session manager element
56  * that will handle the RTCP messages to and from the server, jitter removal,
57  * and packet reordering.
58  * This feature is implemented using the gstrtpbin element.
59  *
60  * rtspclientsink accepts any stream for which there is an installed payloader,
61  * creates the payloader and manages payload-types, as well as RTX setup.
62  * The new-payloader signal is fired when a payloader is created, in case
63  * an app wants to do custom configuration (such as for MTU).
64  *
65  * ## Example launch line
66  *
67  * |[
68  * gst-launch-1.0 videotestsrc ! jpegenc ! rtspclientsink location=rtsp://some.server/url
69  * ]| Establish a connection to an RTSP server and send JPEG encoded video packets
70  */
71
72 /* FIXMEs
73  * - Handle EOS properly and shutdown. The problem with EOS is we don't know
74  *   when the server has received all data, so we don't know when to do teardown.
75  *   At the moment, we forward EOS to the app as soon as we stop sending. Is there
76  *   a way to know from the receiver that it's got all data? Some session timeout?
77  * - Implement extension support for Real / WMS if they support RECORD?
78  * - Add support for network clock synchronised streaming?
79  * - Fix crypto key nego so SAVP/SAVPF profiles work.
80  * - Test (&fix?) HTTP tunnel support
81  * - Add an address pool object for GstRTSPStreams to use for multicast
82  * - Test multicast UDP transport
83  */
84
85 #ifdef HAVE_CONFIG_H
86 #include "config.h"
87 #endif
88
89 #ifdef HAVE_UNISTD_H
90 #include <unistd.h>
91 #endif /* HAVE_UNISTD_H */
92 #include <stdlib.h>
93 #include <string.h>
94 #include <stdio.h>
95 #include <stdarg.h>
96
97 #include <gst/net/gstnet.h>
98 #include <gst/sdp/gstsdpmessage.h>
99 #include <gst/sdp/gstmikey.h>
100 #include <gst/rtp/rtp.h>
101
102 #include "gstrtspclientsink.h"
103
104 typedef struct _GstRtspClientSinkPad GstRtspClientSinkPad;
105 typedef GstGhostPadClass GstRtspClientSinkPadClass;
106
107 struct _GstRtspClientSinkPad
108 {
109   GstGhostPad parent;
110   GstElement *custom_payloader;
111   guint ulpfec_percentage;
112 };
113
114 enum
115 {
116   PROP_PAD_0,
117   PROP_PAD_PAYLOADER,
118   PROP_PAD_ULPFEC_PERCENTAGE
119 };
120
121 #define DEFAULT_PAD_ULPFEC_PERCENTAGE 0
122
123 static GType gst_rtsp_client_sink_pad_get_type (void);
124 G_DEFINE_TYPE (GstRtspClientSinkPad, gst_rtsp_client_sink_pad,
125     GST_TYPE_GHOST_PAD);
126 #define GST_TYPE_RTSP_CLIENT_SINK_PAD (gst_rtsp_client_sink_pad_get_type ())
127 #define GST_RTSP_CLIENT_SINK_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTSP_CLIENT_SINK_PAD,GstRtspClientSinkPad))
128
129 static void
130 gst_rtsp_client_sink_pad_set_property (GObject * object, guint prop_id,
131     const GValue * value, GParamSpec * pspec)
132 {
133   GstRtspClientSinkPad *pad;
134
135   pad = GST_RTSP_CLIENT_SINK_PAD (object);
136
137   switch (prop_id) {
138     case PROP_PAD_PAYLOADER:
139       GST_OBJECT_LOCK (pad);
140       if (pad->custom_payloader)
141         gst_object_unref (pad->custom_payloader);
142       pad->custom_payloader = g_value_get_object (value);
143       gst_object_ref_sink (pad->custom_payloader);
144       GST_OBJECT_UNLOCK (pad);
145       break;
146     case PROP_PAD_ULPFEC_PERCENTAGE:
147       GST_OBJECT_LOCK (pad);
148       pad->ulpfec_percentage = g_value_get_uint (value);
149       GST_OBJECT_UNLOCK (pad);
150       break;
151     default:
152       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
153       break;
154   }
155 }
156
157 static void
158 gst_rtsp_client_sink_pad_get_property (GObject * object, guint prop_id,
159     GValue * value, GParamSpec * pspec)
160 {
161   GstRtspClientSinkPad *pad;
162
163   pad = GST_RTSP_CLIENT_SINK_PAD (object);
164
165   switch (prop_id) {
166     case PROP_PAD_PAYLOADER:
167       GST_OBJECT_LOCK (pad);
168       g_value_set_object (value, pad->custom_payloader);
169       GST_OBJECT_UNLOCK (pad);
170       break;
171     case PROP_PAD_ULPFEC_PERCENTAGE:
172       GST_OBJECT_LOCK (pad);
173       g_value_set_uint (value, pad->ulpfec_percentage);
174       GST_OBJECT_UNLOCK (pad);
175       break;
176     default:
177       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
178       break;
179   }
180 }
181
182 static void
183 gst_rtsp_client_sink_pad_dispose (GObject * object)
184 {
185   GstRtspClientSinkPad *pad = GST_RTSP_CLIENT_SINK_PAD (object);
186
187   if (pad->custom_payloader)
188     gst_object_unref (pad->custom_payloader);
189
190   G_OBJECT_CLASS (gst_rtsp_client_sink_pad_parent_class)->dispose (object);
191 }
192
193 static void
194 gst_rtsp_client_sink_pad_class_init (GstRtspClientSinkPadClass * klass)
195 {
196   GObjectClass *gobject_klass;
197
198   gobject_klass = (GObjectClass *) klass;
199
200   gobject_klass->set_property = gst_rtsp_client_sink_pad_set_property;
201   gobject_klass->get_property = gst_rtsp_client_sink_pad_get_property;
202   gobject_klass->dispose = gst_rtsp_client_sink_pad_dispose;
203
204   g_object_class_install_property (gobject_klass, PROP_PAD_PAYLOADER,
205       g_param_spec_object ("payloader", "Payloader",
206           "The payloader element to use (NULL = default automatically selected)",
207           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
208
209   g_object_class_install_property (gobject_klass, PROP_PAD_ULPFEC_PERCENTAGE,
210       g_param_spec_uint ("ulpfec-percentage", "ULPFEC percentage",
211           "The percentage of ULP redundancy to apply", 0, 100,
212           DEFAULT_PAD_ULPFEC_PERCENTAGE,
213           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214 }
215
216 static void
217 gst_rtsp_client_sink_pad_init (GstRtspClientSinkPad * pad)
218 {
219 }
220
221 static GstPad *
222 gst_rtsp_client_sink_pad_new (const GstPadTemplate * pad_tmpl,
223     const gchar * name)
224 {
225   GstRtspClientSinkPad *ret;
226
227   ret =
228       g_object_new (GST_TYPE_RTSP_CLIENT_SINK_PAD, "direction", GST_PAD_SINK,
229       "template", pad_tmpl, "name", name, NULL);
230
231   return GST_PAD (ret);
232 }
233
234 GST_DEBUG_CATEGORY_STATIC (rtsp_client_sink_debug);
235 #define GST_CAT_DEFAULT (rtsp_client_sink_debug)
236
237 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
238     GST_PAD_SINK,
239     GST_PAD_REQUEST,
240     GST_STATIC_CAPS_ANY);       /* Actual caps come from available set of payloaders */
241
242 enum
243 {
244   SIGNAL_HANDLE_REQUEST,
245   SIGNAL_NEW_MANAGER,
246   SIGNAL_NEW_PAYLOADER,
247   SIGNAL_REQUEST_RTCP_KEY,
248   SIGNAL_ACCEPT_CERTIFICATE,
249   SIGNAL_UPDATE_SDP,
250   LAST_SIGNAL
251 };
252
253 enum _GstRTSPClientSinkNtpTimeSource
254 {
255   NTP_TIME_SOURCE_NTP,
256   NTP_TIME_SOURCE_UNIX,
257   NTP_TIME_SOURCE_RUNNING_TIME,
258   NTP_TIME_SOURCE_CLOCK_TIME
259 };
260
261 #define GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE (gst_rtsp_client_sink_ntp_time_source_get_type())
262 static GType
263 gst_rtsp_client_sink_ntp_time_source_get_type (void)
264 {
265   static GType ntp_time_source_type = 0;
266   static const GEnumValue ntp_time_source_values[] = {
267     {NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
268     {NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
269     {NTP_TIME_SOURCE_RUNNING_TIME,
270           "Running time based on pipeline clock",
271         "running-time"},
272     {NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
273     {0, NULL, NULL},
274   };
275
276   if (!ntp_time_source_type) {
277     ntp_time_source_type =
278         g_enum_register_static ("GstRTSPClientSinkNtpTimeSource",
279         ntp_time_source_values);
280   }
281   return ntp_time_source_type;
282 }
283
284 #define DEFAULT_LOCATION         NULL
285 #define DEFAULT_PROTOCOLS        GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP
286 #define DEFAULT_DEBUG            FALSE
287 #define DEFAULT_RETRY            20
288 #define DEFAULT_TIMEOUT          5000000
289 #define DEFAULT_UDP_BUFFER_SIZE  0x80000
290 #define DEFAULT_TCP_TIMEOUT      20000000
291 #define DEFAULT_LATENCY_MS       2000
292 #define DEFAULT_DO_RTSP_KEEP_ALIVE       TRUE
293 #define DEFAULT_PROXY            NULL
294 #define DEFAULT_RTP_BLOCKSIZE    0
295 #define DEFAULT_USER_ID          NULL
296 #define DEFAULT_USER_PW          NULL
297 #define DEFAULT_PORT_RANGE       NULL
298 #define DEFAULT_UDP_RECONNECT    TRUE
299 #define DEFAULT_MULTICAST_IFACE  NULL
300 #define DEFAULT_TLS_VALIDATION_FLAGS     G_TLS_CERTIFICATE_VALIDATE_ALL
301 #define DEFAULT_TLS_DATABASE     NULL
302 #define DEFAULT_TLS_INTERACTION     NULL
303 #define DEFAULT_NTP_TIME_SOURCE  NTP_TIME_SOURCE_NTP
304 #define DEFAULT_USER_AGENT       "GStreamer/" PACKAGE_VERSION
305 #define DEFAULT_PROFILES         GST_RTSP_PROFILE_AVP
306 #define DEFAULT_RTX_TIME_MS      500
307
308 enum
309 {
310   PROP_0,
311   PROP_LOCATION,
312   PROP_PROTOCOLS,
313   PROP_DEBUG,
314   PROP_RETRY,
315   PROP_TIMEOUT,
316   PROP_TCP_TIMEOUT,
317   PROP_LATENCY,
318   PROP_RTX_TIME,
319   PROP_DO_RTSP_KEEP_ALIVE,
320   PROP_PROXY,
321   PROP_PROXY_ID,
322   PROP_PROXY_PW,
323   PROP_RTP_BLOCKSIZE,
324   PROP_USER_ID,
325   PROP_USER_PW,
326   PROP_PORT_RANGE,
327   PROP_UDP_BUFFER_SIZE,
328   PROP_UDP_RECONNECT,
329   PROP_MULTICAST_IFACE,
330   PROP_SDES,
331   PROP_TLS_VALIDATION_FLAGS,
332   PROP_TLS_DATABASE,
333   PROP_TLS_INTERACTION,
334   PROP_NTP_TIME_SOURCE,
335   PROP_USER_AGENT,
336   PROP_PROFILES
337 };
338
339 static void gst_rtsp_client_sink_finalize (GObject * object);
340
341 static void gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
342     const GValue * value, GParamSpec * pspec);
343 static void gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
344     GValue * value, GParamSpec * pspec);
345
346 static GstClock *gst_rtsp_client_sink_provide_clock (GstElement * element);
347
348 static void gst_rtsp_client_sink_uri_handler_init (gpointer g_iface,
349     gpointer iface_data);
350
351 static gboolean gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp,
352     const gchar * proxy);
353 static void gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink *
354     rtsp_client_sink, guint64 timeout);
355
356 static GstStateChangeReturn gst_rtsp_client_sink_change_state (GstElement *
357     element, GstStateChange transition);
358 static void gst_rtsp_client_sink_handle_message (GstBin * bin,
359     GstMessage * message);
360
361 static gboolean gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
362     GstRTSPMessage * response);
363
364 static gboolean gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink,
365     gint cmd, gint mask);
366
367 static GstRTSPResult gst_rtsp_client_sink_open (GstRTSPClientSink * sink,
368     gboolean async);
369 static GstRTSPResult gst_rtsp_client_sink_record (GstRTSPClientSink * sink,
370     gboolean async);
371 static GstRTSPResult gst_rtsp_client_sink_pause (GstRTSPClientSink * sink,
372     gboolean async);
373 static GstRTSPResult gst_rtsp_client_sink_close (GstRTSPClientSink * sink,
374     gboolean async, gboolean only_close);
375 static gboolean gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink);
376
377 static gboolean gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler,
378     const gchar * uri, GError ** error);
379 static gchar *gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler);
380
381 static gboolean gst_rtsp_client_sink_loop (GstRTSPClientSink * sink);
382 static void gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink,
383     gboolean flush);
384
385 static GstPad *gst_rtsp_client_sink_request_new_pad (GstElement * element,
386     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
387 static void gst_rtsp_client_sink_release_pad (GstElement * element,
388     GstPad * pad);
389
390 /* commands we send to out loop to notify it of events */
391 #define CMD_OPEN        (1 << 0)
392 #define CMD_RECORD      (1 << 1)
393 #define CMD_PAUSE       (1 << 2)
394 #define CMD_CLOSE       (1 << 3)
395 #define CMD_WAIT        (1 << 4)
396 #define CMD_RECONNECT   (1 << 5)
397 #define CMD_LOOP        (1 << 6)
398
399 /* mask for all commands */
400 #define CMD_ALL         ((CMD_LOOP << 1) - 1)
401
402 #define GST_ELEMENT_PROGRESS(el, type, code, text)      \
403 G_STMT_START {                                          \
404   gchar *__txt = _gst_element_error_printf text;        \
405   gst_element_post_message (GST_ELEMENT_CAST (el),      \
406       gst_message_new_progress (GST_OBJECT_CAST (el),   \
407           GST_PROGRESS_TYPE_ ##type, code, __txt));     \
408   g_free (__txt);                                       \
409 } G_STMT_END
410
411 static guint gst_rtsp_client_sink_signals[LAST_SIGNAL] = { 0 };
412
413 /*********************************
414  * GstChildProxy implementation  *
415  *********************************/
416 static GObject *
417 gst_rtsp_client_sink_child_proxy_get_child_by_index (GstChildProxy *
418     child_proxy, guint index)
419 {
420   GObject *obj;
421   GstRTSPClientSink *cs = GST_RTSP_CLIENT_SINK (child_proxy);
422
423   GST_OBJECT_LOCK (cs);
424   if ((obj = g_list_nth_data (GST_ELEMENT (cs)->sinkpads, index)))
425     g_object_ref (obj);
426   GST_OBJECT_UNLOCK (cs);
427
428   return obj;
429 }
430
431 static guint
432 gst_rtsp_client_sink_child_proxy_get_children_count (GstChildProxy *
433     child_proxy)
434 {
435   guint count = 0;
436
437   GST_OBJECT_LOCK (child_proxy);
438   count = GST_ELEMENT (child_proxy)->numsinkpads;
439   GST_OBJECT_UNLOCK (child_proxy);
440
441   GST_INFO_OBJECT (child_proxy, "Children Count: %d", count);
442
443   return count;
444 }
445
446 static void
447 gst_rtsp_client_sink_child_proxy_init (gpointer g_iface, gpointer iface_data)
448 {
449   GstChildProxyInterface *iface = g_iface;
450
451   GST_INFO ("intializing child proxy interface");
452   iface->get_child_by_index =
453       gst_rtsp_client_sink_child_proxy_get_child_by_index;
454   iface->get_children_count =
455       gst_rtsp_client_sink_child_proxy_get_children_count;
456 }
457
458 #define gst_rtsp_client_sink_parent_class parent_class
459 G_DEFINE_TYPE_WITH_CODE (GstRTSPClientSink, gst_rtsp_client_sink, GST_TYPE_BIN,
460     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
461         gst_rtsp_client_sink_uri_handler_init);
462     G_IMPLEMENT_INTERFACE (GST_TYPE_CHILD_PROXY,
463         gst_rtsp_client_sink_child_proxy_init);
464     );
465
466 #ifndef GST_DISABLE_GST_DEBUG
467 static inline const gchar *
468 cmd_to_string (guint cmd)
469 {
470   switch (cmd) {
471     case CMD_OPEN:
472       return "OPEN";
473     case CMD_RECORD:
474       return "RECORD";
475     case CMD_PAUSE:
476       return "PAUSE";
477     case CMD_CLOSE:
478       return "CLOSE";
479     case CMD_WAIT:
480       return "WAIT";
481     case CMD_RECONNECT:
482       return "RECONNECT";
483     case CMD_LOOP:
484       return "LOOP";
485   }
486
487   return "unknown";
488 }
489 #endif
490
491 static void
492 gst_rtsp_client_sink_class_init (GstRTSPClientSinkClass * klass)
493 {
494   GObjectClass *gobject_class;
495   GstElementClass *gstelement_class;
496   GstBinClass *gstbin_class;
497
498   gobject_class = (GObjectClass *) klass;
499   gstelement_class = (GstElementClass *) klass;
500   gstbin_class = (GstBinClass *) klass;
501
502   GST_DEBUG_CATEGORY_INIT (rtsp_client_sink_debug, "rtspclientsink", 0,
503       "RTSP sink element");
504
505   gobject_class->set_property = gst_rtsp_client_sink_set_property;
506   gobject_class->get_property = gst_rtsp_client_sink_get_property;
507
508   gobject_class->finalize = gst_rtsp_client_sink_finalize;
509
510   g_object_class_install_property (gobject_class, PROP_LOCATION,
511       g_param_spec_string ("location", "RTSP Location",
512           "Location of the RTSP url to read",
513           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
514
515   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
516       g_param_spec_flags ("protocols", "Protocols",
517           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
518           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
519
520   g_object_class_install_property (gobject_class, PROP_PROFILES,
521       g_param_spec_flags ("profiles", "Profiles",
522           "Allowed RTSP profiles", GST_TYPE_RTSP_PROFILE,
523           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
524
525   g_object_class_install_property (gobject_class, PROP_DEBUG,
526       g_param_spec_boolean ("debug", "Debug",
527           "Dump request and response messages to stdout",
528           DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
529
530   g_object_class_install_property (gobject_class, PROP_RETRY,
531       g_param_spec_uint ("retry", "Retry",
532           "Max number of retries when allocating RTP ports.",
533           0, G_MAXUINT16, DEFAULT_RETRY,
534           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
535
536   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
537       g_param_spec_uint64 ("timeout", "Timeout",
538           "Retry TCP transport after UDP timeout microseconds (0 = disabled)",
539           0, G_MAXUINT64, DEFAULT_TIMEOUT,
540           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
541
542   g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT,
543       g_param_spec_uint64 ("tcp-timeout", "TCP Timeout",
544           "Fail after timeout microseconds on TCP connections (0 = disabled)",
545           0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT,
546           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
547
548   g_object_class_install_property (gobject_class, PROP_LATENCY,
549       g_param_spec_uint ("latency", "Buffer latency in ms",
550           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
551           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
552
553   g_object_class_install_property (gobject_class, PROP_RTX_TIME,
554       g_param_spec_uint ("rtx-time", "Retransmission buffer in ms",
555           "Amount of ms to buffer for retransmission. 0 disables retransmission",
556           0, G_MAXUINT, DEFAULT_RTX_TIME_MS,
557           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
558
559   /**
560    * GstRTSPClientSink:do-rtsp-keep-alive:
561    *
562    * Enable RTSP keep alive support. Some old server don't like RTSP
563    * keep alive and then this property needs to be set to FALSE.
564    */
565   g_object_class_install_property (gobject_class, PROP_DO_RTSP_KEEP_ALIVE,
566       g_param_spec_boolean ("do-rtsp-keep-alive", "Do RTSP Keep Alive",
567           "Send RTSP keep alive packets, disable for old incompatible server.",
568           DEFAULT_DO_RTSP_KEEP_ALIVE,
569           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
570
571   /**
572    * GstRTSPClientSink:proxy:
573    *
574    * Set the proxy parameters. This has to be a string of the format
575    * [http://][user:passwd@]host[:port].
576    */
577   g_object_class_install_property (gobject_class, PROP_PROXY,
578       g_param_spec_string ("proxy", "Proxy",
579           "Proxy settings for HTTP tunneling. Format: [http://][user:passwd@]host[:port]",
580           DEFAULT_PROXY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
581   /**
582    * GstRTSPClientSink:proxy-id:
583    *
584    * Sets the proxy URI user id for authentication. If the URI set via the
585    * "proxy" property contains a user-id already, that will take precedence.
586    *
587    */
588   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
589       g_param_spec_string ("proxy-id", "proxy-id",
590           "HTTP proxy URI user id for authentication", "",
591           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
592   /**
593    * GstRTSPClientSink:proxy-pw:
594    *
595    * Sets the proxy URI password for authentication. If the URI set via the
596    * "proxy" property contains a password already, that will take precedence.
597    *
598    */
599   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
600       g_param_spec_string ("proxy-pw", "proxy-pw",
601           "HTTP proxy URI user password for authentication", "",
602           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
603
604   /**
605    * GstRTSPClientSink:rtp-blocksize:
606    *
607    * RTP package size to suggest to server.
608    */
609   g_object_class_install_property (gobject_class, PROP_RTP_BLOCKSIZE,
610       g_param_spec_uint ("rtp-blocksize", "RTP Blocksize",
611           "RTP package size to suggest to server (0 = disabled)",
612           0, 65536, DEFAULT_RTP_BLOCKSIZE,
613           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
614
615   g_object_class_install_property (gobject_class,
616       PROP_USER_ID,
617       g_param_spec_string ("user-id", "user-id",
618           "RTSP location URI user id for authentication", DEFAULT_USER_ID,
619           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
620   g_object_class_install_property (gobject_class, PROP_USER_PW,
621       g_param_spec_string ("user-pw", "user-pw",
622           "RTSP location URI user password for authentication", DEFAULT_USER_PW,
623           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
624
625   /**
626    * GstRTSPClientSink:port-range:
627    *
628    * Configure the client port numbers that can be used to receive
629    * RTCP.
630    */
631   g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
632       g_param_spec_string ("port-range", "Port range",
633           "Client port range that can be used to receive RTCP data, "
634           "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE,
635           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
636
637   /**
638    * GstRTSPClientSink:udp-buffer-size:
639    *
640    * Size of the kernel UDP receive buffer in bytes.
641    */
642   g_object_class_install_property (gobject_class, PROP_UDP_BUFFER_SIZE,
643       g_param_spec_int ("udp-buffer-size", "UDP Buffer Size",
644           "Size of the kernel UDP receive buffer in bytes, 0=default",
645           0, G_MAXINT, DEFAULT_UDP_BUFFER_SIZE,
646           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
647
648   g_object_class_install_property (gobject_class, PROP_UDP_RECONNECT,
649       g_param_spec_boolean ("udp-reconnect", "Reconnect to the server",
650           "Reconnect to the server if RTSP connection is closed when doing UDP",
651           DEFAULT_UDP_RECONNECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
652
653   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
654       g_param_spec_string ("multicast-iface", "Multicast Interface",
655           "The network interface on which to join the multicast group",
656           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
657
658   g_object_class_install_property (gobject_class, PROP_SDES,
659       g_param_spec_boxed ("sdes", "SDES",
660           "The SDES items of this session",
661           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
662
663   /**
664    * GstRTSPClientSink::tls-validation-flags:
665    *
666    * TLS certificate validation flags used to validate server
667    * certificate.
668    *
669    */
670   g_object_class_install_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
671       g_param_spec_flags ("tls-validation-flags", "TLS validation flags",
672           "TLS certificate validation flags used to validate the server certificate",
673           G_TYPE_TLS_CERTIFICATE_FLAGS, DEFAULT_TLS_VALIDATION_FLAGS,
674           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
675
676   /**
677    * GstRTSPClientSink::tls-database:
678    *
679    * TLS database with anchor certificate authorities used to validate
680    * the server certificate.
681    *
682    */
683   g_object_class_install_property (gobject_class, PROP_TLS_DATABASE,
684       g_param_spec_object ("tls-database", "TLS database",
685           "TLS database with anchor certificate authorities used to validate the server certificate",
686           G_TYPE_TLS_DATABASE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
687
688   /**
689    * GstRTSPClientSink::tls-interaction:
690    *
691    * A #GTlsInteraction object to be used when the connection or certificate
692    * database need to interact with the user. This will be used to prompt the
693    * user for passwords where necessary.
694    *
695    */
696   g_object_class_install_property (gobject_class, PROP_TLS_INTERACTION,
697       g_param_spec_object ("tls-interaction", "TLS interaction",
698           "A GTlsInteraction object to prompt the user for password or certificate",
699           G_TYPE_TLS_INTERACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
700
701   /**
702    * GstRTSPClientSink::ntp-time-source:
703    *
704    * allows to select the time source that should be used
705    * for the NTP time in outgoing packets
706    *
707    */
708   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
709       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
710           "NTP time source for RTCP packets",
711           GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE, DEFAULT_NTP_TIME_SOURCE,
712           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
713
714   /**
715    * GstRTSPClientSink::user-agent:
716    *
717    * The string to set in the User-Agent header.
718    *
719    */
720   g_object_class_install_property (gobject_class, PROP_USER_AGENT,
721       g_param_spec_string ("user-agent", "User Agent",
722           "The User-Agent string to send to the server",
723           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
724
725   /**
726    * GstRTSPClientSink::handle-request:
727    * @rtsp_client_sink: a #GstRTSPClientSink
728    * @request: a #GstRTSPMessage
729    * @response: a #GstRTSPMessage
730    *
731    * Handle a server request in @request and prepare @response.
732    *
733    * This signal is called from the streaming thread, you should therefore not
734    * do any state changes on @rtsp_client_sink because this might deadlock. If you want
735    * to modify the state as a result of this signal, post a
736    * #GST_MESSAGE_REQUEST_STATE message on the bus or signal the main thread
737    * in some other way.
738    *
739    */
740   gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST] =
741       g_signal_new ("handle-request", G_TYPE_FROM_CLASS (klass), 0,
742       0, NULL, NULL, NULL, G_TYPE_NONE, 2,
743       GST_TYPE_RTSP_MESSAGE | G_SIGNAL_TYPE_STATIC_SCOPE,
744       GST_TYPE_RTSP_MESSAGE | G_SIGNAL_TYPE_STATIC_SCOPE);
745
746   /**
747    * GstRTSPClientSink::new-manager:
748    * @rtsp_client_sink: a #GstRTSPClientSink
749    * @manager: a #GstElement
750    *
751    * Emitted after a new manager (like rtpbin) was created and the default
752    * properties were configured.
753    *
754    */
755   gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER] =
756       g_signal_new_class_handler ("new-manager", G_TYPE_FROM_CLASS (klass),
757       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL, NULL,
758       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
759
760   /**
761    * GstRTSPClientSink::new-payloader:
762    * @rtsp_client_sink: a #GstRTSPClientSink
763    * @payloader: a #GstElement
764    *
765    * Emitted after a new RTP payloader was created and the default
766    * properties were configured.
767    *
768    */
769   gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER] =
770       g_signal_new_class_handler ("new-payloader", G_TYPE_FROM_CLASS (klass),
771       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL, NULL,
772       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
773
774   /**
775    * GstRTSPClientSink::request-rtcp-key:
776    * @rtsp_client_sink: a #GstRTSPClientSink
777    * @num: the stream number
778    *
779    * Signal emitted to get the crypto parameters relevant to the RTCP
780    * stream. User should provide the key and the RTCP encryption ciphers
781    * and authentication, and return them wrapped in a GstCaps.
782    *
783    */
784   gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY] =
785       g_signal_new ("request-rtcp-key", G_TYPE_FROM_CLASS (klass),
786       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
787
788   /**
789    * GstRTSPClientSink::accept-certificate:
790    * @rtsp_client_sink: a #GstRTSPClientSink
791    * @peer_cert: the peer's #GTlsCertificate
792    * @errors: the problems with @peer_cert
793    * @user_data: user data set when the signal handler was connected.
794    *
795    * This will directly map to #GTlsConnection 's "accept-certificate"
796    * signal and be performed after the default checks of #GstRTSPConnection
797    * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
798    * have failed. If no #GTlsDatabase is set on this connection, only this
799    * signal will be emitted.
800    *
801    * Since: 1.14
802    */
803   gst_rtsp_client_sink_signals[SIGNAL_ACCEPT_CERTIFICATE] =
804       g_signal_new ("accept-certificate", G_TYPE_FROM_CLASS (klass),
805       G_SIGNAL_RUN_LAST, 0, g_signal_accumulator_true_handled, NULL, NULL,
806       G_TYPE_BOOLEAN, 3, G_TYPE_TLS_CONNECTION, G_TYPE_TLS_CERTIFICATE,
807       G_TYPE_TLS_CERTIFICATE_FLAGS);
808
809   /**
810    * GstRTSPClientSink::update-sdp:
811    * @rtsp_client_sink: a #GstRTSPClientSink
812    * @sdp: a #GstSDPMessage
813    *
814    * Emitted right before the ANNOUNCE request is sent to the server with the
815    * generated SDP. The SDP can be updated from signal handlers but the order
816    * and number of medias must not be changed.
817    *
818    * Since: 1.20
819    */
820   gst_rtsp_client_sink_signals[SIGNAL_UPDATE_SDP] =
821       g_signal_new_class_handler ("update-sdp", G_TYPE_FROM_CLASS (klass),
822       0, 0, NULL, NULL, NULL,
823       G_TYPE_NONE, 1, GST_TYPE_SDP_MESSAGE | G_SIGNAL_TYPE_STATIC_SCOPE);
824
825   gstelement_class->provide_clock = gst_rtsp_client_sink_provide_clock;
826   gstelement_class->change_state = gst_rtsp_client_sink_change_state;
827   gstelement_class->request_new_pad =
828       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_request_new_pad);
829   gstelement_class->release_pad =
830       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_release_pad);
831
832   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
833       &rtptemplate, GST_TYPE_RTSP_CLIENT_SINK_PAD);
834
835   gst_element_class_set_static_metadata (gstelement_class,
836       "RTSP RECORD client", "Sink/Network",
837       "Send data over the network via RTSP RECORD(RFC 2326)",
838       "Jan Schmidt <jan@centricular.com>");
839
840   gstbin_class->handle_message = gst_rtsp_client_sink_handle_message;
841
842   gst_type_mark_as_plugin_api (GST_TYPE_RTSP_CLIENT_SINK_PAD, 0);
843   gst_type_mark_as_plugin_api (GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE, 0);
844 }
845
846 static void
847 gst_rtsp_client_sink_init (GstRTSPClientSink * sink)
848 {
849   sink->conninfo.location = g_strdup (DEFAULT_LOCATION);
850   sink->protocols = DEFAULT_PROTOCOLS;
851   sink->debug = DEFAULT_DEBUG;
852   sink->retry = DEFAULT_RETRY;
853   sink->udp_timeout = DEFAULT_TIMEOUT;
854   gst_rtsp_client_sink_set_tcp_timeout (sink, DEFAULT_TCP_TIMEOUT);
855   sink->latency = DEFAULT_LATENCY_MS;
856   sink->rtx_time = DEFAULT_RTX_TIME_MS;
857   sink->do_rtsp_keep_alive = DEFAULT_DO_RTSP_KEEP_ALIVE;
858   gst_rtsp_client_sink_set_proxy (sink, DEFAULT_PROXY);
859   sink->rtp_blocksize = DEFAULT_RTP_BLOCKSIZE;
860   sink->user_id = g_strdup (DEFAULT_USER_ID);
861   sink->user_pw = g_strdup (DEFAULT_USER_PW);
862   sink->client_port_range.min = 0;
863   sink->client_port_range.max = 0;
864   sink->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
865   sink->udp_reconnect = DEFAULT_UDP_RECONNECT;
866   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
867   sink->sdes = NULL;
868   sink->tls_validation_flags = DEFAULT_TLS_VALIDATION_FLAGS;
869   sink->tls_database = DEFAULT_TLS_DATABASE;
870   sink->tls_interaction = DEFAULT_TLS_INTERACTION;
871   sink->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
872   sink->user_agent = g_strdup (DEFAULT_USER_AGENT);
873
874   sink->profiles = DEFAULT_PROFILES;
875
876   /* protects the streaming thread in interleaved mode or the polling
877    * thread in UDP mode. */
878   g_rec_mutex_init (&sink->stream_rec_lock);
879
880   /* protects our state changes from multiple invocations */
881   g_rec_mutex_init (&sink->state_rec_lock);
882
883   g_mutex_init (&sink->send_lock);
884
885   g_mutex_init (&sink->preroll_lock);
886   g_cond_init (&sink->preroll_cond);
887
888   sink->state = GST_RTSP_STATE_INVALID;
889
890   g_mutex_init (&sink->conninfo.send_lock);
891   g_mutex_init (&sink->conninfo.recv_lock);
892
893   g_mutex_init (&sink->block_streams_lock);
894   g_cond_init (&sink->block_streams_cond);
895
896   g_mutex_init (&sink->open_conn_lock);
897   g_cond_init (&sink->open_conn_cond);
898
899   sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
900   g_object_set (sink->internal_bin, "async-handling", TRUE, NULL);
901   gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
902   gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
903
904   sink->next_dyn_pt = 96;
905
906   gst_sdp_message_init (&sink->cursdp);
907
908   GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
909 }
910
911 static void
912 gst_rtsp_client_sink_finalize (GObject * object)
913 {
914   GstRTSPClientSink *rtsp_client_sink;
915
916   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
917
918   gst_sdp_message_uninit (&rtsp_client_sink->cursdp);
919
920   g_free (rtsp_client_sink->conninfo.location);
921   gst_rtsp_url_free (rtsp_client_sink->conninfo.url);
922   g_free (rtsp_client_sink->conninfo.url_str);
923   g_free (rtsp_client_sink->user_id);
924   g_free (rtsp_client_sink->user_pw);
925   g_free (rtsp_client_sink->multi_iface);
926   g_free (rtsp_client_sink->user_agent);
927
928   if (rtsp_client_sink->uri_sdp) {
929     gst_sdp_message_free (rtsp_client_sink->uri_sdp);
930     rtsp_client_sink->uri_sdp = NULL;
931   }
932   if (rtsp_client_sink->provided_clock)
933     gst_object_unref (rtsp_client_sink->provided_clock);
934
935   if (rtsp_client_sink->sdes)
936     gst_structure_free (rtsp_client_sink->sdes);
937
938   if (rtsp_client_sink->tls_database)
939     g_object_unref (rtsp_client_sink->tls_database);
940
941   if (rtsp_client_sink->tls_interaction)
942     g_object_unref (rtsp_client_sink->tls_interaction);
943
944   /* free locks */
945   g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
946   g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
947
948   g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
949   g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
950
951   g_mutex_clear (&rtsp_client_sink->send_lock);
952
953   g_mutex_clear (&rtsp_client_sink->preroll_lock);
954   g_cond_clear (&rtsp_client_sink->preroll_cond);
955
956   g_mutex_clear (&rtsp_client_sink->block_streams_lock);
957   g_cond_clear (&rtsp_client_sink->block_streams_cond);
958
959   g_mutex_clear (&rtsp_client_sink->open_conn_lock);
960   g_cond_clear (&rtsp_client_sink->open_conn_cond);
961
962   G_OBJECT_CLASS (parent_class)->finalize (object);
963 }
964
965 static gboolean
966 gst_rtp_payloader_filter_func (GstPluginFeature * feature, gpointer user_data)
967 {
968   GstElementFactory *factory = NULL;
969   const gchar *klass;
970
971   if (!GST_IS_ELEMENT_FACTORY (feature))
972     return FALSE;
973
974   factory = GST_ELEMENT_FACTORY (feature);
975
976   if (gst_plugin_feature_get_rank (feature) == GST_RANK_NONE)
977     return FALSE;
978
979   if (!gst_element_factory_list_is_type (factory,
980           GST_ELEMENT_FACTORY_TYPE_PAYLOADER))
981     return FALSE;
982
983   klass =
984       gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
985   if (strstr (klass, "Codec") == NULL)
986     return FALSE;
987   if (strstr (klass, "RTP") == NULL)
988     return FALSE;
989
990   return TRUE;
991 }
992
993 static gint
994 compare_ranks (GstPluginFeature * f1, GstPluginFeature * f2)
995 {
996   gint diff;
997   const gchar *rname1, *rname2;
998   GstRank rank1, rank2;
999
1000   rname1 = gst_plugin_feature_get_name (f1);
1001   rname2 = gst_plugin_feature_get_name (f2);
1002
1003   rank1 = gst_plugin_feature_get_rank (f1);
1004   rank2 = gst_plugin_feature_get_rank (f2);
1005
1006   /* HACK: Prefer rtpmp4apay over rtpmp4gpay */
1007   if (g_str_equal (rname1, "rtpmp4apay"))
1008     rank1 = GST_RANK_SECONDARY + 1;
1009   if (g_str_equal (rname2, "rtpmp4apay"))
1010     rank2 = GST_RANK_SECONDARY + 1;
1011
1012   diff = rank2 - rank1;
1013   if (diff != 0)
1014     return diff;
1015
1016   diff = strcmp (rname2, rname1);
1017
1018   return diff;
1019 }
1020
1021 static GList *
1022 gst_rtsp_client_sink_get_factories (void)
1023 {
1024   static GList *payloader_factories = NULL;
1025
1026   if (g_once_init_enter (&payloader_factories)) {
1027     GList *all_factories;
1028
1029     all_factories =
1030         gst_registry_feature_filter (gst_registry_get (),
1031         gst_rtp_payloader_filter_func, FALSE, NULL);
1032
1033     all_factories = g_list_sort (all_factories, (GCompareFunc) compare_ranks);
1034
1035     g_once_init_leave (&payloader_factories, all_factories);
1036   }
1037
1038   return payloader_factories;
1039 }
1040
1041 static GstCaps *
1042 gst_rtsp_client_sink_get_payloader_caps (GstElementFactory * factory)
1043 {
1044   const GList *tmp;
1045   GstCaps *caps = gst_caps_new_empty ();
1046
1047   for (tmp = gst_element_factory_get_static_pad_templates (factory);
1048       tmp; tmp = g_list_next (tmp)) {
1049     GstStaticPadTemplate *template = tmp->data;
1050
1051     if (template->direction == GST_PAD_SINK) {
1052       GstCaps *static_caps = gst_static_pad_template_get_caps (template);
1053
1054       GST_LOG ("Found pad template %s on factory %s",
1055           template->name_template, gst_plugin_feature_get_name (factory));
1056
1057       if (static_caps)
1058         caps = gst_caps_merge (caps, static_caps);
1059
1060       /* Early out, any is absorbing */
1061       if (gst_caps_is_any (caps))
1062         goto out;
1063     }
1064   }
1065
1066 out:
1067   return caps;
1068 }
1069
1070 static GstCaps *
1071 gst_rtsp_client_sink_get_all_payloaders_caps (void)
1072 {
1073   /* Cached caps result */
1074   static GstCaps *ret;
1075
1076   if (g_once_init_enter (&ret)) {
1077     GList *factories, *cur;
1078     GstCaps *caps = gst_caps_new_empty ();
1079
1080     factories = gst_rtsp_client_sink_get_factories ();
1081     for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
1082       GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
1083       GstCaps *payloader_caps =
1084           gst_rtsp_client_sink_get_payloader_caps (factory);
1085
1086       caps = gst_caps_merge (caps, payloader_caps);
1087
1088       /* Early out, any is absorbing */
1089       if (gst_caps_is_any (caps))
1090         goto out;
1091     }
1092
1093   out:
1094     g_once_init_leave (&ret, caps);
1095   }
1096
1097   /* Return cached result */
1098   return gst_caps_ref (ret);
1099 }
1100
1101 static GstElement *
1102 gst_rtsp_client_sink_make_payloader (GstCaps * caps)
1103 {
1104   GList *factories, *cur;
1105
1106   factories = gst_rtsp_client_sink_get_factories ();
1107   for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
1108     GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
1109     const GList *tmp;
1110
1111     for (tmp = gst_element_factory_get_static_pad_templates (factory);
1112         tmp; tmp = g_list_next (tmp)) {
1113       GstStaticPadTemplate *template = tmp->data;
1114
1115       if (template->direction == GST_PAD_SINK) {
1116         GstCaps *static_caps = gst_static_pad_template_get_caps (template);
1117         GstElement *payloader = NULL;
1118
1119         if (gst_caps_can_intersect (static_caps, caps)) {
1120           GST_DEBUG ("caps %" GST_PTR_FORMAT " intersects with template %"
1121               GST_PTR_FORMAT " for payloader %s", caps, static_caps,
1122               gst_plugin_feature_get_name (factory));
1123           payloader = gst_element_factory_create (factory, NULL);
1124         }
1125
1126         gst_caps_unref (static_caps);
1127
1128         if (payloader)
1129           return payloader;
1130       }
1131     }
1132   }
1133
1134   return NULL;
1135 }
1136
1137 static GstRTSPStream *
1138 gst_rtsp_client_sink_create_stream (GstRTSPClientSink * sink,
1139     GstRTSPStreamContext * context, GstElement * payloader, GstPad * pad)
1140 {
1141   GstRTSPStream *stream = NULL;
1142   guint pt, aux_pt, ulpfec_pt;
1143
1144   GST_OBJECT_LOCK (sink);
1145
1146   g_object_get (G_OBJECT (payloader), "pt", &pt, NULL);
1147   if (pt >= 96 && pt <= sink->next_dyn_pt) {
1148     /* Payloader has a dynamic PT, but one that's already used */
1149     /* FIXME: Create a caps->ptmap instead? */
1150     pt = sink->next_dyn_pt;
1151
1152     if (pt > 127)
1153       goto no_free_pt;
1154
1155     GST_DEBUG_OBJECT (sink, "Assigning pt %u to stream %d", pt, context->index);
1156
1157     sink->next_dyn_pt++;
1158   } else {
1159     GST_DEBUG_OBJECT (sink, "Keeping existing pt %u for stream %d",
1160         pt, context->index);
1161   }
1162
1163   aux_pt = sink->next_dyn_pt;
1164   if (aux_pt > 127)
1165     goto no_free_pt;
1166   sink->next_dyn_pt++;
1167
1168   ulpfec_pt = sink->next_dyn_pt;
1169   if (ulpfec_pt > 127)
1170     goto no_free_pt;
1171   sink->next_dyn_pt++;
1172
1173   GST_OBJECT_UNLOCK (sink);
1174
1175
1176   g_object_set (G_OBJECT (payloader), "pt", pt, NULL);
1177
1178   stream = gst_rtsp_stream_new (context->index, payloader, pad);
1179
1180   gst_rtsp_stream_set_client_side (stream, TRUE);
1181   gst_rtsp_stream_set_retransmission_time (stream,
1182       (GstClockTime) (sink->rtx_time) * GST_MSECOND);
1183   gst_rtsp_stream_set_protocols (stream, sink->protocols);
1184   gst_rtsp_stream_set_profiles (stream, sink->profiles);
1185   gst_rtsp_stream_set_retransmission_pt (stream, aux_pt);
1186   gst_rtsp_stream_set_buffer_size (stream, sink->udp_buffer_size);
1187   if (sink->rtp_blocksize > 0)
1188     gst_rtsp_stream_set_mtu (stream, sink->rtp_blocksize);
1189   gst_rtsp_stream_set_multicast_iface (stream, sink->multi_iface);
1190
1191   gst_rtsp_stream_set_ulpfec_pt (stream, ulpfec_pt);
1192   gst_rtsp_stream_set_ulpfec_percentage (stream, context->ulpfec_percentage);
1193
1194 #if 0
1195   if (priv->pool)
1196     gst_rtsp_stream_set_address_pool (stream, priv->pool);
1197 #endif
1198
1199   return stream;
1200 no_free_pt:
1201   GST_OBJECT_UNLOCK (sink);
1202
1203   GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL),
1204       ("Ran out of dynamic payload types."));
1205
1206   return NULL;
1207 }
1208
1209 static GstPadProbeReturn
1210 handle_payloader_block (GstPad * pad, GstPadProbeInfo * info,
1211     GstRTSPStreamContext * context)
1212 {
1213   GstRTSPClientSink *sink = context->parent;
1214
1215   GST_INFO_OBJECT (sink, "Block on pad %" GST_PTR_FORMAT, pad);
1216
1217   g_mutex_lock (&sink->preroll_lock);
1218   context->prerolled = TRUE;
1219   g_cond_broadcast (&sink->preroll_cond);
1220   g_mutex_unlock (&sink->preroll_lock);
1221
1222   GST_INFO_OBJECT (sink, "Announced preroll on pad %" GST_PTR_FORMAT, pad);
1223
1224   return GST_PAD_PROBE_OK;
1225 }
1226
1227 static gboolean
1228 gst_rtsp_client_sink_setup_payloader (GstRTSPClientSink * sink, GstPad * pad,
1229     GstCaps * caps)
1230 {
1231   GstRTSPStreamContext *context;
1232   GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1233
1234   GstElement *payloader;
1235   GstPad *sinkpad, *srcpad, *ghostsink;
1236
1237   context = gst_pad_get_element_private (pad);
1238
1239   if (cspad->custom_payloader) {
1240     payloader = cspad->custom_payloader;
1241   } else {
1242     /* Find the payloader. */
1243     payloader = gst_rtsp_client_sink_make_payloader (caps);
1244   }
1245
1246   if (payloader == NULL)
1247     return FALSE;
1248
1249   GST_DEBUG_OBJECT (sink, "Configuring payloader %" GST_PTR_FORMAT
1250       " for pad %" GST_PTR_FORMAT, payloader, pad);
1251
1252   sinkpad = gst_element_get_static_pad (payloader, "sink");
1253   if (sinkpad == NULL)
1254     goto no_sinkpad;
1255
1256   srcpad = gst_element_get_static_pad (payloader, "src");
1257   if (srcpad == NULL)
1258     goto no_srcpad;
1259
1260   gst_bin_add (GST_BIN (sink->internal_bin), payloader);
1261   ghostsink = gst_ghost_pad_new (NULL, sinkpad);
1262   gst_pad_set_active (ghostsink, TRUE);
1263   gst_element_add_pad (GST_ELEMENT (sink->internal_bin), ghostsink);
1264
1265   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER], 0,
1266       payloader);
1267
1268   GST_RTSP_STATE_LOCK (sink);
1269   context->payloader_block_id =
1270       gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1271       (GstPadProbeCallback) handle_payloader_block, context, NULL);
1272   context->payloader = payloader;
1273
1274   payloader = gst_object_ref (payloader);
1275
1276   gst_ghost_pad_set_target (GST_GHOST_PAD (pad), ghostsink);
1277   gst_object_unref (GST_OBJECT (sinkpad));
1278   GST_RTSP_STATE_UNLOCK (sink);
1279
1280   context->ulpfec_percentage = cspad->ulpfec_percentage;
1281
1282   gst_element_sync_state_with_parent (payloader);
1283
1284   gst_object_unref (payloader);
1285   gst_object_unref (GST_OBJECT (srcpad));
1286
1287   return TRUE;
1288
1289 no_sinkpad:
1290   GST_ERROR_OBJECT (sink,
1291       "Could not find sink pad on payloader %" GST_PTR_FORMAT, payloader);
1292   if (!cspad->custom_payloader)
1293     gst_object_unref (payloader);
1294   return FALSE;
1295
1296 no_srcpad:
1297   GST_ERROR_OBJECT (sink,
1298       "Could not find src pad on payloader %" GST_PTR_FORMAT, payloader);
1299   gst_object_unref (GST_OBJECT (sinkpad));
1300   gst_object_unref (payloader);
1301   return TRUE;
1302 }
1303
1304 static gboolean
1305 gst_rtsp_client_sink_sinkpad_event (GstPad * pad, GstObject * parent,
1306     GstEvent * event)
1307 {
1308   if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
1309     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1310     if (target == NULL) {
1311       GstCaps *caps;
1312
1313       /* No target yet - choose a payloader and configure it */
1314       gst_event_parse_caps (event, &caps);
1315
1316       GST_DEBUG_OBJECT (parent,
1317           "Have set caps event on pad %" GST_PTR_FORMAT
1318           " caps %" GST_PTR_FORMAT, pad, caps);
1319
1320       if (!gst_rtsp_client_sink_setup_payloader (GST_RTSP_CLIENT_SINK (parent),
1321               pad, caps)) {
1322         GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1323         GST_ELEMENT_ERROR (parent, CORE, NEGOTIATION,
1324             ("Could not create payloader"),
1325             ("Custom payloader: %p, caps: %" GST_PTR_FORMAT,
1326                 cspad->custom_payloader, caps));
1327         gst_event_unref (event);
1328         return FALSE;
1329       }
1330     } else {
1331       gst_object_unref (target);
1332     }
1333   }
1334
1335   return gst_pad_event_default (pad, parent, event);
1336 }
1337
1338 static gboolean
1339 gst_rtsp_client_sink_sinkpad_query (GstPad * pad, GstObject * parent,
1340     GstQuery * query)
1341 {
1342   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1343     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1344     if (target == NULL) {
1345       GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1346       GstCaps *caps;
1347
1348       if (cspad->custom_payloader) {
1349         GstPad *sinkpad =
1350             gst_element_get_static_pad (cspad->custom_payloader, "sink");
1351
1352         if (sinkpad) {
1353           caps = gst_pad_query_caps (sinkpad, NULL);
1354           gst_object_unref (sinkpad);
1355         } else {
1356           GST_ELEMENT_ERROR (parent, CORE, NEGOTIATION, (NULL),
1357               ("Custom payloaders are expected to expose a sink pad named 'sink'"));
1358           return FALSE;
1359         }
1360       } else {
1361         /* No target yet - return the union of all payloader caps */
1362         caps = gst_rtsp_client_sink_get_all_payloaders_caps ();
1363       }
1364
1365       GST_TRACE_OBJECT (parent, "Returning payloader caps %" GST_PTR_FORMAT,
1366           caps);
1367
1368       gst_query_set_caps_result (query, caps);
1369       gst_caps_unref (caps);
1370
1371       return TRUE;
1372     }
1373     gst_object_unref (target);
1374   }
1375
1376   return gst_pad_query_default (pad, parent, query);
1377 }
1378
1379 static GstPad *
1380 gst_rtsp_client_sink_request_new_pad (GstElement * element,
1381     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1382 {
1383   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1384   GstPad *pad;
1385   GstRTSPStreamContext *context;
1386   guint idx = (guint) - 1;
1387   gchar *tmpname;
1388
1389   g_mutex_lock (&sink->preroll_lock);
1390   if (sink->streams_collected) {
1391     GST_WARNING_OBJECT (element, "Can't add streams to a running session");
1392     g_mutex_unlock (&sink->preroll_lock);
1393     return NULL;
1394   }
1395   g_mutex_unlock (&sink->preroll_lock);
1396
1397   GST_OBJECT_LOCK (sink);
1398   if (name) {
1399     if (!sscanf (name, "sink_%u", &idx)) {
1400       GST_OBJECT_UNLOCK (sink);
1401       GST_ERROR_OBJECT (element, "Invalid sink pad name %s", name);
1402       return NULL;
1403     }
1404
1405     if (idx >= sink->next_pad_id)
1406       sink->next_pad_id = idx + 1;
1407   }
1408   if (idx == (guint) - 1) {
1409     idx = sink->next_pad_id;
1410     sink->next_pad_id++;
1411   }
1412   GST_OBJECT_UNLOCK (sink);
1413
1414   tmpname = g_strdup_printf ("sink_%u", idx);
1415   pad = gst_rtsp_client_sink_pad_new (templ, tmpname);
1416   g_free (tmpname);
1417
1418   GST_DEBUG_OBJECT (element, "Creating request pad %" GST_PTR_FORMAT, pad);
1419
1420   gst_pad_set_event_function (pad,
1421       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_event));
1422   gst_pad_set_query_function (pad,
1423       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_query));
1424
1425   context = g_new0 (GstRTSPStreamContext, 1);
1426   context->parent = sink;
1427   context->index = idx;
1428
1429   gst_pad_set_element_private (pad, context);
1430
1431   /* The rest of the context is configured on a caps set */
1432   gst_pad_set_active (pad, TRUE);
1433   gst_element_add_pad (element, pad);
1434   gst_child_proxy_child_added (GST_CHILD_PROXY (element), G_OBJECT (pad),
1435       GST_PAD_NAME (pad));
1436
1437   (void) gst_rtsp_client_sink_get_factories ();
1438
1439   g_mutex_init (&context->conninfo.send_lock);
1440   g_mutex_init (&context->conninfo.recv_lock);
1441
1442   GST_RTSP_STATE_LOCK (sink);
1443   sink->contexts = g_list_prepend (sink->contexts, context);
1444   GST_RTSP_STATE_UNLOCK (sink);
1445
1446   return pad;
1447 }
1448
1449 static void
1450 gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad)
1451 {
1452   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1453   GstRTSPStreamContext *context;
1454
1455   context = gst_pad_get_element_private (pad);
1456
1457   /* FIXME: we may need to change our blocking state waiting for
1458    * GstRTSPStreamBlocking messages */
1459
1460   GST_RTSP_STATE_LOCK (sink);
1461   sink->contexts = g_list_remove (sink->contexts, context);
1462   GST_RTSP_STATE_UNLOCK (sink);
1463
1464   /* FIXME: Shut down and clean up streaming on this pad,
1465    * do teardown if needed */
1466   GST_LOG_OBJECT (sink,
1467       "Cleaning up payloader and stream for released pad %" GST_PTR_FORMAT,
1468       pad);
1469
1470   if (context->stream_transport) {
1471     gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1472     gst_object_unref (context->stream_transport);
1473     context->stream_transport = NULL;
1474   }
1475   if (context->stream) {
1476     if (context->joined) {
1477       gst_rtsp_stream_leave_bin (context->stream,
1478           GST_BIN (sink->internal_bin), sink->rtpbin);
1479       context->joined = FALSE;
1480     }
1481     gst_object_unref (context->stream);
1482     context->stream = NULL;
1483   }
1484   if (context->srtcpparams)
1485     gst_caps_unref (context->srtcpparams);
1486
1487   g_free (context->conninfo.location);
1488   context->conninfo.location = NULL;
1489
1490   g_mutex_clear (&context->conninfo.send_lock);
1491   g_mutex_clear (&context->conninfo.recv_lock);
1492
1493   g_free (context);
1494
1495   gst_element_remove_pad (element, pad);
1496 }
1497
1498 static GstClock *
1499 gst_rtsp_client_sink_provide_clock (GstElement * element)
1500 {
1501   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1502   GstClock *clock;
1503
1504   if ((clock = sink->provided_clock) != NULL)
1505     gst_object_ref (clock);
1506
1507   return clock;
1508 }
1509
1510 /* a proxy string of the format [user:passwd@]host[:port] */
1511 static gboolean
1512 gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp, const gchar * proxy)
1513 {
1514   gchar *p, *at, *col;
1515
1516   g_free (rtsp->proxy_user);
1517   rtsp->proxy_user = NULL;
1518   g_free (rtsp->proxy_passwd);
1519   rtsp->proxy_passwd = NULL;
1520   g_free (rtsp->proxy_host);
1521   rtsp->proxy_host = NULL;
1522   rtsp->proxy_port = 0;
1523
1524   p = (gchar *) proxy;
1525
1526   if (p == NULL)
1527     return TRUE;
1528
1529   /* we allow http:// in front but ignore it */
1530   if (g_str_has_prefix (p, "http://"))
1531     p += 7;
1532
1533   at = strchr (p, '@');
1534   if (at) {
1535     /* look for user:passwd */
1536     col = strchr (proxy, ':');
1537     if (col == NULL || col > at)
1538       return FALSE;
1539
1540     rtsp->proxy_user = g_strndup (p, col - p);
1541     col++;
1542     rtsp->proxy_passwd = g_strndup (col, at - col);
1543
1544     /* move to host */
1545     p = at + 1;
1546   } else {
1547     if (rtsp->prop_proxy_id != NULL && *rtsp->prop_proxy_id != '\0')
1548       rtsp->proxy_user = g_strdup (rtsp->prop_proxy_id);
1549     if (rtsp->prop_proxy_pw != NULL && *rtsp->prop_proxy_pw != '\0')
1550       rtsp->proxy_passwd = g_strdup (rtsp->prop_proxy_pw);
1551     if (rtsp->proxy_user != NULL || rtsp->proxy_passwd != NULL) {
1552       GST_LOG_OBJECT (rtsp, "set proxy user/pw from properties: %s:%s",
1553           GST_STR_NULL (rtsp->proxy_user), GST_STR_NULL (rtsp->proxy_passwd));
1554     }
1555   }
1556   col = strchr (p, ':');
1557
1558   if (col) {
1559     /* everything before the colon is the hostname */
1560     rtsp->proxy_host = g_strndup (p, col - p);
1561     p = col + 1;
1562     rtsp->proxy_port = strtoul (p, (char **) &p, 10);
1563   } else {
1564     rtsp->proxy_host = g_strdup (p);
1565     rtsp->proxy_port = 8080;
1566   }
1567   return TRUE;
1568 }
1569
1570 static void
1571 gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink * rtsp_client_sink,
1572     guint64 timeout)
1573 {
1574   rtsp_client_sink->tcp_timeout = timeout;
1575 }
1576
1577 static void
1578 gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
1579     const GValue * value, GParamSpec * pspec)
1580 {
1581   GstRTSPClientSink *rtsp_client_sink;
1582
1583   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1584
1585   switch (prop_id) {
1586     case PROP_LOCATION:
1587       gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (rtsp_client_sink),
1588           g_value_get_string (value), NULL);
1589       break;
1590     case PROP_PROTOCOLS:
1591       rtsp_client_sink->protocols = g_value_get_flags (value);
1592       break;
1593     case PROP_PROFILES:
1594       rtsp_client_sink->profiles = g_value_get_flags (value);
1595       break;
1596     case PROP_DEBUG:
1597       rtsp_client_sink->debug = g_value_get_boolean (value);
1598       break;
1599     case PROP_RETRY:
1600       rtsp_client_sink->retry = g_value_get_uint (value);
1601       break;
1602     case PROP_TIMEOUT:
1603       rtsp_client_sink->udp_timeout = g_value_get_uint64 (value);
1604       break;
1605     case PROP_TCP_TIMEOUT:
1606       gst_rtsp_client_sink_set_tcp_timeout (rtsp_client_sink,
1607           g_value_get_uint64 (value));
1608       break;
1609     case PROP_LATENCY:
1610       rtsp_client_sink->latency = g_value_get_uint (value);
1611       break;
1612     case PROP_RTX_TIME:
1613       rtsp_client_sink->rtx_time = g_value_get_uint (value);
1614       break;
1615     case PROP_DO_RTSP_KEEP_ALIVE:
1616       rtsp_client_sink->do_rtsp_keep_alive = g_value_get_boolean (value);
1617       break;
1618     case PROP_PROXY:
1619       gst_rtsp_client_sink_set_proxy (rtsp_client_sink,
1620           g_value_get_string (value));
1621       break;
1622     case PROP_PROXY_ID:
1623       if (rtsp_client_sink->prop_proxy_id)
1624         g_free (rtsp_client_sink->prop_proxy_id);
1625       rtsp_client_sink->prop_proxy_id = g_value_dup_string (value);
1626       break;
1627     case PROP_PROXY_PW:
1628       if (rtsp_client_sink->prop_proxy_pw)
1629         g_free (rtsp_client_sink->prop_proxy_pw);
1630       rtsp_client_sink->prop_proxy_pw = g_value_dup_string (value);
1631       break;
1632     case PROP_RTP_BLOCKSIZE:
1633       rtsp_client_sink->rtp_blocksize = g_value_get_uint (value);
1634       break;
1635     case PROP_USER_ID:
1636       if (rtsp_client_sink->user_id)
1637         g_free (rtsp_client_sink->user_id);
1638       rtsp_client_sink->user_id = g_value_dup_string (value);
1639       break;
1640     case PROP_USER_PW:
1641       if (rtsp_client_sink->user_pw)
1642         g_free (rtsp_client_sink->user_pw);
1643       rtsp_client_sink->user_pw = g_value_dup_string (value);
1644       break;
1645     case PROP_PORT_RANGE:
1646     {
1647       const gchar *str;
1648
1649       str = g_value_get_string (value);
1650       if (!str || !sscanf (str, "%u-%u",
1651               &rtsp_client_sink->client_port_range.min,
1652               &rtsp_client_sink->client_port_range.max)) {
1653         rtsp_client_sink->client_port_range.min = 0;
1654         rtsp_client_sink->client_port_range.max = 0;
1655       }
1656       break;
1657     }
1658     case PROP_UDP_BUFFER_SIZE:
1659       rtsp_client_sink->udp_buffer_size = g_value_get_int (value);
1660       break;
1661     case PROP_UDP_RECONNECT:
1662       rtsp_client_sink->udp_reconnect = g_value_get_boolean (value);
1663       break;
1664     case PROP_MULTICAST_IFACE:
1665       g_free (rtsp_client_sink->multi_iface);
1666
1667       if (g_value_get_string (value) == NULL)
1668         rtsp_client_sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1669       else
1670         rtsp_client_sink->multi_iface = g_value_dup_string (value);
1671       break;
1672     case PROP_SDES:
1673       rtsp_client_sink->sdes = g_value_dup_boxed (value);
1674       break;
1675     case PROP_TLS_VALIDATION_FLAGS:
1676       rtsp_client_sink->tls_validation_flags = g_value_get_flags (value);
1677       break;
1678     case PROP_TLS_DATABASE:
1679       g_clear_object (&rtsp_client_sink->tls_database);
1680       rtsp_client_sink->tls_database = g_value_dup_object (value);
1681       break;
1682     case PROP_TLS_INTERACTION:
1683       g_clear_object (&rtsp_client_sink->tls_interaction);
1684       rtsp_client_sink->tls_interaction = g_value_dup_object (value);
1685       break;
1686     case PROP_NTP_TIME_SOURCE:
1687       rtsp_client_sink->ntp_time_source = g_value_get_enum (value);
1688       break;
1689     case PROP_USER_AGENT:
1690       g_free (rtsp_client_sink->user_agent);
1691       rtsp_client_sink->user_agent = g_value_dup_string (value);
1692       break;
1693     default:
1694       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1695       break;
1696   }
1697 }
1698
1699 static void
1700 gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
1701     GValue * value, GParamSpec * pspec)
1702 {
1703   GstRTSPClientSink *rtsp_client_sink;
1704
1705   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1706
1707   switch (prop_id) {
1708     case PROP_LOCATION:
1709       g_value_set_string (value, rtsp_client_sink->conninfo.location);
1710       break;
1711     case PROP_PROTOCOLS:
1712       g_value_set_flags (value, rtsp_client_sink->protocols);
1713       break;
1714     case PROP_PROFILES:
1715       g_value_set_flags (value, rtsp_client_sink->profiles);
1716       break;
1717     case PROP_DEBUG:
1718       g_value_set_boolean (value, rtsp_client_sink->debug);
1719       break;
1720     case PROP_RETRY:
1721       g_value_set_uint (value, rtsp_client_sink->retry);
1722       break;
1723     case PROP_TIMEOUT:
1724       g_value_set_uint64 (value, rtsp_client_sink->udp_timeout);
1725       break;
1726     case PROP_TCP_TIMEOUT:
1727       g_value_set_uint64 (value, rtsp_client_sink->tcp_timeout);
1728       break;
1729     case PROP_LATENCY:
1730       g_value_set_uint (value, rtsp_client_sink->latency);
1731       break;
1732     case PROP_RTX_TIME:
1733       g_value_set_uint (value, rtsp_client_sink->rtx_time);
1734       break;
1735     case PROP_DO_RTSP_KEEP_ALIVE:
1736       g_value_set_boolean (value, rtsp_client_sink->do_rtsp_keep_alive);
1737       break;
1738     case PROP_PROXY:
1739     {
1740       gchar *str;
1741
1742       if (rtsp_client_sink->proxy_host) {
1743         str =
1744             g_strdup_printf ("%s:%d", rtsp_client_sink->proxy_host,
1745             rtsp_client_sink->proxy_port);
1746       } else {
1747         str = NULL;
1748       }
1749       g_value_take_string (value, str);
1750       break;
1751     }
1752     case PROP_PROXY_ID:
1753       g_value_set_string (value, rtsp_client_sink->prop_proxy_id);
1754       break;
1755     case PROP_PROXY_PW:
1756       g_value_set_string (value, rtsp_client_sink->prop_proxy_pw);
1757       break;
1758     case PROP_RTP_BLOCKSIZE:
1759       g_value_set_uint (value, rtsp_client_sink->rtp_blocksize);
1760       break;
1761     case PROP_USER_ID:
1762       g_value_set_string (value, rtsp_client_sink->user_id);
1763       break;
1764     case PROP_USER_PW:
1765       g_value_set_string (value, rtsp_client_sink->user_pw);
1766       break;
1767     case PROP_PORT_RANGE:
1768     {
1769       gchar *str;
1770
1771       if (rtsp_client_sink->client_port_range.min != 0) {
1772         str = g_strdup_printf ("%u-%u", rtsp_client_sink->client_port_range.min,
1773             rtsp_client_sink->client_port_range.max);
1774       } else {
1775         str = NULL;
1776       }
1777       g_value_take_string (value, str);
1778       break;
1779     }
1780     case PROP_UDP_BUFFER_SIZE:
1781       g_value_set_int (value, rtsp_client_sink->udp_buffer_size);
1782       break;
1783     case PROP_UDP_RECONNECT:
1784       g_value_set_boolean (value, rtsp_client_sink->udp_reconnect);
1785       break;
1786     case PROP_MULTICAST_IFACE:
1787       g_value_set_string (value, rtsp_client_sink->multi_iface);
1788       break;
1789     case PROP_SDES:
1790       g_value_set_boxed (value, rtsp_client_sink->sdes);
1791       break;
1792     case PROP_TLS_VALIDATION_FLAGS:
1793       g_value_set_flags (value, rtsp_client_sink->tls_validation_flags);
1794       break;
1795     case PROP_TLS_DATABASE:
1796       g_value_set_object (value, rtsp_client_sink->tls_database);
1797       break;
1798     case PROP_TLS_INTERACTION:
1799       g_value_set_object (value, rtsp_client_sink->tls_interaction);
1800       break;
1801     case PROP_NTP_TIME_SOURCE:
1802       g_value_set_enum (value, rtsp_client_sink->ntp_time_source);
1803       break;
1804     case PROP_USER_AGENT:
1805       g_value_set_string (value, rtsp_client_sink->user_agent);
1806       break;
1807     default:
1808       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1809       break;
1810   }
1811 }
1812
1813 static const gchar *
1814 get_aggregate_control (GstRTSPClientSink * sink)
1815 {
1816   const gchar *base;
1817
1818   if (sink->control)
1819     base = sink->control;
1820   else if (sink->content_base)
1821     base = sink->content_base;
1822   else if (sink->conninfo.url_str)
1823     base = sink->conninfo.url_str;
1824   else
1825     base = "/";
1826
1827   return base;
1828 }
1829
1830 static void
1831 gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink)
1832 {
1833   GList *walk;
1834
1835   GST_DEBUG_OBJECT (sink, "cleanup");
1836
1837   gst_element_set_state (GST_ELEMENT (sink->internal_bin), GST_STATE_NULL);
1838
1839   /* Clean up any left over stream objects */
1840   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
1841     GstRTSPStreamContext *context = (GstRTSPStreamContext *) (walk->data);
1842     if (context->stream_transport) {
1843       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1844       gst_object_unref (context->stream_transport);
1845       context->stream_transport = NULL;
1846     }
1847
1848     if (context->stream) {
1849       if (context->joined) {
1850         gst_rtsp_stream_leave_bin (context->stream,
1851             GST_BIN (sink->internal_bin), sink->rtpbin);
1852         context->joined = FALSE;
1853       }
1854       gst_object_unref (context->stream);
1855       context->stream = NULL;
1856     }
1857
1858     if (context->srtcpparams) {
1859       gst_caps_unref (context->srtcpparams);
1860       context->srtcpparams = NULL;
1861     }
1862     g_free (context->conninfo.location);
1863     context->conninfo.location = NULL;
1864   }
1865
1866   if (sink->rtpbin) {
1867     gst_element_set_state (sink->rtpbin, GST_STATE_NULL);
1868     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), sink->rtpbin);
1869     sink->rtpbin = NULL;
1870   }
1871
1872   g_free (sink->content_base);
1873   sink->content_base = NULL;
1874
1875   g_free (sink->control);
1876   sink->control = NULL;
1877
1878   if (sink->range)
1879     gst_rtsp_range_free (sink->range);
1880   sink->range = NULL;
1881
1882   /* don't clear the SDP when it was used in the url */
1883   if (sink->uri_sdp && !sink->from_sdp) {
1884     gst_sdp_message_free (sink->uri_sdp);
1885     sink->uri_sdp = NULL;
1886   }
1887
1888   if (sink->provided_clock) {
1889     gst_object_unref (sink->provided_clock);
1890     sink->provided_clock = NULL;
1891   }
1892
1893   g_free (sink->server_ip);
1894   sink->server_ip = NULL;
1895
1896   sink->next_pad_id = 0;
1897   sink->next_dyn_pt = 96;
1898 }
1899
1900 static GstRTSPResult
1901 gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
1902     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, gint64 timeout)
1903 {
1904   GstRTSPResult ret;
1905
1906   if (conninfo->connection) {
1907     g_mutex_lock (&conninfo->send_lock);
1908     ret =
1909         gst_rtsp_connection_send_usec (conninfo->connection, message, timeout);
1910     g_mutex_unlock (&conninfo->send_lock);
1911   } else {
1912     ret = GST_RTSP_ERROR;
1913   }
1914
1915   return ret;
1916 }
1917
1918 static GstRTSPResult
1919 gst_rtsp_client_sink_connection_send_messages (GstRTSPClientSink * sink,
1920     GstRTSPConnInfo * conninfo, GstRTSPMessage * messages, guint n_messages,
1921     gint64 timeout)
1922 {
1923   GstRTSPResult ret;
1924
1925   if (conninfo->connection) {
1926     g_mutex_lock (&conninfo->send_lock);
1927     ret =
1928         gst_rtsp_connection_send_messages_usec (conninfo->connection, messages,
1929         n_messages, timeout);
1930     g_mutex_unlock (&conninfo->send_lock);
1931   } else {
1932     ret = GST_RTSP_ERROR;
1933   }
1934
1935   return ret;
1936 }
1937
1938 static GstRTSPResult
1939 gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
1940     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, gint64 timeout)
1941 {
1942   GstRTSPResult ret;
1943
1944   if (conninfo->connection) {
1945     g_mutex_lock (&conninfo->recv_lock);
1946     ret = gst_rtsp_connection_receive_usec (conninfo->connection, message,
1947         timeout);
1948     g_mutex_unlock (&conninfo->recv_lock);
1949   } else {
1950     ret = GST_RTSP_ERROR;
1951   }
1952
1953   return ret;
1954 }
1955
1956 static gboolean
1957 accept_certificate_cb (GTlsConnection * conn, GTlsCertificate * peer_cert,
1958     GTlsCertificateFlags errors, gpointer user_data)
1959 {
1960   GstRTSPClientSink *sink = user_data;
1961   gboolean accept = FALSE;
1962
1963   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_ACCEPT_CERTIFICATE],
1964       0, conn, peer_cert, errors, &accept);
1965
1966   return accept;
1967 }
1968
1969 static GstRTSPResult
1970 gst_rtsp_conninfo_connect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1971     gboolean async)
1972 {
1973   GstRTSPResult res;
1974
1975   if (info->connection == NULL) {
1976     if (info->url == NULL) {
1977       GST_DEBUG_OBJECT (sink, "parsing uri (%s)...", info->location);
1978       if ((res = gst_rtsp_url_parse (info->location, &info->url)) < 0)
1979         goto parse_error;
1980     }
1981
1982     /* create connection */
1983     GST_DEBUG_OBJECT (sink, "creating connection (%s)...", info->location);
1984     if ((res = gst_rtsp_connection_create (info->url, &info->connection)) < 0)
1985       goto could_not_create;
1986
1987     if (info->url_str)
1988       g_free (info->url_str);
1989     info->url_str = gst_rtsp_url_get_request_uri (info->url);
1990
1991     GST_DEBUG_OBJECT (sink, "sanitized uri %s", info->url_str);
1992
1993     if (info->url->transports & GST_RTSP_LOWER_TRANS_TLS) {
1994       if (!gst_rtsp_connection_set_tls_validation_flags (info->connection,
1995               sink->tls_validation_flags))
1996         GST_WARNING_OBJECT (sink, "Unable to set TLS validation flags");
1997
1998       if (sink->tls_database)
1999         gst_rtsp_connection_set_tls_database (info->connection,
2000             sink->tls_database);
2001
2002       if (sink->tls_interaction)
2003         gst_rtsp_connection_set_tls_interaction (info->connection,
2004             sink->tls_interaction);
2005
2006       gst_rtsp_connection_set_accept_certificate_func (info->connection,
2007           accept_certificate_cb, sink, NULL);
2008     }
2009
2010     if (info->url->transports & GST_RTSP_LOWER_TRANS_HTTP)
2011       gst_rtsp_connection_set_tunneled (info->connection, TRUE);
2012
2013     if (sink->proxy_host) {
2014       GST_DEBUG_OBJECT (sink, "setting proxy %s:%d", sink->proxy_host,
2015           sink->proxy_port);
2016       gst_rtsp_connection_set_proxy (info->connection, sink->proxy_host,
2017           sink->proxy_port);
2018     }
2019   }
2020
2021   if (!info->connected) {
2022     /* connect */
2023     if (async)
2024       GST_ELEMENT_PROGRESS (sink, CONTINUE, "connect",
2025           ("Connecting to %s", info->location));
2026     GST_DEBUG_OBJECT (sink, "connecting (%s)...", info->location);
2027     if ((res =
2028             gst_rtsp_connection_connect_usec (info->connection,
2029                 sink->tcp_timeout)) < 0)
2030       goto could_not_connect;
2031
2032     info->connected = TRUE;
2033   }
2034   return GST_RTSP_OK;
2035
2036   /* ERRORS */
2037 parse_error:
2038   {
2039     GST_ERROR_OBJECT (sink, "No valid RTSP URL was provided");
2040     return res;
2041   }
2042 could_not_create:
2043   {
2044     gchar *str = gst_rtsp_strresult (res);
2045     GST_ERROR_OBJECT (sink, "Could not create connection. (%s)", str);
2046     g_free (str);
2047     return res;
2048   }
2049 could_not_connect:
2050   {
2051     gchar *str = gst_rtsp_strresult (res);
2052     GST_ERROR_OBJECT (sink, "Could not connect to server. (%s)", str);
2053     g_free (str);
2054     return res;
2055   }
2056 }
2057
2058 static GstRTSPResult
2059 gst_rtsp_conninfo_close (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
2060     gboolean free)
2061 {
2062   GST_RTSP_STATE_LOCK (sink);
2063   if (info->connected) {
2064     GST_DEBUG_OBJECT (sink, "closing connection...");
2065     gst_rtsp_connection_close (info->connection);
2066     info->connected = FALSE;
2067   }
2068   if (free && info->connection) {
2069     /* free connection */
2070     GST_DEBUG_OBJECT (sink, "freeing connection...");
2071     gst_rtsp_connection_free (info->connection);
2072     g_mutex_lock (&sink->preroll_lock);
2073     info->connection = NULL;
2074     g_cond_broadcast (&sink->preroll_cond);
2075     g_mutex_unlock (&sink->preroll_lock);
2076   }
2077   GST_RTSP_STATE_UNLOCK (sink);
2078   return GST_RTSP_OK;
2079 }
2080
2081 static GstRTSPResult
2082 gst_rtsp_conninfo_reconnect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
2083     gboolean async)
2084 {
2085   GstRTSPResult res;
2086
2087   GST_DEBUG_OBJECT (sink, "reconnecting connection...");
2088   gst_rtsp_conninfo_close (sink, info, FALSE);
2089   res = gst_rtsp_conninfo_connect (sink, info, async);
2090
2091   return res;
2092 }
2093
2094 static void
2095 gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink, gboolean flush)
2096 {
2097   GList *walk;
2098
2099   GST_DEBUG_OBJECT (sink, "set flushing %d", flush);
2100   g_mutex_lock (&sink->preroll_lock);
2101   if (sink->conninfo.connection && sink->conninfo.flushing != flush) {
2102     GST_DEBUG_OBJECT (sink, "connection flush");
2103     gst_rtsp_connection_flush (sink->conninfo.connection, flush);
2104     sink->conninfo.flushing = flush;
2105   }
2106   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
2107     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
2108     if (stream->conninfo.connection && stream->conninfo.flushing != flush) {
2109       GST_DEBUG_OBJECT (sink, "stream %p flush", stream);
2110       gst_rtsp_connection_flush (stream->conninfo.connection, flush);
2111       stream->conninfo.flushing = flush;
2112     }
2113   }
2114   g_cond_broadcast (&sink->preroll_cond);
2115   g_mutex_unlock (&sink->preroll_lock);
2116 }
2117
2118 static GstRTSPResult
2119 gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink,
2120     GstRTSPMessage * msg, GstRTSPMethod method, const gchar * uri)
2121 {
2122   GstRTSPResult res;
2123
2124   res = gst_rtsp_message_init_request (msg, method, uri);
2125   if (res < 0)
2126     return res;
2127
2128   /* set user-agent */
2129   if (sink->user_agent)
2130     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_USER_AGENT,
2131         sink->user_agent);
2132
2133   return res;
2134 }
2135
2136 /* FIXME, handle server request, reply with OK, for now */
2137 static GstRTSPResult
2138 gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
2139     GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
2140 {
2141   GstRTSPMessage response = { 0 };
2142   GstRTSPResult res;
2143
2144   GST_DEBUG_OBJECT (sink, "got server request message");
2145
2146   if (sink->debug)
2147     gst_rtsp_message_dump (request);
2148
2149   /* default implementation, send OK */
2150   GST_DEBUG_OBJECT (sink, "prepare OK reply");
2151   res =
2152       gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, "OK",
2153       request);
2154   if (res < 0)
2155     goto send_error;
2156
2157   /* let app parse and reply */
2158   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST],
2159       0, request, &response);
2160
2161   if (sink->debug)
2162     gst_rtsp_message_dump (&response);
2163
2164   res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, 0);
2165   if (res < 0)
2166     goto send_error;
2167
2168   gst_rtsp_message_unset (&response);
2169
2170   return GST_RTSP_OK;
2171
2172   /* ERRORS */
2173 send_error:
2174   {
2175     gst_rtsp_message_unset (&response);
2176     return res;
2177   }
2178 }
2179
2180 /* send server keep-alive */
2181 static GstRTSPResult
2182 gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink)
2183 {
2184   GstRTSPMessage request = { 0 };
2185   GstRTSPResult res;
2186   GstRTSPMethod method;
2187   const gchar *control;
2188
2189   if (sink->do_rtsp_keep_alive == FALSE) {
2190     GST_DEBUG_OBJECT (sink, "do-rtsp-keep-alive is FALSE, not sending.");
2191     gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
2192     return GST_RTSP_OK;
2193   }
2194
2195   GST_DEBUG_OBJECT (sink, "creating server keep-alive");
2196
2197   /* find a method to use for keep-alive */
2198   if (sink->methods & GST_RTSP_GET_PARAMETER)
2199     method = GST_RTSP_GET_PARAMETER;
2200   else
2201     method = GST_RTSP_OPTIONS;
2202
2203   control = get_aggregate_control (sink);
2204   if (control == NULL)
2205     goto no_control;
2206
2207   res = gst_rtsp_client_sink_init_request (sink, &request, method, control);
2208   if (res < 0)
2209     goto send_error;
2210
2211   if (sink->debug)
2212     gst_rtsp_message_dump (&request);
2213
2214   res =
2215       gst_rtsp_client_sink_connection_send (sink, &sink->conninfo, &request, 0);
2216   if (res < 0)
2217     goto send_error;
2218
2219   gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
2220   gst_rtsp_message_unset (&request);
2221
2222   return GST_RTSP_OK;
2223
2224   /* ERRORS */
2225 no_control:
2226   {
2227     GST_WARNING_OBJECT (sink, "no control url to send keepalive");
2228     return GST_RTSP_OK;
2229   }
2230 send_error:
2231   {
2232     gchar *str = gst_rtsp_strresult (res);
2233
2234     gst_rtsp_message_unset (&request);
2235     GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, (NULL),
2236         ("Could not send keep-alive. (%s)", str));
2237     g_free (str);
2238     return res;
2239   }
2240 }
2241
2242 static GstFlowReturn
2243 gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
2244 {
2245   GstRTSPResult res;
2246   GstRTSPMessage message = { 0 };
2247   gint retry = 0;
2248
2249   while (TRUE) {
2250     gint64 timeout;
2251
2252     /* get the next timeout interval */
2253     timeout = gst_rtsp_connection_next_timeout_usec (sink->conninfo.connection);
2254
2255     GST_DEBUG_OBJECT (sink, "doing receive with timeout %d seconds",
2256         (gint) timeout / G_USEC_PER_SEC);
2257
2258     gst_rtsp_message_unset (&message);
2259
2260     /* we should continue reading the TCP socket because the server might
2261      * send us requests. When the session timeout expires, we need to send a
2262      * keep-alive request to keep the session open. */
2263     res =
2264         gst_rtsp_client_sink_connection_receive (sink,
2265         &sink->conninfo, &message, timeout);
2266
2267     switch (res) {
2268       case GST_RTSP_OK:
2269         GST_DEBUG_OBJECT (sink, "we received a server message");
2270         break;
2271       case GST_RTSP_EINTR:
2272         /* we got interrupted, see what we have to do */
2273         goto interrupt;
2274       case GST_RTSP_ETIMEOUT:
2275         /* send keep-alive, ignore the result, a warning will be posted. */
2276         GST_DEBUG_OBJECT (sink, "timeout, sending keep-alive");
2277         if ((res =
2278                 gst_rtsp_client_sink_send_keep_alive (sink)) == GST_RTSP_EINTR)
2279           goto interrupt;
2280         continue;
2281       case GST_RTSP_EEOF:
2282         /* server closed the connection. not very fatal for UDP, reconnect and
2283          * see what happens. */
2284         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2285             ("The server closed the connection."));
2286         if (sink->udp_reconnect) {
2287           if ((res =
2288                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2289                       FALSE)) < 0)
2290             goto connect_error;
2291         } else {
2292           goto server_eof;
2293         }
2294         continue;
2295         break;
2296       case GST_RTSP_ENET:
2297         GST_DEBUG_OBJECT (sink, "An ethernet problem occured.");
2298       default:
2299         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2300             ("Unhandled return value %d.", res));
2301         goto receive_error;
2302     }
2303
2304     switch (message.type) {
2305       case GST_RTSP_MESSAGE_REQUEST:
2306         /* server sends us a request message, handle it */
2307         res =
2308             gst_rtsp_client_sink_handle_request (sink,
2309             &sink->conninfo, &message);
2310         if (res == GST_RTSP_EEOF)
2311           goto server_eof;
2312         else if (res < 0)
2313           goto handle_request_failed;
2314         break;
2315       case GST_RTSP_MESSAGE_RESPONSE:
2316         /* we ignore response and data messages */
2317         GST_DEBUG_OBJECT (sink, "ignoring response message");
2318         if (sink->debug)
2319           gst_rtsp_message_dump (&message);
2320         if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) {
2321           GST_DEBUG_OBJECT (sink, "but is Unauthorized response ...");
2322           if (gst_rtsp_client_sink_setup_auth (sink, &message) && !(retry++)) {
2323             GST_DEBUG_OBJECT (sink, "so retrying keep-alive");
2324             if ((res =
2325                     gst_rtsp_client_sink_send_keep_alive (sink)) ==
2326                 GST_RTSP_EINTR)
2327               goto interrupt;
2328           }
2329         } else {
2330           retry = 0;
2331         }
2332         break;
2333       case GST_RTSP_MESSAGE_DATA:
2334         /* we ignore response and data messages */
2335         GST_DEBUG_OBJECT (sink, "ignoring data message");
2336         break;
2337       default:
2338         GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2339             message.type);
2340         break;
2341     }
2342   }
2343   g_assert_not_reached ();
2344
2345   /* we get here when the connection got interrupted */
2346 interrupt:
2347   {
2348     gst_rtsp_message_unset (&message);
2349     GST_DEBUG_OBJECT (sink, "got interrupted");
2350     return GST_FLOW_FLUSHING;
2351   }
2352 connect_error:
2353   {
2354     gchar *str = gst_rtsp_strresult (res);
2355     GstFlowReturn ret;
2356
2357     sink->conninfo.connected = FALSE;
2358     if (res != GST_RTSP_EINTR) {
2359       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
2360           ("Could not connect to server. (%s)", str));
2361       g_free (str);
2362       ret = GST_FLOW_ERROR;
2363     } else {
2364       ret = GST_FLOW_FLUSHING;
2365     }
2366     return ret;
2367   }
2368 receive_error:
2369   {
2370     gchar *str = gst_rtsp_strresult (res);
2371
2372     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2373         ("Could not receive message. (%s)", str));
2374     g_free (str);
2375     return GST_FLOW_ERROR;
2376   }
2377 handle_request_failed:
2378   {
2379     gchar *str = gst_rtsp_strresult (res);
2380     GstFlowReturn ret;
2381
2382     gst_rtsp_message_unset (&message);
2383     if (res != GST_RTSP_EINTR) {
2384       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2385           ("Could not handle server message. (%s)", str));
2386       g_free (str);
2387       ret = GST_FLOW_ERROR;
2388     } else {
2389       ret = GST_FLOW_FLUSHING;
2390     }
2391     return ret;
2392   }
2393 server_eof:
2394   {
2395     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2396     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2397         ("The server closed the connection."));
2398     sink->conninfo.connected = FALSE;
2399     gst_rtsp_message_unset (&message);
2400     return GST_FLOW_EOS;
2401   }
2402 }
2403
2404 static GstRTSPResult
2405 gst_rtsp_client_sink_reconnect (GstRTSPClientSink * sink, gboolean async)
2406 {
2407   GstRTSPResult res = GST_RTSP_OK;
2408   gboolean restart = FALSE;
2409
2410   GST_DEBUG_OBJECT (sink, "doing reconnect");
2411
2412   GST_FIXME_OBJECT (sink, "Reconnection is not yet implemented");
2413
2414   /* no need to restart, we're done */
2415   if (!restart)
2416     goto done;
2417
2418   /* we can try only TCP now */
2419   sink->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
2420
2421   /* close and cleanup our state */
2422   if ((res = gst_rtsp_client_sink_close (sink, async, FALSE)) < 0)
2423     goto done;
2424
2425   /* see if we have TCP left to try. Also don't try TCP when we were configured
2426    * with an SDP. */
2427   if (!(sink->protocols & GST_RTSP_LOWER_TRANS_TCP) || sink->from_sdp)
2428     goto no_protocols;
2429
2430   /* We post a warning message now to inform the user
2431    * that nothing happened. It's most likely a firewall thing. */
2432   GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2433       ("Could not receive any UDP packets for %.4f seconds, maybe your "
2434           "firewall is blocking it. Retrying using a TCP connection.",
2435           gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2436
2437   /* open new connection using tcp */
2438   if (gst_rtsp_client_sink_open (sink, async) < 0)
2439     goto open_failed;
2440
2441   /* start recording */
2442   if (gst_rtsp_client_sink_record (sink, async) < 0)
2443     goto play_failed;
2444
2445 done:
2446   return res;
2447
2448   /* ERRORS */
2449 no_protocols:
2450   {
2451     sink->cur_protocols = 0;
2452     /* no transport possible, post an error and stop */
2453     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2454         ("Could not receive any UDP packets for %.4f seconds, maybe your "
2455             "firewall is blocking it. No other protocols to try.",
2456             gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2457     return GST_RTSP_ERROR;
2458   }
2459 open_failed:
2460   {
2461     GST_DEBUG_OBJECT (sink, "open failed");
2462     return GST_RTSP_OK;
2463   }
2464 play_failed:
2465   {
2466     GST_DEBUG_OBJECT (sink, "play failed");
2467     return GST_RTSP_OK;
2468   }
2469 }
2470
2471 static void
2472 gst_rtsp_client_sink_loop_start_cmd (GstRTSPClientSink * sink, gint cmd)
2473 {
2474   switch (cmd) {
2475     case CMD_OPEN:
2476       GST_ELEMENT_PROGRESS (sink, START, "open", ("Opening Stream"));
2477       break;
2478     case CMD_RECORD:
2479       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending RECORD request"));
2480       break;
2481     case CMD_PAUSE:
2482       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending PAUSE request"));
2483       break;
2484     case CMD_CLOSE:
2485       GST_ELEMENT_PROGRESS (sink, START, "close", ("Closing Stream"));
2486       break;
2487     default:
2488       break;
2489   }
2490 }
2491
2492 static void
2493 gst_rtsp_client_sink_loop_complete_cmd (GstRTSPClientSink * sink, gint cmd)
2494 {
2495   switch (cmd) {
2496     case CMD_OPEN:
2497       GST_ELEMENT_PROGRESS (sink, COMPLETE, "open", ("Opened Stream"));
2498       break;
2499     case CMD_RECORD:
2500       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent RECORD request"));
2501       break;
2502     case CMD_PAUSE:
2503       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent PAUSE request"));
2504       break;
2505     case CMD_CLOSE:
2506       GST_ELEMENT_PROGRESS (sink, COMPLETE, "close", ("Closed Stream"));
2507       break;
2508     default:
2509       break;
2510   }
2511 }
2512
2513 static void
2514 gst_rtsp_client_sink_loop_cancel_cmd (GstRTSPClientSink * sink, gint cmd)
2515 {
2516   switch (cmd) {
2517     case CMD_OPEN:
2518       GST_ELEMENT_PROGRESS (sink, CANCELED, "open", ("Open canceled"));
2519       break;
2520     case CMD_RECORD:
2521       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("RECORD canceled"));
2522       break;
2523     case CMD_PAUSE:
2524       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("PAUSE canceled"));
2525       break;
2526     case CMD_CLOSE:
2527       GST_ELEMENT_PROGRESS (sink, CANCELED, "close", ("Close canceled"));
2528       break;
2529     default:
2530       break;
2531   }
2532 }
2533
2534 static void
2535 gst_rtsp_client_sink_loop_error_cmd (GstRTSPClientSink * sink, gint cmd)
2536 {
2537   switch (cmd) {
2538     case CMD_OPEN:
2539       GST_ELEMENT_PROGRESS (sink, ERROR, "open", ("Open failed"));
2540       break;
2541     case CMD_RECORD:
2542       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("RECORD failed"));
2543       break;
2544     case CMD_PAUSE:
2545       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("PAUSE failed"));
2546       break;
2547     case CMD_CLOSE:
2548       GST_ELEMENT_PROGRESS (sink, ERROR, "close", ("Close failed"));
2549       break;
2550     default:
2551       break;
2552   }
2553 }
2554
2555 static void
2556 gst_rtsp_client_sink_loop_end_cmd (GstRTSPClientSink * sink, gint cmd,
2557     GstRTSPResult ret)
2558 {
2559   if (ret == GST_RTSP_OK)
2560     gst_rtsp_client_sink_loop_complete_cmd (sink, cmd);
2561   else if (ret == GST_RTSP_EINTR)
2562     gst_rtsp_client_sink_loop_cancel_cmd (sink, cmd);
2563   else
2564     gst_rtsp_client_sink_loop_error_cmd (sink, cmd);
2565 }
2566
2567 static gboolean
2568 gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink, gint cmd,
2569     gint mask)
2570 {
2571   gint old;
2572   gboolean flushed = FALSE;
2573
2574   /* start new request */
2575   gst_rtsp_client_sink_loop_start_cmd (sink, cmd);
2576
2577   GST_DEBUG_OBJECT (sink, "sending cmd %s", cmd_to_string (cmd));
2578
2579   GST_OBJECT_LOCK (sink);
2580   old = sink->pending_cmd;
2581   if (old == CMD_RECONNECT) {
2582     GST_DEBUG_OBJECT (sink, "ignore, we were reconnecting");
2583     cmd = CMD_RECONNECT;
2584   }
2585   if (old != CMD_WAIT) {
2586     sink->pending_cmd = CMD_WAIT;
2587     GST_OBJECT_UNLOCK (sink);
2588     /* cancel previous request */
2589     GST_DEBUG_OBJECT (sink, "cancel previous request %s", cmd_to_string (old));
2590     gst_rtsp_client_sink_loop_cancel_cmd (sink, old);
2591     GST_OBJECT_LOCK (sink);
2592   }
2593   sink->pending_cmd = cmd;
2594   /* interrupt if allowed */
2595   if (sink->busy_cmd & mask) {
2596     GST_DEBUG_OBJECT (sink, "connection flush busy %s",
2597         cmd_to_string (sink->busy_cmd));
2598     gst_rtsp_client_sink_connection_flush (sink, TRUE);
2599     flushed = TRUE;
2600   } else {
2601     GST_DEBUG_OBJECT (sink, "not interrupting busy cmd %s",
2602         cmd_to_string (sink->busy_cmd));
2603   }
2604   if (sink->task)
2605     gst_task_start (sink->task);
2606   GST_OBJECT_UNLOCK (sink);
2607
2608   return flushed;
2609 }
2610
2611 static gboolean
2612 gst_rtsp_client_sink_loop (GstRTSPClientSink * sink)
2613 {
2614   GstFlowReturn ret;
2615
2616   if (!sink->conninfo.connection || !sink->conninfo.connected)
2617     goto no_connection;
2618
2619   ret = gst_rtsp_client_sink_loop_rx (sink);
2620   if (ret != GST_FLOW_OK)
2621     goto pause;
2622
2623   return TRUE;
2624
2625   /* ERRORS */
2626 no_connection:
2627   {
2628     GST_WARNING_OBJECT (sink, "we are not connected");
2629     ret = GST_FLOW_FLUSHING;
2630     goto pause;
2631   }
2632 pause:
2633   {
2634     const gchar *reason = gst_flow_get_name (ret);
2635
2636     GST_DEBUG_OBJECT (sink, "pausing task, reason %s", reason);
2637     gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_LOOP);
2638     return FALSE;
2639   }
2640 }
2641
2642 #ifndef GST_DISABLE_GST_DEBUG
2643 static const gchar *
2644 gst_rtsp_auth_method_to_string (GstRTSPAuthMethod method)
2645 {
2646   gint index = 0;
2647
2648   while (method != 0) {
2649     index++;
2650     method >>= 1;
2651   }
2652   switch (index) {
2653     case 0:
2654       return "None";
2655     case 1:
2656       return "Basic";
2657     case 2:
2658       return "Digest";
2659   }
2660
2661   return "Unknown";
2662 }
2663 #endif
2664
2665 /* Parse a WWW-Authenticate Response header and determine the
2666  * available authentication methods
2667  *
2668  * This code should also cope with the fact that each WWW-Authenticate
2669  * header can contain multiple challenge methods + tokens
2670  *
2671  * At the moment, for Basic auth, we just do a minimal check and don't
2672  * even parse out the realm */
2673 static void
2674 gst_rtsp_client_sink_parse_auth_hdr (GstRTSPMessage * response,
2675     GstRTSPAuthMethod * methods, GstRTSPConnection * conn, gboolean * stale)
2676 {
2677   GstRTSPAuthCredential **credentials, **credential;
2678
2679   g_return_if_fail (response != NULL);
2680   g_return_if_fail (methods != NULL);
2681   g_return_if_fail (stale != NULL);
2682
2683   credentials =
2684       gst_rtsp_message_parse_auth_credentials (response,
2685       GST_RTSP_HDR_WWW_AUTHENTICATE);
2686   if (!credentials)
2687     return;
2688
2689   credential = credentials;
2690   while (*credential) {
2691     if ((*credential)->scheme == GST_RTSP_AUTH_BASIC) {
2692       *methods |= GST_RTSP_AUTH_BASIC;
2693     } else if ((*credential)->scheme == GST_RTSP_AUTH_DIGEST) {
2694       GstRTSPAuthParam **param = (*credential)->params;
2695
2696       *methods |= GST_RTSP_AUTH_DIGEST;
2697
2698       gst_rtsp_connection_clear_auth_params (conn);
2699       *stale = FALSE;
2700
2701       while (*param) {
2702         if (strcmp ((*param)->name, "stale") == 0
2703             && g_ascii_strcasecmp ((*param)->value, "TRUE") == 0)
2704           *stale = TRUE;
2705         gst_rtsp_connection_set_auth_param (conn, (*param)->name,
2706             (*param)->value);
2707         param++;
2708       }
2709     }
2710
2711     credential++;
2712   }
2713
2714   gst_rtsp_auth_credentials_free (credentials);
2715 }
2716
2717 /**
2718  * gst_rtsp_client_sink_setup_auth:
2719  * @src: the rtsp source
2720  *
2721  * Configure a username and password and auth method on the
2722  * connection object based on a response we received from the
2723  * peer.
2724  *
2725  * Currently, this requires that a username and password were supplied
2726  * in the uri. In the future, they may be requested on demand by sending
2727  * a message up the bus.
2728  *
2729  * Returns: TRUE if authentication information could be set up correctly.
2730  */
2731 static gboolean
2732 gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
2733     GstRTSPMessage * response)
2734 {
2735   gchar *user = NULL;
2736   gchar *pass = NULL;
2737   GstRTSPAuthMethod avail_methods = GST_RTSP_AUTH_NONE;
2738   GstRTSPAuthMethod method;
2739   GstRTSPResult auth_result;
2740   GstRTSPUrl *url;
2741   GstRTSPConnection *conn;
2742   gboolean stale = FALSE;
2743
2744   conn = sink->conninfo.connection;
2745
2746   /* Identify the available auth methods and see if any are supported */
2747   gst_rtsp_client_sink_parse_auth_hdr (response, &avail_methods, conn, &stale);
2748
2749   if (avail_methods == GST_RTSP_AUTH_NONE)
2750     goto no_auth_available;
2751
2752   /* For digest auth, if the response indicates that the session
2753    * data are stale, we just update them in the connection object and
2754    * return TRUE to retry the request */
2755   if (stale)
2756     sink->tried_url_auth = FALSE;
2757
2758   url = gst_rtsp_connection_get_url (conn);
2759
2760   /* Do we have username and password available? */
2761   if (url != NULL && !sink->tried_url_auth && url->user != NULL
2762       && url->passwd != NULL) {
2763     user = url->user;
2764     pass = url->passwd;
2765     sink->tried_url_auth = TRUE;
2766     GST_DEBUG_OBJECT (sink,
2767         "Attempting authentication using credentials from the URL");
2768   } else {
2769     user = sink->user_id;
2770     pass = sink->user_pw;
2771     GST_DEBUG_OBJECT (sink,
2772         "Attempting authentication using credentials from the properties");
2773   }
2774
2775   /* FIXME: If the url didn't contain username and password or we tried them
2776    * already, request a username and passwd from the application via some kind
2777    * of credentials request message */
2778
2779   /* If we don't have a username and passwd at this point, bail out. */
2780   if (user == NULL || pass == NULL)
2781     goto no_user_pass;
2782
2783   /* Try to configure for each available authentication method, strongest to
2784    * weakest */
2785   for (method = GST_RTSP_AUTH_MAX; method != GST_RTSP_AUTH_NONE; method >>= 1) {
2786     /* Check if this method is available on the server */
2787     if ((method & avail_methods) == 0)
2788       continue;
2789
2790     /* Pass the credentials to the connection to try on the next request */
2791     auth_result = gst_rtsp_connection_set_auth (conn, method, user, pass);
2792     /* INVAL indicates an invalid username/passwd were supplied, so we'll just
2793      * ignore it and end up retrying later */
2794     if (auth_result == GST_RTSP_OK || auth_result == GST_RTSP_EINVAL) {
2795       GST_DEBUG_OBJECT (sink, "Attempting %s authentication",
2796           gst_rtsp_auth_method_to_string (method));
2797       break;
2798     }
2799   }
2800
2801   if (method == GST_RTSP_AUTH_NONE)
2802     goto no_auth_available;
2803
2804   return TRUE;
2805
2806 no_auth_available:
2807   {
2808     /* Output an error indicating that we couldn't connect because there were
2809      * no supported authentication protocols */
2810     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
2811         ("No supported authentication protocol was found"));
2812     return FALSE;
2813   }
2814 no_user_pass:
2815   {
2816     /* We don't fire an error message, we just return FALSE and let the
2817      * normal NOT_AUTHORIZED error be propagated */
2818     return FALSE;
2819   }
2820 }
2821
2822 static GstRTSPResult
2823 gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
2824     GstRTSPConnInfo * conninfo, GstRTSPMessage * requests,
2825     guint n_requests, GstRTSPMessage * response, GstRTSPStatusCode * code)
2826 {
2827   GstRTSPResult res;
2828   GstRTSPStatusCode thecode;
2829   gchar *content_base = NULL;
2830   gint try = 0;
2831
2832   g_assert (n_requests == 1 || response == NULL);
2833
2834 again:
2835   GST_DEBUG_OBJECT (sink, "sending message");
2836
2837   if (sink->debug && n_requests == 1)
2838     gst_rtsp_message_dump (&requests[0]);
2839
2840   g_mutex_lock (&sink->send_lock);
2841
2842   res =
2843       gst_rtsp_client_sink_connection_send_messages (sink, conninfo, requests,
2844       n_requests, sink->tcp_timeout);
2845   if (res < 0) {
2846     g_mutex_unlock (&sink->send_lock);
2847     goto send_error;
2848   }
2849
2850   gst_rtsp_connection_reset_timeout (conninfo->connection);
2851
2852   /* See if we should handle the response */
2853   if (response == NULL) {
2854     g_mutex_unlock (&sink->send_lock);
2855     return GST_RTSP_OK;
2856   }
2857 next:
2858   res =
2859       gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
2860       sink->tcp_timeout);
2861
2862   g_mutex_unlock (&sink->send_lock);
2863
2864   if (res < 0)
2865     goto receive_error;
2866
2867   if (sink->debug)
2868     gst_rtsp_message_dump (response);
2869
2870
2871   switch (response->type) {
2872     case GST_RTSP_MESSAGE_REQUEST:
2873       res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
2874       if (res == GST_RTSP_EEOF)
2875         goto server_eof;
2876       else if (res < 0)
2877         goto handle_request_failed;
2878       g_mutex_lock (&sink->send_lock);
2879       goto next;
2880     case GST_RTSP_MESSAGE_RESPONSE:
2881       /* ok, a response is good */
2882       GST_DEBUG_OBJECT (sink, "received response message");
2883       break;
2884     case GST_RTSP_MESSAGE_DATA:
2885       /* we ignore data messages */
2886       GST_DEBUG_OBJECT (sink, "ignoring data message");
2887       g_mutex_lock (&sink->send_lock);
2888       goto next;
2889     default:
2890       GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2891           response->type);
2892       g_mutex_lock (&sink->send_lock);
2893       goto next;
2894   }
2895
2896   thecode = response->type_data.response.code;
2897
2898   GST_DEBUG_OBJECT (sink, "got response message %d", thecode);
2899
2900   /* if the caller wanted the result code, we store it. */
2901   if (code)
2902     *code = thecode;
2903
2904   /* If the request didn't succeed, bail out before doing any more */
2905   if (thecode != GST_RTSP_STS_OK)
2906     return GST_RTSP_OK;
2907
2908   /* store new content base if any */
2909   gst_rtsp_message_get_header (response, GST_RTSP_HDR_CONTENT_BASE,
2910       &content_base, 0);
2911   if (content_base) {
2912     g_free (sink->content_base);
2913     sink->content_base = g_strdup (content_base);
2914   }
2915
2916   return GST_RTSP_OK;
2917
2918   /* ERRORS */
2919 send_error:
2920   {
2921     gchar *str = gst_rtsp_strresult (res);
2922
2923     if (res != GST_RTSP_EINTR) {
2924       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2925           ("Could not send message. (%s)", str));
2926     } else {
2927       GST_WARNING_OBJECT (sink, "send interrupted");
2928     }
2929     g_free (str);
2930     return res;
2931   }
2932 receive_error:
2933   {
2934     switch (res) {
2935       case GST_RTSP_EEOF:
2936         GST_WARNING_OBJECT (sink, "server closed connection");
2937         if ((try == 0) && !sink->interleaved && sink->udp_reconnect) {
2938           try++;
2939           /* if reconnect succeeds, try again */
2940           if ((res =
2941                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2942                       FALSE)) == 0)
2943             goto again;
2944         }
2945         /* only try once after reconnect, then fallthrough and error out */
2946       default:
2947       {
2948         gchar *str = gst_rtsp_strresult (res);
2949
2950         if (res != GST_RTSP_EINTR) {
2951           GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2952               ("Could not receive message. (%s)", str));
2953         } else {
2954           GST_WARNING_OBJECT (sink, "receive interrupted");
2955         }
2956         g_free (str);
2957         break;
2958       }
2959     }
2960     return res;
2961   }
2962 handle_request_failed:
2963   {
2964     /* ERROR was posted */
2965     gst_rtsp_message_unset (response);
2966     return res;
2967   }
2968 server_eof:
2969   {
2970     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2971     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2972         ("The server closed the connection."));
2973     gst_rtsp_message_unset (response);
2974     return res;
2975   }
2976 }
2977
2978 static void
2979 gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state)
2980 {
2981   GST_DEBUG_OBJECT (sink, "Setting internal state to %s",
2982       gst_element_state_get_name (state));
2983   gst_element_set_state (GST_ELEMENT (sink->internal_bin), state);
2984 }
2985
2986 /**
2987  * gst_rtsp_client_sink_send:
2988  * @src: the rtsp source
2989  * @conn: the connection to send on
2990  * @request: must point to a valid request
2991  * @response: must point to an empty #GstRTSPMessage
2992  * @code: an optional code result
2993  *
2994  * send @request and retrieve the response in @response. optionally @code can be
2995  * non-NULL in which case it will contain the status code of the response.
2996  *
2997  * If This function returns #GST_RTSP_OK, @response will contain a valid response
2998  * message that should be cleaned with gst_rtsp_message_unset() after usage.
2999  *
3000  * If @code is NULL, this function will return #GST_RTSP_ERROR (with an invalid
3001  * @response message) if the response code was not 200 (OK).
3002  *
3003  * If the attempt results in an authentication failure, then this will attempt
3004  * to retrieve authentication credentials via gst_rtsp_client_sink_setup_auth and retry
3005  * the request.
3006  *
3007  * Returns: #GST_RTSP_OK if the processing was successful.
3008  */
3009 static GstRTSPResult
3010 gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
3011     GstRTSPMessage * request, GstRTSPMessage * response,
3012     GstRTSPStatusCode * code)
3013 {
3014   GstRTSPStatusCode int_code = GST_RTSP_STS_OK;
3015   GstRTSPResult res = GST_RTSP_ERROR;
3016   gint count;
3017   gboolean retry;
3018   GstRTSPMethod method = GST_RTSP_INVALID;
3019
3020   count = 0;
3021   do {
3022     retry = FALSE;
3023
3024     /* make sure we don't loop forever */
3025     if (count++ > 8)
3026       break;
3027
3028     /* save method so we can disable it when the server complains */
3029     method = request->type_data.request.method;
3030
3031     if ((res =
3032             gst_rtsp_client_sink_try_send (sink, conninfo, request, 1, response,
3033                 &int_code)) < 0)
3034       goto error;
3035
3036     switch (int_code) {
3037       case GST_RTSP_STS_UNAUTHORIZED:
3038         if (gst_rtsp_client_sink_setup_auth (sink, response)) {
3039           /* Try the request/response again after configuring the auth info
3040            * and loop again */
3041           retry = TRUE;
3042         }
3043         break;
3044       default:
3045         break;
3046     }
3047   } while (retry == TRUE);
3048
3049   /* If the user requested the code, let them handle errors, otherwise
3050    * post an error below */
3051   if (code != NULL)
3052     *code = int_code;
3053   else if (int_code != GST_RTSP_STS_OK)
3054     goto error_response;
3055
3056   return res;
3057
3058   /* ERRORS */
3059 error:
3060   {
3061     GST_DEBUG_OBJECT (sink, "got error %d", res);
3062     return res;
3063   }
3064 error_response:
3065   {
3066     res = GST_RTSP_ERROR;
3067
3068     switch (response->type_data.response.code) {
3069       case GST_RTSP_STS_NOT_FOUND:
3070         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL), ("%s",
3071                 response->type_data.response.reason));
3072         break;
3073       case GST_RTSP_STS_UNAUTHORIZED:
3074         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_AUTHORIZED, (NULL), ("%s",
3075                 response->type_data.response.reason));
3076         break;
3077       case GST_RTSP_STS_MOVED_PERMANENTLY:
3078       case GST_RTSP_STS_MOVE_TEMPORARILY:
3079       {
3080         gchar *new_location;
3081         GstRTSPLowerTrans transports;
3082
3083         GST_DEBUG_OBJECT (sink, "got redirection");
3084         /* if we don't have a Location Header, we must error */
3085         if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_LOCATION,
3086                 &new_location, 0) < 0)
3087           break;
3088
3089         /* When we receive a redirect result, we go back to the INIT state after
3090          * parsing the new URI. The caller should do the needed steps to issue
3091          * a new setup when it detects this state change. */
3092         GST_DEBUG_OBJECT (sink, "redirection to %s", new_location);
3093
3094         /* save current transports */
3095         if (sink->conninfo.url)
3096           transports = sink->conninfo.url->transports;
3097         else
3098           transports = GST_RTSP_LOWER_TRANS_UNKNOWN;
3099
3100         gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (sink), new_location,
3101             NULL);
3102
3103         /* set old transports */
3104         if (sink->conninfo.url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN)
3105           sink->conninfo.url->transports = transports;
3106
3107         sink->need_redirect = TRUE;
3108         sink->state = GST_RTSP_STATE_INIT;
3109         res = GST_RTSP_OK;
3110         break;
3111       }
3112       case GST_RTSP_STS_NOT_ACCEPTABLE:
3113       case GST_RTSP_STS_NOT_IMPLEMENTED:
3114       case GST_RTSP_STS_METHOD_NOT_ALLOWED:
3115         GST_WARNING_OBJECT (sink, "got NOT IMPLEMENTED, disable method %s",
3116             gst_rtsp_method_as_text (method));
3117         sink->methods &= ~method;
3118         res = GST_RTSP_OK;
3119         break;
3120       default:
3121         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3122             ("Got error response: %d (%s).", response->type_data.response.code,
3123                 response->type_data.response.reason));
3124         break;
3125     }
3126     /* if we return ERROR we should unset the response ourselves */
3127     if (res == GST_RTSP_ERROR)
3128       gst_rtsp_message_unset (response);
3129
3130     return res;
3131   }
3132 }
3133
3134 /* parse the response and collect all the supported methods. We need this
3135  * information so that we don't try to send an unsupported request to the
3136  * server.
3137  */
3138 static gboolean
3139 gst_rtsp_client_sink_parse_methods (GstRTSPClientSink * sink,
3140     GstRTSPMessage * response)
3141 {
3142   GstRTSPHeaderField field;
3143   gchar *respoptions;
3144   gint indx = 0;
3145
3146   /* reset supported methods */
3147   sink->methods = 0;
3148
3149   /* Try Allow Header first */
3150   field = GST_RTSP_HDR_ALLOW;
3151   while (TRUE) {
3152     respoptions = NULL;
3153     gst_rtsp_message_get_header (response, field, &respoptions, indx);
3154     if (indx == 0 && !respoptions) {
3155       /* if no Allow header was found then try the Public header... */
3156       field = GST_RTSP_HDR_PUBLIC;
3157       gst_rtsp_message_get_header (response, field, &respoptions, indx);
3158     }
3159     if (!respoptions)
3160       break;
3161
3162     sink->methods |= gst_rtsp_options_from_text (respoptions);
3163
3164     indx++;
3165   }
3166
3167   if (sink->methods == 0) {
3168     /* neither Allow nor Public are required, assume the server supports
3169      * at least SETUP. */
3170     GST_DEBUG_OBJECT (sink, "could not get OPTIONS");
3171     sink->methods = GST_RTSP_SETUP;
3172   }
3173
3174   /* Even if the server replied, and didn't say it supports
3175    * RECORD|ANNOUNCE, try anyway by assuming it does */
3176   sink->methods |= GST_RTSP_ANNOUNCE | GST_RTSP_RECORD;
3177
3178   if (!(sink->methods & GST_RTSP_SETUP))
3179     goto no_setup;
3180
3181   return TRUE;
3182
3183   /* ERRORS */
3184 no_setup:
3185   {
3186     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
3187         ("Server does not support SETUP."));
3188     return FALSE;
3189   }
3190 }
3191
3192 static GstRTSPResult
3193 gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink,
3194     gboolean async)
3195 {
3196   GstRTSPResult res;
3197   GstRTSPMessage request = { 0 };
3198   GstRTSPMessage response = { 0 };
3199   GSocket *conn_socket;
3200   GSocketAddress *sa;
3201   GInetAddress *ia;
3202
3203   sink->need_redirect = FALSE;
3204
3205   /* can't continue without a valid url */
3206   if (G_UNLIKELY (sink->conninfo.url == NULL)) {
3207     res = GST_RTSP_EINVAL;
3208     goto no_url;
3209   }
3210   sink->tried_url_auth = FALSE;
3211
3212   if ((res = gst_rtsp_conninfo_connect (sink, &sink->conninfo, async)) < 0)
3213     goto connect_failed;
3214
3215   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
3216   sa = g_socket_get_remote_address (conn_socket, NULL);
3217   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
3218
3219   sink->server_ip = g_inet_address_to_string (ia);
3220
3221   g_object_unref (sa);
3222
3223   /* create OPTIONS */
3224   GST_DEBUG_OBJECT (sink, "create options...");
3225   res =
3226       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_OPTIONS,
3227       sink->conninfo.url_str);
3228   if (res < 0)
3229     goto create_request_failed;
3230
3231   /* send OPTIONS */
3232   GST_DEBUG_OBJECT (sink, "send options...");
3233
3234   if (async)
3235     GST_ELEMENT_PROGRESS (sink, CONTINUE, "open",
3236         ("Retrieving server options"));
3237
3238   if ((res =
3239           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
3240               &response, NULL)) < 0)
3241     goto send_error;
3242
3243   /* parse OPTIONS */
3244   if (!gst_rtsp_client_sink_parse_methods (sink, &response))
3245     goto methods_error;
3246
3247   /* FIXME: Do we need to handle REDIRECT responses for OPTIONS? */
3248
3249   /* clean up any messages */
3250   gst_rtsp_message_unset (&request);
3251   gst_rtsp_message_unset (&response);
3252
3253   return res;
3254
3255   /* ERRORS */
3256 no_url:
3257   {
3258     GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
3259         ("No valid RTSP URL was provided"));
3260     goto cleanup_error;
3261   }
3262 connect_failed:
3263   {
3264     gchar *str = gst_rtsp_strresult (res);
3265
3266     if (res != GST_RTSP_EINTR) {
3267       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
3268           ("Failed to connect. (%s)", str));
3269     } else {
3270       GST_WARNING_OBJECT (sink, "connect interrupted");
3271     }
3272     g_free (str);
3273     goto cleanup_error;
3274   }
3275 create_request_failed:
3276   {
3277     gchar *str = gst_rtsp_strresult (res);
3278
3279     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3280         ("Could not create request. (%s)", str));
3281     g_free (str);
3282     goto cleanup_error;
3283   }
3284 send_error:
3285   {
3286     /* Don't post a message - the rtsp_send method will have
3287      * taken care of it because we passed NULL for the response code */
3288     goto cleanup_error;
3289   }
3290 methods_error:
3291   {
3292     /* error was posted */
3293     res = GST_RTSP_ERROR;
3294     goto cleanup_error;
3295   }
3296 cleanup_error:
3297   {
3298     if (sink->conninfo.connection) {
3299       GST_DEBUG_OBJECT (sink, "free connection");
3300       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3301     }
3302     gst_rtsp_message_unset (&request);
3303     gst_rtsp_message_unset (&response);
3304     return res;
3305   }
3306 }
3307
3308 static GstRTSPResult
3309 gst_rtsp_client_sink_open (GstRTSPClientSink * sink, gboolean async)
3310 {
3311   GstRTSPResult ret;
3312
3313   sink->methods =
3314       GST_RTSP_SETUP | GST_RTSP_RECORD | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN;
3315
3316   g_mutex_lock (&sink->open_conn_lock);
3317   sink->open_conn_start = TRUE;
3318   g_cond_broadcast (&sink->open_conn_cond);
3319   GST_DEBUG_OBJECT (sink, "connection to server started");
3320   g_mutex_unlock (&sink->open_conn_lock);
3321
3322   if ((ret = gst_rtsp_client_sink_connect_to_server (sink, async)) < 0)
3323     goto open_failed;
3324
3325   if (async)
3326     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3327
3328   return ret;
3329
3330   /* ERRORS */
3331 open_failed:
3332   {
3333     GST_WARNING_OBJECT (sink, "Failed to connect to server");
3334     sink->open_error = TRUE;
3335     if (async)
3336       gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3337     return ret;
3338   }
3339 }
3340
3341 static GstRTSPResult
3342 gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async,
3343     gboolean only_close)
3344 {
3345   GstRTSPMessage request = { 0 };
3346   GstRTSPMessage response = { 0 };
3347   GstRTSPResult res = GST_RTSP_OK;
3348   GList *walk;
3349   const gchar *control;
3350
3351   GST_DEBUG_OBJECT (sink, "TEARDOWN...");
3352
3353   gst_rtsp_client_sink_set_state (sink, GST_STATE_NULL);
3354
3355   if (sink->state < GST_RTSP_STATE_READY) {
3356     GST_DEBUG_OBJECT (sink, "not ready, doing cleanup");
3357     goto close;
3358   }
3359
3360   if (only_close)
3361     goto close;
3362
3363   /* construct a control url */
3364   control = get_aggregate_control (sink);
3365
3366   if (!(sink->methods & (GST_RTSP_RECORD | GST_RTSP_TEARDOWN)))
3367     goto not_supported;
3368
3369   /* stop streaming */
3370   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3371     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3372
3373     if (context->stream_transport) {
3374       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
3375       gst_object_unref (context->stream_transport);
3376       context->stream_transport = NULL;
3377     }
3378
3379     if (context->joined) {
3380       gst_rtsp_stream_leave_bin (context->stream, GST_BIN (sink->internal_bin),
3381           sink->rtpbin);
3382       context->joined = FALSE;
3383     }
3384   }
3385
3386   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3387     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3388     const gchar *setup_url;
3389     GstRTSPConnInfo *info;
3390
3391     GST_DEBUG_OBJECT (sink, "Looking at stream %p for teardown",
3392         context->stream);
3393
3394     /* try aggregate control first but do non-aggregate control otherwise */
3395     if (control)
3396       setup_url = control;
3397     else if ((setup_url = context->conninfo.location) == NULL) {
3398       GST_DEBUG_OBJECT (sink, "Skipping TEARDOWN stream %p - no setup URL",
3399           context->stream);
3400       continue;
3401     }
3402
3403     if (sink->conninfo.connection) {
3404       info = &sink->conninfo;
3405     } else if (context->conninfo.connection) {
3406       info = &context->conninfo;
3407     } else {
3408       continue;
3409     }
3410     if (!info->connected)
3411       goto next;
3412
3413     /* do TEARDOWN */
3414     GST_DEBUG_OBJECT (sink, "Sending teardown for stream %p at URL %s",
3415         context->stream, setup_url);
3416     res =
3417         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_TEARDOWN,
3418         setup_url);
3419     if (res < 0)
3420       goto create_request_failed;
3421
3422     if (async)
3423       GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
3424
3425     if ((res =
3426             gst_rtsp_client_sink_send (sink, info, &request,
3427                 &response, NULL)) < 0)
3428       goto send_error;
3429
3430     /* FIXME, parse result? */
3431     gst_rtsp_message_unset (&request);
3432     gst_rtsp_message_unset (&response);
3433
3434   next:
3435     /* early exit when we did aggregate control */
3436     if (control)
3437       break;
3438   }
3439
3440 close:
3441   /* close connections */
3442   GST_DEBUG_OBJECT (sink, "closing connection...");
3443   gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3444   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3445     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
3446     gst_rtsp_conninfo_close (sink, &stream->conninfo, TRUE);
3447   }
3448
3449   /* cleanup */
3450   gst_rtsp_client_sink_cleanup (sink);
3451
3452   sink->state = GST_RTSP_STATE_INVALID;
3453
3454   if (async)
3455     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_CLOSE, res);
3456
3457   return res;
3458
3459   /* ERRORS */
3460 create_request_failed:
3461   {
3462     gchar *str = gst_rtsp_strresult (res);
3463
3464     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3465         ("Could not create request. (%s)", str));
3466     g_free (str);
3467     goto close;
3468   }
3469 send_error:
3470   {
3471     gchar *str = gst_rtsp_strresult (res);
3472
3473     gst_rtsp_message_unset (&request);
3474     if (res != GST_RTSP_EINTR) {
3475       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
3476           ("Could not send message. (%s)", str));
3477     } else {
3478       GST_WARNING_OBJECT (sink, "TEARDOWN interrupted");
3479     }
3480     g_free (str);
3481     goto close;
3482   }
3483 not_supported:
3484   {
3485     GST_DEBUG_OBJECT (sink,
3486         "TEARDOWN and PLAY not supported, can't do TEARDOWN");
3487     goto close;
3488   }
3489 }
3490
3491 static gboolean
3492 gst_rtsp_client_sink_configure_manager (GstRTSPClientSink * sink)
3493 {
3494   GstElement *rtpbin;
3495   GstStateChangeReturn ret;
3496
3497   rtpbin = sink->rtpbin;
3498
3499   if (rtpbin == NULL) {
3500     GObjectClass *klass;
3501
3502     rtpbin = gst_element_factory_make ("rtpbin", NULL);
3503     if (rtpbin == NULL)
3504       goto no_rtpbin;
3505
3506     gst_bin_add (GST_BIN_CAST (sink->internal_bin), rtpbin);
3507
3508     sink->rtpbin = rtpbin;
3509
3510     /* Any more settings we should configure on rtpbin here? */
3511     g_object_set (sink->rtpbin, "latency", sink->latency, NULL);
3512
3513     klass = G_OBJECT_GET_CLASS (G_OBJECT (rtpbin));
3514
3515     if (g_object_class_find_property (klass, "ntp-time-source")) {
3516       g_object_set (sink->rtpbin, "ntp-time-source", sink->ntp_time_source,
3517           NULL);
3518     }
3519
3520     if (sink->sdes && g_object_class_find_property (klass, "sdes")) {
3521       g_object_set (sink->rtpbin, "sdes", sink->sdes, NULL);
3522     }
3523
3524     g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER], 0,
3525         sink->rtpbin);
3526   }
3527
3528   ret = gst_element_set_state (rtpbin, GST_STATE_PAUSED);
3529   if (ret == GST_STATE_CHANGE_FAILURE)
3530     goto start_manager_failure;
3531
3532   return TRUE;
3533
3534 no_rtpbin:
3535   {
3536     GST_WARNING ("no rtpbin element");
3537     g_warning ("failed to create element 'rtpbin', check your installation");
3538     return FALSE;
3539   }
3540 start_manager_failure:
3541   {
3542     GST_DEBUG_OBJECT (sink, "could not start session manager");
3543     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), rtpbin);
3544     return FALSE;
3545   }
3546 }
3547
3548 static GstElement *
3549 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPClientSink * sink)
3550 {
3551   GstRTSPStream *stream = NULL;
3552   GstElement *ret = NULL;
3553   GList *walk;
3554
3555   GST_RTSP_STATE_LOCK (sink);
3556   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3557     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3558
3559     if (sessid == gst_rtsp_stream_get_index (context->stream)) {
3560       stream = context->stream;
3561       break;
3562     }
3563   }
3564
3565   if (stream != NULL) {
3566     GST_DEBUG_OBJECT (sink, "Creating aux sender for stream %u", sessid);
3567     ret = gst_rtsp_stream_request_aux_sender (stream, sessid);
3568   }
3569
3570   GST_RTSP_STATE_UNLOCK (sink);
3571
3572   return ret;
3573 }
3574
3575 static GstElement *
3576 request_fec_encoder (GstElement * rtpbin, guint sessid,
3577     GstRTSPClientSink * sink)
3578 {
3579   GstRTSPStream *stream = NULL;
3580   GstElement *ret = NULL;
3581   GList *walk;
3582
3583   GST_RTSP_STATE_LOCK (sink);
3584   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3585     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3586
3587     if (sessid == gst_rtsp_stream_get_index (context->stream)) {
3588       stream = context->stream;
3589       break;
3590     }
3591   }
3592
3593   if (stream != NULL) {
3594     ret = gst_rtsp_stream_request_ulpfec_encoder (stream, sessid);
3595   }
3596
3597   GST_RTSP_STATE_UNLOCK (sink);
3598
3599   return ret;
3600 }
3601
3602 static gboolean
3603 gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink)
3604 {
3605   GstRTSPStreamContext *context;
3606   GList *walk;
3607   const gchar *base;
3608   gchar *stream_path;
3609   GstUri *base_uri, *uri;
3610
3611   GST_DEBUG_OBJECT (sink, "Collecting stream information");
3612
3613   if (!gst_rtsp_client_sink_configure_manager (sink))
3614     return FALSE;
3615
3616   base = get_aggregate_control (sink);
3617
3618   base_uri = gst_uri_from_string (base);
3619   if (!base_uri) {
3620     GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
3621         ("Could not parse uri %s", base));
3622     return FALSE;
3623   }
3624
3625   g_mutex_lock (&sink->preroll_lock);
3626   while (sink->contexts == NULL && !sink->conninfo.flushing) {
3627     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3628   }
3629   g_mutex_unlock (&sink->preroll_lock);
3630
3631   /* FIXME: Need different locking - need to protect against pad releases
3632    * and potential state changes ruining things here */
3633   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3634     GstPad *srcpad;
3635
3636     context = (GstRTSPStreamContext *) walk->data;
3637     if (context->stream)
3638       continue;
3639
3640     g_mutex_lock (&sink->preroll_lock);
3641     while (!context->prerolled && !sink->conninfo.flushing) {
3642       GST_DEBUG_OBJECT (sink, "Waiting for caps on stream %d", context->index);
3643       g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3644     }
3645     if (sink->conninfo.flushing) {
3646       g_mutex_unlock (&sink->preroll_lock);
3647       break;
3648     }
3649     g_mutex_unlock (&sink->preroll_lock);
3650
3651     if (context->payloader == NULL)
3652       continue;
3653
3654     srcpad = gst_element_get_static_pad (context->payloader, "src");
3655
3656     GST_DEBUG_OBJECT (sink, "Creating stream object for stream %d",
3657         context->index);
3658     context->stream =
3659         gst_rtsp_client_sink_create_stream (sink, context, context->payloader,
3660         srcpad);
3661
3662     /* append stream index to uri path */
3663     g_free (context->conninfo.location);
3664
3665     stream_path = g_strdup_printf ("stream=%d", context->index);
3666     uri = gst_uri_copy (base_uri);
3667     gst_uri_append_path (uri, stream_path);
3668
3669     context->conninfo.location = gst_uri_to_string (uri);
3670     gst_uri_unref (uri);
3671     g_free (stream_path);
3672
3673     if (sink->rtx_time > 0) {
3674       /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
3675       g_signal_connect (sink->rtpbin, "request-aux-sender",
3676           (GCallback) request_aux_sender, sink);
3677     }
3678
3679     g_signal_connect (sink->rtpbin, "request-fec-encoder",
3680         (GCallback) request_fec_encoder, sink);
3681
3682     if (!gst_rtsp_stream_join_bin (context->stream,
3683             GST_BIN (sink->internal_bin), sink->rtpbin, GST_STATE_PAUSED)) {
3684       goto join_bin_failed;
3685     }
3686     context->joined = TRUE;
3687
3688     /* Block the stream, as it does not have any transport parts yet */
3689     gst_rtsp_stream_set_blocked (context->stream, TRUE);
3690
3691     /* Let the stream object receive data */
3692     gst_pad_remove_probe (srcpad, context->payloader_block_id);
3693
3694     gst_object_unref (srcpad);
3695   }
3696
3697   /* Now wait for the preroll of the rtp bin */
3698   g_mutex_lock (&sink->preroll_lock);
3699   while (!sink->prerolled && sink->conninfo.connection
3700       && !sink->conninfo.flushing) {
3701     GST_LOG_OBJECT (sink, "Waiting for preroll before continuing");
3702     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3703   }
3704   GST_LOG_OBJECT (sink, "Marking streams as collected");
3705   sink->streams_collected = TRUE;
3706   g_mutex_unlock (&sink->preroll_lock);
3707
3708   gst_uri_unref (base_uri);
3709   return TRUE;
3710
3711 join_bin_failed:
3712
3713   gst_uri_unref (base_uri);
3714   GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3715       ("Could not start stream %d", context->index));
3716   return FALSE;
3717 }
3718
3719 static GstRTSPResult
3720 gst_rtsp_client_sink_create_transports_string (GstRTSPClientSink * sink,
3721     GstRTSPStreamContext * context, GSocketFamily family,
3722     GstRTSPLowerTrans protocols, GstRTSPProfile profiles, gchar ** transports)
3723 {
3724   GString *result;
3725   GstRTSPStream *stream = context->stream;
3726   gboolean first = TRUE;
3727
3728   /* the default RTSP transports */
3729   result = g_string_new ("RTP");
3730
3731   while (profiles != 0) {
3732     if (!first)
3733       g_string_append (result, ",RTP");
3734
3735     if (profiles & GST_RTSP_PROFILE_SAVPF) {
3736       g_string_append (result, "/SAVPF");
3737       profiles &= ~GST_RTSP_PROFILE_SAVPF;
3738     } else if (profiles & GST_RTSP_PROFILE_SAVP) {
3739       g_string_append (result, "/SAVP");
3740       profiles &= ~GST_RTSP_PROFILE_SAVP;
3741     } else if (profiles & GST_RTSP_PROFILE_AVPF) {
3742       g_string_append (result, "/AVPF");
3743       profiles &= ~GST_RTSP_PROFILE_AVPF;
3744     } else if (profiles & GST_RTSP_PROFILE_AVP) {
3745       g_string_append (result, "/AVP");
3746       profiles &= ~GST_RTSP_PROFILE_AVP;
3747     } else {
3748       GST_WARNING_OBJECT (sink, "Unimplemented profile(s) 0x%x", profiles);
3749       break;
3750     }
3751
3752     if (protocols & GST_RTSP_LOWER_TRANS_UDP) {
3753       GstRTSPRange ports;
3754
3755       GST_DEBUG_OBJECT (sink, "adding UDP unicast");
3756       gst_rtsp_stream_get_server_port (stream, &ports, family);
3757
3758       g_string_append_printf (result, "/UDP;unicast;client_port=%d-%d",
3759           ports.min, ports.max);
3760     } else if (protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3761       GstRTSPAddress *addr =
3762           gst_rtsp_stream_get_multicast_address (stream, family);
3763       if (addr) {
3764         GST_DEBUG_OBJECT (sink, "adding UDP multicast");
3765         g_string_append_printf (result, "/UDP;multicast;client_port=%d-%d",
3766             addr->port, addr->port + addr->n_ports - 1);
3767         gst_rtsp_address_free (addr);
3768       }
3769     } else if (protocols & GST_RTSP_LOWER_TRANS_TCP) {
3770       GST_DEBUG_OBJECT (sink, "adding TCP");
3771       g_string_append_printf (result, "/TCP;unicast;interleaved=%d-%d",
3772           sink->free_channel, sink->free_channel + 1);
3773     }
3774
3775     g_string_append (result, ";mode=RECORD");
3776     /* FIXME: Support appending too:
3777        if (sink->append)
3778        g_string_append (result, ";append");
3779      */
3780
3781     first = FALSE;
3782   }
3783
3784   if (first) {
3785     /* No valid transport could be constructed */
3786     GST_ERROR_OBJECT (sink, "No supported profiles configured");
3787     goto fail;
3788   }
3789
3790   *transports = g_string_free (result, FALSE);
3791
3792   GST_DEBUG_OBJECT (sink, "prepared transports %s", GST_STR_NULL (*transports));
3793
3794   return GST_RTSP_OK;
3795 fail:
3796   g_string_free (result, TRUE);
3797   return GST_RTSP_ERROR;
3798 }
3799
3800 static GstCaps *
3801 signal_get_srtcp_params (GstRTSPClientSink * sink,
3802     GstRTSPStreamContext * context)
3803 {
3804   GstCaps *caps = NULL;
3805
3806   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY], 0,
3807       context->index, &caps);
3808
3809   if (caps != NULL)
3810     GST_DEBUG_OBJECT (sink, "SRTP parameters received");
3811
3812   return caps;
3813 }
3814
3815 static gchar *
3816 gst_rtsp_client_sink_stream_make_keymgmt (GstRTSPClientSink * sink,
3817     GstRTSPStreamContext * context)
3818 {
3819   gchar *base64, *result = NULL;
3820   GstMIKEYMessage *mikey_msg;
3821
3822   context->srtcpparams = signal_get_srtcp_params (sink, context);
3823   if (context->srtcpparams == NULL)
3824     context->srtcpparams = gst_rtsp_stream_get_caps (context->stream);
3825
3826   mikey_msg = gst_mikey_message_new_from_caps (context->srtcpparams);
3827   if (mikey_msg) {
3828     guint send_ssrc, send_rtx_ssrc;
3829     const GstStructure *s = gst_caps_get_structure (context->srtcpparams, 0);
3830
3831     /* add policy '0' for our SSRC */
3832     gst_rtsp_stream_get_ssrc (context->stream, &send_ssrc);
3833     GST_LOG_OBJECT (sink, "Stream %p ssrc %x", context->stream, send_ssrc);
3834     gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_ssrc, 0);
3835
3836     if (gst_structure_get_uint (s, "rtx-ssrc", &send_rtx_ssrc))
3837       gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_rtx_ssrc, 0);
3838
3839     base64 = gst_mikey_message_base64_encode (mikey_msg);
3840     gst_mikey_message_unref (mikey_msg);
3841
3842     if (base64) {
3843       result = gst_sdp_make_keymgmt (context->conninfo.location, base64);
3844       g_free (base64);
3845     }
3846   }
3847
3848   return result;
3849 }
3850
3851 /* masks to be kept in sync with the hardcoded protocol order of preference
3852  * in code below */
3853 static const guint protocol_masks[] = {
3854   GST_RTSP_LOWER_TRANS_UDP,
3855   GST_RTSP_LOWER_TRANS_UDP_MCAST,
3856   GST_RTSP_LOWER_TRANS_TCP,
3857   0
3858 };
3859
3860 /* Same for profile_masks */
3861 static const guint profile_masks[] = {
3862   GST_RTSP_PROFILE_SAVPF,
3863   GST_RTSP_PROFILE_SAVP,
3864   GST_RTSP_PROFILE_AVPF,
3865   GST_RTSP_PROFILE_AVP,
3866   0
3867 };
3868
3869 static gboolean
3870 do_send_data (GstBuffer * buffer, guint8 channel,
3871     GstRTSPStreamContext * context)
3872 {
3873   GstRTSPClientSink *sink = context->parent;
3874   GstRTSPMessage message = { 0 };
3875   GstRTSPResult res = GST_RTSP_OK;
3876
3877   gst_rtsp_message_init_data (&message, channel);
3878
3879   gst_rtsp_message_set_body_buffer (&message, buffer);
3880
3881   res =
3882       gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message, 1,
3883       NULL, NULL);
3884
3885   gst_rtsp_message_unset (&message);
3886
3887   gst_rtsp_stream_transport_message_sent (context->stream_transport);
3888
3889   return res == GST_RTSP_OK;
3890 }
3891
3892 static gboolean
3893 do_send_data_list (GstBufferList * buffer_list, guint8 channel,
3894     GstRTSPStreamContext * context)
3895 {
3896   GstRTSPClientSink *sink = context->parent;
3897   GstRTSPResult res = GST_RTSP_OK;
3898   guint i, n = gst_buffer_list_length (buffer_list);
3899   GstRTSPMessage *messages = g_newa (GstRTSPMessage, n);
3900
3901   memset (messages, 0, n * sizeof (GstRTSPMessage));
3902
3903   for (i = 0; i < n; i++) {
3904     GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
3905
3906     gst_rtsp_message_init_data (&messages[i], channel);
3907
3908     gst_rtsp_message_set_body_buffer (&messages[i], buffer);
3909   }
3910
3911   res =
3912       gst_rtsp_client_sink_try_send (sink, &sink->conninfo, messages, n,
3913       NULL, NULL);
3914
3915   for (i = 0; i < n; i++) {
3916     gst_rtsp_message_unset (&messages[i]);
3917     gst_rtsp_stream_transport_message_sent (context->stream_transport);
3918   }
3919
3920   return res == GST_RTSP_OK;
3921 }
3922
3923 static GstRTSPResult
3924 gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
3925 {
3926   GstRTSPResult res = GST_RTSP_ERROR;
3927   GstRTSPMessage request = { 0 };
3928   GstRTSPMessage response = { 0 };
3929   GstRTSPLowerTrans protocols;
3930   GstRTSPStatusCode code;
3931   GSocketFamily family;
3932   GSocketAddress *sa;
3933   GSocket *conn_socket;
3934   GstRTSPUrl *url;
3935   GList *walk;
3936   gchar *hval;
3937
3938   if (sink->conninfo.connection) {
3939     url = gst_rtsp_connection_get_url (sink->conninfo.connection);
3940     /* we initially allow all configured lower transports. based on the URL
3941      * transports and the replies from the server we narrow them down. */
3942     protocols = url->transports & sink->cur_protocols;
3943   } else {
3944     url = NULL;
3945     protocols = sink->cur_protocols;
3946   }
3947
3948   if (protocols == 0)
3949     goto no_protocols;
3950
3951   GST_RTSP_STATE_LOCK (sink);
3952
3953   if (G_UNLIKELY (sink->contexts == NULL))
3954     goto no_streams;
3955
3956   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3957     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3958     GstRTSPStream *stream;
3959
3960     GstRTSPConnInfo *info;
3961     GstRTSPProfile profiles;
3962     GstRTSPProfile cur_profile;
3963     gchar *transports;
3964     gint retry = 0;
3965     guint profile_mask = 0;
3966     guint mask = 0;
3967     GstCaps *caps;
3968     const GstSDPMedia *media;
3969
3970     stream = context->stream;
3971     profiles = gst_rtsp_stream_get_profiles (stream);
3972
3973     caps = gst_rtsp_stream_get_caps (stream);
3974     if (caps == NULL) {
3975       GST_DEBUG_OBJECT (sink, "skipping stream %p, no caps", stream);
3976       continue;
3977     }
3978     gst_caps_unref (caps);
3979     media = gst_sdp_message_get_media (&sink->cursdp, context->sdp_index);
3980     if (media == NULL) {
3981       GST_DEBUG_OBJECT (sink, "skipping stream %p, no SDP info", stream);
3982       continue;
3983     }
3984
3985     /* skip setup if we have no URL for it */
3986     if (context->conninfo.location == NULL) {
3987       GST_DEBUG_OBJECT (sink, "skipping stream %p, no setup", stream);
3988       continue;
3989     }
3990
3991     if (sink->conninfo.connection == NULL) {
3992       if (!gst_rtsp_conninfo_connect (sink, &context->conninfo, async)) {
3993         GST_DEBUG_OBJECT (sink, "skipping stream %p, failed to connect",
3994             stream);
3995         continue;
3996       }
3997       info = &context->conninfo;
3998     } else {
3999       info = &sink->conninfo;
4000     }
4001     GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
4002         context->conninfo.location);
4003
4004     conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
4005     sa = g_socket_get_local_address (conn_socket, NULL);
4006     family = g_socket_address_get_family (sa);
4007     g_object_unref (sa);
4008
4009   next_protocol:
4010     /* first selectable profile */
4011     while (profile_masks[profile_mask]
4012         && !(profiles & profile_masks[profile_mask]))
4013       profile_mask++;
4014     if (!profile_masks[profile_mask])
4015       goto no_profiles;
4016
4017     /* first selectable protocol */
4018     while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
4019       mask++;
4020     if (!protocol_masks[mask])
4021       goto no_protocols;
4022
4023   retry:
4024     GST_DEBUG_OBJECT (sink, "protocols = 0x%x, protocol mask = 0x%x", protocols,
4025         protocol_masks[mask]);
4026     /* create a string with first transport in line */
4027     transports = NULL;
4028     cur_profile = profiles & profile_masks[profile_mask];
4029     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
4030         protocols & protocol_masks[mask], cur_profile, &transports);
4031     if (res < 0 || transports == NULL)
4032       goto setup_transport_failed;
4033
4034     if (strlen (transports) == 0) {
4035       g_free (transports);
4036       GST_DEBUG_OBJECT (sink, "no transports found");
4037       mask++;
4038       profile_mask = 0;
4039       goto next_protocol;
4040     }
4041
4042     GST_DEBUG_OBJECT (sink, "transport is %s", GST_STR_NULL (transports));
4043
4044     /* create SETUP request */
4045     res =
4046         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_SETUP,
4047         context->conninfo.location);
4048     if (res < 0) {
4049       g_free (transports);
4050       goto create_request_failed;
4051     }
4052
4053     /* set up keys */
4054     if (cur_profile == GST_RTSP_PROFILE_SAVP ||
4055         cur_profile == GST_RTSP_PROFILE_SAVPF) {
4056       hval = gst_rtsp_client_sink_stream_make_keymgmt (sink, context);
4057       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_KEYMGMT, hval);
4058     }
4059
4060     /* if the user wants a non default RTP packet size we add the blocksize
4061      * parameter */
4062     if (sink->rtp_blocksize > 0) {
4063       hval = g_strdup_printf ("%d", sink->rtp_blocksize);
4064       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_BLOCKSIZE, hval);
4065     }
4066
4067     if (async)
4068       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request", ("SETUP stream %d",
4069               context->index));
4070
4071     {
4072       GstRTSPTransport *transport;
4073
4074       gst_rtsp_transport_new (&transport);
4075       if (gst_rtsp_transport_parse (transports, transport) != GST_RTSP_OK)
4076         goto parse_transport_failed;
4077       if (transport->lower_transport != GST_RTSP_LOWER_TRANS_TCP) {
4078         if (!gst_rtsp_stream_allocate_udp_sockets (stream, family, transport,
4079                 FALSE)) {
4080           gst_rtsp_transport_free (transport);
4081           goto allocate_udp_ports_failed;
4082         }
4083       }
4084       if (!gst_rtsp_stream_complete_stream (stream, transport)) {
4085         gst_rtsp_transport_free (transport);
4086         goto complete_stream_failed;
4087       }
4088
4089       gst_rtsp_transport_free (transport);
4090       gst_rtsp_stream_set_blocked (stream, FALSE);
4091     }
4092
4093     /* FIXME:
4094      * the creation of the transports string depends on
4095      * calling stream_get_server_port, which only starts returning
4096      * something meaningful after a call to stream_allocate_udp_sockets
4097      * has been made, this function expects a transport that we parse
4098      * from the transport string ...
4099      *
4100      * Significant refactoring is in order, but does not look entirely
4101      * trivial, for now we put a band aid on and create a second transport
4102      * string after the stream has been completed, to pass it in
4103      * the request headers instead of the previous, incomplete one.
4104      */
4105     g_free (transports);
4106     transports = NULL;
4107     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
4108         protocols & protocol_masks[mask], cur_profile, &transports);
4109
4110     if (res < 0 || transports == NULL)
4111       goto setup_transport_failed;
4112
4113     /* select transport */
4114     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_TRANSPORT, transports);
4115
4116     /* handle the code ourselves */
4117     res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
4118     if (res < 0)
4119       goto send_error;
4120
4121     switch (code) {
4122       case GST_RTSP_STS_OK:
4123         break;
4124       case GST_RTSP_STS_UNSUPPORTED_TRANSPORT:
4125         gst_rtsp_message_unset (&request);
4126         gst_rtsp_message_unset (&response);
4127
4128         /* Try another profile. If no more, move to the next protocol */
4129         profile_mask++;
4130         while (profile_masks[profile_mask]
4131             && !(profiles & profile_masks[profile_mask]))
4132           profile_mask++;
4133         if (profile_masks[profile_mask])
4134           goto retry;
4135
4136         /* select next available protocol, give up on this stream if none */
4137         /* Reset profiles to try: */
4138         profile_mask = 0;
4139
4140         mask++;
4141         while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
4142           mask++;
4143         if (!protocol_masks[mask])
4144           continue;
4145         else
4146           goto retry;
4147       default:
4148         goto response_error;
4149     }
4150
4151     /* parse response transport */
4152     {
4153       gchar *resptrans = NULL;
4154       GstRTSPTransport *transport;
4155
4156       gst_rtsp_message_get_header (&response, GST_RTSP_HDR_TRANSPORT,
4157           &resptrans, 0);
4158       if (!resptrans) {
4159         goto no_transport;
4160       }
4161
4162       gst_rtsp_transport_new (&transport);
4163
4164       /* parse transport, go to next stream on parse error */
4165       if (gst_rtsp_transport_parse (resptrans, transport) != GST_RTSP_OK) {
4166         GST_WARNING_OBJECT (sink, "failed to parse transport %s", resptrans);
4167         goto next;
4168       }
4169
4170       /* update allowed transports for other streams. once the transport of
4171        * one stream has been determined, we make sure that all other streams
4172        * are configured in the same way */
4173       switch (transport->lower_transport) {
4174         case GST_RTSP_LOWER_TRANS_TCP:
4175           GST_DEBUG_OBJECT (sink, "stream %p as TCP interleaved", stream);
4176           protocols = GST_RTSP_LOWER_TRANS_TCP;
4177           sink->interleaved = TRUE;
4178           /* update free channels */
4179           sink->free_channel =
4180               MAX (transport->interleaved.min, sink->free_channel);
4181           sink->free_channel =
4182               MAX (transport->interleaved.max, sink->free_channel);
4183           sink->free_channel++;
4184           break;
4185         case GST_RTSP_LOWER_TRANS_UDP_MCAST:
4186           /* only allow multicast for other streams */
4187           GST_DEBUG_OBJECT (sink, "stream %p as UDP multicast", stream);
4188           protocols = GST_RTSP_LOWER_TRANS_UDP_MCAST;
4189           break;
4190         case GST_RTSP_LOWER_TRANS_UDP:
4191           /* only allow unicast for other streams */
4192           GST_DEBUG_OBJECT (sink, "stream %p as UDP unicast", stream);
4193           protocols = GST_RTSP_LOWER_TRANS_UDP;
4194           /* Update transport with server destination if not provided by the server */
4195           if (transport->destination == NULL) {
4196             transport->destination = g_strdup (sink->server_ip);
4197           }
4198           break;
4199         default:
4200           GST_DEBUG_OBJECT (sink, "stream %p unknown transport %d", stream,
4201               transport->lower_transport);
4202           break;
4203       }
4204
4205       if (!retry) {
4206         GST_DEBUG ("Configuring the stream transport for stream %d",
4207             context->index);
4208         if (context->stream_transport == NULL)
4209           context->stream_transport =
4210               gst_rtsp_stream_transport_new (stream, transport);
4211         else
4212           gst_rtsp_stream_transport_set_transport (context->stream_transport,
4213               transport);
4214
4215         if (transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
4216           /* our callbacks to send data on this TCP connection */
4217           gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
4218               (GstRTSPSendFunc) do_send_data,
4219               (GstRTSPSendFunc) do_send_data, context, NULL);
4220           gst_rtsp_stream_transport_set_list_callbacks
4221               (context->stream_transport,
4222               (GstRTSPSendListFunc) do_send_data_list,
4223               (GstRTSPSendListFunc) do_send_data_list, context, NULL);
4224         }
4225
4226         /* The stream_transport now owns the transport */
4227         transport = NULL;
4228
4229         gst_rtsp_stream_transport_set_active (context->stream_transport, TRUE);
4230       }
4231     next:
4232       if (transport)
4233         gst_rtsp_transport_free (transport);
4234       /* clean up used RTSP messages */
4235       gst_rtsp_message_unset (&request);
4236       gst_rtsp_message_unset (&response);
4237     }
4238   }
4239   GST_RTSP_STATE_UNLOCK (sink);
4240
4241   /* store the transport protocol that was configured */
4242   sink->cur_protocols = protocols;
4243
4244   return res;
4245
4246 no_streams:
4247   {
4248     GST_RTSP_STATE_UNLOCK (sink);
4249     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4250         ("SDP contains no streams"));
4251     return GST_RTSP_ERROR;
4252   }
4253 setup_transport_failed:
4254   {
4255     GST_RTSP_STATE_UNLOCK (sink);
4256     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4257         ("Could not setup transport."));
4258     res = GST_RTSP_ERROR;
4259     goto cleanup_error;
4260   }
4261 no_profiles:
4262   {
4263     GST_RTSP_STATE_UNLOCK (sink);
4264     /* no transport possible, post an error and stop */
4265     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
4266         ("Could not connect to server, no profiles left"));
4267     return GST_RTSP_ERROR;
4268   }
4269 no_protocols:
4270   {
4271     GST_RTSP_STATE_UNLOCK (sink);
4272     /* no transport possible, post an error and stop */
4273     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
4274         ("Could not connect to server, no protocols left"));
4275     return GST_RTSP_ERROR;
4276   }
4277 no_transport:
4278   {
4279     GST_RTSP_STATE_UNLOCK (sink);
4280     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4281         ("Server did not select transport."));
4282     res = GST_RTSP_ERROR;
4283     goto cleanup_error;
4284   }
4285 create_request_failed:
4286   {
4287     gchar *str = gst_rtsp_strresult (res);
4288
4289     GST_RTSP_STATE_UNLOCK (sink);
4290     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4291         ("Could not create request. (%s)", str));
4292     g_free (str);
4293     goto cleanup_error;
4294   }
4295 parse_transport_failed:
4296   {
4297     GST_RTSP_STATE_UNLOCK (sink);
4298     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4299         ("Could not parse transport."));
4300     res = GST_RTSP_ERROR;
4301     goto cleanup_error;
4302   }
4303 allocate_udp_ports_failed:
4304   {
4305     GST_RTSP_STATE_UNLOCK (sink);
4306     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4307         ("Could not parse transport."));
4308     res = GST_RTSP_ERROR;
4309     goto cleanup_error;
4310   }
4311 complete_stream_failed:
4312   {
4313     GST_RTSP_STATE_UNLOCK (sink);
4314     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4315         ("Could not parse transport."));
4316     res = GST_RTSP_ERROR;
4317     goto cleanup_error;
4318   }
4319 send_error:
4320   {
4321     gchar *str = gst_rtsp_strresult (res);
4322
4323     GST_RTSP_STATE_UNLOCK (sink);
4324     if (res != GST_RTSP_EINTR) {
4325       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4326           ("Could not send message. (%s)", str));
4327     } else {
4328       GST_WARNING_OBJECT (sink, "send interrupted");
4329     }
4330     g_free (str);
4331     goto cleanup_error;
4332   }
4333 response_error:
4334   {
4335     const gchar *str = gst_rtsp_status_as_text (code);
4336
4337     GST_RTSP_STATE_UNLOCK (sink);
4338     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4339         ("Error (%d): %s", code, GST_STR_NULL (str)));
4340     res = GST_RTSP_ERROR;
4341     goto cleanup_error;
4342   }
4343 cleanup_error:
4344   {
4345     gst_rtsp_message_unset (&request);
4346     gst_rtsp_message_unset (&response);
4347     return res;
4348   }
4349 }
4350
4351 static GstRTSPResult
4352 gst_rtsp_client_sink_ensure_open (GstRTSPClientSink * sink, gboolean async)
4353 {
4354   GstRTSPResult res = GST_RTSP_OK;
4355
4356   if (sink->state < GST_RTSP_STATE_READY) {
4357     res = GST_RTSP_ERROR;
4358     if (sink->open_error) {
4359       GST_DEBUG_OBJECT (sink, "the stream was in error");
4360       goto done;
4361     }
4362     if (async)
4363       gst_rtsp_client_sink_loop_start_cmd (sink, CMD_OPEN);
4364
4365     if ((res = gst_rtsp_client_sink_open (sink, async)) < 0) {
4366       GST_DEBUG_OBJECT (sink, "failed to open stream");
4367       goto done;
4368     }
4369   }
4370
4371 done:
4372   return res;
4373 }
4374
4375 static GstRTSPResult
4376 gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
4377 {
4378   GstRTSPMessage request = { 0 };
4379   GstRTSPMessage response = { 0 };
4380   GstRTSPResult res = GST_RTSP_OK;
4381   GstSDPMessage *sdp;
4382   guint sdp_index = 0;
4383   GstSDPInfo info = { 0, };
4384   gchar *keymgmt;
4385   guint i;
4386
4387   const gchar *proto;
4388   gchar *sess_id, *client_ip, *str;
4389   GSocketAddress *sa;
4390   GInetAddress *ia;
4391   GSocket *conn_socket;
4392   GList *walk;
4393
4394   g_mutex_lock (&sink->preroll_lock);
4395   if (sink->state == GST_RTSP_STATE_PLAYING) {
4396     /* Already recording, don't send another request */
4397     GST_LOG_OBJECT (sink, "Already in RECORD. Skipping duplicate request.");
4398     g_mutex_unlock (&sink->preroll_lock);
4399     goto done;
4400   }
4401   g_mutex_unlock (&sink->preroll_lock);
4402
4403   /* Collect all our input streams and create
4404    * stream objects before actually returning.
4405    * The streams are blocked at this point as we do not have any transport
4406    * parts yet. */
4407   gst_rtsp_client_sink_collect_streams (sink);
4408
4409   g_mutex_lock (&sink->block_streams_lock);
4410   /* Wait for streams to be blocked */
4411   while (sink->n_streams_blocked < g_list_length (sink->contexts)) {
4412     GST_DEBUG_OBJECT (sink, "waiting for streams to be blocked");
4413     g_cond_wait (&sink->block_streams_cond, &sink->block_streams_lock);
4414   }
4415   g_mutex_unlock (&sink->block_streams_lock);
4416
4417   /* Send announce, then setup for all streams */
4418   gst_sdp_message_init (&sink->cursdp);
4419   sdp = &sink->cursdp;
4420
4421   /* some standard things first */
4422   gst_sdp_message_set_version (sdp, "0");
4423
4424   /* session ID doesn't have to be super-unique in this case */
4425   sess_id = g_strdup_printf ("%u", g_random_int ());
4426
4427   if (sink->conninfo.connection == NULL)
4428     return GST_RTSP_ERROR;
4429
4430   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
4431
4432   sa = g_socket_get_local_address (conn_socket, NULL);
4433   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
4434   client_ip = g_inet_address_to_string (ia);
4435   if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV6) {
4436     info.is_ipv6 = TRUE;
4437     proto = "IP6";
4438   } else if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV4)
4439     proto = "IP4";
4440   else
4441     g_assert_not_reached ();
4442   g_object_unref (sa);
4443
4444   /* FIXME: Should this actually be the server's IP or ours? */
4445   info.server_ip = sink->server_ip;
4446
4447   gst_sdp_message_set_origin (sdp, "-", sess_id, "1", "IN", proto, client_ip);
4448
4449   gst_sdp_message_set_session_name (sdp, "Session streamed with GStreamer");
4450   gst_sdp_message_set_information (sdp, "rtspclientsink");
4451   gst_sdp_message_add_time (sdp, "0", "0", NULL);
4452   gst_sdp_message_add_attribute (sdp, "tool", "GStreamer");
4453
4454   /* add stream */
4455   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4456     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
4457
4458     gst_rtsp_sdp_from_stream (sdp, &info, context->stream);
4459     context->sdp_index = sdp_index++;
4460   }
4461
4462   g_free (sess_id);
4463   g_free (client_ip);
4464
4465   /* send ANNOUNCE request */
4466   GST_DEBUG_OBJECT (sink, "create ANNOUNCE request...");
4467   res =
4468       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_ANNOUNCE,
4469       sink->conninfo.url_str);
4470   if (res < 0)
4471     goto create_request_failed;
4472
4473   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_UPDATE_SDP], 0, sdp);
4474
4475   gst_rtsp_message_add_header (&request, GST_RTSP_HDR_CONTENT_TYPE,
4476       "application/sdp");
4477
4478   /* add SDP to the request body */
4479   str = gst_sdp_message_as_text (sdp);
4480   gst_rtsp_message_take_body (&request, (guint8 *) str, strlen (str));
4481
4482   /* send ANNOUNCE */
4483   GST_DEBUG_OBJECT (sink, "sending announce...");
4484
4485   if (async)
4486     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record",
4487         ("Sending server stream info"));
4488
4489   if ((res =
4490           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4491               &response, NULL)) < 0)
4492     goto send_error;
4493
4494   /* parse the keymgmt */
4495   i = 0;
4496   walk = sink->contexts;
4497   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_KEYMGMT,
4498           &keymgmt, i++) == GST_RTSP_OK) {
4499     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
4500     walk = g_list_next (walk);
4501     if (!gst_rtsp_stream_handle_keymgmt (context->stream, keymgmt))
4502       goto keymgmt_error;
4503   }
4504
4505   /* send setup for all streams */
4506   if ((res = gst_rtsp_client_sink_setup_streams (sink, async)) < 0)
4507     goto setup_failed;
4508
4509   res = gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_RECORD,
4510       sink->conninfo.url_str);
4511
4512   if (res < 0)
4513     goto create_request_failed;
4514
4515 #if 0                           /* FIXME: Configure a range based on input segments? */
4516   if (src->need_range) {
4517     hval = gen_range_header (src, segment);
4518
4519     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_RANGE, hval);
4520   }
4521
4522   if (segment->rate != 1.0) {
4523     gchar hval[G_ASCII_DTOSTR_BUF_SIZE];
4524
4525     g_ascii_dtostr (hval, sizeof (hval), segment->rate);
4526     if (src->skip)
4527       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SCALE, hval);
4528     else
4529       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SPEED, hval);
4530   }
4531 #endif
4532
4533   if (async)
4534     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
4535   if ((res =
4536           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4537               &response, NULL)) < 0)
4538     goto send_error;
4539
4540 #if 0                           /* FIXME: Check if servers return these for record: */
4541   /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp
4542    * for the RTP packets. If this is not present, we assume all starts from 0...
4543    * This is info for the RTP session manager that we pass to it in caps. */
4544   hval_idx = 0;
4545   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTP_INFO,
4546           &hval, hval_idx++) == GST_RTSP_OK)
4547     gst_rtspsrc_parse_rtpinfo (src, hval);
4548
4549   /* some servers indicate RTCP parameters in PLAY response,
4550    * rather than properly in SDP */
4551   if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTCP_INTERVAL,
4552           &hval, 0) == GST_RTSP_OK)
4553     gst_rtspsrc_handle_rtcp_interval (src, hval);
4554 #endif
4555
4556   gst_rtsp_client_sink_set_state (sink, GST_STATE_PLAYING);
4557   sink->state = GST_RTSP_STATE_PLAYING;
4558
4559   /* clean up any messages */
4560   gst_rtsp_message_unset (&request);
4561   gst_rtsp_message_unset (&response);
4562
4563 done:
4564   return res;
4565
4566 create_request_failed:
4567   {
4568     gchar *str = gst_rtsp_strresult (res);
4569
4570     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4571         ("Could not create request. (%s)", str));
4572     g_free (str);
4573     goto cleanup_error;
4574   }
4575 send_error:
4576   {
4577     /* Don't post a message - the rtsp_send method will have
4578      * taken care of it because we passed NULL for the response code */
4579     goto cleanup_error;
4580   }
4581 keymgmt_error:
4582   {
4583     GST_ELEMENT_ERROR (sink, STREAM, DECRYPT_NOKEY, (NULL),
4584         ("Could not handle KeyMgmt"));
4585   }
4586 setup_failed:
4587   {
4588     GST_ERROR_OBJECT (sink, "setup failed");
4589     goto cleanup_error;
4590   }
4591 cleanup_error:
4592   {
4593     if (sink->conninfo.connection) {
4594       GST_DEBUG_OBJECT (sink, "free connection");
4595       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
4596     }
4597     gst_rtsp_message_unset (&request);
4598     gst_rtsp_message_unset (&response);
4599     return res;
4600   }
4601 }
4602
4603 static GstRTSPResult
4604 gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
4605 {
4606   GstRTSPResult res = GST_RTSP_OK;
4607   GstRTSPMessage request = { 0 };
4608   GstRTSPMessage response = { 0 };
4609   GList *walk;
4610   const gchar *control;
4611
4612   GST_DEBUG_OBJECT (sink, "PAUSE...");
4613
4614   if ((res = gst_rtsp_client_sink_ensure_open (sink, async)) < 0)
4615     goto open_failed;
4616
4617   if (!(sink->methods & GST_RTSP_PAUSE))
4618     goto not_supported;
4619
4620   if (sink->state == GST_RTSP_STATE_READY)
4621     goto was_paused;
4622
4623   if (!sink->conninfo.connection || !sink->conninfo.connected)
4624     goto no_connection;
4625
4626   /* construct a control url */
4627   control = get_aggregate_control (sink);
4628
4629   /* loop over the streams. We might exit the loop early when we could do an
4630    * aggregate control */
4631   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4632     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
4633     GstRTSPConnInfo *info;
4634     const gchar *setup_url;
4635
4636     /* try aggregate control first but do non-aggregate control otherwise */
4637     if (control)
4638       setup_url = control;
4639     else if ((setup_url = stream->conninfo.location) == NULL)
4640       continue;
4641
4642     if (sink->conninfo.connection) {
4643       info = &sink->conninfo;
4644     } else if (stream->conninfo.connection) {
4645       info = &stream->conninfo;
4646     } else {
4647       continue;
4648     }
4649
4650     if (async)
4651       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request",
4652           ("Sending PAUSE request"));
4653
4654     if ((res =
4655             gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_PAUSE,
4656                 setup_url)) < 0)
4657       goto create_request_failed;
4658
4659     if ((res =
4660             gst_rtsp_client_sink_send (sink, info, &request, &response,
4661                 NULL)) < 0)
4662       goto send_error;
4663
4664     gst_rtsp_message_unset (&request);
4665     gst_rtsp_message_unset (&response);
4666
4667     /* exit early when we did agregate control */
4668     if (control)
4669       break;
4670   }
4671
4672   /* change element states now */
4673   gst_rtsp_client_sink_set_state (sink, GST_STATE_PAUSED);
4674
4675 no_connection:
4676   sink->state = GST_RTSP_STATE_READY;
4677
4678 done:
4679   if (async)
4680     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_PAUSE, res);
4681
4682   return res;
4683
4684   /* ERRORS */
4685 open_failed:
4686   {
4687     GST_DEBUG_OBJECT (sink, "failed to open stream");
4688     goto done;
4689   }
4690 not_supported:
4691   {
4692     GST_DEBUG_OBJECT (sink, "PAUSE is not supported");
4693     goto done;
4694   }
4695 was_paused:
4696   {
4697     GST_DEBUG_OBJECT (sink, "we were already PAUSED");
4698     goto done;
4699   }
4700 create_request_failed:
4701   {
4702     gchar *str = gst_rtsp_strresult (res);
4703
4704     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4705         ("Could not create request. (%s)", str));
4706     g_free (str);
4707     goto done;
4708   }
4709 send_error:
4710   {
4711     gchar *str = gst_rtsp_strresult (res);
4712
4713     gst_rtsp_message_unset (&request);
4714     if (res != GST_RTSP_EINTR) {
4715       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4716           ("Could not send message. (%s)", str));
4717     } else {
4718       GST_WARNING_OBJECT (sink, "PAUSE interrupted");
4719     }
4720     g_free (str);
4721     goto done;
4722   }
4723 }
4724
4725 static void
4726 gst_rtsp_client_sink_handle_message (GstBin * bin, GstMessage * message)
4727 {
4728   GstRTSPClientSink *rtsp_client_sink;
4729
4730   rtsp_client_sink = GST_RTSP_CLIENT_SINK (bin);
4731
4732   switch (GST_MESSAGE_TYPE (message)) {
4733     case GST_MESSAGE_ELEMENT:
4734     {
4735       const GstStructure *s = gst_message_get_structure (message);
4736
4737       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
4738         gboolean ignore_timeout;
4739
4740         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
4741
4742         GST_OBJECT_LOCK (rtsp_client_sink);
4743         ignore_timeout = rtsp_client_sink->ignore_timeout;
4744         rtsp_client_sink->ignore_timeout = TRUE;
4745         GST_OBJECT_UNLOCK (rtsp_client_sink);
4746
4747         /* we only act on the first udp timeout message, others are irrelevant
4748          * and can be ignored. */
4749         if (!ignore_timeout)
4750           gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECONNECT,
4751               CMD_LOOP);
4752         /* eat and free */
4753         gst_message_unref (message);
4754         return;
4755       } else if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
4756         /* An RTSPStream has prerolled */
4757         GST_DEBUG_OBJECT (rtsp_client_sink, "received GstRTSPStreamBlocking");
4758         g_mutex_lock (&rtsp_client_sink->block_streams_lock);
4759         rtsp_client_sink->n_streams_blocked++;
4760         g_cond_broadcast (&rtsp_client_sink->block_streams_cond);
4761         g_mutex_unlock (&rtsp_client_sink->block_streams_lock);
4762       }
4763       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4764       break;
4765     }
4766     case GST_MESSAGE_ASYNC_START:{
4767       GstObject *sender;
4768
4769       sender = GST_MESSAGE_SRC (message);
4770
4771       GST_LOG_OBJECT (rtsp_client_sink,
4772           "Have async-start from %" GST_PTR_FORMAT, sender);
4773       if (sender == GST_OBJECT (rtsp_client_sink->internal_bin)) {
4774         GST_LOG_OBJECT (rtsp_client_sink, "child bin is now ASYNC");
4775       }
4776       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4777       break;
4778     }
4779     case GST_MESSAGE_ASYNC_DONE:
4780     {
4781       GstObject *sender;
4782       gboolean need_async_done;
4783
4784       sender = GST_MESSAGE_SRC (message);
4785       GST_LOG_OBJECT (rtsp_client_sink, "Have async-done from %" GST_PTR_FORMAT,
4786           sender);
4787
4788       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4789       if (sender == GST_OBJECT_CAST (rtsp_client_sink->internal_bin)) {
4790         GST_LOG_OBJECT (rtsp_client_sink, "child bin is no longer ASYNC");
4791       }
4792       need_async_done = rtsp_client_sink->in_async;
4793       if (rtsp_client_sink->in_async) {
4794         rtsp_client_sink->in_async = FALSE;
4795         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4796       }
4797       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4798
4799       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4800
4801       if (need_async_done) {
4802         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-DONE");
4803         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4804             gst_message_new_async_done (GST_OBJECT_CAST (rtsp_client_sink),
4805                 GST_CLOCK_TIME_NONE));
4806       }
4807       break;
4808     }
4809     case GST_MESSAGE_ERROR:
4810     {
4811       GstObject *sender;
4812
4813       sender = GST_MESSAGE_SRC (message);
4814
4815       GST_DEBUG_OBJECT (rtsp_client_sink, "got error from %s",
4816           GST_ELEMENT_NAME (sender));
4817
4818       /* FIXME: Ignore errors on RTCP? */
4819       /* fatal but not our message, forward */
4820       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4821       break;
4822     }
4823     case GST_MESSAGE_STATE_CHANGED:
4824     {
4825       if (GST_MESSAGE_SRC (message) ==
4826           (GstObject *) rtsp_client_sink->internal_bin) {
4827         GstState newstate, pending;
4828         gst_message_parse_state_changed (message, NULL, &newstate, &pending);
4829         g_mutex_lock (&rtsp_client_sink->preroll_lock);
4830         rtsp_client_sink->prerolled = (newstate >= GST_STATE_PAUSED)
4831             && pending == GST_STATE_VOID_PENDING;
4832         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4833         g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4834         GST_DEBUG_OBJECT (bin,
4835             "Internal bin changed state to %s (pending %s). Prerolled now %d",
4836             gst_element_state_get_name (newstate),
4837             gst_element_state_get_name (pending), rtsp_client_sink->prerolled);
4838       }
4839       /* fallthrough */
4840     }
4841     default:
4842     {
4843       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4844       break;
4845     }
4846   }
4847 }
4848
4849 /* the thread where everything happens */
4850 static void
4851 gst_rtsp_client_sink_thread (GstRTSPClientSink * sink)
4852 {
4853   gint cmd;
4854
4855   GST_OBJECT_LOCK (sink);
4856   cmd = sink->pending_cmd;
4857   if (cmd == CMD_RECONNECT || cmd == CMD_RECORD || cmd == CMD_PAUSE
4858       || cmd == CMD_LOOP || cmd == CMD_OPEN)
4859     sink->pending_cmd = CMD_LOOP;
4860   else
4861     sink->pending_cmd = CMD_WAIT;
4862   GST_DEBUG_OBJECT (sink, "got command %s", cmd_to_string (cmd));
4863
4864   /* we got the message command, so ensure communication is possible again */
4865   gst_rtsp_client_sink_connection_flush (sink, FALSE);
4866
4867   sink->busy_cmd = cmd;
4868   GST_OBJECT_UNLOCK (sink);
4869
4870   switch (cmd) {
4871     case CMD_OPEN:
4872       if (gst_rtsp_client_sink_open (sink, TRUE) == GST_RTSP_ERROR)
4873         gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT,
4874             CMD_ALL & ~CMD_CLOSE);
4875       break;
4876     case CMD_RECORD:
4877       gst_rtsp_client_sink_record (sink, TRUE);
4878       break;
4879     case CMD_PAUSE:
4880       gst_rtsp_client_sink_pause (sink, TRUE);
4881       break;
4882     case CMD_CLOSE:
4883       gst_rtsp_client_sink_close (sink, TRUE, FALSE);
4884       break;
4885     case CMD_LOOP:
4886       gst_rtsp_client_sink_loop (sink);
4887       break;
4888     case CMD_RECONNECT:
4889       gst_rtsp_client_sink_reconnect (sink, FALSE);
4890       break;
4891     default:
4892       break;
4893   }
4894
4895   GST_OBJECT_LOCK (sink);
4896   /* and go back to sleep */
4897   if (sink->pending_cmd == CMD_WAIT) {
4898     if (sink->task)
4899       gst_task_pause (sink->task);
4900   }
4901   /* reset waiting */
4902   sink->busy_cmd = CMD_WAIT;
4903   GST_OBJECT_UNLOCK (sink);
4904 }
4905
4906 static gboolean
4907 gst_rtsp_client_sink_start (GstRTSPClientSink * sink)
4908 {
4909   GST_DEBUG_OBJECT (sink, "starting");
4910
4911   sink->streams_collected = FALSE;
4912   gst_element_set_locked_state (GST_ELEMENT (sink->internal_bin), TRUE);
4913
4914   gst_rtsp_client_sink_set_state (sink, GST_STATE_READY);
4915
4916   GST_OBJECT_LOCK (sink);
4917   sink->pending_cmd = CMD_WAIT;
4918
4919   if (sink->task == NULL) {
4920     sink->task =
4921         gst_task_new ((GstTaskFunction) gst_rtsp_client_sink_thread, sink,
4922         NULL);
4923     if (sink->task == NULL)
4924       goto task_error;
4925
4926     gst_task_set_lock (sink->task, GST_RTSP_STREAM_GET_LOCK (sink));
4927   }
4928   GST_OBJECT_UNLOCK (sink);
4929
4930   return TRUE;
4931
4932   /* ERRORS */
4933 task_error:
4934   {
4935     GST_OBJECT_UNLOCK (sink);
4936     GST_ERROR_OBJECT (sink, "failed to create task");
4937     return FALSE;
4938   }
4939 }
4940
4941 static gboolean
4942 gst_rtsp_client_sink_stop (GstRTSPClientSink * sink)
4943 {
4944   GstTask *task;
4945
4946   GST_DEBUG_OBJECT (sink, "stopping");
4947
4948   /* also cancels pending task */
4949   gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_ALL & ~CMD_CLOSE);
4950
4951   GST_OBJECT_LOCK (sink);
4952   if ((task = sink->task)) {
4953     sink->task = NULL;
4954     GST_OBJECT_UNLOCK (sink);
4955
4956     gst_task_stop (task);
4957
4958     /* make sure it is not running */
4959     GST_RTSP_STREAM_LOCK (sink);
4960     GST_RTSP_STREAM_UNLOCK (sink);
4961
4962     /* now wait for the task to finish */
4963     gst_task_join (task);
4964
4965     /* and free the task */
4966     gst_object_unref (GST_OBJECT (task));
4967
4968     GST_OBJECT_LOCK (sink);
4969   }
4970   GST_OBJECT_UNLOCK (sink);
4971
4972   /* ensure synchronously all is closed and clean */
4973   gst_rtsp_client_sink_close (sink, FALSE, TRUE);
4974
4975   return TRUE;
4976 }
4977
4978 static GstStateChangeReturn
4979 gst_rtsp_client_sink_change_state (GstElement * element,
4980     GstStateChange transition)
4981 {
4982   GstRTSPClientSink *rtsp_client_sink;
4983   GstStateChangeReturn ret;
4984
4985   rtsp_client_sink = GST_RTSP_CLIENT_SINK (element);
4986
4987   switch (transition) {
4988     case GST_STATE_CHANGE_NULL_TO_READY:
4989       if (!gst_rtsp_client_sink_start (rtsp_client_sink))
4990         goto start_failed;
4991       break;
4992     case GST_STATE_CHANGE_READY_TO_PAUSED:
4993       /* init some state */
4994       rtsp_client_sink->cur_protocols = rtsp_client_sink->protocols;
4995       /* first attempt, don't ignore timeouts */
4996       rtsp_client_sink->ignore_timeout = FALSE;
4997       rtsp_client_sink->open_error = FALSE;
4998
4999       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_PAUSED);
5000
5001       g_mutex_lock (&rtsp_client_sink->preroll_lock);
5002       if (rtsp_client_sink->in_async) {
5003         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-START");
5004         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
5005             gst_message_new_async_start (GST_OBJECT_CAST (rtsp_client_sink)));
5006       }
5007       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
5008
5009       break;
5010     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5011       /* fall-through */
5012     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5013       /* unblock the tcp tasks and make the loop waiting */
5014       if (gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_WAIT,
5015               CMD_LOOP)) {
5016         /* make sure it is waiting before we send PLAY below */
5017         GST_RTSP_STREAM_LOCK (rtsp_client_sink);
5018         GST_RTSP_STREAM_UNLOCK (rtsp_client_sink);
5019       }
5020       break;
5021     case GST_STATE_CHANGE_PAUSED_TO_READY:
5022       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_READY);
5023       break;
5024     default:
5025       break;
5026   }
5027
5028   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5029   if (ret == GST_STATE_CHANGE_FAILURE)
5030     goto done;
5031
5032   switch (transition) {
5033     case GST_STATE_CHANGE_NULL_TO_READY:
5034       ret = GST_STATE_CHANGE_SUCCESS;
5035       break;
5036     case GST_STATE_CHANGE_READY_TO_PAUSED:
5037       /* Return ASYNC and preroll input streams */
5038       g_mutex_lock (&rtsp_client_sink->preroll_lock);
5039       if (rtsp_client_sink->in_async)
5040         ret = GST_STATE_CHANGE_ASYNC;
5041       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
5042       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_OPEN, 0);
5043
5044       /* CMD_OPEN has been scheduled. Wait until the sink thread starts
5045        * opening connection to the server */
5046       g_mutex_lock (&rtsp_client_sink->open_conn_lock);
5047       while (!rtsp_client_sink->open_conn_start) {
5048         GST_DEBUG_OBJECT (rtsp_client_sink,
5049             "wait for connection to be started");
5050         g_cond_wait (&rtsp_client_sink->open_conn_cond,
5051             &rtsp_client_sink->open_conn_lock);
5052       }
5053       rtsp_client_sink->open_conn_start = FALSE;
5054       g_mutex_unlock (&rtsp_client_sink->open_conn_lock);
5055       break;
5056     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:{
5057       GST_DEBUG_OBJECT (rtsp_client_sink,
5058           "Switching to playing -sending RECORD");
5059       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECORD, 0);
5060       ret = GST_STATE_CHANGE_SUCCESS;
5061       break;
5062     }
5063     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5064       /* send pause request and keep the idle task around */
5065       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_PAUSE,
5066           CMD_LOOP);
5067       ret = GST_STATE_CHANGE_NO_PREROLL;
5068       break;
5069     case GST_STATE_CHANGE_PAUSED_TO_READY:
5070       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_CLOSE,
5071           CMD_PAUSE);
5072       ret = GST_STATE_CHANGE_SUCCESS;
5073       break;
5074     case GST_STATE_CHANGE_READY_TO_NULL:
5075       gst_rtsp_client_sink_stop (rtsp_client_sink);
5076       ret = GST_STATE_CHANGE_SUCCESS;
5077       break;
5078     default:
5079       break;
5080   }
5081
5082 done:
5083   return ret;
5084
5085 start_failed:
5086   {
5087     GST_DEBUG_OBJECT (rtsp_client_sink, "start failed");
5088     return GST_STATE_CHANGE_FAILURE;
5089   }
5090 }
5091
5092 /*** GSTURIHANDLER INTERFACE *************************************************/
5093
5094 static GstURIType
5095 gst_rtsp_client_sink_uri_get_type (GType type)
5096 {
5097   return GST_URI_SINK;
5098 }
5099
5100 static const gchar *const *
5101 gst_rtsp_client_sink_uri_get_protocols (GType type)
5102 {
5103   static const gchar *protocols[] =
5104       { "rtsp", "rtspu", "rtspt", "rtsph", "rtsp-sdp",
5105     "rtsps", "rtspsu", "rtspst", "rtspsh", NULL
5106   };
5107
5108   return protocols;
5109 }
5110
5111 static gchar *
5112 gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler)
5113 {
5114   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (handler);
5115
5116   /* FIXME: make thread-safe */
5117   return g_strdup (sink->conninfo.location);
5118 }
5119
5120 static gboolean
5121 gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
5122     GError ** error)
5123 {
5124   GstRTSPClientSink *sink;
5125   GstRTSPResult res;
5126   GstSDPResult sres;
5127   GstRTSPUrl *newurl = NULL;
5128   GstSDPMessage *sdp = NULL;
5129
5130   sink = GST_RTSP_CLIENT_SINK (handler);
5131
5132   /* same URI, we're fine */
5133   if (sink->conninfo.location && uri && !strcmp (uri, sink->conninfo.location))
5134     goto was_ok;
5135
5136   if (g_str_has_prefix (uri, "rtsp-sdp://")) {
5137     sres = gst_sdp_message_new (&sdp);
5138     if (sres < 0)
5139       goto sdp_failed;
5140
5141     GST_DEBUG_OBJECT (sink, "parsing SDP message");
5142     sres = gst_sdp_message_parse_uri (uri, sdp);
5143     if (sres < 0)
5144       goto invalid_sdp;
5145   } else {
5146     /* try to parse */
5147     GST_DEBUG_OBJECT (sink, "parsing URI");
5148     if ((res = gst_rtsp_url_parse (uri, &newurl)) < 0)
5149       goto parse_error;
5150   }
5151
5152   /* if worked, free previous and store new url object along with the original
5153    * location. */
5154   GST_DEBUG_OBJECT (sink, "configuring URI");
5155   g_free (sink->conninfo.location);
5156   sink->conninfo.location = g_strdup (uri);
5157   gst_rtsp_url_free (sink->conninfo.url);
5158   sink->conninfo.url = newurl;
5159   g_free (sink->conninfo.url_str);
5160   if (newurl)
5161     sink->conninfo.url_str = gst_rtsp_url_get_request_uri (sink->conninfo.url);
5162   else
5163     sink->conninfo.url_str = NULL;
5164
5165   if (sink->uri_sdp)
5166     gst_sdp_message_free (sink->uri_sdp);
5167   sink->uri_sdp = sdp;
5168   sink->from_sdp = sdp != NULL;
5169
5170   GST_DEBUG_OBJECT (sink, "set uri: %s", GST_STR_NULL (uri));
5171   GST_DEBUG_OBJECT (sink, "request uri is: %s",
5172       GST_STR_NULL (sink->conninfo.url_str));
5173
5174   return TRUE;
5175
5176   /* Special cases */
5177 was_ok:
5178   {
5179     GST_DEBUG_OBJECT (sink, "URI was ok: '%s'", GST_STR_NULL (uri));
5180     return TRUE;
5181   }
5182 sdp_failed:
5183   {
5184     GST_ERROR_OBJECT (sink, "Could not create new SDP (%d)", sres);
5185     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5186         "Could not create SDP");
5187     return FALSE;
5188   }
5189 invalid_sdp:
5190   {
5191     GST_ERROR_OBJECT (sink, "Not a valid SDP (%d) '%s'", sres,
5192         GST_STR_NULL (uri));
5193     gst_sdp_message_free (sdp);
5194     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5195         "Invalid SDP");
5196     return FALSE;
5197   }
5198 parse_error:
5199   {
5200     GST_ERROR_OBJECT (sink, "Not a valid RTSP url '%s' (%d)",
5201         GST_STR_NULL (uri), res);
5202     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5203         "Invalid RTSP URI");
5204     return FALSE;
5205   }
5206 }
5207
5208 static void
5209 gst_rtsp_client_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
5210 {
5211   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
5212
5213   iface->get_type = gst_rtsp_client_sink_uri_get_type;
5214   iface->get_protocols = gst_rtsp_client_sink_uri_get_protocols;
5215   iface->get_uri = gst_rtsp_client_sink_uri_get_uri;
5216   iface->set_uri = gst_rtsp_client_sink_uri_set_uri;
5217 }