rtspclientsink: Don't deadlock in preroll on early close
[platform/upstream/gstreamer.git] / gst / rtsp-sink / gstrtspclientsink.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim at fluendo dot com>
3  *               <2006> Lutz Mueller <lutz at topfrose dot de>
4  *               <2015> Jan Schmidt <jan at centricular dot com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /*
22  * Unless otherwise indicated, Source Code is licensed under MIT license.
23  * See further explanation attached in License Statement (distributed in the file
24  * LICENSE).
25  *
26  * Permission is hereby granted, free of charge, to any person obtaining a copy of
27  * this software and associated documentation files (the "Software"), to deal in
28  * the Software without restriction, including without limitation the rights to
29  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
30  * of the Software, and to permit persons to whom the Software is furnished to do
31  * so, subject to the following conditions:
32  *
33  * The above copyright notice and this permission notice shall be included in all
34  * copies or substantial portions of the Software.
35  *
36  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
37  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
38  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
39  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
40  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
41  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
42  * SOFTWARE.
43  */
44 /**
45  * SECTION:element-rtspclientsink
46  *
47  * Makes a connection to an RTSP server and send data via RTSP RECORD.
48  * rtspclientsink strictly follows RFC 2326
49  *
50  * RTSP supports transport over TCP or UDP in unicast or multicast mode. By
51  * default rtspclientsink will negotiate a connection in the following order:
52  * UDP unicast/UDP multicast/TCP. The order cannot be changed but the allowed
53  * protocols can be controlled with the #GstRTSPClientSink:protocols property.
54  *
55  * rtspclientsink will internally instantiate an RTP session manager element
56  * that will handle the RTCP messages to and from the server, jitter removal,
57  * and packet reordering.
58  * This feature is implemented using the gstrtpbin element.
59  *
60  * rtspclientsink accepts any stream for which there is an installed payloader,
61  * creates the payloader and manages payload-types, as well as RTX setup.
62  * The new-payloader signal is fired when a payloader is created, in case
63  * an app wants to do custom configuration (such as for MTU).
64  *
65  * <refsect2>
66  * <title>Example launch line</title>
67  * |[
68  * gst-launch-1.0 videotestsrc ! jpegenc ! rtspclientsink location=rtsp://some.server/url
69  * ]| Establish a connection to an RTSP server and send JPEG encoded video packets
70  * </refsect2>
71  */
72
73 /* FIXMEs
74  * - Handle EOS properly and shutdown. The problem with EOS is we don't know
75  *   when the server has received all data, so we don't know when to do teardown.
76  *   At the moment, we forward EOS to the app as soon as we stop sending. Is there
77  *   a way to know from the receiver that it's got all data? Some session timeout?
78  * - Implement extension support for Real / WMS if they support RECORD?
79  * - Add support for network clock synchronised streaming?
80  * - Fix crypto key nego so SAVP/SAVPF profiles work.
81  * - Test (&fix?) HTTP tunnel support
82  * - Add an address pool object for GstRTSPStreams to use for multicast
83  * - Test multicast UDP transport
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #include "config.h"
88 #endif
89
90 #ifdef HAVE_UNISTD_H
91 #include <unistd.h>
92 #endif /* HAVE_UNISTD_H */
93 #include <stdlib.h>
94 #include <string.h>
95 #include <stdio.h>
96 #include <stdarg.h>
97
98 #include <gst/net/gstnet.h>
99 #include <gst/sdp/gstsdpmessage.h>
100 #include <gst/sdp/gstmikey.h>
101 #include <gst/rtp/rtp.h>
102
103 #include "gstrtspclientsink.h"
104
105 typedef struct _GstRtspClientSinkPad GstRtspClientSinkPad;
106 typedef GstGhostPadClass GstRtspClientSinkPadClass;
107
108 struct _GstRtspClientSinkPad
109 {
110   GstGhostPad parent;
111   GstElement *custom_payloader;
112   guint ulpfec_percentage;
113 };
114
115 enum
116 {
117   PROP_PAD_0,
118   PROP_PAD_PAYLOADER,
119   PROP_PAD_ULPFEC_PERCENTAGE
120 };
121
122 #define DEFAULT_PAD_ULPFEC_PERCENTAGE 0
123
124 static GType gst_rtsp_client_sink_pad_get_type (void);
125 G_DEFINE_TYPE (GstRtspClientSinkPad, gst_rtsp_client_sink_pad,
126     GST_TYPE_GHOST_PAD);
127 #define GST_TYPE_RTSP_CLIENT_SINK_PAD (gst_rtsp_client_sink_pad_get_type ())
128 #define GST_RTSP_CLIENT_SINK_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTSP_CLIENT_SINK_PAD,GstRtspClientSinkPad))
129
130 static void
131 gst_rtsp_client_sink_pad_set_property (GObject * object, guint prop_id,
132     const GValue * value, GParamSpec * pspec)
133 {
134   GstRtspClientSinkPad *pad;
135
136   pad = GST_RTSP_CLIENT_SINK_PAD (object);
137
138   switch (prop_id) {
139     case PROP_PAD_PAYLOADER:
140       GST_OBJECT_LOCK (pad);
141       if (pad->custom_payloader)
142         gst_object_unref (pad->custom_payloader);
143       pad->custom_payloader = g_value_get_object (value);
144       gst_object_ref_sink (pad->custom_payloader);
145       GST_OBJECT_UNLOCK (pad);
146       break;
147     case PROP_PAD_ULPFEC_PERCENTAGE:
148       GST_OBJECT_LOCK (pad);
149       pad->ulpfec_percentage = g_value_get_uint (value);
150       GST_OBJECT_UNLOCK (pad);
151       break;
152     default:
153       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
154       break;
155   }
156 }
157
158 static void
159 gst_rtsp_client_sink_pad_get_property (GObject * object, guint prop_id,
160     GValue * value, GParamSpec * pspec)
161 {
162   GstRtspClientSinkPad *pad;
163
164   pad = GST_RTSP_CLIENT_SINK_PAD (object);
165
166   switch (prop_id) {
167     case PROP_PAD_PAYLOADER:
168       GST_OBJECT_LOCK (pad);
169       g_value_set_object (value, pad->custom_payloader);
170       GST_OBJECT_UNLOCK (pad);
171       break;
172     case PROP_PAD_ULPFEC_PERCENTAGE:
173       GST_OBJECT_LOCK (pad);
174       g_value_set_uint (value, pad->ulpfec_percentage);
175       GST_OBJECT_UNLOCK (pad);
176       break;
177     default:
178       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
179       break;
180   }
181 }
182
183 static void
184 gst_rtsp_client_sink_pad_dispose (GObject * object)
185 {
186   GstRtspClientSinkPad *pad = GST_RTSP_CLIENT_SINK_PAD (object);
187
188   if (pad->custom_payloader)
189     gst_object_unref (pad->custom_payloader);
190
191   G_OBJECT_CLASS (gst_rtsp_client_sink_pad_parent_class)->dispose (object);
192 }
193
194 static void
195 gst_rtsp_client_sink_pad_class_init (GstRtspClientSinkPadClass * klass)
196 {
197   GObjectClass *gobject_klass;
198
199   gobject_klass = (GObjectClass *) klass;
200
201   gobject_klass->set_property = gst_rtsp_client_sink_pad_set_property;
202   gobject_klass->get_property = gst_rtsp_client_sink_pad_get_property;
203   gobject_klass->dispose = gst_rtsp_client_sink_pad_dispose;
204
205   g_object_class_install_property (gobject_klass, PROP_PAD_PAYLOADER,
206       g_param_spec_object ("payloader", "Payloader",
207           "The payloader element to use (NULL = default automatically selected)",
208           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_klass, PROP_PAD_ULPFEC_PERCENTAGE,
211       g_param_spec_uint ("ulpfec-percentage", "ULPFEC percentage",
212           "The percentage of ULP redundancy to apply", 0, 100,
213           DEFAULT_PAD_ULPFEC_PERCENTAGE,
214           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215 }
216
217 static void
218 gst_rtsp_client_sink_pad_init (GstRtspClientSinkPad * pad)
219 {
220 }
221
222 static GstPad *
223 gst_rtsp_client_sink_pad_new (const GstPadTemplate * pad_tmpl,
224     const gchar * name)
225 {
226   GstRtspClientSinkPad *ret;
227
228   ret =
229       g_object_new (GST_TYPE_RTSP_CLIENT_SINK_PAD, "direction", GST_PAD_SINK,
230       "template", pad_tmpl, "name", name, NULL);
231   gst_ghost_pad_construct (GST_GHOST_PAD_CAST (ret));
232
233   return GST_PAD (ret);
234 }
235
236 GST_DEBUG_CATEGORY_STATIC (rtsp_client_sink_debug);
237 #define GST_CAT_DEFAULT (rtsp_client_sink_debug)
238
239 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
240     GST_PAD_SINK,
241     GST_PAD_REQUEST,
242     GST_STATIC_CAPS_ANY);       /* Actual caps come from available set of payloaders */
243
244 enum
245 {
246   SIGNAL_HANDLE_REQUEST,
247   SIGNAL_NEW_MANAGER,
248   SIGNAL_NEW_PAYLOADER,
249   SIGNAL_REQUEST_RTCP_KEY,
250   SIGNAL_ACCEPT_CERTIFICATE,
251   LAST_SIGNAL
252 };
253
254 enum _GstRTSPClientSinkNtpTimeSource
255 {
256   NTP_TIME_SOURCE_NTP,
257   NTP_TIME_SOURCE_UNIX,
258   NTP_TIME_SOURCE_RUNNING_TIME,
259   NTP_TIME_SOURCE_CLOCK_TIME
260 };
261
262 #define GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE (gst_rtsp_client_sink_ntp_time_source_get_type())
263 static GType
264 gst_rtsp_client_sink_ntp_time_source_get_type (void)
265 {
266   static GType ntp_time_source_type = 0;
267   static const GEnumValue ntp_time_source_values[] = {
268     {NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
269     {NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
270     {NTP_TIME_SOURCE_RUNNING_TIME,
271           "Running time based on pipeline clock",
272         "running-time"},
273     {NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
274     {0, NULL, NULL},
275   };
276
277   if (!ntp_time_source_type) {
278     ntp_time_source_type =
279         g_enum_register_static ("GstRTSPClientSinkNtpTimeSource",
280         ntp_time_source_values);
281   }
282   return ntp_time_source_type;
283 }
284
285 #define DEFAULT_LOCATION         NULL
286 #define DEFAULT_PROTOCOLS        GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP
287 #define DEFAULT_DEBUG            FALSE
288 #define DEFAULT_RETRY            20
289 #define DEFAULT_TIMEOUT          5000000
290 #define DEFAULT_UDP_BUFFER_SIZE  0x80000
291 #define DEFAULT_TCP_TIMEOUT      20000000
292 #define DEFAULT_LATENCY_MS       2000
293 #define DEFAULT_DO_RTSP_KEEP_ALIVE       TRUE
294 #define DEFAULT_PROXY            NULL
295 #define DEFAULT_RTP_BLOCKSIZE    0
296 #define DEFAULT_USER_ID          NULL
297 #define DEFAULT_USER_PW          NULL
298 #define DEFAULT_PORT_RANGE       NULL
299 #define DEFAULT_UDP_RECONNECT    TRUE
300 #define DEFAULT_MULTICAST_IFACE  NULL
301 #define DEFAULT_TLS_VALIDATION_FLAGS     G_TLS_CERTIFICATE_VALIDATE_ALL
302 #define DEFAULT_TLS_DATABASE     NULL
303 #define DEFAULT_TLS_INTERACTION     NULL
304 #define DEFAULT_NTP_TIME_SOURCE  NTP_TIME_SOURCE_NTP
305 #define DEFAULT_USER_AGENT       "GStreamer/" PACKAGE_VERSION
306 #define DEFAULT_PROFILES         GST_RTSP_PROFILE_AVP
307 #define DEFAULT_RTX_TIME_MS      500
308
309 enum
310 {
311   PROP_0,
312   PROP_LOCATION,
313   PROP_PROTOCOLS,
314   PROP_DEBUG,
315   PROP_RETRY,
316   PROP_TIMEOUT,
317   PROP_TCP_TIMEOUT,
318   PROP_LATENCY,
319   PROP_RTX_TIME,
320   PROP_DO_RTSP_KEEP_ALIVE,
321   PROP_PROXY,
322   PROP_PROXY_ID,
323   PROP_PROXY_PW,
324   PROP_RTP_BLOCKSIZE,
325   PROP_USER_ID,
326   PROP_USER_PW,
327   PROP_PORT_RANGE,
328   PROP_UDP_BUFFER_SIZE,
329   PROP_UDP_RECONNECT,
330   PROP_MULTICAST_IFACE,
331   PROP_SDES,
332   PROP_TLS_VALIDATION_FLAGS,
333   PROP_TLS_DATABASE,
334   PROP_TLS_INTERACTION,
335   PROP_NTP_TIME_SOURCE,
336   PROP_USER_AGENT,
337   PROP_PROFILES
338 };
339
340 static void gst_rtsp_client_sink_finalize (GObject * object);
341
342 static void gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
343     const GValue * value, GParamSpec * pspec);
344 static void gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
345     GValue * value, GParamSpec * pspec);
346
347 static GstClock *gst_rtsp_client_sink_provide_clock (GstElement * element);
348
349 static void gst_rtsp_client_sink_uri_handler_init (gpointer g_iface,
350     gpointer iface_data);
351
352 static gboolean gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp,
353     const gchar * proxy);
354 static void gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink *
355     rtsp_client_sink, guint64 timeout);
356
357 static GstStateChangeReturn gst_rtsp_client_sink_change_state (GstElement *
358     element, GstStateChange transition);
359 static void gst_rtsp_client_sink_handle_message (GstBin * bin,
360     GstMessage * message);
361
362 static gboolean gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
363     GstRTSPMessage * response);
364
365 static gboolean gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink,
366     gint cmd, gint mask);
367
368 static GstRTSPResult gst_rtsp_client_sink_open (GstRTSPClientSink * sink,
369     gboolean async);
370 static GstRTSPResult gst_rtsp_client_sink_record (GstRTSPClientSink * sink,
371     gboolean async);
372 static GstRTSPResult gst_rtsp_client_sink_pause (GstRTSPClientSink * sink,
373     gboolean async);
374 static GstRTSPResult gst_rtsp_client_sink_close (GstRTSPClientSink * sink,
375     gboolean async, gboolean only_close);
376 static gboolean gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink);
377
378 static gboolean gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler,
379     const gchar * uri, GError ** error);
380 static gchar *gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler);
381
382 static gboolean gst_rtsp_client_sink_loop (GstRTSPClientSink * sink);
383 static void gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink,
384     gboolean flush);
385
386 static GstPad *gst_rtsp_client_sink_request_new_pad (GstElement * element,
387     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
388 static void gst_rtsp_client_sink_release_pad (GstElement * element,
389     GstPad * pad);
390
391 /* commands we send to out loop to notify it of events */
392 #define CMD_OPEN        (1 << 0)
393 #define CMD_RECORD      (1 << 1)
394 #define CMD_PAUSE       (1 << 2)
395 #define CMD_CLOSE       (1 << 3)
396 #define CMD_WAIT        (1 << 4)
397 #define CMD_RECONNECT   (1 << 5)
398 #define CMD_LOOP        (1 << 6)
399
400 /* mask for all commands */
401 #define CMD_ALL         ((CMD_LOOP << 1) - 1)
402
403 #define GST_ELEMENT_PROGRESS(el, type, code, text)      \
404 G_STMT_START {                                          \
405   gchar *__txt = _gst_element_error_printf text;        \
406   gst_element_post_message (GST_ELEMENT_CAST (el),      \
407       gst_message_new_progress (GST_OBJECT_CAST (el),   \
408           GST_PROGRESS_TYPE_ ##type, code, __txt));     \
409   g_free (__txt);                                       \
410 } G_STMT_END
411
412 static guint gst_rtsp_client_sink_signals[LAST_SIGNAL] = { 0 };
413
414 /*********************************
415  * GstChildProxy implementation  *
416  *********************************/
417 static GObject *
418 gst_rtsp_client_sink_child_proxy_get_child_by_index (GstChildProxy *
419     child_proxy, guint index)
420 {
421   GObject *obj;
422   GstRTSPClientSink *cs = GST_RTSP_CLIENT_SINK (child_proxy);
423
424   GST_OBJECT_LOCK (cs);
425   if ((obj = g_list_nth_data (GST_ELEMENT (cs)->sinkpads, index)))
426     g_object_ref (obj);
427   GST_OBJECT_UNLOCK (cs);
428
429   return obj;
430 }
431
432 static guint
433 gst_rtsp_client_sink_child_proxy_get_children_count (GstChildProxy *
434     child_proxy)
435 {
436   guint count = 0;
437
438   GST_OBJECT_LOCK (child_proxy);
439   count = GST_ELEMENT (child_proxy)->numsinkpads;
440   GST_OBJECT_UNLOCK (child_proxy);
441
442   GST_INFO_OBJECT (child_proxy, "Children Count: %d", count);
443
444   return count;
445 }
446
447 static void
448 gst_rtsp_client_sink_child_proxy_init (gpointer g_iface, gpointer iface_data)
449 {
450   GstChildProxyInterface *iface = g_iface;
451
452   GST_INFO ("intializing child proxy interface");
453   iface->get_child_by_index =
454       gst_rtsp_client_sink_child_proxy_get_child_by_index;
455   iface->get_children_count =
456       gst_rtsp_client_sink_child_proxy_get_children_count;
457 }
458
459 #define gst_rtsp_client_sink_parent_class parent_class
460 G_DEFINE_TYPE_WITH_CODE (GstRTSPClientSink, gst_rtsp_client_sink, GST_TYPE_BIN,
461     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
462         gst_rtsp_client_sink_uri_handler_init);
463     G_IMPLEMENT_INTERFACE (GST_TYPE_CHILD_PROXY,
464         gst_rtsp_client_sink_child_proxy_init);
465     );
466
467 #ifndef GST_DISABLE_GST_DEBUG
468 static inline const gchar *
469 cmd_to_string (guint cmd)
470 {
471   switch (cmd) {
472     case CMD_OPEN:
473       return "OPEN";
474     case CMD_RECORD:
475       return "RECORD";
476     case CMD_PAUSE:
477       return "PAUSE";
478     case CMD_CLOSE:
479       return "CLOSE";
480     case CMD_WAIT:
481       return "WAIT";
482     case CMD_RECONNECT:
483       return "RECONNECT";
484     case CMD_LOOP:
485       return "LOOP";
486   }
487
488   return "unknown";
489 }
490 #endif
491
492 static void
493 gst_rtsp_client_sink_class_init (GstRTSPClientSinkClass * klass)
494 {
495   GObjectClass *gobject_class;
496   GstElementClass *gstelement_class;
497   GstBinClass *gstbin_class;
498
499   gobject_class = (GObjectClass *) klass;
500   gstelement_class = (GstElementClass *) klass;
501   gstbin_class = (GstBinClass *) klass;
502
503   GST_DEBUG_CATEGORY_INIT (rtsp_client_sink_debug, "rtspclientsink", 0,
504       "RTSP sink element");
505
506   gobject_class->set_property = gst_rtsp_client_sink_set_property;
507   gobject_class->get_property = gst_rtsp_client_sink_get_property;
508
509   gobject_class->finalize = gst_rtsp_client_sink_finalize;
510
511   g_object_class_install_property (gobject_class, PROP_LOCATION,
512       g_param_spec_string ("location", "RTSP Location",
513           "Location of the RTSP url to read",
514           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
515
516   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
517       g_param_spec_flags ("protocols", "Protocols",
518           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
519           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
520
521   g_object_class_install_property (gobject_class, PROP_PROFILES,
522       g_param_spec_flags ("profiles", "Profiles",
523           "Allowed RTSP profiles", GST_TYPE_RTSP_PROFILE,
524           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
525
526   g_object_class_install_property (gobject_class, PROP_DEBUG,
527       g_param_spec_boolean ("debug", "Debug",
528           "Dump request and response messages to stdout",
529           DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
530
531   g_object_class_install_property (gobject_class, PROP_RETRY,
532       g_param_spec_uint ("retry", "Retry",
533           "Max number of retries when allocating RTP ports.",
534           0, G_MAXUINT16, DEFAULT_RETRY,
535           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536
537   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
538       g_param_spec_uint64 ("timeout", "Timeout",
539           "Retry TCP transport after UDP timeout microseconds (0 = disabled)",
540           0, G_MAXUINT64, DEFAULT_TIMEOUT,
541           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
542
543   g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT,
544       g_param_spec_uint64 ("tcp-timeout", "TCP Timeout",
545           "Fail after timeout microseconds on TCP connections (0 = disabled)",
546           0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT,
547           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
548
549   g_object_class_install_property (gobject_class, PROP_LATENCY,
550       g_param_spec_uint ("latency", "Buffer latency in ms",
551           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
552           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553
554   g_object_class_install_property (gobject_class, PROP_RTX_TIME,
555       g_param_spec_uint ("rtx-time", "Retransmission buffer in ms",
556           "Amount of ms to buffer for retransmission. 0 disables retransmission",
557           0, G_MAXUINT, DEFAULT_RTX_TIME_MS,
558           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
559
560   /**
561    * GstRTSPClientSink:do-rtsp-keep-alive:
562    *
563    * Enable RTSP keep alive support. Some old server don't like RTSP
564    * keep alive and then this property needs to be set to FALSE.
565    */
566   g_object_class_install_property (gobject_class, PROP_DO_RTSP_KEEP_ALIVE,
567       g_param_spec_boolean ("do-rtsp-keep-alive", "Do RTSP Keep Alive",
568           "Send RTSP keep alive packets, disable for old incompatible server.",
569           DEFAULT_DO_RTSP_KEEP_ALIVE,
570           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
571
572   /**
573    * GstRTSPClientSink:proxy:
574    *
575    * Set the proxy parameters. This has to be a string of the format
576    * [http://][user:passwd@]host[:port].
577    */
578   g_object_class_install_property (gobject_class, PROP_PROXY,
579       g_param_spec_string ("proxy", "Proxy",
580           "Proxy settings for HTTP tunneling. Format: [http://][user:passwd@]host[:port]",
581           DEFAULT_PROXY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
582   /**
583    * GstRTSPClientSink:proxy-id:
584    *
585    * Sets the proxy URI user id for authentication. If the URI set via the
586    * "proxy" property contains a user-id already, that will take precedence.
587    *
588    */
589   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
590       g_param_spec_string ("proxy-id", "proxy-id",
591           "HTTP proxy URI user id for authentication", "",
592           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
593   /**
594    * GstRTSPClientSink:proxy-pw:
595    *
596    * Sets the proxy URI password for authentication. If the URI set via the
597    * "proxy" property contains a password already, that will take precedence.
598    *
599    */
600   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
601       g_param_spec_string ("proxy-pw", "proxy-pw",
602           "HTTP proxy URI user password for authentication", "",
603           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
604
605   /**
606    * GstRTSPClientSink:rtp-blocksize:
607    *
608    * RTP package size to suggest to server.
609    */
610   g_object_class_install_property (gobject_class, PROP_RTP_BLOCKSIZE,
611       g_param_spec_uint ("rtp-blocksize", "RTP Blocksize",
612           "RTP package size to suggest to server (0 = disabled)",
613           0, 65536, DEFAULT_RTP_BLOCKSIZE,
614           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
615
616   g_object_class_install_property (gobject_class,
617       PROP_USER_ID,
618       g_param_spec_string ("user-id", "user-id",
619           "RTSP location URI user id for authentication", DEFAULT_USER_ID,
620           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
621   g_object_class_install_property (gobject_class, PROP_USER_PW,
622       g_param_spec_string ("user-pw", "user-pw",
623           "RTSP location URI user password for authentication", DEFAULT_USER_PW,
624           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
625
626   /**
627    * GstRTSPClientSink:port-range:
628    *
629    * Configure the client port numbers that can be used to receive
630    * RTCP.
631    */
632   g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
633       g_param_spec_string ("port-range", "Port range",
634           "Client port range that can be used to receive RTCP data, "
635           "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE,
636           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
637
638   /**
639    * GstRTSPClientSink:udp-buffer-size:
640    *
641    * Size of the kernel UDP receive buffer in bytes.
642    */
643   g_object_class_install_property (gobject_class, PROP_UDP_BUFFER_SIZE,
644       g_param_spec_int ("udp-buffer-size", "UDP Buffer Size",
645           "Size of the kernel UDP receive buffer in bytes, 0=default",
646           0, G_MAXINT, DEFAULT_UDP_BUFFER_SIZE,
647           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
648
649   g_object_class_install_property (gobject_class, PROP_UDP_RECONNECT,
650       g_param_spec_boolean ("udp-reconnect", "Reconnect to the server",
651           "Reconnect to the server if RTSP connection is closed when doing UDP",
652           DEFAULT_UDP_RECONNECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
653
654   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
655       g_param_spec_string ("multicast-iface", "Multicast Interface",
656           "The network interface on which to join the multicast group",
657           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
658
659   g_object_class_install_property (gobject_class, PROP_SDES,
660       g_param_spec_boxed ("sdes", "SDES",
661           "The SDES items of this session",
662           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
663
664   /**
665    * GstRTSPClientSink::tls-validation-flags:
666    *
667    * TLS certificate validation flags used to validate server
668    * certificate.
669    *
670    */
671   g_object_class_install_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
672       g_param_spec_flags ("tls-validation-flags", "TLS validation flags",
673           "TLS certificate validation flags used to validate the server certificate",
674           G_TYPE_TLS_CERTIFICATE_FLAGS, DEFAULT_TLS_VALIDATION_FLAGS,
675           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
676
677   /**
678    * GstRTSPClientSink::tls-database:
679    *
680    * TLS database with anchor certificate authorities used to validate
681    * the server certificate.
682    *
683    */
684   g_object_class_install_property (gobject_class, PROP_TLS_DATABASE,
685       g_param_spec_object ("tls-database", "TLS database",
686           "TLS database with anchor certificate authorities used to validate the server certificate",
687           G_TYPE_TLS_DATABASE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
688
689   /**
690    * GstRTSPClientSink::tls-interaction:
691    *
692    * A #GTlsInteraction object to be used when the connection or certificate
693    * database need to interact with the user. This will be used to prompt the
694    * user for passwords where necessary.
695    *
696    */
697   g_object_class_install_property (gobject_class, PROP_TLS_INTERACTION,
698       g_param_spec_object ("tls-interaction", "TLS interaction",
699           "A GTlsInteraction object to prompt the user for password or certificate",
700           G_TYPE_TLS_INTERACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
701
702   /**
703    * GstRTSPClientSink::ntp-time-source:
704    *
705    * allows to select the time source that should be used
706    * for the NTP time in outgoing packets
707    *
708    */
709   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
710       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
711           "NTP time source for RTCP packets",
712           GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE, DEFAULT_NTP_TIME_SOURCE,
713           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
714
715   /**
716    * GstRTSPClientSink::user-agent:
717    *
718    * The string to set in the User-Agent header.
719    *
720    */
721   g_object_class_install_property (gobject_class, PROP_USER_AGENT,
722       g_param_spec_string ("user-agent", "User Agent",
723           "The User-Agent string to send to the server",
724           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
725
726   /**
727    * GstRTSPClientSink::handle-request:
728    * @rtsp_client_sink: a #GstRTSPClientSink
729    * @request: a #GstRTSPMessage
730    * @response: a #GstRTSPMessage
731    *
732    * Handle a server request in @request and prepare @response.
733    *
734    * This signal is called from the streaming thread, you should therefore not
735    * do any state changes on @rtsp_client_sink because this might deadlock. If you want
736    * to modify the state as a result of this signal, post a
737    * #GST_MESSAGE_REQUEST_STATE message on the bus or signal the main thread
738    * in some other way.
739    *
740    */
741   gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST] =
742       g_signal_new ("handle-request", G_TYPE_FROM_CLASS (klass), 0,
743       0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
744       G_TYPE_POINTER, G_TYPE_POINTER);
745
746   /**
747    * GstRTSPClientSink::new-manager:
748    * @rtsp_client_sink: a #GstRTSPClientSink
749    * @manager: a #GstElement
750    *
751    * Emitted after a new manager (like rtpbin) was created and the default
752    * properties were configured.
753    *
754    */
755   gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER] =
756       g_signal_new_class_handler ("new-manager", G_TYPE_FROM_CLASS (klass),
757       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL,
758       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
759
760   /**
761    * GstRTSPClientSink::new-payloader:
762    * @rtsp_client_sink: a #GstRTSPClientSink
763    * @payloader: a #GstElement
764    *
765    * Emitted after a new RTP payloader was created and the default
766    * properties were configured.
767    *
768    */
769   gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER] =
770       g_signal_new_class_handler ("new-payloader", G_TYPE_FROM_CLASS (klass),
771       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL,
772       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
773
774   /**
775    * GstRTSPClientSink::request-rtcp-key:
776    * @rtsp_client_sink: a #GstRTSPClientSink
777    * @num: the stream number
778    *
779    * Signal emitted to get the crypto parameters relevant to the RTCP
780    * stream. User should provide the key and the RTCP encryption ciphers
781    * and authentication, and return them wrapped in a GstCaps.
782    *
783    */
784   gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY] =
785       g_signal_new ("request-rtcp-key", G_TYPE_FROM_CLASS (klass),
786       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
787
788   /**
789    * GstRTSPClientSink::accept-certificate:
790    * @rtsp_client_sink: a #GstRTSPClientSink
791    * @peer_cert: the peer's #GTlsCertificate
792    * @errors: the problems with @peer_cert
793    * @user_data: user data set when the signal handler was connected.
794    *
795    * This will directly map to #GTlsConnection 's "accept-certificate"
796    * signal and be performed after the default checks of #GstRTSPConnection
797    * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
798    * have failed. If no #GTlsDatabase is set on this connection, only this
799    * signal will be emitted.
800    *
801    * Since: 1.14
802    */
803   gst_rtsp_client_sink_signals[SIGNAL_ACCEPT_CERTIFICATE] =
804       g_signal_new ("accept-certificate", G_TYPE_FROM_CLASS (klass),
805       G_SIGNAL_RUN_LAST, 0, g_signal_accumulator_true_handled, NULL, NULL,
806       G_TYPE_BOOLEAN, 3, G_TYPE_TLS_CONNECTION, G_TYPE_TLS_CERTIFICATE,
807       G_TYPE_TLS_CERTIFICATE_FLAGS);
808
809   gstelement_class->provide_clock = gst_rtsp_client_sink_provide_clock;
810   gstelement_class->change_state = gst_rtsp_client_sink_change_state;
811   gstelement_class->request_new_pad =
812       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_request_new_pad);
813   gstelement_class->release_pad =
814       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_release_pad);
815
816   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
817       &rtptemplate, GST_TYPE_RTSP_CLIENT_SINK_PAD);
818
819   gst_element_class_set_static_metadata (gstelement_class,
820       "RTSP RECORD client", "Sink/Network",
821       "Send data over the network via RTSP RECORD(RFC 2326)",
822       "Jan Schmidt <jan@centricular.com>");
823
824   gstbin_class->handle_message = gst_rtsp_client_sink_handle_message;
825 }
826
827 static void
828 gst_rtsp_client_sink_init (GstRTSPClientSink * sink)
829 {
830   sink->conninfo.location = g_strdup (DEFAULT_LOCATION);
831   sink->protocols = DEFAULT_PROTOCOLS;
832   sink->debug = DEFAULT_DEBUG;
833   sink->retry = DEFAULT_RETRY;
834   sink->udp_timeout = DEFAULT_TIMEOUT;
835   gst_rtsp_client_sink_set_tcp_timeout (sink, DEFAULT_TCP_TIMEOUT);
836   sink->latency = DEFAULT_LATENCY_MS;
837   sink->rtx_time = DEFAULT_RTX_TIME_MS;
838   sink->do_rtsp_keep_alive = DEFAULT_DO_RTSP_KEEP_ALIVE;
839   gst_rtsp_client_sink_set_proxy (sink, DEFAULT_PROXY);
840   sink->rtp_blocksize = DEFAULT_RTP_BLOCKSIZE;
841   sink->user_id = g_strdup (DEFAULT_USER_ID);
842   sink->user_pw = g_strdup (DEFAULT_USER_PW);
843   sink->client_port_range.min = 0;
844   sink->client_port_range.max = 0;
845   sink->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
846   sink->udp_reconnect = DEFAULT_UDP_RECONNECT;
847   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
848   sink->sdes = NULL;
849   sink->tls_validation_flags = DEFAULT_TLS_VALIDATION_FLAGS;
850   sink->tls_database = DEFAULT_TLS_DATABASE;
851   sink->tls_interaction = DEFAULT_TLS_INTERACTION;
852   sink->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
853   sink->user_agent = g_strdup (DEFAULT_USER_AGENT);
854
855   sink->profiles = DEFAULT_PROFILES;
856
857   /* protects the streaming thread in interleaved mode or the polling
858    * thread in UDP mode. */
859   g_rec_mutex_init (&sink->stream_rec_lock);
860
861   /* protects our state changes from multiple invocations */
862   g_rec_mutex_init (&sink->state_rec_lock);
863
864   g_mutex_init (&sink->send_lock);
865
866   g_mutex_init (&sink->preroll_lock);
867   g_cond_init (&sink->preroll_cond);
868
869   sink->state = GST_RTSP_STATE_INVALID;
870
871   g_mutex_init (&sink->conninfo.send_lock);
872   g_mutex_init (&sink->conninfo.recv_lock);
873
874   g_mutex_init (&sink->block_streams_lock);
875   g_cond_init (&sink->block_streams_cond);
876
877   g_mutex_init (&sink->open_conn_lock);
878   g_cond_init (&sink->open_conn_cond);
879
880   sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
881   gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
882   gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
883
884   sink->next_dyn_pt = 96;
885
886   gst_sdp_message_init (&sink->cursdp);
887
888   GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
889 }
890
891 static void
892 gst_rtsp_client_sink_finalize (GObject * object)
893 {
894   GstRTSPClientSink *rtsp_client_sink;
895
896   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
897
898   gst_sdp_message_uninit (&rtsp_client_sink->cursdp);
899
900   g_free (rtsp_client_sink->conninfo.location);
901   gst_rtsp_url_free (rtsp_client_sink->conninfo.url);
902   g_free (rtsp_client_sink->conninfo.url_str);
903   g_free (rtsp_client_sink->user_id);
904   g_free (rtsp_client_sink->user_pw);
905   g_free (rtsp_client_sink->multi_iface);
906   g_free (rtsp_client_sink->user_agent);
907
908   if (rtsp_client_sink->uri_sdp) {
909     gst_sdp_message_free (rtsp_client_sink->uri_sdp);
910     rtsp_client_sink->uri_sdp = NULL;
911   }
912   if (rtsp_client_sink->provided_clock)
913     gst_object_unref (rtsp_client_sink->provided_clock);
914
915   if (rtsp_client_sink->sdes)
916     gst_structure_free (rtsp_client_sink->sdes);
917
918   if (rtsp_client_sink->tls_database)
919     g_object_unref (rtsp_client_sink->tls_database);
920
921   if (rtsp_client_sink->tls_interaction)
922     g_object_unref (rtsp_client_sink->tls_interaction);
923
924   /* free locks */
925   g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
926   g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
927
928   g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
929   g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
930
931   g_mutex_clear (&rtsp_client_sink->send_lock);
932
933   g_mutex_clear (&rtsp_client_sink->preroll_lock);
934   g_cond_clear (&rtsp_client_sink->preroll_cond);
935
936   g_mutex_clear (&rtsp_client_sink->block_streams_lock);
937   g_cond_clear (&rtsp_client_sink->block_streams_cond);
938
939   g_mutex_clear (&rtsp_client_sink->open_conn_lock);
940   g_cond_clear (&rtsp_client_sink->open_conn_cond);
941
942   G_OBJECT_CLASS (parent_class)->finalize (object);
943 }
944
945 static gboolean
946 gst_rtp_payloader_filter_func (GstPluginFeature * feature, gpointer user_data)
947 {
948   GstElementFactory *factory = NULL;
949   const gchar *klass;
950
951   if (!GST_IS_ELEMENT_FACTORY (feature))
952     return FALSE;
953
954   factory = GST_ELEMENT_FACTORY (feature);
955
956   if (gst_plugin_feature_get_rank (feature) == GST_RANK_NONE)
957     return FALSE;
958
959   if (!gst_element_factory_list_is_type (factory,
960           GST_ELEMENT_FACTORY_TYPE_PAYLOADER))
961     return FALSE;
962
963   klass =
964       gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
965   if (strstr (klass, "Codec") == NULL)
966     return FALSE;
967   if (strstr (klass, "RTP") == NULL)
968     return FALSE;
969
970   return TRUE;
971 }
972
973 static gint
974 compare_ranks (GstPluginFeature * f1, GstPluginFeature * f2)
975 {
976   gint diff;
977   const gchar *rname1, *rname2;
978   GstRank rank1, rank2;
979
980   rname1 = gst_plugin_feature_get_name (f1);
981   rname2 = gst_plugin_feature_get_name (f2);
982
983   rank1 = gst_plugin_feature_get_rank (f1);
984   rank2 = gst_plugin_feature_get_rank (f2);
985
986   /* HACK: Prefer rtpmp4apay over rtpmp4gpay */
987   if (g_str_equal (rname1, "rtpmp4apay"))
988     rank1 = GST_RANK_SECONDARY + 1;
989   if (g_str_equal (rname2, "rtpmp4apay"))
990     rank2 = GST_RANK_SECONDARY + 1;
991
992   diff = rank2 - rank1;
993   if (diff != 0)
994     return diff;
995
996   diff = strcmp (rname2, rname1);
997
998   return diff;
999 }
1000
1001 static GList *
1002 gst_rtsp_client_sink_get_factories (void)
1003 {
1004   static GList *payloader_factories = NULL;
1005
1006   if (g_once_init_enter (&payloader_factories)) {
1007     GList *all_factories;
1008
1009     all_factories =
1010         gst_registry_feature_filter (gst_registry_get (),
1011         gst_rtp_payloader_filter_func, FALSE, NULL);
1012
1013     all_factories = g_list_sort (all_factories, (GCompareFunc) compare_ranks);
1014
1015     g_once_init_leave (&payloader_factories, all_factories);
1016   }
1017
1018   return payloader_factories;
1019 }
1020
1021 static GstCaps *
1022 gst_rtsp_client_sink_get_payloader_caps (GstElementFactory * factory)
1023 {
1024   const GList *tmp;
1025   GstCaps *caps = gst_caps_new_empty ();
1026
1027   for (tmp = gst_element_factory_get_static_pad_templates (factory);
1028       tmp; tmp = g_list_next (tmp)) {
1029     GstStaticPadTemplate *template = tmp->data;
1030
1031     if (template->direction == GST_PAD_SINK) {
1032       GstCaps *static_caps = gst_static_pad_template_get_caps (template);
1033
1034       GST_LOG ("Found pad template %s on factory %s",
1035           template->name_template, gst_plugin_feature_get_name (factory));
1036
1037       if (static_caps)
1038         caps = gst_caps_merge (caps, static_caps);
1039
1040       /* Early out, any is absorbing */
1041       if (gst_caps_is_any (caps))
1042         goto out;
1043     }
1044   }
1045
1046 out:
1047   return caps;
1048 }
1049
1050 static GstCaps *
1051 gst_rtsp_client_sink_get_all_payloaders_caps (void)
1052 {
1053   /* Cached caps result */
1054   static GstCaps *ret;
1055
1056   if (g_once_init_enter (&ret)) {
1057     GList *factories, *cur;
1058     GstCaps *caps = gst_caps_new_empty ();
1059
1060     factories = gst_rtsp_client_sink_get_factories ();
1061     for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
1062       GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
1063       GstCaps *payloader_caps =
1064           gst_rtsp_client_sink_get_payloader_caps (factory);
1065
1066       caps = gst_caps_merge (caps, payloader_caps);
1067
1068       /* Early out, any is absorbing */
1069       if (gst_caps_is_any (caps))
1070         goto out;
1071     }
1072
1073   out:
1074     g_once_init_leave (&ret, caps);
1075   }
1076
1077   /* Return cached result */
1078   return gst_caps_ref (ret);
1079 }
1080
1081 static GstElement *
1082 gst_rtsp_client_sink_make_payloader (GstCaps * caps)
1083 {
1084   GList *factories, *cur;
1085
1086   factories = gst_rtsp_client_sink_get_factories ();
1087   for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
1088     GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
1089     const GList *tmp;
1090
1091     for (tmp = gst_element_factory_get_static_pad_templates (factory);
1092         tmp; tmp = g_list_next (tmp)) {
1093       GstStaticPadTemplate *template = tmp->data;
1094
1095       if (template->direction == GST_PAD_SINK) {
1096         GstCaps *static_caps = gst_static_pad_template_get_caps (template);
1097         GstElement *payloader = NULL;
1098
1099         if (gst_caps_can_intersect (static_caps, caps)) {
1100           GST_DEBUG ("caps %" GST_PTR_FORMAT " intersects with template %"
1101               GST_PTR_FORMAT " for payloader %s", caps, static_caps,
1102               gst_plugin_feature_get_name (factory));
1103           payloader = gst_element_factory_create (factory, NULL);
1104         }
1105
1106         gst_caps_unref (static_caps);
1107
1108         if (payloader)
1109           return payloader;
1110       }
1111     }
1112   }
1113
1114   return NULL;
1115 }
1116
1117 static GstRTSPStream *
1118 gst_rtsp_client_sink_create_stream (GstRTSPClientSink * sink,
1119     GstRTSPStreamContext * context, GstElement * payloader, GstPad * pad)
1120 {
1121   GstRTSPStream *stream = NULL;
1122   guint pt, aux_pt, ulpfec_pt;
1123
1124   GST_OBJECT_LOCK (sink);
1125
1126   g_object_get (G_OBJECT (payloader), "pt", &pt, NULL);
1127   if (pt >= 96 && pt <= sink->next_dyn_pt) {
1128     /* Payloader has a dynamic PT, but one that's already used */
1129     /* FIXME: Create a caps->ptmap instead? */
1130     pt = sink->next_dyn_pt;
1131
1132     if (pt > 127)
1133       goto no_free_pt;
1134
1135     GST_DEBUG_OBJECT (sink, "Assigning pt %u to stream %d", pt, context->index);
1136
1137     sink->next_dyn_pt++;
1138   } else {
1139     GST_DEBUG_OBJECT (sink, "Keeping existing pt %u for stream %d",
1140         pt, context->index);
1141   }
1142
1143   aux_pt = sink->next_dyn_pt;
1144   if (aux_pt > 127)
1145     goto no_free_pt;
1146   sink->next_dyn_pt++;
1147
1148   ulpfec_pt = sink->next_dyn_pt;
1149   if (ulpfec_pt > 127)
1150     goto no_free_pt;
1151   sink->next_dyn_pt++;
1152
1153   GST_OBJECT_UNLOCK (sink);
1154
1155
1156   g_object_set (G_OBJECT (payloader), "pt", pt, NULL);
1157
1158   stream = gst_rtsp_stream_new (context->index, payloader, pad);
1159
1160   gst_rtsp_stream_set_client_side (stream, TRUE);
1161   gst_rtsp_stream_set_retransmission_time (stream,
1162       (GstClockTime) (sink->rtx_time) * GST_MSECOND);
1163   gst_rtsp_stream_set_protocols (stream, sink->protocols);
1164   gst_rtsp_stream_set_profiles (stream, sink->profiles);
1165   gst_rtsp_stream_set_retransmission_pt (stream, aux_pt);
1166   gst_rtsp_stream_set_buffer_size (stream, sink->udp_buffer_size);
1167   if (sink->rtp_blocksize > 0)
1168     gst_rtsp_stream_set_mtu (stream, sink->rtp_blocksize);
1169   gst_rtsp_stream_set_multicast_iface (stream, sink->multi_iface);
1170
1171   gst_rtsp_stream_set_ulpfec_pt (stream, ulpfec_pt);
1172   gst_rtsp_stream_set_ulpfec_percentage (stream, context->ulpfec_percentage);
1173
1174 #if 0
1175   if (priv->pool)
1176     gst_rtsp_stream_set_address_pool (stream, priv->pool);
1177 #endif
1178
1179   return stream;
1180 no_free_pt:
1181   GST_OBJECT_UNLOCK (sink);
1182
1183   GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL),
1184       ("Ran out of dynamic payload types."));
1185
1186   return NULL;
1187 }
1188
1189 static GstPadProbeReturn
1190 handle_payloader_block (GstPad * pad, GstPadProbeInfo * info,
1191     GstRTSPStreamContext * context)
1192 {
1193   GstRTSPClientSink *sink = context->parent;
1194
1195   GST_INFO_OBJECT (sink, "Block on pad %" GST_PTR_FORMAT, pad);
1196
1197   g_mutex_lock (&sink->preroll_lock);
1198   context->prerolled = TRUE;
1199   g_cond_broadcast (&sink->preroll_cond);
1200   g_mutex_unlock (&sink->preroll_lock);
1201
1202   GST_INFO_OBJECT (sink, "Announced preroll on pad %" GST_PTR_FORMAT, pad);
1203
1204   return GST_PAD_PROBE_OK;
1205 }
1206
1207 static gboolean
1208 gst_rtsp_client_sink_setup_payloader (GstRTSPClientSink * sink, GstPad * pad,
1209     GstCaps * caps)
1210 {
1211   GstRTSPStreamContext *context;
1212   GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1213
1214   GstElement *payloader;
1215   GstPad *sinkpad, *srcpad, *ghostsink;
1216
1217   context = gst_pad_get_element_private (pad);
1218
1219   if (cspad->custom_payloader) {
1220     payloader = cspad->custom_payloader;
1221   } else {
1222     /* Find the payloader. */
1223     payloader = gst_rtsp_client_sink_make_payloader (caps);
1224   }
1225
1226   if (payloader == NULL)
1227     return FALSE;
1228
1229   GST_DEBUG_OBJECT (sink, "Configuring payloader %" GST_PTR_FORMAT
1230       " for pad %" GST_PTR_FORMAT, payloader, pad);
1231
1232   sinkpad = gst_element_get_static_pad (payloader, "sink");
1233   if (sinkpad == NULL)
1234     goto no_sinkpad;
1235
1236   srcpad = gst_element_get_static_pad (payloader, "src");
1237   if (srcpad == NULL)
1238     goto no_srcpad;
1239
1240   gst_bin_add (GST_BIN (sink->internal_bin), payloader);
1241   ghostsink = gst_ghost_pad_new (NULL, sinkpad);
1242   gst_pad_set_active (ghostsink, TRUE);
1243   gst_element_add_pad (GST_ELEMENT (sink->internal_bin), ghostsink);
1244
1245   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER], 0,
1246       payloader);
1247
1248   GST_RTSP_STATE_LOCK (sink);
1249   context->payloader_block_id =
1250       gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1251       (GstPadProbeCallback) handle_payloader_block, context, NULL);
1252   context->payloader = payloader;
1253
1254   payloader = gst_object_ref (payloader);
1255
1256   gst_ghost_pad_set_target (GST_GHOST_PAD (pad), ghostsink);
1257   gst_object_unref (GST_OBJECT (sinkpad));
1258   GST_RTSP_STATE_UNLOCK (sink);
1259
1260   context->ulpfec_percentage = cspad->ulpfec_percentage;
1261
1262   gst_element_sync_state_with_parent (payloader);
1263
1264   gst_object_unref (payloader);
1265   gst_object_unref (GST_OBJECT (srcpad));
1266
1267   return TRUE;
1268
1269 no_sinkpad:
1270   GST_ERROR_OBJECT (sink,
1271       "Could not find sink pad on payloader %" GST_PTR_FORMAT, payloader);
1272   if (!cspad->custom_payloader)
1273     gst_object_unref (payloader);
1274   return FALSE;
1275
1276 no_srcpad:
1277   GST_ERROR_OBJECT (sink,
1278       "Could not find src pad on payloader %" GST_PTR_FORMAT, payloader);
1279   gst_object_unref (GST_OBJECT (sinkpad));
1280   gst_object_unref (payloader);
1281   return TRUE;
1282 }
1283
1284 static gboolean
1285 gst_rtsp_client_sink_sinkpad_event (GstPad * pad, GstObject * parent,
1286     GstEvent * event)
1287 {
1288   if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
1289     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1290     if (target == NULL) {
1291       GstCaps *caps;
1292
1293       /* No target yet - choose a payloader and configure it */
1294       gst_event_parse_caps (event, &caps);
1295
1296       GST_DEBUG_OBJECT (parent,
1297           "Have set caps event on pad %" GST_PTR_FORMAT
1298           " caps %" GST_PTR_FORMAT, pad, caps);
1299
1300       if (!gst_rtsp_client_sink_setup_payloader (GST_RTSP_CLIENT_SINK (parent),
1301               pad, caps)) {
1302         GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1303         GST_ELEMENT_ERROR (parent, CORE, NEGOTIATION,
1304             ("Could not create payloader"),
1305             ("Custom payloader: %p, caps: %" GST_PTR_FORMAT,
1306                 cspad->custom_payloader, caps));
1307         gst_event_unref (event);
1308         return FALSE;
1309       }
1310     } else {
1311       gst_object_unref (target);
1312     }
1313   }
1314
1315   return gst_pad_event_default (pad, parent, event);
1316 }
1317
1318 static gboolean
1319 gst_rtsp_client_sink_sinkpad_query (GstPad * pad, GstObject * parent,
1320     GstQuery * query)
1321 {
1322   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1323     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1324     if (target == NULL) {
1325       GstRtspClientSinkPad *cspad = GST_RTSP_CLIENT_SINK_PAD (pad);
1326       GstCaps *caps;
1327
1328       if (cspad->custom_payloader) {
1329         GstPad *sinkpad =
1330             gst_element_get_static_pad (cspad->custom_payloader, "sink");
1331
1332         if (sinkpad) {
1333           caps = gst_pad_query_caps (sinkpad, NULL);
1334           gst_object_unref (sinkpad);
1335         } else {
1336           GST_ELEMENT_ERROR (parent, CORE, NEGOTIATION, (NULL),
1337               ("Custom payloaders are expected to expose a sink pad named 'sink'"));
1338           return FALSE;
1339         }
1340       } else {
1341         /* No target yet - return the union of all payloader caps */
1342         caps = gst_rtsp_client_sink_get_all_payloaders_caps ();
1343       }
1344
1345       GST_TRACE_OBJECT (parent, "Returning payloader caps %" GST_PTR_FORMAT,
1346           caps);
1347
1348       gst_query_set_caps_result (query, caps);
1349       gst_caps_unref (caps);
1350
1351       return TRUE;
1352     }
1353     gst_object_unref (target);
1354   }
1355
1356   return gst_pad_query_default (pad, parent, query);
1357 }
1358
1359 static GstPad *
1360 gst_rtsp_client_sink_request_new_pad (GstElement * element,
1361     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1362 {
1363   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1364   GstPad *pad;
1365   GstRTSPStreamContext *context;
1366   guint idx = (guint) - 1;
1367   gchar *tmpname;
1368
1369   g_mutex_lock (&sink->preroll_lock);
1370   if (sink->streams_collected) {
1371     GST_WARNING_OBJECT (element, "Can't add streams to a running session");
1372     g_mutex_unlock (&sink->preroll_lock);
1373     return NULL;
1374   }
1375   g_mutex_unlock (&sink->preroll_lock);
1376
1377   GST_OBJECT_LOCK (sink);
1378   if (name) {
1379     if (!sscanf (name, "sink_%u", &idx)) {
1380       GST_OBJECT_UNLOCK (sink);
1381       GST_ERROR_OBJECT (element, "Invalid sink pad name %s", name);
1382       return NULL;
1383     }
1384
1385     if (idx >= sink->next_pad_id)
1386       sink->next_pad_id = idx + 1;
1387   }
1388   if (idx == (guint) - 1) {
1389     idx = sink->next_pad_id;
1390     sink->next_pad_id++;
1391   }
1392   GST_OBJECT_UNLOCK (sink);
1393
1394   tmpname = g_strdup_printf ("sink_%u", idx);
1395   pad = gst_rtsp_client_sink_pad_new (templ, tmpname);
1396   g_free (tmpname);
1397
1398   GST_DEBUG_OBJECT (element, "Creating request pad %" GST_PTR_FORMAT, pad);
1399
1400   gst_pad_set_event_function (pad,
1401       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_event));
1402   gst_pad_set_query_function (pad,
1403       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_query));
1404
1405   context = g_new0 (GstRTSPStreamContext, 1);
1406   context->parent = sink;
1407   context->index = idx;
1408
1409   gst_pad_set_element_private (pad, context);
1410
1411   /* The rest of the context is configured on a caps set */
1412   gst_pad_set_active (pad, TRUE);
1413   gst_element_add_pad (element, pad);
1414   gst_child_proxy_child_added (GST_CHILD_PROXY (element), G_OBJECT (pad),
1415       GST_PAD_NAME (pad));
1416
1417   (void) gst_rtsp_client_sink_get_factories ();
1418
1419   g_mutex_init (&context->conninfo.send_lock);
1420   g_mutex_init (&context->conninfo.recv_lock);
1421
1422   GST_RTSP_STATE_LOCK (sink);
1423   sink->contexts = g_list_prepend (sink->contexts, context);
1424   GST_RTSP_STATE_UNLOCK (sink);
1425
1426   return pad;
1427 }
1428
1429 static void
1430 gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad)
1431 {
1432   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1433   GstRTSPStreamContext *context;
1434
1435   context = gst_pad_get_element_private (pad);
1436
1437   GST_RTSP_STATE_LOCK (sink);
1438   sink->contexts = g_list_remove (sink->contexts, context);
1439   GST_RTSP_STATE_UNLOCK (sink);
1440
1441   /* FIXME: Shut down and clean up streaming on this pad,
1442    * do teardown if needed */
1443   GST_LOG_OBJECT (sink,
1444       "Cleaning up payloader and stream for released pad %" GST_PTR_FORMAT,
1445       pad);
1446
1447   if (context->stream_transport) {
1448     gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1449     gst_object_unref (context->stream_transport);
1450     context->stream_transport = NULL;
1451   }
1452   if (context->stream) {
1453     if (context->joined) {
1454       gst_rtsp_stream_leave_bin (context->stream,
1455           GST_BIN (sink->internal_bin), sink->rtpbin);
1456       context->joined = FALSE;
1457     }
1458     gst_object_unref (context->stream);
1459     context->stream = NULL;
1460   }
1461   if (context->srtcpparams)
1462     gst_caps_unref (context->srtcpparams);
1463
1464   g_free (context->conninfo.location);
1465   context->conninfo.location = NULL;
1466
1467   g_mutex_clear (&context->conninfo.send_lock);
1468   g_mutex_clear (&context->conninfo.recv_lock);
1469
1470   g_free (context);
1471
1472   gst_element_remove_pad (element, pad);
1473 }
1474
1475 static GstClock *
1476 gst_rtsp_client_sink_provide_clock (GstElement * element)
1477 {
1478   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1479   GstClock *clock;
1480
1481   if ((clock = sink->provided_clock) != NULL)
1482     gst_object_ref (clock);
1483
1484   return clock;
1485 }
1486
1487 /* a proxy string of the format [user:passwd@]host[:port] */
1488 static gboolean
1489 gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp, const gchar * proxy)
1490 {
1491   gchar *p, *at, *col;
1492
1493   g_free (rtsp->proxy_user);
1494   rtsp->proxy_user = NULL;
1495   g_free (rtsp->proxy_passwd);
1496   rtsp->proxy_passwd = NULL;
1497   g_free (rtsp->proxy_host);
1498   rtsp->proxy_host = NULL;
1499   rtsp->proxy_port = 0;
1500
1501   p = (gchar *) proxy;
1502
1503   if (p == NULL)
1504     return TRUE;
1505
1506   /* we allow http:// in front but ignore it */
1507   if (g_str_has_prefix (p, "http://"))
1508     p += 7;
1509
1510   at = strchr (p, '@');
1511   if (at) {
1512     /* look for user:passwd */
1513     col = strchr (proxy, ':');
1514     if (col == NULL || col > at)
1515       return FALSE;
1516
1517     rtsp->proxy_user = g_strndup (p, col - p);
1518     col++;
1519     rtsp->proxy_passwd = g_strndup (col, at - col);
1520
1521     /* move to host */
1522     p = at + 1;
1523   } else {
1524     if (rtsp->prop_proxy_id != NULL && *rtsp->prop_proxy_id != '\0')
1525       rtsp->proxy_user = g_strdup (rtsp->prop_proxy_id);
1526     if (rtsp->prop_proxy_pw != NULL && *rtsp->prop_proxy_pw != '\0')
1527       rtsp->proxy_passwd = g_strdup (rtsp->prop_proxy_pw);
1528     if (rtsp->proxy_user != NULL || rtsp->proxy_passwd != NULL) {
1529       GST_LOG_OBJECT (rtsp, "set proxy user/pw from properties: %s:%s",
1530           GST_STR_NULL (rtsp->proxy_user), GST_STR_NULL (rtsp->proxy_passwd));
1531     }
1532   }
1533   col = strchr (p, ':');
1534
1535   if (col) {
1536     /* everything before the colon is the hostname */
1537     rtsp->proxy_host = g_strndup (p, col - p);
1538     p = col + 1;
1539     rtsp->proxy_port = strtoul (p, (char **) &p, 10);
1540   } else {
1541     rtsp->proxy_host = g_strdup (p);
1542     rtsp->proxy_port = 8080;
1543   }
1544   return TRUE;
1545 }
1546
1547 static void
1548 gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink * rtsp_client_sink,
1549     guint64 timeout)
1550 {
1551   rtsp_client_sink->tcp_timeout.tv_sec = timeout / G_USEC_PER_SEC;
1552   rtsp_client_sink->tcp_timeout.tv_usec = timeout % G_USEC_PER_SEC;
1553
1554   if (timeout != 0)
1555     rtsp_client_sink->ptcp_timeout = &rtsp_client_sink->tcp_timeout;
1556   else
1557     rtsp_client_sink->ptcp_timeout = NULL;
1558 }
1559
1560 static void
1561 gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
1562     const GValue * value, GParamSpec * pspec)
1563 {
1564   GstRTSPClientSink *rtsp_client_sink;
1565
1566   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1567
1568   switch (prop_id) {
1569     case PROP_LOCATION:
1570       gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (rtsp_client_sink),
1571           g_value_get_string (value), NULL);
1572       break;
1573     case PROP_PROTOCOLS:
1574       rtsp_client_sink->protocols = g_value_get_flags (value);
1575       break;
1576     case PROP_PROFILES:
1577       rtsp_client_sink->profiles = g_value_get_flags (value);
1578       break;
1579     case PROP_DEBUG:
1580       rtsp_client_sink->debug = g_value_get_boolean (value);
1581       break;
1582     case PROP_RETRY:
1583       rtsp_client_sink->retry = g_value_get_uint (value);
1584       break;
1585     case PROP_TIMEOUT:
1586       rtsp_client_sink->udp_timeout = g_value_get_uint64 (value);
1587       break;
1588     case PROP_TCP_TIMEOUT:
1589       gst_rtsp_client_sink_set_tcp_timeout (rtsp_client_sink,
1590           g_value_get_uint64 (value));
1591       break;
1592     case PROP_LATENCY:
1593       rtsp_client_sink->latency = g_value_get_uint (value);
1594       break;
1595     case PROP_RTX_TIME:
1596       rtsp_client_sink->rtx_time = g_value_get_uint (value);
1597       break;
1598     case PROP_DO_RTSP_KEEP_ALIVE:
1599       rtsp_client_sink->do_rtsp_keep_alive = g_value_get_boolean (value);
1600       break;
1601     case PROP_PROXY:
1602       gst_rtsp_client_sink_set_proxy (rtsp_client_sink,
1603           g_value_get_string (value));
1604       break;
1605     case PROP_PROXY_ID:
1606       if (rtsp_client_sink->prop_proxy_id)
1607         g_free (rtsp_client_sink->prop_proxy_id);
1608       rtsp_client_sink->prop_proxy_id = g_value_dup_string (value);
1609       break;
1610     case PROP_PROXY_PW:
1611       if (rtsp_client_sink->prop_proxy_pw)
1612         g_free (rtsp_client_sink->prop_proxy_pw);
1613       rtsp_client_sink->prop_proxy_pw = g_value_dup_string (value);
1614       break;
1615     case PROP_RTP_BLOCKSIZE:
1616       rtsp_client_sink->rtp_blocksize = g_value_get_uint (value);
1617       break;
1618     case PROP_USER_ID:
1619       if (rtsp_client_sink->user_id)
1620         g_free (rtsp_client_sink->user_id);
1621       rtsp_client_sink->user_id = g_value_dup_string (value);
1622       break;
1623     case PROP_USER_PW:
1624       if (rtsp_client_sink->user_pw)
1625         g_free (rtsp_client_sink->user_pw);
1626       rtsp_client_sink->user_pw = g_value_dup_string (value);
1627       break;
1628     case PROP_PORT_RANGE:
1629     {
1630       const gchar *str;
1631
1632       str = g_value_get_string (value);
1633       if (!str || !sscanf (str, "%u-%u",
1634               &rtsp_client_sink->client_port_range.min,
1635               &rtsp_client_sink->client_port_range.max)) {
1636         rtsp_client_sink->client_port_range.min = 0;
1637         rtsp_client_sink->client_port_range.max = 0;
1638       }
1639       break;
1640     }
1641     case PROP_UDP_BUFFER_SIZE:
1642       rtsp_client_sink->udp_buffer_size = g_value_get_int (value);
1643       break;
1644     case PROP_UDP_RECONNECT:
1645       rtsp_client_sink->udp_reconnect = g_value_get_boolean (value);
1646       break;
1647     case PROP_MULTICAST_IFACE:
1648       g_free (rtsp_client_sink->multi_iface);
1649
1650       if (g_value_get_string (value) == NULL)
1651         rtsp_client_sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1652       else
1653         rtsp_client_sink->multi_iface = g_value_dup_string (value);
1654       break;
1655     case PROP_SDES:
1656       rtsp_client_sink->sdes = g_value_dup_boxed (value);
1657       break;
1658     case PROP_TLS_VALIDATION_FLAGS:
1659       rtsp_client_sink->tls_validation_flags = g_value_get_flags (value);
1660       break;
1661     case PROP_TLS_DATABASE:
1662       g_clear_object (&rtsp_client_sink->tls_database);
1663       rtsp_client_sink->tls_database = g_value_dup_object (value);
1664       break;
1665     case PROP_TLS_INTERACTION:
1666       g_clear_object (&rtsp_client_sink->tls_interaction);
1667       rtsp_client_sink->tls_interaction = g_value_dup_object (value);
1668       break;
1669     case PROP_NTP_TIME_SOURCE:
1670       rtsp_client_sink->ntp_time_source = g_value_get_enum (value);
1671       break;
1672     case PROP_USER_AGENT:
1673       g_free (rtsp_client_sink->user_agent);
1674       rtsp_client_sink->user_agent = g_value_dup_string (value);
1675       break;
1676     default:
1677       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1678       break;
1679   }
1680 }
1681
1682 static void
1683 gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
1684     GValue * value, GParamSpec * pspec)
1685 {
1686   GstRTSPClientSink *rtsp_client_sink;
1687
1688   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1689
1690   switch (prop_id) {
1691     case PROP_LOCATION:
1692       g_value_set_string (value, rtsp_client_sink->conninfo.location);
1693       break;
1694     case PROP_PROTOCOLS:
1695       g_value_set_flags (value, rtsp_client_sink->protocols);
1696       break;
1697     case PROP_PROFILES:
1698       g_value_set_flags (value, rtsp_client_sink->profiles);
1699       break;
1700     case PROP_DEBUG:
1701       g_value_set_boolean (value, rtsp_client_sink->debug);
1702       break;
1703     case PROP_RETRY:
1704       g_value_set_uint (value, rtsp_client_sink->retry);
1705       break;
1706     case PROP_TIMEOUT:
1707       g_value_set_uint64 (value, rtsp_client_sink->udp_timeout);
1708       break;
1709     case PROP_TCP_TIMEOUT:
1710     {
1711       guint64 timeout;
1712
1713       timeout = rtsp_client_sink->tcp_timeout.tv_sec * G_USEC_PER_SEC +
1714           rtsp_client_sink->tcp_timeout.tv_usec;
1715       g_value_set_uint64 (value, timeout);
1716       break;
1717     }
1718     case PROP_LATENCY:
1719       g_value_set_uint (value, rtsp_client_sink->latency);
1720       break;
1721     case PROP_RTX_TIME:
1722       g_value_set_uint (value, rtsp_client_sink->rtx_time);
1723       break;
1724     case PROP_DO_RTSP_KEEP_ALIVE:
1725       g_value_set_boolean (value, rtsp_client_sink->do_rtsp_keep_alive);
1726       break;
1727     case PROP_PROXY:
1728     {
1729       gchar *str;
1730
1731       if (rtsp_client_sink->proxy_host) {
1732         str =
1733             g_strdup_printf ("%s:%d", rtsp_client_sink->proxy_host,
1734             rtsp_client_sink->proxy_port);
1735       } else {
1736         str = NULL;
1737       }
1738       g_value_take_string (value, str);
1739       break;
1740     }
1741     case PROP_PROXY_ID:
1742       g_value_set_string (value, rtsp_client_sink->prop_proxy_id);
1743       break;
1744     case PROP_PROXY_PW:
1745       g_value_set_string (value, rtsp_client_sink->prop_proxy_pw);
1746       break;
1747     case PROP_RTP_BLOCKSIZE:
1748       g_value_set_uint (value, rtsp_client_sink->rtp_blocksize);
1749       break;
1750     case PROP_USER_ID:
1751       g_value_set_string (value, rtsp_client_sink->user_id);
1752       break;
1753     case PROP_USER_PW:
1754       g_value_set_string (value, rtsp_client_sink->user_pw);
1755       break;
1756     case PROP_PORT_RANGE:
1757     {
1758       gchar *str;
1759
1760       if (rtsp_client_sink->client_port_range.min != 0) {
1761         str = g_strdup_printf ("%u-%u", rtsp_client_sink->client_port_range.min,
1762             rtsp_client_sink->client_port_range.max);
1763       } else {
1764         str = NULL;
1765       }
1766       g_value_take_string (value, str);
1767       break;
1768     }
1769     case PROP_UDP_BUFFER_SIZE:
1770       g_value_set_int (value, rtsp_client_sink->udp_buffer_size);
1771       break;
1772     case PROP_UDP_RECONNECT:
1773       g_value_set_boolean (value, rtsp_client_sink->udp_reconnect);
1774       break;
1775     case PROP_MULTICAST_IFACE:
1776       g_value_set_string (value, rtsp_client_sink->multi_iface);
1777       break;
1778     case PROP_SDES:
1779       g_value_set_boxed (value, rtsp_client_sink->sdes);
1780       break;
1781     case PROP_TLS_VALIDATION_FLAGS:
1782       g_value_set_flags (value, rtsp_client_sink->tls_validation_flags);
1783       break;
1784     case PROP_TLS_DATABASE:
1785       g_value_set_object (value, rtsp_client_sink->tls_database);
1786       break;
1787     case PROP_TLS_INTERACTION:
1788       g_value_set_object (value, rtsp_client_sink->tls_interaction);
1789       break;
1790     case PROP_NTP_TIME_SOURCE:
1791       g_value_set_enum (value, rtsp_client_sink->ntp_time_source);
1792       break;
1793     case PROP_USER_AGENT:
1794       g_value_set_string (value, rtsp_client_sink->user_agent);
1795       break;
1796     default:
1797       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1798       break;
1799   }
1800 }
1801
1802 static const gchar *
1803 get_aggregate_control (GstRTSPClientSink * sink)
1804 {
1805   const gchar *base;
1806
1807   if (sink->control)
1808     base = sink->control;
1809   else if (sink->content_base)
1810     base = sink->content_base;
1811   else if (sink->conninfo.url_str)
1812     base = sink->conninfo.url_str;
1813   else
1814     base = "/";
1815
1816   return base;
1817 }
1818
1819 static void
1820 gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink)
1821 {
1822   GList *walk;
1823
1824   GST_DEBUG_OBJECT (sink, "cleanup");
1825
1826   gst_element_set_state (GST_ELEMENT (sink->internal_bin), GST_STATE_NULL);
1827
1828   /* Clean up any left over stream objects */
1829   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
1830     GstRTSPStreamContext *context = (GstRTSPStreamContext *) (walk->data);
1831     if (context->stream_transport) {
1832       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1833       gst_object_unref (context->stream_transport);
1834       context->stream_transport = NULL;
1835     }
1836
1837     if (context->stream) {
1838       if (context->joined) {
1839         gst_rtsp_stream_leave_bin (context->stream,
1840             GST_BIN (sink->internal_bin), sink->rtpbin);
1841         context->joined = FALSE;
1842       }
1843       gst_object_unref (context->stream);
1844       context->stream = NULL;
1845     }
1846
1847     if (context->srtcpparams) {
1848       gst_caps_unref (context->srtcpparams);
1849       context->srtcpparams = NULL;
1850     }
1851     g_free (context->conninfo.location);
1852     context->conninfo.location = NULL;
1853   }
1854
1855   if (sink->rtpbin) {
1856     gst_element_set_state (sink->rtpbin, GST_STATE_NULL);
1857     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), sink->rtpbin);
1858     sink->rtpbin = NULL;
1859   }
1860
1861   g_free (sink->content_base);
1862   sink->content_base = NULL;
1863
1864   g_free (sink->control);
1865   sink->control = NULL;
1866
1867   if (sink->range)
1868     gst_rtsp_range_free (sink->range);
1869   sink->range = NULL;
1870
1871   /* don't clear the SDP when it was used in the url */
1872   if (sink->uri_sdp && !sink->from_sdp) {
1873     gst_sdp_message_free (sink->uri_sdp);
1874     sink->uri_sdp = NULL;
1875   }
1876
1877   if (sink->provided_clock) {
1878     gst_object_unref (sink->provided_clock);
1879     sink->provided_clock = NULL;
1880   }
1881
1882   g_free (sink->server_ip);
1883   sink->server_ip = NULL;
1884
1885   sink->next_pad_id = 0;
1886   sink->next_dyn_pt = 96;
1887 }
1888
1889 static GstRTSPResult
1890 gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
1891     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
1892 {
1893   GstRTSPResult ret;
1894
1895   if (conninfo->connection) {
1896     g_mutex_lock (&conninfo->send_lock);
1897     ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
1898     g_mutex_unlock (&conninfo->send_lock);
1899   } else {
1900     ret = GST_RTSP_ERROR;
1901   }
1902
1903   return ret;
1904 }
1905
1906 static GstRTSPResult
1907 gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
1908     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
1909 {
1910   GstRTSPResult ret;
1911
1912   if (conninfo->connection) {
1913     g_mutex_lock (&conninfo->recv_lock);
1914     ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
1915     g_mutex_unlock (&conninfo->recv_lock);
1916   } else {
1917     ret = GST_RTSP_ERROR;
1918   }
1919
1920   return ret;
1921 }
1922
1923 static gboolean
1924 accept_certificate_cb (GTlsConnection * conn, GTlsCertificate * peer_cert,
1925     GTlsCertificateFlags errors, gpointer user_data)
1926 {
1927   GstRTSPClientSink *sink = user_data;
1928   gboolean accept = FALSE;
1929
1930   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_ACCEPT_CERTIFICATE],
1931       0, conn, peer_cert, errors, &accept);
1932
1933   return accept;
1934 }
1935
1936 static GstRTSPResult
1937 gst_rtsp_conninfo_connect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1938     gboolean async)
1939 {
1940   GstRTSPResult res;
1941
1942   if (info->connection == NULL) {
1943     if (info->url == NULL) {
1944       GST_DEBUG_OBJECT (sink, "parsing uri (%s)...", info->location);
1945       if ((res = gst_rtsp_url_parse (info->location, &info->url)) < 0)
1946         goto parse_error;
1947     }
1948
1949     /* create connection */
1950     GST_DEBUG_OBJECT (sink, "creating connection (%s)...", info->location);
1951     if ((res = gst_rtsp_connection_create (info->url, &info->connection)) < 0)
1952       goto could_not_create;
1953
1954     if (info->url_str)
1955       g_free (info->url_str);
1956     info->url_str = gst_rtsp_url_get_request_uri (info->url);
1957
1958     GST_DEBUG_OBJECT (sink, "sanitized uri %s", info->url_str);
1959
1960     if (info->url->transports & GST_RTSP_LOWER_TRANS_TLS) {
1961       if (!gst_rtsp_connection_set_tls_validation_flags (info->connection,
1962               sink->tls_validation_flags))
1963         GST_WARNING_OBJECT (sink, "Unable to set TLS validation flags");
1964
1965       if (sink->tls_database)
1966         gst_rtsp_connection_set_tls_database (info->connection,
1967             sink->tls_database);
1968
1969       if (sink->tls_interaction)
1970         gst_rtsp_connection_set_tls_interaction (info->connection,
1971             sink->tls_interaction);
1972
1973       gst_rtsp_connection_set_accept_certificate_func (info->connection,
1974           accept_certificate_cb, sink, NULL);
1975     }
1976
1977     if (info->url->transports & GST_RTSP_LOWER_TRANS_HTTP)
1978       gst_rtsp_connection_set_tunneled (info->connection, TRUE);
1979
1980     if (sink->proxy_host) {
1981       GST_DEBUG_OBJECT (sink, "setting proxy %s:%d", sink->proxy_host,
1982           sink->proxy_port);
1983       gst_rtsp_connection_set_proxy (info->connection, sink->proxy_host,
1984           sink->proxy_port);
1985     }
1986   }
1987
1988   if (!info->connected) {
1989     /* connect */
1990     if (async)
1991       GST_ELEMENT_PROGRESS (sink, CONTINUE, "connect",
1992           ("Connecting to %s", info->location));
1993     GST_DEBUG_OBJECT (sink, "connecting (%s)...", info->location);
1994     if ((res =
1995             gst_rtsp_connection_connect (info->connection,
1996                 sink->ptcp_timeout)) < 0)
1997       goto could_not_connect;
1998
1999     info->connected = TRUE;
2000   }
2001   return GST_RTSP_OK;
2002
2003   /* ERRORS */
2004 parse_error:
2005   {
2006     GST_ERROR_OBJECT (sink, "No valid RTSP URL was provided");
2007     return res;
2008   }
2009 could_not_create:
2010   {
2011     gchar *str = gst_rtsp_strresult (res);
2012     GST_ERROR_OBJECT (sink, "Could not create connection. (%s)", str);
2013     g_free (str);
2014     return res;
2015   }
2016 could_not_connect:
2017   {
2018     gchar *str = gst_rtsp_strresult (res);
2019     GST_ERROR_OBJECT (sink, "Could not connect to server. (%s)", str);
2020     g_free (str);
2021     return res;
2022   }
2023 }
2024
2025 static GstRTSPResult
2026 gst_rtsp_conninfo_close (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
2027     gboolean free)
2028 {
2029   GST_RTSP_STATE_LOCK (sink);
2030   if (info->connected) {
2031     GST_DEBUG_OBJECT (sink, "closing connection...");
2032     gst_rtsp_connection_close (info->connection);
2033     info->connected = FALSE;
2034   }
2035   if (free && info->connection) {
2036     /* free connection */
2037     GST_DEBUG_OBJECT (sink, "freeing connection...");
2038     gst_rtsp_connection_free (info->connection);
2039     g_mutex_lock (&sink->preroll_lock);
2040     info->connection = NULL;
2041     g_cond_broadcast (&sink->preroll_cond);
2042     g_mutex_unlock (&sink->preroll_lock);
2043   }
2044   GST_RTSP_STATE_UNLOCK (sink);
2045   return GST_RTSP_OK;
2046 }
2047
2048 static GstRTSPResult
2049 gst_rtsp_conninfo_reconnect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
2050     gboolean async)
2051 {
2052   GstRTSPResult res;
2053
2054   GST_DEBUG_OBJECT (sink, "reconnecting connection...");
2055   gst_rtsp_conninfo_close (sink, info, FALSE);
2056   res = gst_rtsp_conninfo_connect (sink, info, async);
2057
2058   return res;
2059 }
2060
2061 static void
2062 gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink, gboolean flush)
2063 {
2064   GList *walk;
2065
2066   GST_DEBUG_OBJECT (sink, "set flushing %d", flush);
2067   g_mutex_lock (&sink->preroll_lock);
2068   if (sink->conninfo.connection && sink->conninfo.flushing != flush) {
2069     GST_DEBUG_OBJECT (sink, "connection flush");
2070     gst_rtsp_connection_flush (sink->conninfo.connection, flush);
2071     sink->conninfo.flushing = flush;
2072   }
2073   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
2074     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
2075     if (stream->conninfo.connection && stream->conninfo.flushing != flush) {
2076       GST_DEBUG_OBJECT (sink, "stream %p flush", stream);
2077       gst_rtsp_connection_flush (stream->conninfo.connection, flush);
2078       stream->conninfo.flushing = flush;
2079     }
2080   }
2081   g_cond_broadcast (&sink->preroll_cond);
2082   g_mutex_unlock (&sink->preroll_lock);
2083 }
2084
2085 static GstRTSPResult
2086 gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink,
2087     GstRTSPMessage * msg, GstRTSPMethod method, const gchar * uri)
2088 {
2089   GstRTSPResult res;
2090
2091   res = gst_rtsp_message_init_request (msg, method, uri);
2092   if (res < 0)
2093     return res;
2094
2095   /* set user-agent */
2096   if (sink->user_agent)
2097     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_USER_AGENT,
2098         sink->user_agent);
2099
2100   return res;
2101 }
2102
2103 /* FIXME, handle server request, reply with OK, for now */
2104 static GstRTSPResult
2105 gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
2106     GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
2107 {
2108   GstRTSPMessage response = { 0 };
2109   GstRTSPResult res;
2110
2111   GST_DEBUG_OBJECT (sink, "got server request message");
2112
2113   if (sink->debug)
2114     gst_rtsp_message_dump (request);
2115
2116   /* default implementation, send OK */
2117   GST_DEBUG_OBJECT (sink, "prepare OK reply");
2118   res =
2119       gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, "OK",
2120       request);
2121   if (res < 0)
2122     goto send_error;
2123
2124   /* let app parse and reply */
2125   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST],
2126       0, request, &response);
2127
2128   if (sink->debug)
2129     gst_rtsp_message_dump (&response);
2130
2131   res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL);
2132   if (res < 0)
2133     goto send_error;
2134
2135   gst_rtsp_message_unset (&response);
2136
2137   return GST_RTSP_OK;
2138
2139   /* ERRORS */
2140 send_error:
2141   {
2142     gst_rtsp_message_unset (&response);
2143     return res;
2144   }
2145 }
2146
2147 /* send server keep-alive */
2148 static GstRTSPResult
2149 gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink)
2150 {
2151   GstRTSPMessage request = { 0 };
2152   GstRTSPResult res;
2153   GstRTSPMethod method;
2154   const gchar *control;
2155
2156   if (sink->do_rtsp_keep_alive == FALSE) {
2157     GST_DEBUG_OBJECT (sink, "do-rtsp-keep-alive is FALSE, not sending.");
2158     gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
2159     return GST_RTSP_OK;
2160   }
2161
2162   GST_DEBUG_OBJECT (sink, "creating server keep-alive");
2163
2164   /* find a method to use for keep-alive */
2165   if (sink->methods & GST_RTSP_GET_PARAMETER)
2166     method = GST_RTSP_GET_PARAMETER;
2167   else
2168     method = GST_RTSP_OPTIONS;
2169
2170   control = get_aggregate_control (sink);
2171   if (control == NULL)
2172     goto no_control;
2173
2174   res = gst_rtsp_client_sink_init_request (sink, &request, method, control);
2175   if (res < 0)
2176     goto send_error;
2177
2178   if (sink->debug)
2179     gst_rtsp_message_dump (&request);
2180
2181   res =
2182       gst_rtsp_client_sink_connection_send (sink, &sink->conninfo,
2183       &request, NULL);
2184   if (res < 0)
2185     goto send_error;
2186
2187   gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
2188   gst_rtsp_message_unset (&request);
2189
2190   return GST_RTSP_OK;
2191
2192   /* ERRORS */
2193 no_control:
2194   {
2195     GST_WARNING_OBJECT (sink, "no control url to send keepalive");
2196     return GST_RTSP_OK;
2197   }
2198 send_error:
2199   {
2200     gchar *str = gst_rtsp_strresult (res);
2201
2202     gst_rtsp_message_unset (&request);
2203     GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, (NULL),
2204         ("Could not send keep-alive. (%s)", str));
2205     g_free (str);
2206     return res;
2207   }
2208 }
2209
2210 static GstFlowReturn
2211 gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
2212 {
2213   GstRTSPResult res;
2214   GstRTSPMessage message = { 0 };
2215   gint retry = 0;
2216
2217   while (TRUE) {
2218     GTimeVal tv_timeout;
2219
2220     /* get the next timeout interval */
2221     gst_rtsp_connection_next_timeout (sink->conninfo.connection, &tv_timeout);
2222
2223     GST_DEBUG_OBJECT (sink, "doing receive with timeout %d seconds",
2224         (gint) tv_timeout.tv_sec);
2225
2226     gst_rtsp_message_unset (&message);
2227
2228     /* we should continue reading the TCP socket because the server might
2229      * send us requests. When the session timeout expires, we need to send a
2230      * keep-alive request to keep the session open. */
2231     res =
2232         gst_rtsp_client_sink_connection_receive (sink,
2233         &sink->conninfo, &message, &tv_timeout);
2234
2235     switch (res) {
2236       case GST_RTSP_OK:
2237         GST_DEBUG_OBJECT (sink, "we received a server message");
2238         break;
2239       case GST_RTSP_EINTR:
2240         /* we got interrupted, see what we have to do */
2241         goto interrupt;
2242       case GST_RTSP_ETIMEOUT:
2243         /* send keep-alive, ignore the result, a warning will be posted. */
2244         GST_DEBUG_OBJECT (sink, "timeout, sending keep-alive");
2245         if ((res =
2246                 gst_rtsp_client_sink_send_keep_alive (sink)) == GST_RTSP_EINTR)
2247           goto interrupt;
2248         continue;
2249       case GST_RTSP_EEOF:
2250         /* server closed the connection. not very fatal for UDP, reconnect and
2251          * see what happens. */
2252         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2253             ("The server closed the connection."));
2254         if (sink->udp_reconnect) {
2255           if ((res =
2256                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2257                       FALSE)) < 0)
2258             goto connect_error;
2259         } else {
2260           goto server_eof;
2261         }
2262         continue;
2263         break;
2264       case GST_RTSP_ENET:
2265         GST_DEBUG_OBJECT (sink, "An ethernet problem occured.");
2266       default:
2267         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2268             ("Unhandled return value %d.", res));
2269         goto receive_error;
2270     }
2271
2272     switch (message.type) {
2273       case GST_RTSP_MESSAGE_REQUEST:
2274         /* server sends us a request message, handle it */
2275         res =
2276             gst_rtsp_client_sink_handle_request (sink,
2277             &sink->conninfo, &message);
2278         if (res == GST_RTSP_EEOF)
2279           goto server_eof;
2280         else if (res < 0)
2281           goto handle_request_failed;
2282         break;
2283       case GST_RTSP_MESSAGE_RESPONSE:
2284         /* we ignore response and data messages */
2285         GST_DEBUG_OBJECT (sink, "ignoring response message");
2286         if (sink->debug)
2287           gst_rtsp_message_dump (&message);
2288         if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) {
2289           GST_DEBUG_OBJECT (sink, "but is Unauthorized response ...");
2290           if (gst_rtsp_client_sink_setup_auth (sink, &message) && !(retry++)) {
2291             GST_DEBUG_OBJECT (sink, "so retrying keep-alive");
2292             if ((res =
2293                     gst_rtsp_client_sink_send_keep_alive (sink)) ==
2294                 GST_RTSP_EINTR)
2295               goto interrupt;
2296           }
2297         } else {
2298           retry = 0;
2299         }
2300         break;
2301       case GST_RTSP_MESSAGE_DATA:
2302         /* we ignore response and data messages */
2303         GST_DEBUG_OBJECT (sink, "ignoring data message");
2304         break;
2305       default:
2306         GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2307             message.type);
2308         break;
2309     }
2310   }
2311   g_assert_not_reached ();
2312
2313   /* we get here when the connection got interrupted */
2314 interrupt:
2315   {
2316     gst_rtsp_message_unset (&message);
2317     GST_DEBUG_OBJECT (sink, "got interrupted");
2318     return GST_FLOW_FLUSHING;
2319   }
2320 connect_error:
2321   {
2322     gchar *str = gst_rtsp_strresult (res);
2323     GstFlowReturn ret;
2324
2325     sink->conninfo.connected = FALSE;
2326     if (res != GST_RTSP_EINTR) {
2327       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
2328           ("Could not connect to server. (%s)", str));
2329       g_free (str);
2330       ret = GST_FLOW_ERROR;
2331     } else {
2332       ret = GST_FLOW_FLUSHING;
2333     }
2334     return ret;
2335   }
2336 receive_error:
2337   {
2338     gchar *str = gst_rtsp_strresult (res);
2339
2340     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2341         ("Could not receive message. (%s)", str));
2342     g_free (str);
2343     return GST_FLOW_ERROR;
2344   }
2345 handle_request_failed:
2346   {
2347     gchar *str = gst_rtsp_strresult (res);
2348     GstFlowReturn ret;
2349
2350     gst_rtsp_message_unset (&message);
2351     if (res != GST_RTSP_EINTR) {
2352       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2353           ("Could not handle server message. (%s)", str));
2354       g_free (str);
2355       ret = GST_FLOW_ERROR;
2356     } else {
2357       ret = GST_FLOW_FLUSHING;
2358     }
2359     return ret;
2360   }
2361 server_eof:
2362   {
2363     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2364     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2365         ("The server closed the connection."));
2366     sink->conninfo.connected = FALSE;
2367     gst_rtsp_message_unset (&message);
2368     return GST_FLOW_EOS;
2369   }
2370 }
2371
2372 static GstRTSPResult
2373 gst_rtsp_client_sink_reconnect (GstRTSPClientSink * sink, gboolean async)
2374 {
2375   GstRTSPResult res = GST_RTSP_OK;
2376   gboolean restart = FALSE;
2377
2378   GST_DEBUG_OBJECT (sink, "doing reconnect");
2379
2380   GST_FIXME_OBJECT (sink, "Reconnection is not yet implemented");
2381
2382   /* no need to restart, we're done */
2383   if (!restart)
2384     goto done;
2385
2386   /* we can try only TCP now */
2387   sink->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
2388
2389   /* close and cleanup our state */
2390   if ((res = gst_rtsp_client_sink_close (sink, async, FALSE)) < 0)
2391     goto done;
2392
2393   /* see if we have TCP left to try. Also don't try TCP when we were configured
2394    * with an SDP. */
2395   if (!(sink->protocols & GST_RTSP_LOWER_TRANS_TCP) || sink->from_sdp)
2396     goto no_protocols;
2397
2398   /* We post a warning message now to inform the user
2399    * that nothing happened. It's most likely a firewall thing. */
2400   GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2401       ("Could not receive any UDP packets for %.4f seconds, maybe your "
2402           "firewall is blocking it. Retrying using a TCP connection.",
2403           gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2404
2405   /* open new connection using tcp */
2406   if (gst_rtsp_client_sink_open (sink, async) < 0)
2407     goto open_failed;
2408
2409   /* start recording */
2410   if (gst_rtsp_client_sink_record (sink, async) < 0)
2411     goto play_failed;
2412
2413 done:
2414   return res;
2415
2416   /* ERRORS */
2417 no_protocols:
2418   {
2419     sink->cur_protocols = 0;
2420     /* no transport possible, post an error and stop */
2421     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2422         ("Could not receive any UDP packets for %.4f seconds, maybe your "
2423             "firewall is blocking it. No other protocols to try.",
2424             gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2425     return GST_RTSP_ERROR;
2426   }
2427 open_failed:
2428   {
2429     GST_DEBUG_OBJECT (sink, "open failed");
2430     return GST_RTSP_OK;
2431   }
2432 play_failed:
2433   {
2434     GST_DEBUG_OBJECT (sink, "play failed");
2435     return GST_RTSP_OK;
2436   }
2437 }
2438
2439 static void
2440 gst_rtsp_client_sink_loop_start_cmd (GstRTSPClientSink * sink, gint cmd)
2441 {
2442   switch (cmd) {
2443     case CMD_OPEN:
2444       GST_ELEMENT_PROGRESS (sink, START, "open", ("Opening Stream"));
2445       break;
2446     case CMD_RECORD:
2447       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending RECORD request"));
2448       break;
2449     case CMD_PAUSE:
2450       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending PAUSE request"));
2451       break;
2452     case CMD_CLOSE:
2453       GST_ELEMENT_PROGRESS (sink, START, "close", ("Closing Stream"));
2454       break;
2455     default:
2456       break;
2457   }
2458 }
2459
2460 static void
2461 gst_rtsp_client_sink_loop_complete_cmd (GstRTSPClientSink * sink, gint cmd)
2462 {
2463   switch (cmd) {
2464     case CMD_OPEN:
2465       GST_ELEMENT_PROGRESS (sink, COMPLETE, "open", ("Opened Stream"));
2466       break;
2467     case CMD_RECORD:
2468       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent RECORD request"));
2469       break;
2470     case CMD_PAUSE:
2471       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent PAUSE request"));
2472       break;
2473     case CMD_CLOSE:
2474       GST_ELEMENT_PROGRESS (sink, COMPLETE, "close", ("Closed Stream"));
2475       break;
2476     default:
2477       break;
2478   }
2479 }
2480
2481 static void
2482 gst_rtsp_client_sink_loop_cancel_cmd (GstRTSPClientSink * sink, gint cmd)
2483 {
2484   switch (cmd) {
2485     case CMD_OPEN:
2486       GST_ELEMENT_PROGRESS (sink, CANCELED, "open", ("Open canceled"));
2487       break;
2488     case CMD_RECORD:
2489       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("RECORD canceled"));
2490       break;
2491     case CMD_PAUSE:
2492       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("PAUSE canceled"));
2493       break;
2494     case CMD_CLOSE:
2495       GST_ELEMENT_PROGRESS (sink, CANCELED, "close", ("Close canceled"));
2496       break;
2497     default:
2498       break;
2499   }
2500 }
2501
2502 static void
2503 gst_rtsp_client_sink_loop_error_cmd (GstRTSPClientSink * sink, gint cmd)
2504 {
2505   switch (cmd) {
2506     case CMD_OPEN:
2507       GST_ELEMENT_PROGRESS (sink, ERROR, "open", ("Open failed"));
2508       break;
2509     case CMD_RECORD:
2510       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("RECORD failed"));
2511       break;
2512     case CMD_PAUSE:
2513       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("PAUSE failed"));
2514       break;
2515     case CMD_CLOSE:
2516       GST_ELEMENT_PROGRESS (sink, ERROR, "close", ("Close failed"));
2517       break;
2518     default:
2519       break;
2520   }
2521 }
2522
2523 static void
2524 gst_rtsp_client_sink_loop_end_cmd (GstRTSPClientSink * sink, gint cmd,
2525     GstRTSPResult ret)
2526 {
2527   if (ret == GST_RTSP_OK)
2528     gst_rtsp_client_sink_loop_complete_cmd (sink, cmd);
2529   else if (ret == GST_RTSP_EINTR)
2530     gst_rtsp_client_sink_loop_cancel_cmd (sink, cmd);
2531   else
2532     gst_rtsp_client_sink_loop_error_cmd (sink, cmd);
2533 }
2534
2535 static gboolean
2536 gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink, gint cmd,
2537     gint mask)
2538 {
2539   gint old;
2540   gboolean flushed = FALSE;
2541
2542   /* start new request */
2543   gst_rtsp_client_sink_loop_start_cmd (sink, cmd);
2544
2545   GST_DEBUG_OBJECT (sink, "sending cmd %s", cmd_to_string (cmd));
2546
2547   GST_OBJECT_LOCK (sink);
2548   old = sink->pending_cmd;
2549   if (old == CMD_RECONNECT) {
2550     GST_DEBUG_OBJECT (sink, "ignore, we were reconnecting");
2551     cmd = CMD_RECONNECT;
2552   }
2553   if (old != CMD_WAIT) {
2554     sink->pending_cmd = CMD_WAIT;
2555     GST_OBJECT_UNLOCK (sink);
2556     /* cancel previous request */
2557     GST_DEBUG_OBJECT (sink, "cancel previous request %s", cmd_to_string (old));
2558     gst_rtsp_client_sink_loop_cancel_cmd (sink, old);
2559     GST_OBJECT_LOCK (sink);
2560   }
2561   sink->pending_cmd = cmd;
2562   /* interrupt if allowed */
2563   if (sink->busy_cmd & mask) {
2564     GST_DEBUG_OBJECT (sink, "connection flush busy %s",
2565         cmd_to_string (sink->busy_cmd));
2566     gst_rtsp_client_sink_connection_flush (sink, TRUE);
2567     flushed = TRUE;
2568   } else {
2569     GST_DEBUG_OBJECT (sink, "not interrupting busy cmd %s",
2570         cmd_to_string (sink->busy_cmd));
2571   }
2572   if (sink->task)
2573     gst_task_start (sink->task);
2574   GST_OBJECT_UNLOCK (sink);
2575
2576   return flushed;
2577 }
2578
2579 static gboolean
2580 gst_rtsp_client_sink_loop (GstRTSPClientSink * sink)
2581 {
2582   GstFlowReturn ret;
2583
2584   if (!sink->conninfo.connection || !sink->conninfo.connected)
2585     goto no_connection;
2586
2587   ret = gst_rtsp_client_sink_loop_rx (sink);
2588   if (ret != GST_FLOW_OK)
2589     goto pause;
2590
2591   return TRUE;
2592
2593   /* ERRORS */
2594 no_connection:
2595   {
2596     GST_WARNING_OBJECT (sink, "we are not connected");
2597     ret = GST_FLOW_FLUSHING;
2598     goto pause;
2599   }
2600 pause:
2601   {
2602     const gchar *reason = gst_flow_get_name (ret);
2603
2604     GST_DEBUG_OBJECT (sink, "pausing task, reason %s", reason);
2605     gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_LOOP);
2606     return FALSE;
2607   }
2608 }
2609
2610 #ifndef GST_DISABLE_GST_DEBUG
2611 static const gchar *
2612 gst_rtsp_auth_method_to_string (GstRTSPAuthMethod method)
2613 {
2614   gint index = 0;
2615
2616   while (method != 0) {
2617     index++;
2618     method >>= 1;
2619   }
2620   switch (index) {
2621     case 0:
2622       return "None";
2623     case 1:
2624       return "Basic";
2625     case 2:
2626       return "Digest";
2627   }
2628
2629   return "Unknown";
2630 }
2631 #endif
2632
2633 /* Parse a WWW-Authenticate Response header and determine the
2634  * available authentication methods
2635  *
2636  * This code should also cope with the fact that each WWW-Authenticate
2637  * header can contain multiple challenge methods + tokens
2638  *
2639  * At the moment, for Basic auth, we just do a minimal check and don't
2640  * even parse out the realm */
2641 static void
2642 gst_rtsp_client_sink_parse_auth_hdr (GstRTSPMessage * response,
2643     GstRTSPAuthMethod * methods, GstRTSPConnection * conn, gboolean * stale)
2644 {
2645   GstRTSPAuthCredential **credentials, **credential;
2646
2647   g_return_if_fail (response != NULL);
2648   g_return_if_fail (methods != NULL);
2649   g_return_if_fail (stale != NULL);
2650
2651   credentials =
2652       gst_rtsp_message_parse_auth_credentials (response,
2653       GST_RTSP_HDR_WWW_AUTHENTICATE);
2654   if (!credentials)
2655     return;
2656
2657   credential = credentials;
2658   while (*credential) {
2659     if ((*credential)->scheme == GST_RTSP_AUTH_BASIC) {
2660       *methods |= GST_RTSP_AUTH_BASIC;
2661     } else if ((*credential)->scheme == GST_RTSP_AUTH_DIGEST) {
2662       GstRTSPAuthParam **param = (*credential)->params;
2663
2664       *methods |= GST_RTSP_AUTH_DIGEST;
2665
2666       gst_rtsp_connection_clear_auth_params (conn);
2667       *stale = FALSE;
2668
2669       while (*param) {
2670         if (strcmp ((*param)->name, "stale") == 0
2671             && g_ascii_strcasecmp ((*param)->value, "TRUE") == 0)
2672           *stale = TRUE;
2673         gst_rtsp_connection_set_auth_param (conn, (*param)->name,
2674             (*param)->value);
2675         param++;
2676       }
2677     }
2678
2679     credential++;
2680   }
2681
2682   gst_rtsp_auth_credentials_free (credentials);
2683 }
2684
2685 /**
2686  * gst_rtsp_client_sink_setup_auth:
2687  * @src: the rtsp source
2688  *
2689  * Configure a username and password and auth method on the
2690  * connection object based on a response we received from the
2691  * peer.
2692  *
2693  * Currently, this requires that a username and password were supplied
2694  * in the uri. In the future, they may be requested on demand by sending
2695  * a message up the bus.
2696  *
2697  * Returns: TRUE if authentication information could be set up correctly.
2698  */
2699 static gboolean
2700 gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
2701     GstRTSPMessage * response)
2702 {
2703   gchar *user = NULL;
2704   gchar *pass = NULL;
2705   GstRTSPAuthMethod avail_methods = GST_RTSP_AUTH_NONE;
2706   GstRTSPAuthMethod method;
2707   GstRTSPResult auth_result;
2708   GstRTSPUrl *url;
2709   GstRTSPConnection *conn;
2710   gboolean stale = FALSE;
2711
2712   conn = sink->conninfo.connection;
2713
2714   /* Identify the available auth methods and see if any are supported */
2715   gst_rtsp_client_sink_parse_auth_hdr (response, &avail_methods, conn, &stale);
2716
2717   if (avail_methods == GST_RTSP_AUTH_NONE)
2718     goto no_auth_available;
2719
2720   /* For digest auth, if the response indicates that the session
2721    * data are stale, we just update them in the connection object and
2722    * return TRUE to retry the request */
2723   if (stale)
2724     sink->tried_url_auth = FALSE;
2725
2726   url = gst_rtsp_connection_get_url (conn);
2727
2728   /* Do we have username and password available? */
2729   if (url != NULL && !sink->tried_url_auth && url->user != NULL
2730       && url->passwd != NULL) {
2731     user = url->user;
2732     pass = url->passwd;
2733     sink->tried_url_auth = TRUE;
2734     GST_DEBUG_OBJECT (sink,
2735         "Attempting authentication using credentials from the URL");
2736   } else {
2737     user = sink->user_id;
2738     pass = sink->user_pw;
2739     GST_DEBUG_OBJECT (sink,
2740         "Attempting authentication using credentials from the properties");
2741   }
2742
2743   /* FIXME: If the url didn't contain username and password or we tried them
2744    * already, request a username and passwd from the application via some kind
2745    * of credentials request message */
2746
2747   /* If we don't have a username and passwd at this point, bail out. */
2748   if (user == NULL || pass == NULL)
2749     goto no_user_pass;
2750
2751   /* Try to configure for each available authentication method, strongest to
2752    * weakest */
2753   for (method = GST_RTSP_AUTH_MAX; method != GST_RTSP_AUTH_NONE; method >>= 1) {
2754     /* Check if this method is available on the server */
2755     if ((method & avail_methods) == 0)
2756       continue;
2757
2758     /* Pass the credentials to the connection to try on the next request */
2759     auth_result = gst_rtsp_connection_set_auth (conn, method, user, pass);
2760     /* INVAL indicates an invalid username/passwd were supplied, so we'll just
2761      * ignore it and end up retrying later */
2762     if (auth_result == GST_RTSP_OK || auth_result == GST_RTSP_EINVAL) {
2763       GST_DEBUG_OBJECT (sink, "Attempting %s authentication",
2764           gst_rtsp_auth_method_to_string (method));
2765       break;
2766     }
2767   }
2768
2769   if (method == GST_RTSP_AUTH_NONE)
2770     goto no_auth_available;
2771
2772   return TRUE;
2773
2774 no_auth_available:
2775   {
2776     /* Output an error indicating that we couldn't connect because there were
2777      * no supported authentication protocols */
2778     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
2779         ("No supported authentication protocol was found"));
2780     return FALSE;
2781   }
2782 no_user_pass:
2783   {
2784     /* We don't fire an error message, we just return FALSE and let the
2785      * normal NOT_AUTHORIZED error be propagated */
2786     return FALSE;
2787   }
2788 }
2789
2790 static GstRTSPResult
2791 gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
2792     GstRTSPConnInfo * conninfo, GstRTSPMessage * request,
2793     GstRTSPMessage * response, GstRTSPStatusCode * code)
2794 {
2795   GstRTSPResult res;
2796   GstRTSPStatusCode thecode;
2797   gchar *content_base = NULL;
2798   gint try = 0;
2799
2800 again:
2801   GST_DEBUG_OBJECT (sink, "sending message");
2802
2803   if (sink->debug)
2804     gst_rtsp_message_dump (request);
2805
2806   g_mutex_lock (&sink->send_lock);
2807
2808   res =
2809       gst_rtsp_client_sink_connection_send (sink, conninfo, request,
2810       sink->ptcp_timeout);
2811   if (res < 0) {
2812     g_mutex_unlock (&sink->send_lock);
2813     goto send_error;
2814   }
2815
2816   gst_rtsp_connection_reset_timeout (conninfo->connection);
2817
2818   /* See if we should handle the response */
2819   if (response == NULL) {
2820     g_mutex_unlock (&sink->send_lock);
2821     return GST_RTSP_OK;
2822   }
2823 next:
2824   res =
2825       gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
2826       sink->ptcp_timeout);
2827
2828   g_mutex_unlock (&sink->send_lock);
2829
2830   if (res < 0)
2831     goto receive_error;
2832
2833   if (sink->debug)
2834     gst_rtsp_message_dump (response);
2835
2836
2837   switch (response->type) {
2838     case GST_RTSP_MESSAGE_REQUEST:
2839       res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
2840       if (res == GST_RTSP_EEOF)
2841         goto server_eof;
2842       else if (res < 0)
2843         goto handle_request_failed;
2844       g_mutex_lock (&sink->send_lock);
2845       goto next;
2846     case GST_RTSP_MESSAGE_RESPONSE:
2847       /* ok, a response is good */
2848       GST_DEBUG_OBJECT (sink, "received response message");
2849       break;
2850     case GST_RTSP_MESSAGE_DATA:
2851       /* we ignore data messages */
2852       GST_DEBUG_OBJECT (sink, "ignoring data message");
2853       g_mutex_lock (&sink->send_lock);
2854       goto next;
2855     default:
2856       GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2857           response->type);
2858       g_mutex_lock (&sink->send_lock);
2859       goto next;
2860   }
2861
2862   thecode = response->type_data.response.code;
2863
2864   GST_DEBUG_OBJECT (sink, "got response message %d", thecode);
2865
2866   /* if the caller wanted the result code, we store it. */
2867   if (code)
2868     *code = thecode;
2869
2870   /* If the request didn't succeed, bail out before doing any more */
2871   if (thecode != GST_RTSP_STS_OK)
2872     return GST_RTSP_OK;
2873
2874   /* store new content base if any */
2875   gst_rtsp_message_get_header (response, GST_RTSP_HDR_CONTENT_BASE,
2876       &content_base, 0);
2877   if (content_base) {
2878     g_free (sink->content_base);
2879     sink->content_base = g_strdup (content_base);
2880   }
2881
2882   return GST_RTSP_OK;
2883
2884   /* ERRORS */
2885 send_error:
2886   {
2887     gchar *str = gst_rtsp_strresult (res);
2888
2889     if (res != GST_RTSP_EINTR) {
2890       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2891           ("Could not send message. (%s)", str));
2892     } else {
2893       GST_WARNING_OBJECT (sink, "send interrupted");
2894     }
2895     g_free (str);
2896     return res;
2897   }
2898 receive_error:
2899   {
2900     switch (res) {
2901       case GST_RTSP_EEOF:
2902         GST_WARNING_OBJECT (sink, "server closed connection");
2903         if ((try == 0) && !sink->interleaved && sink->udp_reconnect) {
2904           try++;
2905           /* if reconnect succeeds, try again */
2906           if ((res =
2907                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2908                       FALSE)) == 0)
2909             goto again;
2910         }
2911         /* only try once after reconnect, then fallthrough and error out */
2912       default:
2913       {
2914         gchar *str = gst_rtsp_strresult (res);
2915
2916         if (res != GST_RTSP_EINTR) {
2917           GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2918               ("Could not receive message. (%s)", str));
2919         } else {
2920           GST_WARNING_OBJECT (sink, "receive interrupted");
2921         }
2922         g_free (str);
2923         break;
2924       }
2925     }
2926     return res;
2927   }
2928 handle_request_failed:
2929   {
2930     /* ERROR was posted */
2931     gst_rtsp_message_unset (response);
2932     return res;
2933   }
2934 server_eof:
2935   {
2936     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2937     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2938         ("The server closed the connection."));
2939     gst_rtsp_message_unset (response);
2940     return res;
2941   }
2942 }
2943
2944 static void
2945 gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state)
2946 {
2947   GST_DEBUG_OBJECT (sink, "Setting internal state to %s",
2948       gst_element_state_get_name (state));
2949   gst_element_set_state (GST_ELEMENT (sink->internal_bin), state);
2950 }
2951
2952 /**
2953  * gst_rtsp_client_sink_send:
2954  * @src: the rtsp source
2955  * @conn: the connection to send on
2956  * @request: must point to a valid request
2957  * @response: must point to an empty #GstRTSPMessage
2958  * @code: an optional code result
2959  *
2960  * send @request and retrieve the response in @response. optionally @code can be
2961  * non-NULL in which case it will contain the status code of the response.
2962  *
2963  * If This function returns #GST_RTSP_OK, @response will contain a valid response
2964  * message that should be cleaned with gst_rtsp_message_unset() after usage.
2965  *
2966  * If @code is NULL, this function will return #GST_RTSP_ERROR (with an invalid
2967  * @response message) if the response code was not 200 (OK).
2968  *
2969  * If the attempt results in an authentication failure, then this will attempt
2970  * to retrieve authentication credentials via gst_rtsp_client_sink_setup_auth and retry
2971  * the request.
2972  *
2973  * Returns: #GST_RTSP_OK if the processing was successful.
2974  */
2975 static GstRTSPResult
2976 gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
2977     GstRTSPMessage * request, GstRTSPMessage * response,
2978     GstRTSPStatusCode * code)
2979 {
2980   GstRTSPStatusCode int_code = GST_RTSP_STS_OK;
2981   GstRTSPResult res = GST_RTSP_ERROR;
2982   gint count;
2983   gboolean retry;
2984   GstRTSPMethod method = GST_RTSP_INVALID;
2985
2986   count = 0;
2987   do {
2988     retry = FALSE;
2989
2990     /* make sure we don't loop forever */
2991     if (count++ > 8)
2992       break;
2993
2994     /* save method so we can disable it when the server complains */
2995     method = request->type_data.request.method;
2996
2997     if ((res =
2998             gst_rtsp_client_sink_try_send (sink, conninfo, request, response,
2999                 &int_code)) < 0)
3000       goto error;
3001
3002     switch (int_code) {
3003       case GST_RTSP_STS_UNAUTHORIZED:
3004         if (gst_rtsp_client_sink_setup_auth (sink, response)) {
3005           /* Try the request/response again after configuring the auth info
3006            * and loop again */
3007           retry = TRUE;
3008         }
3009         break;
3010       default:
3011         break;
3012     }
3013   } while (retry == TRUE);
3014
3015   /* If the user requested the code, let them handle errors, otherwise
3016    * post an error below */
3017   if (code != NULL)
3018     *code = int_code;
3019   else if (int_code != GST_RTSP_STS_OK)
3020     goto error_response;
3021
3022   return res;
3023
3024   /* ERRORS */
3025 error:
3026   {
3027     GST_DEBUG_OBJECT (sink, "got error %d", res);
3028     return res;
3029   }
3030 error_response:
3031   {
3032     res = GST_RTSP_ERROR;
3033
3034     switch (response->type_data.response.code) {
3035       case GST_RTSP_STS_NOT_FOUND:
3036         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL), ("%s",
3037                 response->type_data.response.reason));
3038         break;
3039       case GST_RTSP_STS_UNAUTHORIZED:
3040         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_AUTHORIZED, (NULL), ("%s",
3041                 response->type_data.response.reason));
3042         break;
3043       case GST_RTSP_STS_MOVED_PERMANENTLY:
3044       case GST_RTSP_STS_MOVE_TEMPORARILY:
3045       {
3046         gchar *new_location;
3047         GstRTSPLowerTrans transports;
3048
3049         GST_DEBUG_OBJECT (sink, "got redirection");
3050         /* if we don't have a Location Header, we must error */
3051         if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_LOCATION,
3052                 &new_location, 0) < 0)
3053           break;
3054
3055         /* When we receive a redirect result, we go back to the INIT state after
3056          * parsing the new URI. The caller should do the needed steps to issue
3057          * a new setup when it detects this state change. */
3058         GST_DEBUG_OBJECT (sink, "redirection to %s", new_location);
3059
3060         /* save current transports */
3061         if (sink->conninfo.url)
3062           transports = sink->conninfo.url->transports;
3063         else
3064           transports = GST_RTSP_LOWER_TRANS_UNKNOWN;
3065
3066         gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (sink), new_location,
3067             NULL);
3068
3069         /* set old transports */
3070         if (sink->conninfo.url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN)
3071           sink->conninfo.url->transports = transports;
3072
3073         sink->need_redirect = TRUE;
3074         sink->state = GST_RTSP_STATE_INIT;
3075         res = GST_RTSP_OK;
3076         break;
3077       }
3078       case GST_RTSP_STS_NOT_ACCEPTABLE:
3079       case GST_RTSP_STS_NOT_IMPLEMENTED:
3080       case GST_RTSP_STS_METHOD_NOT_ALLOWED:
3081         GST_WARNING_OBJECT (sink, "got NOT IMPLEMENTED, disable method %s",
3082             gst_rtsp_method_as_text (method));
3083         sink->methods &= ~method;
3084         res = GST_RTSP_OK;
3085         break;
3086       default:
3087         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3088             ("Got error response: %d (%s).", response->type_data.response.code,
3089                 response->type_data.response.reason));
3090         break;
3091     }
3092     /* if we return ERROR we should unset the response ourselves */
3093     if (res == GST_RTSP_ERROR)
3094       gst_rtsp_message_unset (response);
3095
3096     return res;
3097   }
3098 }
3099
3100 /* parse the response and collect all the supported methods. We need this
3101  * information so that we don't try to send an unsupported request to the
3102  * server.
3103  */
3104 static gboolean
3105 gst_rtsp_client_sink_parse_methods (GstRTSPClientSink * sink,
3106     GstRTSPMessage * response)
3107 {
3108   GstRTSPHeaderField field;
3109   gchar *respoptions;
3110   gint indx = 0;
3111
3112   /* reset supported methods */
3113   sink->methods = 0;
3114
3115   /* Try Allow Header first */
3116   field = GST_RTSP_HDR_ALLOW;
3117   while (TRUE) {
3118     respoptions = NULL;
3119     gst_rtsp_message_get_header (response, field, &respoptions, indx);
3120     if (indx == 0 && !respoptions) {
3121       /* if no Allow header was found then try the Public header... */
3122       field = GST_RTSP_HDR_PUBLIC;
3123       gst_rtsp_message_get_header (response, field, &respoptions, indx);
3124     }
3125     if (!respoptions)
3126       break;
3127
3128     sink->methods |= gst_rtsp_options_from_text (respoptions);
3129
3130     indx++;
3131   }
3132
3133   if (sink->methods == 0) {
3134     /* neither Allow nor Public are required, assume the server supports
3135      * at least SETUP. */
3136     GST_DEBUG_OBJECT (sink, "could not get OPTIONS");
3137     sink->methods = GST_RTSP_SETUP;
3138   }
3139
3140   /* Even if the server replied, and didn't say it supports
3141    * RECORD|ANNOUNCE, try anyway by assuming it does */
3142   sink->methods |= GST_RTSP_ANNOUNCE | GST_RTSP_RECORD;
3143
3144   if (!(sink->methods & GST_RTSP_SETUP))
3145     goto no_setup;
3146
3147   return TRUE;
3148
3149   /* ERRORS */
3150 no_setup:
3151   {
3152     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
3153         ("Server does not support SETUP."));
3154     return FALSE;
3155   }
3156 }
3157
3158 static GstRTSPResult
3159 gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink,
3160     gboolean async)
3161 {
3162   GstRTSPResult res;
3163   GstRTSPMessage request = { 0 };
3164   GstRTSPMessage response = { 0 };
3165   GSocket *conn_socket;
3166   GSocketAddress *sa;
3167   GInetAddress *ia;
3168
3169   sink->need_redirect = FALSE;
3170
3171   /* can't continue without a valid url */
3172   if (G_UNLIKELY (sink->conninfo.url == NULL)) {
3173     res = GST_RTSP_EINVAL;
3174     goto no_url;
3175   }
3176   sink->tried_url_auth = FALSE;
3177
3178   if ((res = gst_rtsp_conninfo_connect (sink, &sink->conninfo, async)) < 0)
3179     goto connect_failed;
3180
3181   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
3182   sa = g_socket_get_remote_address (conn_socket, NULL);
3183   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
3184
3185   sink->server_ip = g_inet_address_to_string (ia);
3186
3187   g_object_unref (sa);
3188
3189   /* create OPTIONS */
3190   GST_DEBUG_OBJECT (sink, "create options...");
3191   res =
3192       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_OPTIONS,
3193       sink->conninfo.url_str);
3194   if (res < 0)
3195     goto create_request_failed;
3196
3197   /* send OPTIONS */
3198   GST_DEBUG_OBJECT (sink, "send options...");
3199
3200   if (async)
3201     GST_ELEMENT_PROGRESS (sink, CONTINUE, "open",
3202         ("Retrieving server options"));
3203
3204   if ((res =
3205           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
3206               &response, NULL)) < 0)
3207     goto send_error;
3208
3209   /* parse OPTIONS */
3210   if (!gst_rtsp_client_sink_parse_methods (sink, &response))
3211     goto methods_error;
3212
3213   /* FIXME: Do we need to handle REDIRECT responses for OPTIONS? */
3214
3215   /* clean up any messages */
3216   gst_rtsp_message_unset (&request);
3217   gst_rtsp_message_unset (&response);
3218
3219   return res;
3220
3221   /* ERRORS */
3222 no_url:
3223   {
3224     GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
3225         ("No valid RTSP URL was provided"));
3226     goto cleanup_error;
3227   }
3228 connect_failed:
3229   {
3230     gchar *str = gst_rtsp_strresult (res);
3231
3232     if (res != GST_RTSP_EINTR) {
3233       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
3234           ("Failed to connect. (%s)", str));
3235     } else {
3236       GST_WARNING_OBJECT (sink, "connect interrupted");
3237     }
3238     g_free (str);
3239     goto cleanup_error;
3240   }
3241 create_request_failed:
3242   {
3243     gchar *str = gst_rtsp_strresult (res);
3244
3245     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3246         ("Could not create request. (%s)", str));
3247     g_free (str);
3248     goto cleanup_error;
3249   }
3250 send_error:
3251   {
3252     /* Don't post a message - the rtsp_send method will have
3253      * taken care of it because we passed NULL for the response code */
3254     goto cleanup_error;
3255   }
3256 methods_error:
3257   {
3258     /* error was posted */
3259     res = GST_RTSP_ERROR;
3260     goto cleanup_error;
3261   }
3262 cleanup_error:
3263   {
3264     if (sink->conninfo.connection) {
3265       GST_DEBUG_OBJECT (sink, "free connection");
3266       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3267     }
3268     gst_rtsp_message_unset (&request);
3269     gst_rtsp_message_unset (&response);
3270     return res;
3271   }
3272 }
3273
3274 static GstRTSPResult
3275 gst_rtsp_client_sink_open (GstRTSPClientSink * sink, gboolean async)
3276 {
3277   GstRTSPResult ret;
3278
3279   sink->methods =
3280       GST_RTSP_SETUP | GST_RTSP_RECORD | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN;
3281
3282   g_mutex_lock (&sink->open_conn_lock);
3283   sink->open_conn_start = TRUE;
3284   g_cond_broadcast (&sink->open_conn_cond);
3285   GST_DEBUG_OBJECT (sink, "connection to server started");
3286   g_mutex_unlock (&sink->open_conn_lock);
3287
3288   if ((ret = gst_rtsp_client_sink_connect_to_server (sink, async)) < 0)
3289     goto open_failed;
3290
3291   if (async)
3292     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3293
3294   return ret;
3295
3296   /* ERRORS */
3297 open_failed:
3298   {
3299     GST_WARNING_OBJECT (sink, "Failed to connect to server");
3300     sink->open_error = TRUE;
3301     if (async)
3302       gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3303     return ret;
3304   }
3305 }
3306
3307 static GstRTSPResult
3308 gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async,
3309     gboolean only_close)
3310 {
3311   GstRTSPMessage request = { 0 };
3312   GstRTSPMessage response = { 0 };
3313   GstRTSPResult res = GST_RTSP_OK;
3314   GList *walk;
3315   const gchar *control;
3316
3317   GST_DEBUG_OBJECT (sink, "TEARDOWN...");
3318
3319   gst_rtsp_client_sink_set_state (sink, GST_STATE_NULL);
3320
3321   if (sink->state < GST_RTSP_STATE_READY) {
3322     GST_DEBUG_OBJECT (sink, "not ready, doing cleanup");
3323     goto close;
3324   }
3325
3326   if (only_close)
3327     goto close;
3328
3329   /* construct a control url */
3330   control = get_aggregate_control (sink);
3331
3332   if (!(sink->methods & (GST_RTSP_RECORD | GST_RTSP_TEARDOWN)))
3333     goto not_supported;
3334
3335   /* stop streaming */
3336   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3337     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3338
3339     if (context->stream_transport)
3340       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
3341
3342     if (context->joined) {
3343       gst_rtsp_stream_leave_bin (context->stream, GST_BIN (sink->internal_bin),
3344           sink->rtpbin);
3345       context->joined = FALSE;
3346     }
3347   }
3348
3349   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3350     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3351     const gchar *setup_url;
3352     GstRTSPConnInfo *info;
3353
3354     GST_DEBUG_OBJECT (sink, "Looking at stream %p for teardown",
3355         context->stream);
3356
3357     /* try aggregate control first but do non-aggregate control otherwise */
3358     if (control)
3359       setup_url = control;
3360     else if ((setup_url = context->conninfo.location) == NULL) {
3361       GST_DEBUG_OBJECT (sink, "Skipping TEARDOWN stream %p - no setup URL",
3362           context->stream);
3363       continue;
3364     }
3365
3366     if (sink->conninfo.connection) {
3367       info = &sink->conninfo;
3368     } else if (context->conninfo.connection) {
3369       info = &context->conninfo;
3370     } else {
3371       continue;
3372     }
3373     if (!info->connected)
3374       goto next;
3375
3376     /* do TEARDOWN */
3377     GST_DEBUG_OBJECT (sink, "Sending teardown for stream %p at URL %s",
3378         context->stream, setup_url);
3379     res =
3380         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_TEARDOWN,
3381         setup_url);
3382     if (res < 0)
3383       goto create_request_failed;
3384
3385     if (async)
3386       GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
3387
3388     if ((res =
3389             gst_rtsp_client_sink_send (sink, info, &request,
3390                 &response, NULL)) < 0)
3391       goto send_error;
3392
3393     /* FIXME, parse result? */
3394     gst_rtsp_message_unset (&request);
3395     gst_rtsp_message_unset (&response);
3396
3397   next:
3398     /* early exit when we did aggregate control */
3399     if (control)
3400       break;
3401   }
3402
3403 close:
3404   /* close connections */
3405   GST_DEBUG_OBJECT (sink, "closing connection...");
3406   gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3407   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3408     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
3409     gst_rtsp_conninfo_close (sink, &stream->conninfo, TRUE);
3410   }
3411
3412   /* cleanup */
3413   gst_rtsp_client_sink_cleanup (sink);
3414
3415   sink->state = GST_RTSP_STATE_INVALID;
3416
3417   if (async)
3418     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_CLOSE, res);
3419
3420   return res;
3421
3422   /* ERRORS */
3423 create_request_failed:
3424   {
3425     gchar *str = gst_rtsp_strresult (res);
3426
3427     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3428         ("Could not create request. (%s)", str));
3429     g_free (str);
3430     goto close;
3431   }
3432 send_error:
3433   {
3434     gchar *str = gst_rtsp_strresult (res);
3435
3436     gst_rtsp_message_unset (&request);
3437     if (res != GST_RTSP_EINTR) {
3438       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
3439           ("Could not send message. (%s)", str));
3440     } else {
3441       GST_WARNING_OBJECT (sink, "TEARDOWN interrupted");
3442     }
3443     g_free (str);
3444     goto close;
3445   }
3446 not_supported:
3447   {
3448     GST_DEBUG_OBJECT (sink,
3449         "TEARDOWN and PLAY not supported, can't do TEARDOWN");
3450     goto close;
3451   }
3452 }
3453
3454 static gboolean
3455 gst_rtsp_client_sink_configure_manager (GstRTSPClientSink * sink)
3456 {
3457   GstElement *rtpbin;
3458   GstStateChangeReturn ret;
3459
3460   rtpbin = sink->rtpbin;
3461
3462   if (rtpbin == NULL) {
3463     GObjectClass *klass;
3464
3465     rtpbin = gst_element_factory_make ("rtpbin", NULL);
3466     if (rtpbin == NULL)
3467       goto no_rtpbin;
3468
3469     gst_bin_add (GST_BIN_CAST (sink->internal_bin), rtpbin);
3470
3471     sink->rtpbin = rtpbin;
3472
3473     /* Any more settings we should configure on rtpbin here? */
3474     g_object_set (sink->rtpbin, "latency", sink->latency, NULL);
3475
3476     klass = G_OBJECT_GET_CLASS (G_OBJECT (rtpbin));
3477
3478     if (g_object_class_find_property (klass, "ntp-time-source")) {
3479       g_object_set (sink->rtpbin, "ntp-time-source", sink->ntp_time_source,
3480           NULL);
3481     }
3482
3483     if (sink->sdes && g_object_class_find_property (klass, "sdes")) {
3484       g_object_set (sink->rtpbin, "sdes", sink->sdes, NULL);
3485     }
3486
3487     g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER], 0,
3488         sink->rtpbin);
3489   }
3490
3491   ret = gst_element_set_state (rtpbin, GST_STATE_PAUSED);
3492   if (ret == GST_STATE_CHANGE_FAILURE)
3493     goto start_manager_failure;
3494
3495   return TRUE;
3496
3497 no_rtpbin:
3498   {
3499     GST_WARNING ("no rtpbin element");
3500     g_warning ("failed to create element 'rtpbin', check your installation");
3501     return FALSE;
3502   }
3503 start_manager_failure:
3504   {
3505     GST_DEBUG_OBJECT (sink, "could not start session manager");
3506     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), rtpbin);
3507     return FALSE;
3508   }
3509 }
3510
3511 static GstElement *
3512 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPClientSink * sink)
3513 {
3514   GstRTSPStream *stream = NULL;
3515   GstElement *ret = NULL;
3516   GList *walk;
3517
3518   GST_RTSP_STATE_LOCK (sink);
3519   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3520     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3521
3522     if (sessid == gst_rtsp_stream_get_index (context->stream)) {
3523       stream = context->stream;
3524       break;
3525     }
3526   }
3527
3528   if (stream != NULL) {
3529     GST_DEBUG_OBJECT (sink, "Creating aux sender for stream %u", sessid);
3530     ret = gst_rtsp_stream_request_aux_sender (stream, sessid);
3531   }
3532
3533   GST_RTSP_STATE_UNLOCK (sink);
3534
3535   return ret;
3536 }
3537
3538 static GstElement *
3539 request_fec_encoder (GstElement * rtpbin, guint sessid,
3540     GstRTSPClientSink * sink)
3541 {
3542   GstRTSPStream *stream = NULL;
3543   GstElement *ret = NULL;
3544   GList *walk;
3545
3546   GST_RTSP_STATE_LOCK (sink);
3547   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3548     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3549
3550     if (sessid == gst_rtsp_stream_get_index (context->stream)) {
3551       stream = context->stream;
3552       break;
3553     }
3554   }
3555
3556   if (stream != NULL) {
3557     ret = gst_rtsp_stream_request_ulpfec_encoder (stream, sessid);
3558   }
3559
3560   GST_RTSP_STATE_UNLOCK (sink);
3561
3562   return ret;
3563 }
3564
3565 static gboolean
3566 gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink)
3567 {
3568   GstRTSPStreamContext *context;
3569   GList *walk;
3570   const gchar *base;
3571   gboolean has_slash;
3572
3573   GST_DEBUG_OBJECT (sink, "Collecting stream information");
3574
3575   if (!gst_rtsp_client_sink_configure_manager (sink))
3576     return FALSE;
3577
3578   base = get_aggregate_control (sink);
3579   /* check if the base ends with / */
3580   has_slash = g_str_has_suffix (base, "/");
3581
3582   g_mutex_lock (&sink->preroll_lock);
3583   while (sink->contexts == NULL && !sink->conninfo.flushing) {
3584     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3585   }
3586   g_mutex_unlock (&sink->preroll_lock);
3587
3588   /* FIXME: Need different locking - need to protect against pad releases
3589    * and potential state changes ruining things here */
3590   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3591     GstPad *srcpad;
3592
3593     context = (GstRTSPStreamContext *) walk->data;
3594     if (context->stream)
3595       continue;
3596
3597     g_mutex_lock (&sink->preroll_lock);
3598     while (!context->prerolled && !sink->conninfo.flushing) {
3599       GST_DEBUG_OBJECT (sink, "Waiting for caps on stream %d", context->index);
3600       g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3601     }
3602     if (sink->conninfo.flushing) {
3603       g_mutex_unlock (&sink->preroll_lock);
3604       break;
3605     }
3606     g_mutex_unlock (&sink->preroll_lock);
3607
3608     if (context->payloader == NULL)
3609       continue;
3610
3611     srcpad = gst_element_get_static_pad (context->payloader, "src");
3612
3613     GST_DEBUG_OBJECT (sink, "Creating stream object for stream %d",
3614         context->index);
3615     context->stream =
3616         gst_rtsp_client_sink_create_stream (sink, context, context->payloader,
3617         srcpad);
3618
3619     /* concatenate the two strings, insert / when not present */
3620     g_free (context->conninfo.location);
3621     context->conninfo.location =
3622         g_strdup_printf ("%s%sstream=%d", base, has_slash ? "" : "/",
3623         context->index);
3624
3625     if (sink->rtx_time > 0) {
3626       /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
3627       g_signal_connect (sink->rtpbin, "request-aux-sender",
3628           (GCallback) request_aux_sender, sink);
3629     }
3630
3631     g_signal_connect (sink->rtpbin, "request-fec-encoder",
3632         (GCallback) request_fec_encoder, sink);
3633
3634     if (!gst_rtsp_stream_join_bin (context->stream,
3635             GST_BIN (sink->internal_bin), sink->rtpbin, GST_STATE_PAUSED)) {
3636       goto join_bin_failed;
3637     }
3638     context->joined = TRUE;
3639
3640     /* Block the stream, as it does not have any transport parts yet */
3641     gst_rtsp_stream_set_blocked (context->stream, TRUE);
3642
3643     /* Let the stream object receive data */
3644     gst_pad_remove_probe (srcpad, context->payloader_block_id);
3645
3646     gst_object_unref (srcpad);
3647   }
3648
3649   /* Now wait for the preroll of the rtp bin */
3650   g_mutex_lock (&sink->preroll_lock);
3651   while (!sink->prerolled && sink->conninfo.connection
3652       && !sink->conninfo.flushing) {
3653     GST_LOG_OBJECT (sink, "Waiting for preroll before continuing");
3654     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3655   }
3656   GST_LOG_OBJECT (sink, "Marking streams as collected");
3657   sink->streams_collected = TRUE;
3658   g_mutex_unlock (&sink->preroll_lock);
3659
3660   return TRUE;
3661
3662 join_bin_failed:
3663
3664   GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3665       ("Could not start stream %d", context->index));
3666   return FALSE;
3667 }
3668
3669 static GstRTSPResult
3670 gst_rtsp_client_sink_create_transports_string (GstRTSPClientSink * sink,
3671     GstRTSPStreamContext * context, GSocketFamily family,
3672     GstRTSPLowerTrans protocols, GstRTSPProfile profiles, gchar ** transports)
3673 {
3674   GString *result;
3675   GstRTSPStream *stream = context->stream;
3676   gboolean first = TRUE;
3677
3678   /* the default RTSP transports */
3679   result = g_string_new ("RTP");
3680
3681   while (profiles != 0) {
3682     if (!first)
3683       g_string_append (result, ",RTP");
3684
3685     if (profiles & GST_RTSP_PROFILE_SAVPF) {
3686       g_string_append (result, "/SAVPF");
3687       profiles &= ~GST_RTSP_PROFILE_SAVPF;
3688     } else if (profiles & GST_RTSP_PROFILE_SAVP) {
3689       g_string_append (result, "/SAVP");
3690       profiles &= ~GST_RTSP_PROFILE_SAVP;
3691     } else if (profiles & GST_RTSP_PROFILE_AVPF) {
3692       g_string_append (result, "/AVPF");
3693       profiles &= ~GST_RTSP_PROFILE_AVPF;
3694     } else if (profiles & GST_RTSP_PROFILE_AVP) {
3695       g_string_append (result, "/AVP");
3696       profiles &= ~GST_RTSP_PROFILE_AVP;
3697     } else {
3698       GST_WARNING_OBJECT (sink, "Unimplemented profile(s) 0x%x", profiles);
3699       break;
3700     }
3701
3702     if (protocols & GST_RTSP_LOWER_TRANS_UDP) {
3703       GstRTSPRange ports;
3704
3705       GST_DEBUG_OBJECT (sink, "adding UDP unicast");
3706       gst_rtsp_stream_get_server_port (stream, &ports, family);
3707
3708       g_string_append_printf (result, "/UDP;unicast;client_port=%d-%d",
3709           ports.min, ports.max);
3710     } else if (protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3711       GstRTSPAddress *addr =
3712           gst_rtsp_stream_get_multicast_address (stream, family);
3713       if (addr) {
3714         GST_DEBUG_OBJECT (sink, "adding UDP multicast");
3715         g_string_append_printf (result, "/UDP;multicast;client_port=%d-%d",
3716             addr->port, addr->port + addr->n_ports - 1);
3717         gst_rtsp_address_free (addr);
3718       }
3719     } else if (protocols & GST_RTSP_LOWER_TRANS_TCP) {
3720       GST_DEBUG_OBJECT (sink, "adding TCP");
3721       g_string_append_printf (result, "/TCP;unicast;interleaved=%d-%d",
3722           sink->free_channel, sink->free_channel + 1);
3723     }
3724
3725     g_string_append (result, ";mode=RECORD");
3726     /* FIXME: Support appending too:
3727        if (sink->append)
3728        g_string_append (result, ";append");
3729      */
3730
3731     first = FALSE;
3732   }
3733
3734   if (first) {
3735     /* No valid transport could be constructed */
3736     GST_ERROR_OBJECT (sink, "No supported profiles configured");
3737     goto fail;
3738   }
3739
3740   *transports = g_string_free (result, FALSE);
3741
3742   GST_DEBUG_OBJECT (sink, "prepared transports %s", GST_STR_NULL (*transports));
3743
3744   return GST_RTSP_OK;
3745 fail:
3746   g_string_free (result, TRUE);
3747   return GST_RTSP_ERROR;
3748 }
3749
3750 static GstCaps *
3751 signal_get_srtcp_params (GstRTSPClientSink * sink,
3752     GstRTSPStreamContext * context)
3753 {
3754   GstCaps *caps = NULL;
3755
3756   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY], 0,
3757       context->index, &caps);
3758
3759   if (caps != NULL)
3760     GST_DEBUG_OBJECT (sink, "SRTP parameters received");
3761
3762   return caps;
3763 }
3764
3765 static gchar *
3766 gst_rtsp_client_sink_stream_make_keymgmt (GstRTSPClientSink * sink,
3767     GstRTSPStreamContext * context)
3768 {
3769   gchar *base64, *result = NULL;
3770   GstMIKEYMessage *mikey_msg;
3771
3772   context->srtcpparams = signal_get_srtcp_params (sink, context);
3773   if (context->srtcpparams == NULL)
3774     context->srtcpparams = gst_rtsp_stream_get_caps (context->stream);
3775
3776   mikey_msg = gst_mikey_message_new_from_caps (context->srtcpparams);
3777   if (mikey_msg) {
3778     guint send_ssrc, send_rtx_ssrc;
3779     const GstStructure *s = gst_caps_get_structure (context->srtcpparams, 0);
3780
3781     /* add policy '0' for our SSRC */
3782     gst_rtsp_stream_get_ssrc (context->stream, &send_ssrc);
3783     GST_LOG_OBJECT (sink, "Stream %p ssrc %x", context->stream, send_ssrc);
3784     gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_ssrc, 0);
3785
3786     if (gst_structure_get_uint (s, "rtx-ssrc", &send_rtx_ssrc))
3787       gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_rtx_ssrc, 0);
3788
3789     base64 = gst_mikey_message_base64_encode (mikey_msg);
3790     gst_mikey_message_unref (mikey_msg);
3791
3792     if (base64) {
3793       result = gst_sdp_make_keymgmt (context->conninfo.location, base64);
3794       g_free (base64);
3795     }
3796   }
3797
3798   return result;
3799 }
3800
3801 /* masks to be kept in sync with the hardcoded protocol order of preference
3802  * in code below */
3803 static const guint protocol_masks[] = {
3804   GST_RTSP_LOWER_TRANS_UDP,
3805   GST_RTSP_LOWER_TRANS_UDP_MCAST,
3806   GST_RTSP_LOWER_TRANS_TCP,
3807   0
3808 };
3809
3810 /* Same for profile_masks */
3811 static const guint profile_masks[] = {
3812   GST_RTSP_PROFILE_SAVPF,
3813   GST_RTSP_PROFILE_SAVP,
3814   GST_RTSP_PROFILE_AVPF,
3815   GST_RTSP_PROFILE_AVP,
3816   0
3817 };
3818
3819 static gboolean
3820 do_send_data (GstBuffer * buffer, guint8 channel,
3821     GstRTSPStreamContext * context)
3822 {
3823   GstRTSPClientSink *sink = context->parent;
3824   GstRTSPMessage message = { 0 };
3825   GstRTSPResult res = GST_RTSP_OK;
3826   GstMapInfo map_info;
3827   guint8 *data;
3828   guint usize;
3829
3830   gst_rtsp_message_init_data (&message, channel);
3831
3832   /* FIXME, need some sort of iovec RTSPMessage here */
3833   if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ))
3834     return FALSE;
3835
3836   gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
3837
3838   res =
3839       gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message,
3840       NULL, NULL);
3841
3842   gst_rtsp_message_steal_body (&message, &data, &usize);
3843   gst_buffer_unmap (buffer, &map_info);
3844
3845   gst_rtsp_message_unset (&message);
3846
3847   return res == GST_RTSP_OK;
3848 }
3849
3850 static GstRTSPResult
3851 gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
3852 {
3853   GstRTSPResult res = GST_RTSP_ERROR;
3854   GstRTSPMessage request = { 0 };
3855   GstRTSPMessage response = { 0 };
3856   GstRTSPLowerTrans protocols;
3857   GstRTSPStatusCode code;
3858   GSocketFamily family;
3859   GSocketAddress *sa;
3860   GSocket *conn_socket;
3861   GstRTSPUrl *url;
3862   GList *walk;
3863   gchar *hval;
3864
3865   if (sink->conninfo.connection) {
3866     url = gst_rtsp_connection_get_url (sink->conninfo.connection);
3867     /* we initially allow all configured lower transports. based on the URL
3868      * transports and the replies from the server we narrow them down. */
3869     protocols = url->transports & sink->cur_protocols;
3870   } else {
3871     url = NULL;
3872     protocols = sink->cur_protocols;
3873   }
3874
3875   if (protocols == 0)
3876     goto no_protocols;
3877
3878   GST_RTSP_STATE_LOCK (sink);
3879
3880   if (G_UNLIKELY (sink->contexts == NULL))
3881     goto no_streams;
3882
3883   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3884     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3885     GstRTSPStream *stream;
3886
3887     GstRTSPConnInfo *info;
3888     GstRTSPProfile profiles;
3889     GstRTSPProfile cur_profile;
3890     gchar *transports;
3891     gint retry = 0;
3892     guint profile_mask = 0;
3893     guint mask = 0;
3894     GstCaps *caps;
3895     const GstSDPMedia *media;
3896
3897     stream = context->stream;
3898     profiles = gst_rtsp_stream_get_profiles (stream);
3899
3900     caps = gst_rtsp_stream_get_caps (stream);
3901     if (caps == NULL) {
3902       GST_DEBUG_OBJECT (sink, "skipping stream %p, no caps", stream);
3903       continue;
3904     }
3905     gst_caps_unref (caps);
3906     media = gst_sdp_message_get_media (&sink->cursdp, context->sdp_index);
3907     if (media == NULL) {
3908       GST_DEBUG_OBJECT (sink, "skipping stream %p, no SDP info", stream);
3909       continue;
3910     }
3911
3912     /* skip setup if we have no URL for it */
3913     if (context->conninfo.location == NULL) {
3914       GST_DEBUG_OBJECT (sink, "skipping stream %p, no setup", stream);
3915       continue;
3916     }
3917
3918     if (sink->conninfo.connection == NULL) {
3919       if (!gst_rtsp_conninfo_connect (sink, &context->conninfo, async)) {
3920         GST_DEBUG_OBJECT (sink, "skipping stream %p, failed to connect",
3921             stream);
3922         continue;
3923       }
3924       info = &context->conninfo;
3925     } else {
3926       info = &sink->conninfo;
3927     }
3928     GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
3929         context->conninfo.location);
3930
3931     conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
3932     sa = g_socket_get_local_address (conn_socket, NULL);
3933     family = g_socket_address_get_family (sa);
3934     g_object_unref (sa);
3935
3936   next_protocol:
3937     /* first selectable profile */
3938     while (profile_masks[profile_mask]
3939         && !(profiles & profile_masks[profile_mask]))
3940       profile_mask++;
3941     if (!profile_masks[profile_mask])
3942       goto no_profiles;
3943
3944     /* first selectable protocol */
3945     while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
3946       mask++;
3947     if (!protocol_masks[mask])
3948       goto no_protocols;
3949
3950   retry:
3951     GST_DEBUG_OBJECT (sink, "protocols = 0x%x, protocol mask = 0x%x", protocols,
3952         protocol_masks[mask]);
3953     /* create a string with first transport in line */
3954     transports = NULL;
3955     cur_profile = profiles & profile_masks[profile_mask];
3956     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
3957         protocols & protocol_masks[mask], cur_profile, &transports);
3958     if (res < 0 || transports == NULL)
3959       goto setup_transport_failed;
3960
3961     if (strlen (transports) == 0) {
3962       g_free (transports);
3963       GST_DEBUG_OBJECT (sink, "no transports found");
3964       mask++;
3965       profile_mask = 0;
3966       goto next_protocol;
3967     }
3968
3969     GST_DEBUG_OBJECT (sink, "transport is %s", GST_STR_NULL (transports));
3970
3971     /* create SETUP request */
3972     res =
3973         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_SETUP,
3974         context->conninfo.location);
3975     if (res < 0) {
3976       g_free (transports);
3977       goto create_request_failed;
3978     }
3979
3980     /* set up keys */
3981     if (cur_profile == GST_RTSP_PROFILE_SAVP ||
3982         cur_profile == GST_RTSP_PROFILE_SAVPF) {
3983       hval = gst_rtsp_client_sink_stream_make_keymgmt (sink, context);
3984       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_KEYMGMT, hval);
3985     }
3986
3987     /* if the user wants a non default RTP packet size we add the blocksize
3988      * parameter */
3989     if (sink->rtp_blocksize > 0) {
3990       hval = g_strdup_printf ("%d", sink->rtp_blocksize);
3991       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_BLOCKSIZE, hval);
3992     }
3993
3994     if (async)
3995       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request", ("SETUP stream %d",
3996               context->index));
3997
3998     {
3999       GstRTSPTransport *transport;
4000
4001       gst_rtsp_transport_new (&transport);
4002       if (gst_rtsp_transport_parse (transports, transport) != GST_RTSP_OK)
4003         goto parse_transport_failed;
4004       if (transport->lower_transport != GST_RTSP_LOWER_TRANS_TCP) {
4005         if (!gst_rtsp_stream_allocate_udp_sockets (stream, family, transport,
4006                 FALSE)) {
4007           gst_rtsp_transport_free (transport);
4008           goto allocate_udp_ports_failed;
4009         }
4010       }
4011       if (!gst_rtsp_stream_complete_stream (stream, transport)) {
4012         gst_rtsp_transport_free (transport);
4013         goto complete_stream_failed;
4014       }
4015
4016       gst_rtsp_transport_free (transport);
4017       gst_rtsp_stream_set_blocked (stream, FALSE);
4018     }
4019
4020     /* FIXME:
4021      * the creation of the transports string depends on
4022      * calling stream_get_server_port, which only starts returning
4023      * something meaningful after a call to stream_allocate_udp_sockets
4024      * has been made, this function expects a transport that we parse
4025      * from the transport string ...
4026      *
4027      * Significant refactoring is in order, but does not look entirely
4028      * trivial, for now we put a band aid on and create a second transport
4029      * string after the stream has been completed, to pass it in
4030      * the request headers instead of the previous, incomplete one.
4031      */
4032     g_free (transports);
4033     transports = NULL;
4034     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
4035         protocols & protocol_masks[mask], cur_profile, &transports);
4036
4037     if (res < 0 || transports == NULL)
4038       goto setup_transport_failed;
4039
4040     /* select transport */
4041     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_TRANSPORT, transports);
4042
4043     /* handle the code ourselves */
4044     res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
4045     if (res < 0)
4046       goto send_error;
4047
4048     switch (code) {
4049       case GST_RTSP_STS_OK:
4050         break;
4051       case GST_RTSP_STS_UNSUPPORTED_TRANSPORT:
4052         gst_rtsp_message_unset (&request);
4053         gst_rtsp_message_unset (&response);
4054
4055         /* Try another profile. If no more, move to the next protocol */
4056         profile_mask++;
4057         while (profile_masks[profile_mask]
4058             && !(profiles & profile_masks[profile_mask]))
4059           profile_mask++;
4060         if (profile_masks[profile_mask])
4061           goto retry;
4062
4063         /* select next available protocol, give up on this stream if none */
4064         /* Reset profiles to try: */
4065         profile_mask = 0;
4066
4067         mask++;
4068         while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
4069           mask++;
4070         if (!protocol_masks[mask])
4071           continue;
4072         else
4073           goto retry;
4074       default:
4075         goto response_error;
4076     }
4077
4078     /* parse response transport */
4079     {
4080       gchar *resptrans = NULL;
4081       GstRTSPTransport *transport;
4082
4083       gst_rtsp_message_get_header (&response, GST_RTSP_HDR_TRANSPORT,
4084           &resptrans, 0);
4085       if (!resptrans) {
4086         goto no_transport;
4087       }
4088
4089       gst_rtsp_transport_new (&transport);
4090
4091       /* parse transport, go to next stream on parse error */
4092       if (gst_rtsp_transport_parse (resptrans, transport) != GST_RTSP_OK) {
4093         GST_WARNING_OBJECT (sink, "failed to parse transport %s", resptrans);
4094         goto next;
4095       }
4096
4097       /* update allowed transports for other streams. once the transport of
4098        * one stream has been determined, we make sure that all other streams
4099        * are configured in the same way */
4100       switch (transport->lower_transport) {
4101         case GST_RTSP_LOWER_TRANS_TCP:
4102           GST_DEBUG_OBJECT (sink, "stream %p as TCP interleaved", stream);
4103           protocols = GST_RTSP_LOWER_TRANS_TCP;
4104           sink->interleaved = TRUE;
4105           /* update free channels */
4106           sink->free_channel =
4107               MAX (transport->interleaved.min, sink->free_channel);
4108           sink->free_channel =
4109               MAX (transport->interleaved.max, sink->free_channel);
4110           sink->free_channel++;
4111           break;
4112         case GST_RTSP_LOWER_TRANS_UDP_MCAST:
4113           /* only allow multicast for other streams */
4114           GST_DEBUG_OBJECT (sink, "stream %p as UDP multicast", stream);
4115           protocols = GST_RTSP_LOWER_TRANS_UDP_MCAST;
4116           break;
4117         case GST_RTSP_LOWER_TRANS_UDP:
4118           /* only allow unicast for other streams */
4119           GST_DEBUG_OBJECT (sink, "stream %p as UDP unicast", stream);
4120           protocols = GST_RTSP_LOWER_TRANS_UDP;
4121           /* Update transport with server destination if not provided by the server */
4122           if (transport->destination == NULL) {
4123             transport->destination = g_strdup (sink->server_ip);
4124           }
4125           break;
4126         default:
4127           GST_DEBUG_OBJECT (sink, "stream %p unknown transport %d", stream,
4128               transport->lower_transport);
4129           break;
4130       }
4131
4132       if (!retry) {
4133         GST_DEBUG ("Configuring the stream transport for stream %d",
4134             context->index);
4135         if (context->stream_transport == NULL)
4136           context->stream_transport =
4137               gst_rtsp_stream_transport_new (stream, transport);
4138         else
4139           gst_rtsp_stream_transport_set_transport (context->stream_transport,
4140               transport);
4141
4142         if (transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
4143           /* our callbacks to send data on this TCP connection */
4144           gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
4145               (GstRTSPSendFunc) do_send_data,
4146               (GstRTSPSendFunc) do_send_data, context, NULL);
4147         }
4148
4149         /* The stream_transport now owns the transport */
4150         transport = NULL;
4151
4152         gst_rtsp_stream_transport_set_active (context->stream_transport, TRUE);
4153       }
4154     next:
4155       if (transport)
4156         gst_rtsp_transport_free (transport);
4157       /* clean up used RTSP messages */
4158       gst_rtsp_message_unset (&request);
4159       gst_rtsp_message_unset (&response);
4160     }
4161   }
4162   GST_RTSP_STATE_UNLOCK (sink);
4163
4164   /* store the transport protocol that was configured */
4165   sink->cur_protocols = protocols;
4166
4167   return res;
4168
4169 no_streams:
4170   {
4171     GST_RTSP_STATE_UNLOCK (sink);
4172     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4173         ("SDP contains no streams"));
4174     return GST_RTSP_ERROR;
4175   }
4176 setup_transport_failed:
4177   {
4178     GST_RTSP_STATE_UNLOCK (sink);
4179     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4180         ("Could not setup transport."));
4181     res = GST_RTSP_ERROR;
4182     goto cleanup_error;
4183   }
4184 no_profiles:
4185   {
4186     GST_RTSP_STATE_UNLOCK (sink);
4187     /* no transport possible, post an error and stop */
4188     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
4189         ("Could not connect to server, no profiles left"));
4190     return GST_RTSP_ERROR;
4191   }
4192 no_protocols:
4193   {
4194     GST_RTSP_STATE_UNLOCK (sink);
4195     /* no transport possible, post an error and stop */
4196     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
4197         ("Could not connect to server, no protocols left"));
4198     return GST_RTSP_ERROR;
4199   }
4200 no_transport:
4201   {
4202     GST_RTSP_STATE_UNLOCK (sink);
4203     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4204         ("Server did not select transport."));
4205     res = GST_RTSP_ERROR;
4206     goto cleanup_error;
4207   }
4208 create_request_failed:
4209   {
4210     gchar *str = gst_rtsp_strresult (res);
4211
4212     GST_RTSP_STATE_UNLOCK (sink);
4213     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4214         ("Could not create request. (%s)", str));
4215     g_free (str);
4216     goto cleanup_error;
4217   }
4218 parse_transport_failed:
4219   {
4220     GST_RTSP_STATE_UNLOCK (sink);
4221     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4222         ("Could not parse transport."));
4223     res = GST_RTSP_ERROR;
4224     goto cleanup_error;
4225   }
4226 allocate_udp_ports_failed:
4227   {
4228     GST_RTSP_STATE_UNLOCK (sink);
4229     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4230         ("Could not parse transport."));
4231     res = GST_RTSP_ERROR;
4232     goto cleanup_error;
4233   }
4234 complete_stream_failed:
4235   {
4236     GST_RTSP_STATE_UNLOCK (sink);
4237     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
4238         ("Could not parse transport."));
4239     res = GST_RTSP_ERROR;
4240     goto cleanup_error;
4241   }
4242 send_error:
4243   {
4244     gchar *str = gst_rtsp_strresult (res);
4245
4246     GST_RTSP_STATE_UNLOCK (sink);
4247     if (res != GST_RTSP_EINTR) {
4248       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4249           ("Could not send message. (%s)", str));
4250     } else {
4251       GST_WARNING_OBJECT (sink, "send interrupted");
4252     }
4253     g_free (str);
4254     goto cleanup_error;
4255   }
4256 response_error:
4257   {
4258     const gchar *str = gst_rtsp_status_as_text (code);
4259
4260     GST_RTSP_STATE_UNLOCK (sink);
4261     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4262         ("Error (%d): %s", code, GST_STR_NULL (str)));
4263     res = GST_RTSP_ERROR;
4264     goto cleanup_error;
4265   }
4266 cleanup_error:
4267   {
4268     gst_rtsp_message_unset (&request);
4269     gst_rtsp_message_unset (&response);
4270     return res;
4271   }
4272 }
4273
4274 static GstRTSPResult
4275 gst_rtsp_client_sink_ensure_open (GstRTSPClientSink * sink, gboolean async)
4276 {
4277   GstRTSPResult res = GST_RTSP_OK;
4278
4279   if (sink->state < GST_RTSP_STATE_READY) {
4280     res = GST_RTSP_ERROR;
4281     if (sink->open_error) {
4282       GST_DEBUG_OBJECT (sink, "the stream was in error");
4283       goto done;
4284     }
4285     if (async)
4286       gst_rtsp_client_sink_loop_start_cmd (sink, CMD_OPEN);
4287
4288     if ((res = gst_rtsp_client_sink_open (sink, async)) < 0) {
4289       GST_DEBUG_OBJECT (sink, "failed to open stream");
4290       goto done;
4291     }
4292   }
4293
4294 done:
4295   return res;
4296 }
4297
4298 static GstRTSPResult
4299 gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
4300 {
4301   GstRTSPMessage request = { 0 };
4302   GstRTSPMessage response = { 0 };
4303   GstRTSPResult res = GST_RTSP_OK;
4304   GstSDPMessage *sdp;
4305   guint sdp_index = 0;
4306   GstSDPInfo info = { 0, };
4307   gchar *keymgmt;
4308   guint i;
4309
4310   const gchar *proto;
4311   gchar *sess_id, *client_ip, *str;
4312   GSocketAddress *sa;
4313   GInetAddress *ia;
4314   GSocket *conn_socket;
4315   GList *walk;
4316
4317   g_mutex_lock (&sink->preroll_lock);
4318   if (sink->state == GST_RTSP_STATE_PLAYING) {
4319     /* Already recording, don't send another request */
4320     GST_LOG_OBJECT (sink, "Already in RECORD. Skipping duplicate request.");
4321     g_mutex_unlock (&sink->preroll_lock);
4322     goto done;
4323   }
4324   g_mutex_unlock (&sink->preroll_lock);
4325
4326   /* Collect all our input streams and create
4327    * stream objects before actually returning.
4328    * The streams are blocked at this point as we do not have any transport
4329    * parts yet. */
4330   gst_rtsp_client_sink_collect_streams (sink);
4331
4332   g_mutex_lock (&sink->block_streams_lock);
4333   /* Wait for streams to be blocked */
4334   while (!sink->streams_blocked) {
4335     GST_DEBUG_OBJECT (sink, "waiting for streams to be blocked");
4336     g_cond_wait (&sink->block_streams_cond, &sink->block_streams_lock);
4337   }
4338   g_mutex_unlock (&sink->block_streams_lock);
4339
4340   /* Send announce, then setup for all streams */
4341   gst_sdp_message_init (&sink->cursdp);
4342   sdp = &sink->cursdp;
4343
4344   /* some standard things first */
4345   gst_sdp_message_set_version (sdp, "0");
4346
4347   /* session ID doesn't have to be super-unique in this case */
4348   sess_id = g_strdup_printf ("%u", g_random_int ());
4349
4350   if (sink->conninfo.connection == NULL)
4351     return GST_RTSP_ERROR;
4352
4353   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
4354
4355   sa = g_socket_get_local_address (conn_socket, NULL);
4356   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
4357   client_ip = g_inet_address_to_string (ia);
4358   if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV6) {
4359     info.is_ipv6 = TRUE;
4360     proto = "IP6";
4361   } else if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV4)
4362     proto = "IP4";
4363   else
4364     g_assert_not_reached ();
4365   g_object_unref (sa);
4366
4367   /* FIXME: Should this actually be the server's IP or ours? */
4368   info.server_ip = sink->server_ip;
4369
4370   gst_sdp_message_set_origin (sdp, "-", sess_id, "1", "IN", proto, client_ip);
4371
4372   gst_sdp_message_set_session_name (sdp, "Session streamed with GStreamer");
4373   gst_sdp_message_set_information (sdp, "rtspclientsink");
4374   gst_sdp_message_add_time (sdp, "0", "0", NULL);
4375   gst_sdp_message_add_attribute (sdp, "tool", "GStreamer");
4376
4377   /* add stream */
4378   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4379     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
4380
4381     gst_rtsp_sdp_from_stream (sdp, &info, context->stream);
4382     context->sdp_index = sdp_index++;
4383   }
4384
4385   g_free (sess_id);
4386   g_free (client_ip);
4387
4388   /* send ANNOUNCE request */
4389   GST_DEBUG_OBJECT (sink, "create ANNOUNCE request...");
4390   res =
4391       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_ANNOUNCE,
4392       sink->conninfo.url_str);
4393   if (res < 0)
4394     goto create_request_failed;
4395
4396   gst_rtsp_message_add_header (&request, GST_RTSP_HDR_CONTENT_TYPE,
4397       "application/sdp");
4398
4399   /* add SDP to the request body */
4400   str = gst_sdp_message_as_text (sdp);
4401   gst_rtsp_message_take_body (&request, (guint8 *) str, strlen (str));
4402
4403   /* send ANNOUNCE */
4404   GST_DEBUG_OBJECT (sink, "sending announce...");
4405
4406   if (async)
4407     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record",
4408         ("Sending server stream info"));
4409
4410   if ((res =
4411           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4412               &response, NULL)) < 0)
4413     goto send_error;
4414
4415   /* parse the keymgmt */
4416   i = 0;
4417   walk = sink->contexts;
4418   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_KEYMGMT,
4419           &keymgmt, i++) == GST_RTSP_OK) {
4420     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
4421     walk = g_list_next (walk);
4422     if (!gst_rtsp_stream_handle_keymgmt (context->stream, keymgmt))
4423       goto keymgmt_error;
4424   }
4425
4426   /* send setup for all streams */
4427   if ((res = gst_rtsp_client_sink_setup_streams (sink, async)) < 0)
4428     goto setup_failed;
4429
4430   res = gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_RECORD,
4431       sink->conninfo.url_str);
4432
4433   if (res < 0)
4434     goto create_request_failed;
4435
4436 #if 0                           /* FIXME: Configure a range based on input segments? */
4437   if (src->need_range) {
4438     hval = gen_range_header (src, segment);
4439
4440     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_RANGE, hval);
4441   }
4442
4443   if (segment->rate != 1.0) {
4444     gchar hval[G_ASCII_DTOSTR_BUF_SIZE];
4445
4446     g_ascii_dtostr (hval, sizeof (hval), segment->rate);
4447     if (src->skip)
4448       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SCALE, hval);
4449     else
4450       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SPEED, hval);
4451   }
4452 #endif
4453
4454   if (async)
4455     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
4456   if ((res =
4457           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4458               &response, NULL)) < 0)
4459     goto send_error;
4460
4461 #if 0                           /* FIXME: Check if servers return these for record: */
4462   /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp
4463    * for the RTP packets. If this is not present, we assume all starts from 0...
4464    * This is info for the RTP session manager that we pass to it in caps. */
4465   hval_idx = 0;
4466   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTP_INFO,
4467           &hval, hval_idx++) == GST_RTSP_OK)
4468     gst_rtspsrc_parse_rtpinfo (src, hval);
4469
4470   /* some servers indicate RTCP parameters in PLAY response,
4471    * rather than properly in SDP */
4472   if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTCP_INTERVAL,
4473           &hval, 0) == GST_RTSP_OK)
4474     gst_rtspsrc_handle_rtcp_interval (src, hval);
4475 #endif
4476
4477   gst_rtsp_client_sink_set_state (sink, GST_STATE_PLAYING);
4478   sink->state = GST_RTSP_STATE_PLAYING;
4479
4480   /* clean up any messages */
4481   gst_rtsp_message_unset (&request);
4482   gst_rtsp_message_unset (&response);
4483
4484 done:
4485   return res;
4486
4487 create_request_failed:
4488   {
4489     gchar *str = gst_rtsp_strresult (res);
4490
4491     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4492         ("Could not create request. (%s)", str));
4493     g_free (str);
4494     goto cleanup_error;
4495   }
4496 send_error:
4497   {
4498     /* Don't post a message - the rtsp_send method will have
4499      * taken care of it because we passed NULL for the response code */
4500     goto cleanup_error;
4501   }
4502 keymgmt_error:
4503   {
4504     GST_ELEMENT_ERROR (sink, STREAM, DECRYPT_NOKEY, (NULL),
4505         ("Could not handle KeyMgmt"));
4506   }
4507 setup_failed:
4508   {
4509     GST_ERROR_OBJECT (sink, "setup failed");
4510     goto cleanup_error;
4511   }
4512 cleanup_error:
4513   {
4514     if (sink->conninfo.connection) {
4515       GST_DEBUG_OBJECT (sink, "free connection");
4516       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
4517     }
4518     gst_rtsp_message_unset (&request);
4519     gst_rtsp_message_unset (&response);
4520     return res;
4521   }
4522 }
4523
4524 static GstRTSPResult
4525 gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
4526 {
4527   GstRTSPResult res = GST_RTSP_OK;
4528   GstRTSPMessage request = { 0 };
4529   GstRTSPMessage response = { 0 };
4530   GList *walk;
4531   const gchar *control;
4532
4533   GST_DEBUG_OBJECT (sink, "PAUSE...");
4534
4535   if ((res = gst_rtsp_client_sink_ensure_open (sink, async)) < 0)
4536     goto open_failed;
4537
4538   if (!(sink->methods & GST_RTSP_PAUSE))
4539     goto not_supported;
4540
4541   if (sink->state == GST_RTSP_STATE_READY)
4542     goto was_paused;
4543
4544   if (!sink->conninfo.connection || !sink->conninfo.connected)
4545     goto no_connection;
4546
4547   /* construct a control url */
4548   control = get_aggregate_control (sink);
4549
4550   /* loop over the streams. We might exit the loop early when we could do an
4551    * aggregate control */
4552   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4553     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
4554     GstRTSPConnInfo *info;
4555     const gchar *setup_url;
4556
4557     /* try aggregate control first but do non-aggregate control otherwise */
4558     if (control)
4559       setup_url = control;
4560     else if ((setup_url = stream->conninfo.location) == NULL)
4561       continue;
4562
4563     if (sink->conninfo.connection) {
4564       info = &sink->conninfo;
4565     } else if (stream->conninfo.connection) {
4566       info = &stream->conninfo;
4567     } else {
4568       continue;
4569     }
4570
4571     if (async)
4572       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request",
4573           ("Sending PAUSE request"));
4574
4575     if ((res =
4576             gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_PAUSE,
4577                 setup_url)) < 0)
4578       goto create_request_failed;
4579
4580     if ((res =
4581             gst_rtsp_client_sink_send (sink, info, &request, &response,
4582                 NULL)) < 0)
4583       goto send_error;
4584
4585     gst_rtsp_message_unset (&request);
4586     gst_rtsp_message_unset (&response);
4587
4588     /* exit early when we did agregate control */
4589     if (control)
4590       break;
4591   }
4592
4593   /* change element states now */
4594   gst_rtsp_client_sink_set_state (sink, GST_STATE_PAUSED);
4595
4596 no_connection:
4597   sink->state = GST_RTSP_STATE_READY;
4598
4599 done:
4600   if (async)
4601     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_PAUSE, res);
4602
4603   return res;
4604
4605   /* ERRORS */
4606 open_failed:
4607   {
4608     GST_DEBUG_OBJECT (sink, "failed to open stream");
4609     goto done;
4610   }
4611 not_supported:
4612   {
4613     GST_DEBUG_OBJECT (sink, "PAUSE is not supported");
4614     goto done;
4615   }
4616 was_paused:
4617   {
4618     GST_DEBUG_OBJECT (sink, "we were already PAUSED");
4619     goto done;
4620   }
4621 create_request_failed:
4622   {
4623     gchar *str = gst_rtsp_strresult (res);
4624
4625     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4626         ("Could not create request. (%s)", str));
4627     g_free (str);
4628     goto done;
4629   }
4630 send_error:
4631   {
4632     gchar *str = gst_rtsp_strresult (res);
4633
4634     gst_rtsp_message_unset (&request);
4635     if (res != GST_RTSP_EINTR) {
4636       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4637           ("Could not send message. (%s)", str));
4638     } else {
4639       GST_WARNING_OBJECT (sink, "PAUSE interrupted");
4640     }
4641     g_free (str);
4642     goto done;
4643   }
4644 }
4645
4646 static void
4647 gst_rtsp_client_sink_handle_message (GstBin * bin, GstMessage * message)
4648 {
4649   GstRTSPClientSink *rtsp_client_sink;
4650
4651   rtsp_client_sink = GST_RTSP_CLIENT_SINK (bin);
4652
4653   switch (GST_MESSAGE_TYPE (message)) {
4654     case GST_MESSAGE_ELEMENT:
4655     {
4656       const GstStructure *s = gst_message_get_structure (message);
4657
4658       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
4659         gboolean ignore_timeout;
4660
4661         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
4662
4663         GST_OBJECT_LOCK (rtsp_client_sink);
4664         ignore_timeout = rtsp_client_sink->ignore_timeout;
4665         rtsp_client_sink->ignore_timeout = TRUE;
4666         GST_OBJECT_UNLOCK (rtsp_client_sink);
4667
4668         /* we only act on the first udp timeout message, others are irrelevant
4669          * and can be ignored. */
4670         if (!ignore_timeout)
4671           gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECONNECT,
4672               CMD_LOOP);
4673         /* eat and free */
4674         gst_message_unref (message);
4675         return;
4676       } else if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
4677         /* An RTSPStream has prerolled */
4678         GST_DEBUG_OBJECT (rtsp_client_sink, "received GstRTSPStreamBlocking");
4679         g_mutex_lock (&rtsp_client_sink->block_streams_lock);
4680         rtsp_client_sink->streams_blocked = TRUE;
4681         g_cond_broadcast (&rtsp_client_sink->block_streams_cond);
4682         g_mutex_unlock (&rtsp_client_sink->block_streams_lock);
4683       }
4684       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4685       break;
4686     }
4687     case GST_MESSAGE_ASYNC_START:{
4688       GstObject *sender;
4689
4690       sender = GST_MESSAGE_SRC (message);
4691
4692       GST_LOG_OBJECT (rtsp_client_sink,
4693           "Have async-start from %" GST_PTR_FORMAT, sender);
4694       if (sender == GST_OBJECT (rtsp_client_sink->internal_bin)) {
4695         GST_LOG_OBJECT (rtsp_client_sink, "child bin is now ASYNC");
4696       }
4697       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4698       break;
4699     }
4700     case GST_MESSAGE_ASYNC_DONE:
4701     {
4702       GstObject *sender;
4703       gboolean need_async_done;
4704
4705       sender = GST_MESSAGE_SRC (message);
4706       GST_LOG_OBJECT (rtsp_client_sink, "Have async-done from %" GST_PTR_FORMAT,
4707           sender);
4708
4709       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4710       if (sender == GST_OBJECT_CAST (rtsp_client_sink->internal_bin)) {
4711         GST_LOG_OBJECT (rtsp_client_sink, "child bin is no longer ASYNC");
4712       }
4713       need_async_done = rtsp_client_sink->in_async;
4714       if (rtsp_client_sink->in_async) {
4715         rtsp_client_sink->in_async = FALSE;
4716         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4717       }
4718       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4719
4720       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4721
4722       if (need_async_done) {
4723         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-DONE");
4724         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4725             gst_message_new_async_done (GST_OBJECT_CAST (rtsp_client_sink),
4726                 GST_CLOCK_TIME_NONE));
4727       }
4728       break;
4729     }
4730     case GST_MESSAGE_ERROR:
4731     {
4732       GstObject *sender;
4733
4734       sender = GST_MESSAGE_SRC (message);
4735
4736       GST_DEBUG_OBJECT (rtsp_client_sink, "got error from %s",
4737           GST_ELEMENT_NAME (sender));
4738
4739       /* FIXME: Ignore errors on RTCP? */
4740       /* fatal but not our message, forward */
4741       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4742       break;
4743     }
4744     case GST_MESSAGE_STATE_CHANGED:
4745     {
4746       if (GST_MESSAGE_SRC (message) ==
4747           (GstObject *) rtsp_client_sink->internal_bin) {
4748         GstState newstate, pending;
4749         gst_message_parse_state_changed (message, NULL, &newstate, &pending);
4750         g_mutex_lock (&rtsp_client_sink->preroll_lock);
4751         rtsp_client_sink->prerolled = (newstate >= GST_STATE_PAUSED)
4752             && pending == GST_STATE_VOID_PENDING;
4753         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4754         g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4755         GST_DEBUG_OBJECT (bin,
4756             "Internal bin changed state to %s (pending %s). Prerolled now %d",
4757             gst_element_state_get_name (newstate),
4758             gst_element_state_get_name (pending), rtsp_client_sink->prerolled);
4759       }
4760       /* fallthrough */
4761     }
4762     default:
4763     {
4764       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4765       break;
4766     }
4767   }
4768 }
4769
4770 /* the thread where everything happens */
4771 static void
4772 gst_rtsp_client_sink_thread (GstRTSPClientSink * sink)
4773 {
4774   gint cmd;
4775
4776   GST_OBJECT_LOCK (sink);
4777   cmd = sink->pending_cmd;
4778   if (cmd == CMD_RECONNECT || cmd == CMD_RECORD || cmd == CMD_PAUSE
4779       || cmd == CMD_LOOP || cmd == CMD_OPEN)
4780     sink->pending_cmd = CMD_LOOP;
4781   else
4782     sink->pending_cmd = CMD_WAIT;
4783   GST_DEBUG_OBJECT (sink, "got command %s", cmd_to_string (cmd));
4784
4785   /* we got the message command, so ensure communication is possible again */
4786   gst_rtsp_client_sink_connection_flush (sink, FALSE);
4787
4788   sink->busy_cmd = cmd;
4789   GST_OBJECT_UNLOCK (sink);
4790
4791   switch (cmd) {
4792     case CMD_OPEN:
4793       if (gst_rtsp_client_sink_open (sink, TRUE) == GST_RTSP_ERROR)
4794         gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT,
4795             CMD_ALL & ~CMD_CLOSE);
4796       break;
4797     case CMD_RECORD:
4798       gst_rtsp_client_sink_record (sink, TRUE);
4799       break;
4800     case CMD_PAUSE:
4801       gst_rtsp_client_sink_pause (sink, TRUE);
4802       break;
4803     case CMD_CLOSE:
4804       gst_rtsp_client_sink_close (sink, TRUE, FALSE);
4805       break;
4806     case CMD_LOOP:
4807       gst_rtsp_client_sink_loop (sink);
4808       break;
4809     case CMD_RECONNECT:
4810       gst_rtsp_client_sink_reconnect (sink, FALSE);
4811       break;
4812     default:
4813       break;
4814   }
4815
4816   GST_OBJECT_LOCK (sink);
4817   /* and go back to sleep */
4818   if (sink->pending_cmd == CMD_WAIT) {
4819     if (sink->task)
4820       gst_task_pause (sink->task);
4821   }
4822   /* reset waiting */
4823   sink->busy_cmd = CMD_WAIT;
4824   GST_OBJECT_UNLOCK (sink);
4825 }
4826
4827 static gboolean
4828 gst_rtsp_client_sink_start (GstRTSPClientSink * sink)
4829 {
4830   GST_DEBUG_OBJECT (sink, "starting");
4831
4832   sink->streams_collected = FALSE;
4833   gst_element_set_locked_state (GST_ELEMENT (sink->internal_bin), TRUE);
4834
4835   gst_rtsp_client_sink_set_state (sink, GST_STATE_READY);
4836
4837   GST_OBJECT_LOCK (sink);
4838   sink->pending_cmd = CMD_WAIT;
4839
4840   if (sink->task == NULL) {
4841     sink->task =
4842         gst_task_new ((GstTaskFunction) gst_rtsp_client_sink_thread, sink,
4843         NULL);
4844     if (sink->task == NULL)
4845       goto task_error;
4846
4847     gst_task_set_lock (sink->task, GST_RTSP_STREAM_GET_LOCK (sink));
4848   }
4849   GST_OBJECT_UNLOCK (sink);
4850
4851   return TRUE;
4852
4853   /* ERRORS */
4854 task_error:
4855   {
4856     GST_OBJECT_UNLOCK (sink);
4857     GST_ERROR_OBJECT (sink, "failed to create task");
4858     return FALSE;
4859   }
4860 }
4861
4862 static gboolean
4863 gst_rtsp_client_sink_stop (GstRTSPClientSink * sink)
4864 {
4865   GstTask *task;
4866
4867   GST_DEBUG_OBJECT (sink, "stopping");
4868
4869   /* also cancels pending task */
4870   gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_ALL & ~CMD_CLOSE);
4871
4872   GST_OBJECT_LOCK (sink);
4873   if ((task = sink->task)) {
4874     sink->task = NULL;
4875     GST_OBJECT_UNLOCK (sink);
4876
4877     gst_task_stop (task);
4878
4879     /* make sure it is not running */
4880     GST_RTSP_STREAM_LOCK (sink);
4881     GST_RTSP_STREAM_UNLOCK (sink);
4882
4883     /* now wait for the task to finish */
4884     gst_task_join (task);
4885
4886     /* and free the task */
4887     gst_object_unref (GST_OBJECT (task));
4888
4889     GST_OBJECT_LOCK (sink);
4890   }
4891   GST_OBJECT_UNLOCK (sink);
4892
4893   /* ensure synchronously all is closed and clean */
4894   gst_rtsp_client_sink_close (sink, FALSE, TRUE);
4895
4896   return TRUE;
4897 }
4898
4899 static GstStateChangeReturn
4900 gst_rtsp_client_sink_change_state (GstElement * element,
4901     GstStateChange transition)
4902 {
4903   GstRTSPClientSink *rtsp_client_sink;
4904   GstStateChangeReturn ret;
4905
4906   rtsp_client_sink = GST_RTSP_CLIENT_SINK (element);
4907
4908   switch (transition) {
4909     case GST_STATE_CHANGE_NULL_TO_READY:
4910       if (!gst_rtsp_client_sink_start (rtsp_client_sink))
4911         goto start_failed;
4912       break;
4913     case GST_STATE_CHANGE_READY_TO_PAUSED:
4914       /* init some state */
4915       rtsp_client_sink->cur_protocols = rtsp_client_sink->protocols;
4916       /* first attempt, don't ignore timeouts */
4917       rtsp_client_sink->ignore_timeout = FALSE;
4918       rtsp_client_sink->open_error = FALSE;
4919
4920       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_PAUSED);
4921
4922       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4923       if (rtsp_client_sink->in_async) {
4924         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-START");
4925         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4926             gst_message_new_async_start (GST_OBJECT_CAST (rtsp_client_sink)));
4927       }
4928       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4929
4930       break;
4931     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4932       /* fall-through */
4933     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4934       /* unblock the tcp tasks and make the loop waiting */
4935       if (gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_WAIT,
4936               CMD_LOOP)) {
4937         /* make sure it is waiting before we send PLAY below */
4938         GST_RTSP_STREAM_LOCK (rtsp_client_sink);
4939         GST_RTSP_STREAM_UNLOCK (rtsp_client_sink);
4940       }
4941       break;
4942     case GST_STATE_CHANGE_PAUSED_TO_READY:
4943       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_READY);
4944       break;
4945     default:
4946       break;
4947   }
4948
4949   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4950   if (ret == GST_STATE_CHANGE_FAILURE)
4951     goto done;
4952
4953   switch (transition) {
4954     case GST_STATE_CHANGE_NULL_TO_READY:
4955       ret = GST_STATE_CHANGE_SUCCESS;
4956       break;
4957     case GST_STATE_CHANGE_READY_TO_PAUSED:
4958       /* Return ASYNC and preroll input streams */
4959       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4960       if (rtsp_client_sink->in_async)
4961         ret = GST_STATE_CHANGE_ASYNC;
4962       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4963       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_OPEN, 0);
4964
4965       /* CMD_OPEN has been scheduled. Wait until the sink thread starts
4966        * opening connection to the server */
4967       g_mutex_lock (&rtsp_client_sink->open_conn_lock);
4968       while (!rtsp_client_sink->open_conn_start) {
4969         GST_DEBUG_OBJECT (rtsp_client_sink,
4970             "wait for connection to be started");
4971         g_cond_wait (&rtsp_client_sink->open_conn_cond,
4972             &rtsp_client_sink->open_conn_lock);
4973       }
4974       rtsp_client_sink->open_conn_start = FALSE;
4975       g_mutex_unlock (&rtsp_client_sink->open_conn_lock);
4976       break;
4977     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:{
4978       GST_DEBUG_OBJECT (rtsp_client_sink,
4979           "Switching to playing -sending RECORD");
4980       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECORD, 0);
4981       ret = GST_STATE_CHANGE_SUCCESS;
4982       break;
4983     }
4984     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4985       /* send pause request and keep the idle task around */
4986       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_PAUSE,
4987           CMD_LOOP);
4988       ret = GST_STATE_CHANGE_NO_PREROLL;
4989       break;
4990     case GST_STATE_CHANGE_PAUSED_TO_READY:
4991       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_CLOSE,
4992           CMD_PAUSE);
4993       ret = GST_STATE_CHANGE_SUCCESS;
4994       break;
4995     case GST_STATE_CHANGE_READY_TO_NULL:
4996       gst_rtsp_client_sink_stop (rtsp_client_sink);
4997       ret = GST_STATE_CHANGE_SUCCESS;
4998       break;
4999     default:
5000       break;
5001   }
5002
5003 done:
5004   return ret;
5005
5006 start_failed:
5007   {
5008     GST_DEBUG_OBJECT (rtsp_client_sink, "start failed");
5009     return GST_STATE_CHANGE_FAILURE;
5010   }
5011 }
5012
5013 /*** GSTURIHANDLER INTERFACE *************************************************/
5014
5015 static GstURIType
5016 gst_rtsp_client_sink_uri_get_type (GType type)
5017 {
5018   return GST_URI_SINK;
5019 }
5020
5021 static const gchar *const *
5022 gst_rtsp_client_sink_uri_get_protocols (GType type)
5023 {
5024   static const gchar *protocols[] =
5025       { "rtsp", "rtspu", "rtspt", "rtsph", "rtsp-sdp",
5026     "rtsps", "rtspsu", "rtspst", "rtspsh", NULL
5027   };
5028
5029   return protocols;
5030 }
5031
5032 static gchar *
5033 gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler)
5034 {
5035   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (handler);
5036
5037   /* FIXME: make thread-safe */
5038   return g_strdup (sink->conninfo.location);
5039 }
5040
5041 static gboolean
5042 gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
5043     GError ** error)
5044 {
5045   GstRTSPClientSink *sink;
5046   GstRTSPResult res;
5047   GstSDPResult sres;
5048   GstRTSPUrl *newurl = NULL;
5049   GstSDPMessage *sdp = NULL;
5050
5051   sink = GST_RTSP_CLIENT_SINK (handler);
5052
5053   /* same URI, we're fine */
5054   if (sink->conninfo.location && uri && !strcmp (uri, sink->conninfo.location))
5055     goto was_ok;
5056
5057   if (g_str_has_prefix (uri, "rtsp-sdp://")) {
5058     sres = gst_sdp_message_new (&sdp);
5059     if (sres < 0)
5060       goto sdp_failed;
5061
5062     GST_DEBUG_OBJECT (sink, "parsing SDP message");
5063     sres = gst_sdp_message_parse_uri (uri, sdp);
5064     if (sres < 0)
5065       goto invalid_sdp;
5066   } else {
5067     /* try to parse */
5068     GST_DEBUG_OBJECT (sink, "parsing URI");
5069     if ((res = gst_rtsp_url_parse (uri, &newurl)) < 0)
5070       goto parse_error;
5071   }
5072
5073   /* if worked, free previous and store new url object along with the original
5074    * location. */
5075   GST_DEBUG_OBJECT (sink, "configuring URI");
5076   g_free (sink->conninfo.location);
5077   sink->conninfo.location = g_strdup (uri);
5078   gst_rtsp_url_free (sink->conninfo.url);
5079   sink->conninfo.url = newurl;
5080   g_free (sink->conninfo.url_str);
5081   if (newurl)
5082     sink->conninfo.url_str = gst_rtsp_url_get_request_uri (sink->conninfo.url);
5083   else
5084     sink->conninfo.url_str = NULL;
5085
5086   if (sink->uri_sdp)
5087     gst_sdp_message_free (sink->uri_sdp);
5088   sink->uri_sdp = sdp;
5089   sink->from_sdp = sdp != NULL;
5090
5091   GST_DEBUG_OBJECT (sink, "set uri: %s", GST_STR_NULL (uri));
5092   GST_DEBUG_OBJECT (sink, "request uri is: %s",
5093       GST_STR_NULL (sink->conninfo.url_str));
5094
5095   return TRUE;
5096
5097   /* Special cases */
5098 was_ok:
5099   {
5100     GST_DEBUG_OBJECT (sink, "URI was ok: '%s'", GST_STR_NULL (uri));
5101     return TRUE;
5102   }
5103 sdp_failed:
5104   {
5105     GST_ERROR_OBJECT (sink, "Could not create new SDP (%d)", sres);
5106     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5107         "Could not create SDP");
5108     return FALSE;
5109   }
5110 invalid_sdp:
5111   {
5112     GST_ERROR_OBJECT (sink, "Not a valid SDP (%d) '%s'", sres,
5113         GST_STR_NULL (uri));
5114     gst_sdp_message_free (sdp);
5115     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5116         "Invalid SDP");
5117     return FALSE;
5118   }
5119 parse_error:
5120   {
5121     GST_ERROR_OBJECT (sink, "Not a valid RTSP url '%s' (%d)",
5122         GST_STR_NULL (uri), res);
5123     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
5124         "Invalid RTSP URI");
5125     return FALSE;
5126   }
5127 }
5128
5129 static void
5130 gst_rtsp_client_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
5131 {
5132   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
5133
5134   iface->get_type = gst_rtsp_client_sink_uri_get_type;
5135   iface->get_protocols = gst_rtsp_client_sink_uri_get_protocols;
5136   iface->get_uri = gst_rtsp_client_sink_uri_get_uri;
5137   iface->set_uri = gst_rtsp_client_sink_uri_set_uri;
5138 }