rtspclientsink: Use a mutex for protecting against concurrent send/receives
[platform/upstream/gstreamer.git] / gst / rtsp-sink / gstrtspclientsink.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim at fluendo dot com>
3  *               <2006> Lutz Mueller <lutz at topfrose dot de>
4  *               <2015> Jan Schmidt <jan at centricular dot com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /*
22  * Unless otherwise indicated, Source Code is licensed under MIT license.
23  * See further explanation attached in License Statement (distributed in the file
24  * LICENSE).
25  *
26  * Permission is hereby granted, free of charge, to any person obtaining a copy of
27  * this software and associated documentation files (the "Software"), to deal in
28  * the Software without restriction, including without limitation the rights to
29  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
30  * of the Software, and to permit persons to whom the Software is furnished to do
31  * so, subject to the following conditions:
32  *
33  * The above copyright notice and this permission notice shall be included in all
34  * copies or substantial portions of the Software.
35  *
36  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
37  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
38  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
39  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
40  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
41  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
42  * SOFTWARE.
43  */
44 /**
45  * SECTION:element-rtspclientsink
46  *
47  * Makes a connection to an RTSP server and send data via RTSP RECORD.
48  * rtspclientsink strictly follows RFC 2326
49  *
50  * RTSP supports transport over TCP or UDP in unicast or multicast mode. By
51  * default rtspclientsink will negotiate a connection in the following order:
52  * UDP unicast/UDP multicast/TCP. The order cannot be changed but the allowed
53  * protocols can be controlled with the #GstRTSPClientSink:protocols property.
54  *
55  * rtspclientsink will internally instantiate an RTP session manager element
56  * that will handle the RTCP messages to and from the server, jitter removal,
57  * and packet reordering.
58  * This feature is implemented using the gstrtpbin element.
59  *
60  * rtspclientsink accepts any stream for which there is an installed payloader,
61  * creates the payloader and manages payload-types, as well as RTX setup.
62  * The new-payloader signal is fired when a payloader is created, in case
63  * an app wants to do custom configuration (such as for MTU).
64  *
65  * <refsect2>
66  * <title>Example launch line</title>
67  * |[
68  * gst-launch-1.0 videotestsrc ! jpegenc ! rtspclientsink location=rtsp://some.server/url
69  * ]| Establish a connection to an RTSP server and send JPEG encoded video packets
70  * </refsect2>
71  */
72
73 /* FIXMEs
74  * - Handle EOS properly and shutdown. The problem with EOS is we don't know
75  *   when the server has received all data, so we don't know when to do teardown.
76  *   At the moment, we forward EOS to the app as soon as we stop sending. Is there
77  *   a way to know from the receiver that it's got all data? Some session timeout?
78  * - Implement extension support for Real / WMS if they support RECORD?
79  * - Add support for network clock synchronised streaming?
80  * - Fix crypto key nego so SAVP/SAVPF profiles work.
81  * - Test (&fix?) HTTP tunnel support
82  * - Add an address pool object for GstRTSPStreams to use for multicast
83  * - Test multicast UDP transport
84  */
85
86 #ifdef HAVE_CONFIG_H
87 #include "config.h"
88 #endif
89
90 #ifdef HAVE_UNISTD_H
91 #include <unistd.h>
92 #endif /* HAVE_UNISTD_H */
93 #include <stdlib.h>
94 #include <string.h>
95 #include <stdio.h>
96 #include <stdarg.h>
97
98 #include <gst/net/gstnet.h>
99 #include <gst/sdp/gstsdpmessage.h>
100 #include <gst/sdp/gstmikey.h>
101 #include <gst/rtp/rtp.h>
102
103 #include "gstrtspclientsink.h"
104
105 GST_DEBUG_CATEGORY_STATIC (rtsp_client_sink_debug);
106 #define GST_CAT_DEFAULT (rtsp_client_sink_debug)
107
108 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream_%u",
109     GST_PAD_SINK,
110     GST_PAD_REQUEST,
111     GST_STATIC_CAPS_ANY);       /* Actual caps come from available set of payloaders */
112
113 enum
114 {
115   SIGNAL_HANDLE_REQUEST,
116   SIGNAL_NEW_MANAGER,
117   SIGNAL_NEW_PAYLOADER,
118   SIGNAL_REQUEST_RTCP_KEY,
119   LAST_SIGNAL
120 };
121
122 enum _GstRTSPClientSinkNtpTimeSource
123 {
124   NTP_TIME_SOURCE_NTP,
125   NTP_TIME_SOURCE_UNIX,
126   NTP_TIME_SOURCE_RUNNING_TIME,
127   NTP_TIME_SOURCE_CLOCK_TIME
128 };
129
130 #define GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE (gst_rtsp_client_sink_ntp_time_source_get_type())
131 static GType
132 gst_rtsp_client_sink_ntp_time_source_get_type (void)
133 {
134   static GType ntp_time_source_type = 0;
135   static const GEnumValue ntp_time_source_values[] = {
136     {NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
137     {NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
138     {NTP_TIME_SOURCE_RUNNING_TIME,
139           "Running time based on pipeline clock",
140         "running-time"},
141     {NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
142     {0, NULL, NULL},
143   };
144
145   if (!ntp_time_source_type) {
146     ntp_time_source_type =
147         g_enum_register_static ("GstRTSPClientSinkNtpTimeSource",
148         ntp_time_source_values);
149   }
150   return ntp_time_source_type;
151 }
152
153 #define DEFAULT_LOCATION         NULL
154 #define DEFAULT_PROTOCOLS        GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP
155 #define DEFAULT_DEBUG            FALSE
156 #define DEFAULT_RETRY            20
157 #define DEFAULT_TIMEOUT          5000000
158 #define DEFAULT_UDP_BUFFER_SIZE  0x80000
159 #define DEFAULT_TCP_TIMEOUT      20000000
160 #define DEFAULT_LATENCY_MS       2000
161 #define DEFAULT_DO_RTSP_KEEP_ALIVE       TRUE
162 #define DEFAULT_PROXY            NULL
163 #define DEFAULT_RTP_BLOCKSIZE    0
164 #define DEFAULT_USER_ID          NULL
165 #define DEFAULT_USER_PW          NULL
166 #define DEFAULT_PORT_RANGE       NULL
167 #define DEFAULT_UDP_RECONNECT    TRUE
168 #define DEFAULT_MULTICAST_IFACE  NULL
169 #define DEFAULT_TLS_VALIDATION_FLAGS     G_TLS_CERTIFICATE_VALIDATE_ALL
170 #define DEFAULT_TLS_DATABASE     NULL
171 #define DEFAULT_TLS_INTERACTION     NULL
172 #define DEFAULT_NTP_TIME_SOURCE  NTP_TIME_SOURCE_NTP
173 #define DEFAULT_USER_AGENT       "GStreamer/" PACKAGE_VERSION
174 #define DEFAULT_PROFILES         GST_RTSP_PROFILE_AVP
175 #define DEFAULT_RTX_TIME_MS      500
176
177 enum
178 {
179   PROP_0,
180   PROP_LOCATION,
181   PROP_PROTOCOLS,
182   PROP_DEBUG,
183   PROP_RETRY,
184   PROP_TIMEOUT,
185   PROP_TCP_TIMEOUT,
186   PROP_LATENCY,
187   PROP_RTX_TIME,
188   PROP_DO_RTSP_KEEP_ALIVE,
189   PROP_PROXY,
190   PROP_PROXY_ID,
191   PROP_PROXY_PW,
192   PROP_RTP_BLOCKSIZE,
193   PROP_USER_ID,
194   PROP_USER_PW,
195   PROP_PORT_RANGE,
196   PROP_UDP_BUFFER_SIZE,
197   PROP_UDP_RECONNECT,
198   PROP_MULTICAST_IFACE,
199   PROP_SDES,
200   PROP_TLS_VALIDATION_FLAGS,
201   PROP_TLS_DATABASE,
202   PROP_TLS_INTERACTION,
203   PROP_NTP_TIME_SOURCE,
204   PROP_USER_AGENT,
205   PROP_PROFILES
206 };
207
208 static void gst_rtsp_client_sink_finalize (GObject * object);
209
210 static void gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
211     const GValue * value, GParamSpec * pspec);
212 static void gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
213     GValue * value, GParamSpec * pspec);
214
215 static GstClock *gst_rtsp_client_sink_provide_clock (GstElement * element);
216
217 static void gst_rtsp_client_sink_uri_handler_init (gpointer g_iface,
218     gpointer iface_data);
219
220 static gboolean gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp,
221     const gchar * proxy);
222 static void gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink *
223     rtsp_client_sink, guint64 timeout);
224
225 static GstStateChangeReturn gst_rtsp_client_sink_change_state (GstElement *
226     element, GstStateChange transition);
227 static void gst_rtsp_client_sink_handle_message (GstBin * bin,
228     GstMessage * message);
229
230 static gboolean gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
231     GstRTSPMessage * response);
232
233 static gboolean gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink,
234     gint cmd, gint mask);
235
236 static GstRTSPResult gst_rtsp_client_sink_open (GstRTSPClientSink * sink,
237     gboolean async);
238 static GstRTSPResult gst_rtsp_client_sink_record (GstRTSPClientSink * sink,
239     gboolean async);
240 static GstRTSPResult gst_rtsp_client_sink_pause (GstRTSPClientSink * sink,
241     gboolean async);
242 static GstRTSPResult gst_rtsp_client_sink_close (GstRTSPClientSink * sink,
243     gboolean async, gboolean only_close);
244 static gboolean gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink);
245
246 static gboolean gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler,
247     const gchar * uri, GError ** error);
248 static gchar *gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler);
249
250 static gboolean gst_rtsp_client_sink_loop (GstRTSPClientSink * sink);
251 static void gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink,
252     gboolean flush);
253
254 static GstPad *gst_rtsp_client_sink_request_new_pad (GstElement * element,
255     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
256 static void gst_rtsp_client_sink_release_pad (GstElement * element,
257     GstPad * pad);
258
259 /* commands we send to out loop to notify it of events */
260 #define CMD_OPEN        (1 << 0)
261 #define CMD_RECORD      (1 << 1)
262 #define CMD_PAUSE       (1 << 2)
263 #define CMD_CLOSE       (1 << 3)
264 #define CMD_WAIT        (1 << 4)
265 #define CMD_RECONNECT   (1 << 5)
266 #define CMD_LOOP        (1 << 6)
267
268 /* mask for all commands */
269 #define CMD_ALL         ((CMD_LOOP << 1) - 1)
270
271 #define GST_ELEMENT_PROGRESS(el, type, code, text)      \
272 G_STMT_START {                                          \
273   gchar *__txt = _gst_element_error_printf text;        \
274   gst_element_post_message (GST_ELEMENT_CAST (el),      \
275       gst_message_new_progress (GST_OBJECT_CAST (el),   \
276           GST_PROGRESS_TYPE_ ##type, code, __txt));     \
277   g_free (__txt);                                       \
278 } G_STMT_END
279
280 static guint gst_rtsp_client_sink_signals[LAST_SIGNAL] = { 0 };
281
282 #define gst_rtsp_client_sink_parent_class parent_class
283 G_DEFINE_TYPE_WITH_CODE (GstRTSPClientSink, gst_rtsp_client_sink, GST_TYPE_BIN,
284     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
285         gst_rtsp_client_sink_uri_handler_init));
286
287 #ifndef GST_DISABLE_GST_DEBUG
288 static inline const gchar *
289 cmd_to_string (guint cmd)
290 {
291   switch (cmd) {
292     case CMD_OPEN:
293       return "OPEN";
294     case CMD_RECORD:
295       return "RECORD";
296     case CMD_PAUSE:
297       return "PAUSE";
298     case CMD_CLOSE:
299       return "CLOSE";
300     case CMD_WAIT:
301       return "WAIT";
302     case CMD_RECONNECT:
303       return "RECONNECT";
304     case CMD_LOOP:
305       return "LOOP";
306   }
307
308   return "unknown";
309 }
310 #endif
311
312 static void
313 gst_rtsp_client_sink_class_init (GstRTSPClientSinkClass * klass)
314 {
315   GObjectClass *gobject_class;
316   GstElementClass *gstelement_class;
317   GstBinClass *gstbin_class;
318
319   gobject_class = (GObjectClass *) klass;
320   gstelement_class = (GstElementClass *) klass;
321   gstbin_class = (GstBinClass *) klass;
322
323   GST_DEBUG_CATEGORY_INIT (rtsp_client_sink_debug, "rtspclientsink", 0,
324       "RTSP sink element");
325
326   gobject_class->set_property = gst_rtsp_client_sink_set_property;
327   gobject_class->get_property = gst_rtsp_client_sink_get_property;
328
329   gobject_class->finalize = gst_rtsp_client_sink_finalize;
330
331   g_object_class_install_property (gobject_class, PROP_LOCATION,
332       g_param_spec_string ("location", "RTSP Location",
333           "Location of the RTSP url to read",
334           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335
336   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
337       g_param_spec_flags ("protocols", "Protocols",
338           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
339           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340
341   g_object_class_install_property (gobject_class, PROP_PROFILES,
342       g_param_spec_flags ("profiles", "Profiles",
343           "Allowed RTSP profiles", GST_TYPE_RTSP_PROFILE,
344           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345
346   g_object_class_install_property (gobject_class, PROP_DEBUG,
347       g_param_spec_boolean ("debug", "Debug",
348           "Dump request and response messages to stdout",
349           DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_RETRY,
352       g_param_spec_uint ("retry", "Retry",
353           "Max number of retries when allocating RTP ports.",
354           0, G_MAXUINT16, DEFAULT_RETRY,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
358       g_param_spec_uint64 ("timeout", "Timeout",
359           "Retry TCP transport after UDP timeout microseconds (0 = disabled)",
360           0, G_MAXUINT64, DEFAULT_TIMEOUT,
361           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362
363   g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT,
364       g_param_spec_uint64 ("tcp-timeout", "TCP Timeout",
365           "Fail after timeout microseconds on TCP connections (0 = disabled)",
366           0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT,
367           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368
369   g_object_class_install_property (gobject_class, PROP_LATENCY,
370       g_param_spec_uint ("latency", "Buffer latency in ms",
371           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
372           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373
374   g_object_class_install_property (gobject_class, PROP_RTX_TIME,
375       g_param_spec_uint ("rtx-time", "Retransmission buffer in ms",
376           "Amount of ms to buffer for retransmission. 0 disables retransmission",
377           0, G_MAXUINT, DEFAULT_RTX_TIME_MS,
378           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
379
380   /**
381    * GstRTSPClientSink:do-rtsp-keep-alive:
382    *
383    * Enable RTSP keep alive support. Some old server don't like RTSP
384    * keep alive and then this property needs to be set to FALSE.
385    */
386   g_object_class_install_property (gobject_class, PROP_DO_RTSP_KEEP_ALIVE,
387       g_param_spec_boolean ("do-rtsp-keep-alive", "Do RTSP Keep Alive",
388           "Send RTSP keep alive packets, disable for old incompatible server.",
389           DEFAULT_DO_RTSP_KEEP_ALIVE,
390           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
391
392   /**
393    * GstRTSPClientSink:proxy:
394    *
395    * Set the proxy parameters. This has to be a string of the format
396    * [http://][user:passwd@]host[:port].
397    */
398   g_object_class_install_property (gobject_class, PROP_PROXY,
399       g_param_spec_string ("proxy", "Proxy",
400           "Proxy settings for HTTP tunneling. Format: [http://][user:passwd@]host[:port]",
401           DEFAULT_PROXY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402   /**
403    * GstRTSPClientSink:proxy-id:
404    *
405    * Sets the proxy URI user id for authentication. If the URI set via the
406    * "proxy" property contains a user-id already, that will take precedence.
407    *
408    */
409   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
410       g_param_spec_string ("proxy-id", "proxy-id",
411           "HTTP proxy URI user id for authentication", "",
412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413   /**
414    * GstRTSPClientSink:proxy-pw:
415    *
416    * Sets the proxy URI password for authentication. If the URI set via the
417    * "proxy" property contains a password already, that will take precedence.
418    *
419    */
420   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
421       g_param_spec_string ("proxy-pw", "proxy-pw",
422           "HTTP proxy URI user password for authentication", "",
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
425   /**
426    * GstRTSPClientSink:rtp-blocksize:
427    *
428    * RTP package size to suggest to server.
429    */
430   g_object_class_install_property (gobject_class, PROP_RTP_BLOCKSIZE,
431       g_param_spec_uint ("rtp-blocksize", "RTP Blocksize",
432           "RTP package size to suggest to server (0 = disabled)",
433           0, 65536, DEFAULT_RTP_BLOCKSIZE,
434           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435
436   g_object_class_install_property (gobject_class,
437       PROP_USER_ID,
438       g_param_spec_string ("user-id", "user-id",
439           "RTSP location URI user id for authentication", DEFAULT_USER_ID,
440           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
441   g_object_class_install_property (gobject_class, PROP_USER_PW,
442       g_param_spec_string ("user-pw", "user-pw",
443           "RTSP location URI user password for authentication", DEFAULT_USER_PW,
444           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
445
446   /**
447    * GstRTSPClientSink:port-range:
448    *
449    * Configure the client port numbers that can be used to receive
450    * RTCP.
451    */
452   g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
453       g_param_spec_string ("port-range", "Port range",
454           "Client port range that can be used to receive RTCP data, "
455           "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstRTSPClientSink:udp-buffer-size:
460    *
461    * Size of the kernel UDP receive buffer in bytes.
462    */
463   g_object_class_install_property (gobject_class, PROP_UDP_BUFFER_SIZE,
464       g_param_spec_int ("udp-buffer-size", "UDP Buffer Size",
465           "Size of the kernel UDP receive buffer in bytes, 0=default",
466           0, G_MAXINT, DEFAULT_UDP_BUFFER_SIZE,
467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468
469   g_object_class_install_property (gobject_class, PROP_UDP_RECONNECT,
470       g_param_spec_boolean ("udp-reconnect", "Reconnect to the server",
471           "Reconnect to the server if RTSP connection is closed when doing UDP",
472           DEFAULT_UDP_RECONNECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
473
474   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
475       g_param_spec_string ("multicast-iface", "Multicast Interface",
476           "The network interface on which to join the multicast group",
477           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class, PROP_SDES,
480       g_param_spec_boxed ("sdes", "SDES",
481           "The SDES items of this session",
482           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
483
484   /**
485    * GstRTSPClientSink::tls-validation-flags:
486    *
487    * TLS certificate validation flags used to validate server
488    * certificate.
489    *
490    */
491   g_object_class_install_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
492       g_param_spec_flags ("tls-validation-flags", "TLS validation flags",
493           "TLS certificate validation flags used to validate the server certificate",
494           G_TYPE_TLS_CERTIFICATE_FLAGS, DEFAULT_TLS_VALIDATION_FLAGS,
495           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496
497   /**
498    * GstRTSPClientSink::tls-database:
499    *
500    * TLS database with anchor certificate authorities used to validate
501    * the server certificate.
502    *
503    */
504   g_object_class_install_property (gobject_class, PROP_TLS_DATABASE,
505       g_param_spec_object ("tls-database", "TLS database",
506           "TLS database with anchor certificate authorities used to validate the server certificate",
507           G_TYPE_TLS_DATABASE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
508
509   /**
510    * GstRTSPClientSink::tls-interaction:
511    *
512    * A #GTlsInteraction object to be used when the connection or certificate
513    * database need to interact with the user. This will be used to prompt the
514    * user for passwords where necessary.
515    *
516    */
517   g_object_class_install_property (gobject_class, PROP_TLS_INTERACTION,
518       g_param_spec_object ("tls-interaction", "TLS interaction",
519           "A GTlsInteraction object to prompt the user for password or certificate",
520           G_TYPE_TLS_INTERACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
521
522   /**
523    * GstRTSPClientSink::ntp-time-source:
524    *
525    * allows to select the time source that should be used
526    * for the NTP time in outgoing packets
527    *
528    */
529   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
530       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
531           "NTP time source for RTCP packets",
532           GST_TYPE_RTSP_CLIENT_SINK_NTP_TIME_SOURCE, DEFAULT_NTP_TIME_SOURCE,
533           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
534
535   /**
536    * GstRTSPClientSink::user-agent:
537    *
538    * The string to set in the User-Agent header.
539    *
540    */
541   g_object_class_install_property (gobject_class, PROP_USER_AGENT,
542       g_param_spec_string ("user-agent", "User Agent",
543           "The User-Agent string to send to the server",
544           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
545
546   /**
547    * GstRTSPClientSink::handle-request:
548    * @rtsp_client_sink: a #GstRTSPClientSink
549    * @request: a #GstRTSPMessage
550    * @response: a #GstRTSPMessage
551    *
552    * Handle a server request in @request and prepare @response.
553    *
554    * This signal is called from the streaming thread, you should therefore not
555    * do any state changes on @rtsp_client_sink because this might deadlock. If you want
556    * to modify the state as a result of this signal, post a
557    * #GST_MESSAGE_REQUEST_STATE message on the bus or signal the main thread
558    * in some other way.
559    *
560    */
561   gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST] =
562       g_signal_new ("handle-request", G_TYPE_FROM_CLASS (klass), 0,
563       0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
564       G_TYPE_POINTER, G_TYPE_POINTER);
565
566   /**
567    * GstRTSPClientSink::new-manager:
568    * @rtsp_client_sink: a #GstRTSPClientSink
569    * @manager: a #GstElement
570    *
571    * Emitted after a new manager (like rtpbin) was created and the default
572    * properties were configured.
573    *
574    */
575   gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER] =
576       g_signal_new_class_handler ("new-manager", G_TYPE_FROM_CLASS (klass),
577       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL,
578       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
579
580   /**
581    * GstRTSPClientSink::new-payloader:
582    * @rtsp_client_sink: a #GstRTSPClientSink
583    * @payloader: a #GstElement
584    *
585    * Emitted after a new RTP payloader was created and the default
586    * properties were configured.
587    *
588    */
589   gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER] =
590       g_signal_new_class_handler ("new-payloader", G_TYPE_FROM_CLASS (klass),
591       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP, 0, NULL, NULL,
592       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
593
594   /**
595    * GstRTSPClientSink::request-rtcp-key:
596    * @rtsp_client_sink: a #GstRTSPClientSink
597    * @num: the stream number
598    *
599    * Signal emitted to get the crypto parameters relevant to the RTCP
600    * stream. User should provide the key and the RTCP encryption ciphers
601    * and authentication, and return them wrapped in a GstCaps.
602    *
603    */
604   gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY] =
605       g_signal_new ("request-rtcp-key", G_TYPE_FROM_CLASS (klass),
606       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
607
608   gstelement_class->provide_clock = gst_rtsp_client_sink_provide_clock;
609   gstelement_class->change_state = gst_rtsp_client_sink_change_state;
610   gstelement_class->request_new_pad =
611       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_request_new_pad);
612   gstelement_class->release_pad =
613       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_release_pad);
614
615   gst_element_class_add_static_pad_template (gstelement_class, &rtptemplate);
616
617   gst_element_class_set_static_metadata (gstelement_class,
618       "RTSP RECORD client", "Sink/Network",
619       "Send data over the network via RTSP RECORD(RFC 2326)",
620       "Jan Schmidt <jan@centricular.com>");
621
622   gstbin_class->handle_message = gst_rtsp_client_sink_handle_message;
623 }
624
625 static void
626 gst_rtsp_client_sink_init (GstRTSPClientSink * sink)
627 {
628   sink->conninfo.location = g_strdup (DEFAULT_LOCATION);
629   sink->protocols = DEFAULT_PROTOCOLS;
630   sink->debug = DEFAULT_DEBUG;
631   sink->retry = DEFAULT_RETRY;
632   sink->udp_timeout = DEFAULT_TIMEOUT;
633   gst_rtsp_client_sink_set_tcp_timeout (sink, DEFAULT_TCP_TIMEOUT);
634   sink->latency = DEFAULT_LATENCY_MS;
635   sink->rtx_time = DEFAULT_RTX_TIME_MS;
636   sink->do_rtsp_keep_alive = DEFAULT_DO_RTSP_KEEP_ALIVE;
637   gst_rtsp_client_sink_set_proxy (sink, DEFAULT_PROXY);
638   sink->rtp_blocksize = DEFAULT_RTP_BLOCKSIZE;
639   sink->user_id = g_strdup (DEFAULT_USER_ID);
640   sink->user_pw = g_strdup (DEFAULT_USER_PW);
641   sink->client_port_range.min = 0;
642   sink->client_port_range.max = 0;
643   sink->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
644   sink->udp_reconnect = DEFAULT_UDP_RECONNECT;
645   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
646   sink->sdes = NULL;
647   sink->tls_validation_flags = DEFAULT_TLS_VALIDATION_FLAGS;
648   sink->tls_database = DEFAULT_TLS_DATABASE;
649   sink->tls_interaction = DEFAULT_TLS_INTERACTION;
650   sink->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
651   sink->user_agent = g_strdup (DEFAULT_USER_AGENT);
652
653   sink->profiles = DEFAULT_PROFILES;
654
655   /* protects the streaming thread in interleaved mode or the polling
656    * thread in UDP mode. */
657   g_rec_mutex_init (&sink->stream_rec_lock);
658
659   /* protects our state changes from multiple invocations */
660   g_rec_mutex_init (&sink->state_rec_lock);
661
662   g_mutex_init (&sink->send_lock);
663
664   g_mutex_init (&sink->preroll_lock);
665   g_cond_init (&sink->preroll_cond);
666
667   sink->state = GST_RTSP_STATE_INVALID;
668
669   g_mutex_init (&sink->conninfo.send_lock);
670   g_mutex_init (&sink->conninfo.recv_lock);
671
672   sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
673   gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
674   gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
675
676   sink->next_dyn_pt = 96;
677
678   gst_sdp_message_init (&sink->cursdp);
679
680   GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
681 }
682
683 static void
684 gst_rtsp_client_sink_finalize (GObject * object)
685 {
686   GstRTSPClientSink *rtsp_client_sink;
687
688   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
689
690   gst_sdp_message_uninit (&rtsp_client_sink->cursdp);
691
692   g_free (rtsp_client_sink->conninfo.location);
693   gst_rtsp_url_free (rtsp_client_sink->conninfo.url);
694   g_free (rtsp_client_sink->conninfo.url_str);
695   g_free (rtsp_client_sink->user_id);
696   g_free (rtsp_client_sink->user_pw);
697   g_free (rtsp_client_sink->multi_iface);
698   g_free (rtsp_client_sink->user_agent);
699
700   if (rtsp_client_sink->uri_sdp) {
701     gst_sdp_message_free (rtsp_client_sink->uri_sdp);
702     rtsp_client_sink->uri_sdp = NULL;
703   }
704   if (rtsp_client_sink->provided_clock)
705     gst_object_unref (rtsp_client_sink->provided_clock);
706
707   if (rtsp_client_sink->sdes)
708     gst_structure_free (rtsp_client_sink->sdes);
709
710   if (rtsp_client_sink->tls_database)
711     g_object_unref (rtsp_client_sink->tls_database);
712
713   if (rtsp_client_sink->tls_interaction)
714     g_object_unref (rtsp_client_sink->tls_interaction);
715
716   /* free locks */
717   g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
718   g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
719
720   g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
721   g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
722
723   g_mutex_clear (&rtsp_client_sink->send_lock);
724
725   g_mutex_clear (&rtsp_client_sink->preroll_lock);
726   g_cond_clear (&rtsp_client_sink->preroll_cond);
727
728   G_OBJECT_CLASS (parent_class)->finalize (object);
729 }
730
731 static gboolean
732 gst_rtp_payloader_filter_func (GstPluginFeature * feature, gpointer user_data)
733 {
734   GstElementFactory *factory = NULL;
735   const gchar *klass;
736
737   if (!GST_IS_ELEMENT_FACTORY (feature))
738     return FALSE;
739
740   factory = GST_ELEMENT_FACTORY (feature);
741
742   if (gst_plugin_feature_get_rank (feature) == GST_RANK_NONE)
743     return FALSE;
744
745   if (!gst_element_factory_list_is_type (factory,
746           GST_ELEMENT_FACTORY_TYPE_PAYLOADER))
747     return FALSE;
748
749   klass =
750       gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
751   if (strstr (klass, "Codec") == NULL)
752     return FALSE;
753   if (strstr (klass, "RTP") == NULL)
754     return FALSE;
755
756   return TRUE;
757 }
758
759 static gint
760 compare_ranks (GstPluginFeature * f1, GstPluginFeature * f2)
761 {
762   gint diff;
763   const gchar *rname1, *rname2;
764   GstRank rank1, rank2;
765
766   rname1 = gst_plugin_feature_get_name (f1);
767   rname2 = gst_plugin_feature_get_name (f2);
768
769   rank1 = gst_plugin_feature_get_rank (f1);
770   rank2 = gst_plugin_feature_get_rank (f2);
771
772   /* HACK: Prefer rtpmp4apay over rtpmp4gpay */
773   if (g_str_equal (rname1, "rtpmp4apay"))
774     rank1 = GST_RANK_SECONDARY + 1;
775   if (g_str_equal (rname2, "rtpmp4apay"))
776     rank2 = GST_RANK_SECONDARY + 1;
777
778   diff = rank2 - rank1;
779   if (diff != 0)
780     return diff;
781
782   diff = strcmp (rname2, rname1);
783
784   return diff;
785 }
786
787 static GList *
788 gst_rtsp_client_sink_get_factories (void)
789 {
790   static GList *payloader_factories = NULL;
791
792   if (g_once_init_enter (&payloader_factories)) {
793     GList *all_factories;
794
795     all_factories =
796         gst_registry_feature_filter (gst_registry_get (),
797         gst_rtp_payloader_filter_func, FALSE, NULL);
798
799     all_factories = g_list_sort (all_factories, (GCompareFunc) compare_ranks);
800
801     g_once_init_leave (&payloader_factories, all_factories);
802   }
803
804   return payloader_factories;
805 }
806
807 static GstCaps *
808 gst_rtsp_client_sink_get_payloader_caps (void)
809 {
810   /* Cached caps result */
811   static GstCaps *ret;
812
813   if (g_once_init_enter (&ret)) {
814     GList *factories, *cur;
815     GstCaps *caps = gst_caps_new_empty ();
816
817     factories = gst_rtsp_client_sink_get_factories ();
818     for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
819       GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
820       const GList *tmp;
821
822       for (tmp = gst_element_factory_get_static_pad_templates (factory);
823           tmp; tmp = g_list_next (tmp)) {
824         GstStaticPadTemplate *template = tmp->data;
825
826         if (template->direction == GST_PAD_SINK) {
827           GstCaps *static_caps = gst_static_pad_template_get_caps (template);
828
829           GST_LOG ("Found pad template %s on factory %s",
830               template->name_template, gst_plugin_feature_get_name (factory));
831
832           if (static_caps)
833             caps = gst_caps_merge (caps, static_caps);
834
835           /* Early out, any is absorbing */
836           if (gst_caps_is_any (caps))
837             goto out;
838         }
839       }
840     }
841   out:
842     g_once_init_leave (&ret, caps);
843   }
844
845   /* Return cached result */
846   return gst_caps_ref (ret);
847 }
848
849 static GstElement *
850 gst_rtsp_client_sink_make_payloader (GstCaps * caps)
851 {
852   GList *factories, *cur;
853
854   factories = gst_rtsp_client_sink_get_factories ();
855   for (cur = factories; cur != NULL; cur = g_list_next (cur)) {
856     GstElementFactory *factory = GST_ELEMENT_FACTORY (cur->data);
857     const GList *tmp;
858
859     for (tmp = gst_element_factory_get_static_pad_templates (factory);
860         tmp; tmp = g_list_next (tmp)) {
861       GstStaticPadTemplate *template = tmp->data;
862
863       if (template->direction == GST_PAD_SINK) {
864         GstCaps *static_caps = gst_static_pad_template_get_caps (template);
865         GstElement *payloader = NULL;
866
867         if (gst_caps_can_intersect (static_caps, caps)) {
868           GST_DEBUG ("caps %" GST_PTR_FORMAT " intersects with template %"
869               GST_PTR_FORMAT " for payloader %s", caps, static_caps,
870               gst_plugin_feature_get_name (factory));
871           payloader = gst_element_factory_create (factory, NULL);
872         }
873
874         gst_caps_unref (static_caps);
875
876         if (payloader)
877           return payloader;
878       }
879     }
880   }
881
882   return NULL;
883 }
884
885 static GstRTSPStream *
886 gst_rtsp_client_sink_create_stream (GstRTSPClientSink * sink,
887     GstRTSPStreamContext * context, GstElement * payloader, GstPad * pad)
888 {
889   GstRTSPStream *stream = NULL;
890   guint pt, aux_pt;
891
892   GST_OBJECT_LOCK (sink);
893
894   g_object_get (G_OBJECT (payloader), "pt", &pt, NULL);
895   if (pt >= 96 && pt <= sink->next_dyn_pt) {
896     /* Payloader has a dynamic PT, but one that's already used */
897     /* FIXME: Create a caps->ptmap instead? */
898     pt = sink->next_dyn_pt;
899
900     if (pt > 127)
901       goto no_free_pt;
902
903     GST_DEBUG_OBJECT (sink, "Assigning pt %u to stream %d", pt, context->index);
904
905     sink->next_dyn_pt++;
906   } else {
907     GST_DEBUG_OBJECT (sink, "Keeping existing pt %u for stream %d",
908         pt, context->index);
909   }
910
911   aux_pt = sink->next_dyn_pt;
912   if (aux_pt > 127)
913     goto no_free_pt;
914   sink->next_dyn_pt++;
915
916   GST_OBJECT_UNLOCK (sink);
917
918
919   g_object_set (G_OBJECT (payloader), "pt", pt, NULL);
920
921   stream = gst_rtsp_stream_new (context->index, payloader, pad);
922
923   gst_rtsp_stream_set_client_side (stream, TRUE);
924   gst_rtsp_stream_set_retransmission_time (stream,
925       (GstClockTime) (sink->rtx_time) * GST_MSECOND);
926   gst_rtsp_stream_set_protocols (stream, sink->protocols);
927   gst_rtsp_stream_set_profiles (stream, sink->profiles);
928   gst_rtsp_stream_set_retransmission_pt (stream, aux_pt);
929   gst_rtsp_stream_set_buffer_size (stream, sink->udp_buffer_size);
930   if (sink->rtp_blocksize > 0)
931     gst_rtsp_stream_set_mtu (stream, sink->rtp_blocksize);
932   gst_rtsp_stream_set_multicast_iface (stream, sink->multi_iface);
933
934 #if 0
935   if (priv->pool)
936     gst_rtsp_stream_set_address_pool (stream, priv->pool);
937 #endif
938
939   return stream;
940 no_free_pt:
941   GST_OBJECT_UNLOCK (sink);
942
943   GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL),
944       ("Ran out of dynamic payload types."));
945
946   return NULL;
947 }
948
949 static GstPadProbeReturn
950 handle_payloader_block (GstPad * pad, GstPadProbeInfo * info,
951     GstRTSPStreamContext * context)
952 {
953   GstRTSPClientSink *sink = context->parent;
954
955   GST_INFO_OBJECT (sink, "Block on pad %" GST_PTR_FORMAT, pad);
956
957   g_mutex_lock (&sink->preroll_lock);
958   context->prerolled = TRUE;
959   g_cond_broadcast (&sink->preroll_cond);
960   g_mutex_unlock (&sink->preroll_lock);
961
962   GST_INFO_OBJECT (sink, "Announced preroll on pad %" GST_PTR_FORMAT, pad);
963
964   return GST_PAD_PROBE_OK;
965 }
966
967 static gboolean
968 gst_rtsp_client_sink_setup_payloader (GstRTSPClientSink * sink, GstPad * pad,
969     GstCaps * caps)
970 {
971   GstRTSPStreamContext *context;
972
973   GstElement *payloader;
974   GstPad *sinkpad, *srcpad, *ghostsink;
975
976   context = gst_pad_get_element_private (pad);
977
978   /* Find the payloader. FIXME: Allow user to provide payloader via pad property */
979   payloader = gst_rtsp_client_sink_make_payloader (caps);
980   if (payloader == NULL)
981     return FALSE;
982
983   GST_DEBUG_OBJECT (sink, "Configuring payloader %" GST_PTR_FORMAT
984       " for pad %" GST_PTR_FORMAT, payloader, pad);
985
986   sinkpad = gst_element_get_static_pad (payloader, "sink");
987   if (sinkpad == NULL)
988     goto no_sinkpad;
989
990   srcpad = gst_element_get_static_pad (payloader, "src");
991   if (srcpad == NULL)
992     goto no_srcpad;
993
994   gst_bin_add (GST_BIN (sink->internal_bin), payloader);
995   ghostsink = gst_ghost_pad_new (NULL, sinkpad);
996   gst_pad_set_active (ghostsink, TRUE);
997   gst_element_add_pad (GST_ELEMENT (sink->internal_bin), ghostsink);
998
999   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_PAYLOADER], 0,
1000       payloader);
1001
1002   GST_RTSP_STATE_LOCK (sink);
1003   context->payloader_block_id =
1004       gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1005       (GstPadProbeCallback) handle_payloader_block, context, NULL);
1006   context->payloader = payloader;
1007
1008   payloader = gst_object_ref (payloader);
1009
1010   gst_ghost_pad_set_target (GST_GHOST_PAD (pad), ghostsink);
1011   gst_object_unref (GST_OBJECT (sinkpad));
1012   GST_RTSP_STATE_UNLOCK (sink);
1013
1014   gst_element_sync_state_with_parent (payloader);
1015
1016   gst_object_unref (payloader);
1017   gst_object_unref (GST_OBJECT (srcpad));
1018
1019   return TRUE;
1020
1021 no_sinkpad:
1022   GST_ERROR_OBJECT (sink,
1023       "Could not find sink pad on payloader %" GST_PTR_FORMAT, payloader);
1024   gst_object_unref (payloader);
1025   return FALSE;
1026
1027 no_srcpad:
1028   GST_ERROR_OBJECT (sink,
1029       "Could not find src pad on payloader %" GST_PTR_FORMAT, payloader);
1030   gst_object_unref (GST_OBJECT (sinkpad));
1031   gst_object_unref (payloader);
1032   return TRUE;
1033 }
1034
1035 static gboolean
1036 gst_rtsp_client_sink_sinkpad_event (GstPad * pad, GstObject * parent,
1037     GstEvent * event)
1038 {
1039   if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
1040     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1041     if (target == NULL) {
1042       GstCaps *caps;
1043
1044       /* No target yet - choose a payloader and configure it */
1045       gst_event_parse_caps (event, &caps);
1046
1047       GST_DEBUG_OBJECT (parent,
1048           "Have set caps event on pad %" GST_PTR_FORMAT
1049           " caps %" GST_PTR_FORMAT, pad, caps);
1050
1051       if (!gst_rtsp_client_sink_setup_payloader (GST_RTSP_CLIENT_SINK (parent),
1052               pad, caps)) {
1053         gst_event_unref (event);
1054         return FALSE;
1055       }
1056     } else {
1057       gst_object_unref (target);
1058     }
1059   }
1060
1061   return gst_pad_event_default (pad, parent, event);
1062 }
1063
1064 static gboolean
1065 gst_rtsp_client_sink_sinkpad_query (GstPad * pad, GstObject * parent,
1066     GstQuery * query)
1067 {
1068   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1069     GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1070     if (target == NULL) {
1071       /* No target yet - return the union of all payloader caps */
1072       GstCaps *caps = gst_rtsp_client_sink_get_payloader_caps ();
1073
1074       GST_TRACE_OBJECT (parent, "Returning payloader caps %" GST_PTR_FORMAT,
1075           caps);
1076
1077       gst_query_set_caps_result (query, caps);
1078       gst_caps_unref (caps);
1079
1080       return TRUE;
1081     }
1082     gst_object_unref (target);
1083   }
1084
1085   return gst_pad_query_default (pad, parent, query);
1086 }
1087
1088 static GstPad *
1089 gst_rtsp_client_sink_request_new_pad (GstElement * element,
1090     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1091 {
1092   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1093   GstPad *pad;
1094   GstRTSPStreamContext *context;
1095   guint idx = (guint) - 1;
1096   gchar *tmpname;
1097
1098   g_mutex_lock (&sink->preroll_lock);
1099   if (sink->streams_collected) {
1100     GST_WARNING_OBJECT (element, "Can't add streams to a running session");
1101     g_mutex_unlock (&sink->preroll_lock);
1102     return NULL;
1103   }
1104   g_mutex_unlock (&sink->preroll_lock);
1105
1106   GST_OBJECT_LOCK (sink);
1107   if (name) {
1108     if (!sscanf (name, "sink_%u", &idx)) {
1109       GST_OBJECT_UNLOCK (sink);
1110       GST_ERROR_OBJECT (element, "Invalid sink pad name %s", name);
1111       return NULL;
1112     }
1113
1114     if (idx >= sink->next_pad_id)
1115       sink->next_pad_id = idx + 1;
1116   }
1117   if (idx == (guint) - 1) {
1118     idx = sink->next_pad_id;
1119     sink->next_pad_id++;
1120   }
1121   GST_OBJECT_UNLOCK (sink);
1122
1123   tmpname = g_strdup_printf ("sink_%u", idx);
1124   pad = gst_ghost_pad_new_no_target_from_template (tmpname, templ);
1125   g_free (tmpname);
1126
1127   GST_DEBUG_OBJECT (element, "Creating request pad %" GST_PTR_FORMAT, pad);
1128
1129   gst_pad_set_event_function (pad,
1130       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_event));
1131   gst_pad_set_query_function (pad,
1132       GST_DEBUG_FUNCPTR (gst_rtsp_client_sink_sinkpad_query));
1133
1134   context = g_new0 (GstRTSPStreamContext, 1);
1135   context->parent = sink;
1136   context->index = idx;
1137
1138   gst_pad_set_element_private (pad, context);
1139
1140   /* The rest of the context is configured on a caps set */
1141   gst_pad_set_active (pad, TRUE);
1142   gst_element_add_pad (element, pad);
1143
1144   (void) gst_rtsp_client_sink_get_factories ();
1145
1146   g_mutex_init (&context->conninfo.send_lock);
1147   g_mutex_init (&context->conninfo.recv_lock);
1148
1149   GST_RTSP_STATE_LOCK (sink);
1150   sink->contexts = g_list_prepend (sink->contexts, context);
1151   GST_RTSP_STATE_UNLOCK (sink);
1152
1153   return pad;
1154 }
1155
1156 static void
1157 gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad)
1158 {
1159   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1160   GstRTSPStreamContext *context;
1161
1162   context = gst_pad_get_element_private (pad);
1163
1164   GST_RTSP_STATE_LOCK (sink);
1165   sink->contexts = g_list_remove (sink->contexts, context);
1166   GST_RTSP_STATE_UNLOCK (sink);
1167
1168   /* FIXME: Shut down and clean up streaming on this pad,
1169    * do teardown if needed */
1170   GST_LOG_OBJECT (sink,
1171       "Cleaning up payloader and stream for released pad %" GST_PTR_FORMAT,
1172       pad);
1173
1174   if (context->stream_transport) {
1175     gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1176     gst_object_unref (context->stream_transport);
1177     context->stream_transport = NULL;
1178   }
1179   if (context->stream) {
1180     if (context->joined) {
1181       gst_rtsp_stream_leave_bin (context->stream,
1182           GST_BIN (sink->internal_bin), sink->rtpbin);
1183       context->joined = FALSE;
1184     }
1185     gst_object_unref (context->stream);
1186     context->stream = NULL;
1187   }
1188   if (context->srtcpparams)
1189     gst_caps_unref (context->srtcpparams);
1190
1191   g_free (context->conninfo.location);
1192   context->conninfo.location = NULL;
1193
1194   g_mutex_clear (&context->conninfo.send_lock);
1195   g_mutex_clear (&context->conninfo.recv_lock);
1196
1197   g_free (context);
1198
1199   gst_element_remove_pad (element, pad);
1200 }
1201
1202 static GstClock *
1203 gst_rtsp_client_sink_provide_clock (GstElement * element)
1204 {
1205   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (element);
1206   GstClock *clock;
1207
1208   if ((clock = sink->provided_clock) != NULL)
1209     gst_object_ref (clock);
1210
1211   return clock;
1212 }
1213
1214 /* a proxy string of the format [user:passwd@]host[:port] */
1215 static gboolean
1216 gst_rtsp_client_sink_set_proxy (GstRTSPClientSink * rtsp, const gchar * proxy)
1217 {
1218   gchar *p, *at, *col;
1219
1220   g_free (rtsp->proxy_user);
1221   rtsp->proxy_user = NULL;
1222   g_free (rtsp->proxy_passwd);
1223   rtsp->proxy_passwd = NULL;
1224   g_free (rtsp->proxy_host);
1225   rtsp->proxy_host = NULL;
1226   rtsp->proxy_port = 0;
1227
1228   p = (gchar *) proxy;
1229
1230   if (p == NULL)
1231     return TRUE;
1232
1233   /* we allow http:// in front but ignore it */
1234   if (g_str_has_prefix (p, "http://"))
1235     p += 7;
1236
1237   at = strchr (p, '@');
1238   if (at) {
1239     /* look for user:passwd */
1240     col = strchr (proxy, ':');
1241     if (col == NULL || col > at)
1242       return FALSE;
1243
1244     rtsp->proxy_user = g_strndup (p, col - p);
1245     col++;
1246     rtsp->proxy_passwd = g_strndup (col, at - col);
1247
1248     /* move to host */
1249     p = at + 1;
1250   } else {
1251     if (rtsp->prop_proxy_id != NULL && *rtsp->prop_proxy_id != '\0')
1252       rtsp->proxy_user = g_strdup (rtsp->prop_proxy_id);
1253     if (rtsp->prop_proxy_pw != NULL && *rtsp->prop_proxy_pw != '\0')
1254       rtsp->proxy_passwd = g_strdup (rtsp->prop_proxy_pw);
1255     if (rtsp->proxy_user != NULL || rtsp->proxy_passwd != NULL) {
1256       GST_LOG_OBJECT (rtsp, "set proxy user/pw from properties: %s:%s",
1257           GST_STR_NULL (rtsp->proxy_user), GST_STR_NULL (rtsp->proxy_passwd));
1258     }
1259   }
1260   col = strchr (p, ':');
1261
1262   if (col) {
1263     /* everything before the colon is the hostname */
1264     rtsp->proxy_host = g_strndup (p, col - p);
1265     p = col + 1;
1266     rtsp->proxy_port = strtoul (p, (char **) &p, 10);
1267   } else {
1268     rtsp->proxy_host = g_strdup (p);
1269     rtsp->proxy_port = 8080;
1270   }
1271   return TRUE;
1272 }
1273
1274 static void
1275 gst_rtsp_client_sink_set_tcp_timeout (GstRTSPClientSink * rtsp_client_sink,
1276     guint64 timeout)
1277 {
1278   rtsp_client_sink->tcp_timeout.tv_sec = timeout / G_USEC_PER_SEC;
1279   rtsp_client_sink->tcp_timeout.tv_usec = timeout % G_USEC_PER_SEC;
1280
1281   if (timeout != 0)
1282     rtsp_client_sink->ptcp_timeout = &rtsp_client_sink->tcp_timeout;
1283   else
1284     rtsp_client_sink->ptcp_timeout = NULL;
1285 }
1286
1287 static void
1288 gst_rtsp_client_sink_set_property (GObject * object, guint prop_id,
1289     const GValue * value, GParamSpec * pspec)
1290 {
1291   GstRTSPClientSink *rtsp_client_sink;
1292
1293   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1294
1295   switch (prop_id) {
1296     case PROP_LOCATION:
1297       gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (rtsp_client_sink),
1298           g_value_get_string (value), NULL);
1299       break;
1300     case PROP_PROTOCOLS:
1301       rtsp_client_sink->protocols = g_value_get_flags (value);
1302       break;
1303     case PROP_PROFILES:
1304       rtsp_client_sink->profiles = g_value_get_flags (value);
1305       break;
1306     case PROP_DEBUG:
1307       rtsp_client_sink->debug = g_value_get_boolean (value);
1308       break;
1309     case PROP_RETRY:
1310       rtsp_client_sink->retry = g_value_get_uint (value);
1311       break;
1312     case PROP_TIMEOUT:
1313       rtsp_client_sink->udp_timeout = g_value_get_uint64 (value);
1314       break;
1315     case PROP_TCP_TIMEOUT:
1316       gst_rtsp_client_sink_set_tcp_timeout (rtsp_client_sink,
1317           g_value_get_uint64 (value));
1318       break;
1319     case PROP_LATENCY:
1320       rtsp_client_sink->latency = g_value_get_uint (value);
1321       break;
1322     case PROP_RTX_TIME:
1323       rtsp_client_sink->rtx_time = g_value_get_uint (value);
1324       break;
1325     case PROP_DO_RTSP_KEEP_ALIVE:
1326       rtsp_client_sink->do_rtsp_keep_alive = g_value_get_boolean (value);
1327       break;
1328     case PROP_PROXY:
1329       gst_rtsp_client_sink_set_proxy (rtsp_client_sink,
1330           g_value_get_string (value));
1331       break;
1332     case PROP_PROXY_ID:
1333       if (rtsp_client_sink->prop_proxy_id)
1334         g_free (rtsp_client_sink->prop_proxy_id);
1335       rtsp_client_sink->prop_proxy_id = g_value_dup_string (value);
1336       break;
1337     case PROP_PROXY_PW:
1338       if (rtsp_client_sink->prop_proxy_pw)
1339         g_free (rtsp_client_sink->prop_proxy_pw);
1340       rtsp_client_sink->prop_proxy_pw = g_value_dup_string (value);
1341       break;
1342     case PROP_RTP_BLOCKSIZE:
1343       rtsp_client_sink->rtp_blocksize = g_value_get_uint (value);
1344       break;
1345     case PROP_USER_ID:
1346       if (rtsp_client_sink->user_id)
1347         g_free (rtsp_client_sink->user_id);
1348       rtsp_client_sink->user_id = g_value_dup_string (value);
1349       break;
1350     case PROP_USER_PW:
1351       if (rtsp_client_sink->user_pw)
1352         g_free (rtsp_client_sink->user_pw);
1353       rtsp_client_sink->user_pw = g_value_dup_string (value);
1354       break;
1355     case PROP_PORT_RANGE:
1356     {
1357       const gchar *str;
1358
1359       str = g_value_get_string (value);
1360       if (!str || !sscanf (str, "%u-%u",
1361               &rtsp_client_sink->client_port_range.min,
1362               &rtsp_client_sink->client_port_range.max)) {
1363         rtsp_client_sink->client_port_range.min = 0;
1364         rtsp_client_sink->client_port_range.max = 0;
1365       }
1366       break;
1367     }
1368     case PROP_UDP_BUFFER_SIZE:
1369       rtsp_client_sink->udp_buffer_size = g_value_get_int (value);
1370       break;
1371     case PROP_UDP_RECONNECT:
1372       rtsp_client_sink->udp_reconnect = g_value_get_boolean (value);
1373       break;
1374     case PROP_MULTICAST_IFACE:
1375       g_free (rtsp_client_sink->multi_iface);
1376
1377       if (g_value_get_string (value) == NULL)
1378         rtsp_client_sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1379       else
1380         rtsp_client_sink->multi_iface = g_value_dup_string (value);
1381       break;
1382     case PROP_SDES:
1383       rtsp_client_sink->sdes = g_value_dup_boxed (value);
1384       break;
1385     case PROP_TLS_VALIDATION_FLAGS:
1386       rtsp_client_sink->tls_validation_flags = g_value_get_flags (value);
1387       break;
1388     case PROP_TLS_DATABASE:
1389       g_clear_object (&rtsp_client_sink->tls_database);
1390       rtsp_client_sink->tls_database = g_value_dup_object (value);
1391       break;
1392     case PROP_TLS_INTERACTION:
1393       g_clear_object (&rtsp_client_sink->tls_interaction);
1394       rtsp_client_sink->tls_interaction = g_value_dup_object (value);
1395       break;
1396     case PROP_NTP_TIME_SOURCE:
1397       rtsp_client_sink->ntp_time_source = g_value_get_enum (value);
1398       break;
1399     case PROP_USER_AGENT:
1400       g_free (rtsp_client_sink->user_agent);
1401       rtsp_client_sink->user_agent = g_value_dup_string (value);
1402       break;
1403     default:
1404       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1405       break;
1406   }
1407 }
1408
1409 static void
1410 gst_rtsp_client_sink_get_property (GObject * object, guint prop_id,
1411     GValue * value, GParamSpec * pspec)
1412 {
1413   GstRTSPClientSink *rtsp_client_sink;
1414
1415   rtsp_client_sink = GST_RTSP_CLIENT_SINK (object);
1416
1417   switch (prop_id) {
1418     case PROP_LOCATION:
1419       g_value_set_string (value, rtsp_client_sink->conninfo.location);
1420       break;
1421     case PROP_PROTOCOLS:
1422       g_value_set_flags (value, rtsp_client_sink->protocols);
1423       break;
1424     case PROP_PROFILES:
1425       g_value_set_flags (value, rtsp_client_sink->profiles);
1426       break;
1427     case PROP_DEBUG:
1428       g_value_set_boolean (value, rtsp_client_sink->debug);
1429       break;
1430     case PROP_RETRY:
1431       g_value_set_uint (value, rtsp_client_sink->retry);
1432       break;
1433     case PROP_TIMEOUT:
1434       g_value_set_uint64 (value, rtsp_client_sink->udp_timeout);
1435       break;
1436     case PROP_TCP_TIMEOUT:
1437     {
1438       guint64 timeout;
1439
1440       timeout = rtsp_client_sink->tcp_timeout.tv_sec * G_USEC_PER_SEC +
1441           rtsp_client_sink->tcp_timeout.tv_usec;
1442       g_value_set_uint64 (value, timeout);
1443       break;
1444     }
1445     case PROP_LATENCY:
1446       g_value_set_uint (value, rtsp_client_sink->latency);
1447       break;
1448     case PROP_RTX_TIME:
1449       g_value_set_uint (value, rtsp_client_sink->rtx_time);
1450       break;
1451     case PROP_DO_RTSP_KEEP_ALIVE:
1452       g_value_set_boolean (value, rtsp_client_sink->do_rtsp_keep_alive);
1453       break;
1454     case PROP_PROXY:
1455     {
1456       gchar *str;
1457
1458       if (rtsp_client_sink->proxy_host) {
1459         str =
1460             g_strdup_printf ("%s:%d", rtsp_client_sink->proxy_host,
1461             rtsp_client_sink->proxy_port);
1462       } else {
1463         str = NULL;
1464       }
1465       g_value_take_string (value, str);
1466       break;
1467     }
1468     case PROP_PROXY_ID:
1469       g_value_set_string (value, rtsp_client_sink->prop_proxy_id);
1470       break;
1471     case PROP_PROXY_PW:
1472       g_value_set_string (value, rtsp_client_sink->prop_proxy_pw);
1473       break;
1474     case PROP_RTP_BLOCKSIZE:
1475       g_value_set_uint (value, rtsp_client_sink->rtp_blocksize);
1476       break;
1477     case PROP_USER_ID:
1478       g_value_set_string (value, rtsp_client_sink->user_id);
1479       break;
1480     case PROP_USER_PW:
1481       g_value_set_string (value, rtsp_client_sink->user_pw);
1482       break;
1483     case PROP_PORT_RANGE:
1484     {
1485       gchar *str;
1486
1487       if (rtsp_client_sink->client_port_range.min != 0) {
1488         str = g_strdup_printf ("%u-%u", rtsp_client_sink->client_port_range.min,
1489             rtsp_client_sink->client_port_range.max);
1490       } else {
1491         str = NULL;
1492       }
1493       g_value_take_string (value, str);
1494       break;
1495     }
1496     case PROP_UDP_BUFFER_SIZE:
1497       g_value_set_int (value, rtsp_client_sink->udp_buffer_size);
1498       break;
1499     case PROP_UDP_RECONNECT:
1500       g_value_set_boolean (value, rtsp_client_sink->udp_reconnect);
1501       break;
1502     case PROP_MULTICAST_IFACE:
1503       g_value_set_string (value, rtsp_client_sink->multi_iface);
1504       break;
1505     case PROP_SDES:
1506       g_value_set_boxed (value, rtsp_client_sink->sdes);
1507       break;
1508     case PROP_TLS_VALIDATION_FLAGS:
1509       g_value_set_flags (value, rtsp_client_sink->tls_validation_flags);
1510       break;
1511     case PROP_TLS_DATABASE:
1512       g_value_set_object (value, rtsp_client_sink->tls_database);
1513       break;
1514     case PROP_TLS_INTERACTION:
1515       g_value_set_object (value, rtsp_client_sink->tls_interaction);
1516       break;
1517     case PROP_NTP_TIME_SOURCE:
1518       g_value_set_enum (value, rtsp_client_sink->ntp_time_source);
1519       break;
1520     case PROP_USER_AGENT:
1521       g_value_set_string (value, rtsp_client_sink->user_agent);
1522       break;
1523     default:
1524       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1525       break;
1526   }
1527 }
1528
1529 static const gchar *
1530 get_aggregate_control (GstRTSPClientSink * sink)
1531 {
1532   const gchar *base;
1533
1534   if (sink->control)
1535     base = sink->control;
1536   else if (sink->content_base)
1537     base = sink->content_base;
1538   else if (sink->conninfo.url_str)
1539     base = sink->conninfo.url_str;
1540   else
1541     base = "/";
1542
1543   return base;
1544 }
1545
1546 static void
1547 gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink)
1548 {
1549   GList *walk;
1550
1551   GST_DEBUG_OBJECT (sink, "cleanup");
1552
1553   gst_element_set_state (GST_ELEMENT (sink->internal_bin), GST_STATE_NULL);
1554
1555   /* Clean up any left over stream objects */
1556   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
1557     GstRTSPStreamContext *context = (GstRTSPStreamContext *) (walk->data);
1558     if (context->stream_transport) {
1559       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
1560       gst_object_unref (context->stream_transport);
1561       context->stream_transport = NULL;
1562     }
1563
1564     if (context->stream) {
1565       if (context->joined) {
1566         gst_rtsp_stream_leave_bin (context->stream,
1567             GST_BIN (sink->internal_bin), sink->rtpbin);
1568         context->joined = FALSE;
1569       }
1570       gst_object_unref (context->stream);
1571       context->stream = NULL;
1572     }
1573
1574     if (context->srtcpparams) {
1575       gst_caps_unref (context->srtcpparams);
1576       context->srtcpparams = NULL;
1577     }
1578     g_free (context->conninfo.location);
1579     context->conninfo.location = NULL;
1580   }
1581
1582   if (sink->rtpbin) {
1583     gst_element_set_state (sink->rtpbin, GST_STATE_NULL);
1584     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), sink->rtpbin);
1585     sink->rtpbin = NULL;
1586   }
1587
1588   g_free (sink->content_base);
1589   sink->content_base = NULL;
1590
1591   g_free (sink->control);
1592   sink->control = NULL;
1593
1594   if (sink->range)
1595     gst_rtsp_range_free (sink->range);
1596   sink->range = NULL;
1597
1598   /* don't clear the SDP when it was used in the url */
1599   if (sink->uri_sdp && !sink->from_sdp) {
1600     gst_sdp_message_free (sink->uri_sdp);
1601     sink->uri_sdp = NULL;
1602   }
1603
1604   if (sink->provided_clock) {
1605     gst_object_unref (sink->provided_clock);
1606     sink->provided_clock = NULL;
1607   }
1608
1609   g_free (sink->server_ip);
1610   sink->server_ip = NULL;
1611
1612   sink->next_pad_id = 0;
1613   sink->next_dyn_pt = 96;
1614 }
1615
1616 static GstRTSPResult
1617 gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
1618     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
1619 {
1620   GstRTSPResult ret;
1621
1622   if (conninfo->connection) {
1623     g_mutex_lock (&conninfo->send_lock);
1624     ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
1625     g_mutex_unlock (&conninfo->send_lock);
1626   } else {
1627     ret = GST_RTSP_ERROR;
1628   }
1629
1630   return ret;
1631 }
1632
1633 static GstRTSPResult
1634 gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
1635     GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
1636 {
1637   GstRTSPResult ret;
1638
1639   if (conninfo->connection) {
1640     g_mutex_lock (&conninfo->recv_lock);
1641     ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
1642     g_mutex_unlock (&conninfo->recv_lock);
1643   } else {
1644     ret = GST_RTSP_ERROR;
1645   }
1646
1647   return ret;
1648 }
1649
1650 static GstRTSPResult
1651 gst_rtsp_conninfo_connect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1652     gboolean async)
1653 {
1654   GstRTSPResult res;
1655
1656   if (info->connection == NULL) {
1657     if (info->url == NULL) {
1658       GST_DEBUG_OBJECT (sink, "parsing uri (%s)...", info->location);
1659       if ((res = gst_rtsp_url_parse (info->location, &info->url)) < 0)
1660         goto parse_error;
1661     }
1662
1663     /* create connection */
1664     GST_DEBUG_OBJECT (sink, "creating connection (%s)...", info->location);
1665     if ((res = gst_rtsp_connection_create (info->url, &info->connection)) < 0)
1666       goto could_not_create;
1667
1668     if (info->url_str)
1669       g_free (info->url_str);
1670     info->url_str = gst_rtsp_url_get_request_uri (info->url);
1671
1672     GST_DEBUG_OBJECT (sink, "sanitized uri %s", info->url_str);
1673
1674     if (info->url->transports & GST_RTSP_LOWER_TRANS_TLS) {
1675       if (!gst_rtsp_connection_set_tls_validation_flags (info->connection,
1676               sink->tls_validation_flags))
1677         GST_WARNING_OBJECT (sink, "Unable to set TLS validation flags");
1678
1679       if (sink->tls_database)
1680         gst_rtsp_connection_set_tls_database (info->connection,
1681             sink->tls_database);
1682
1683       if (sink->tls_interaction)
1684         gst_rtsp_connection_set_tls_interaction (info->connection,
1685             sink->tls_interaction);
1686     }
1687
1688     if (info->url->transports & GST_RTSP_LOWER_TRANS_HTTP)
1689       gst_rtsp_connection_set_tunneled (info->connection, TRUE);
1690
1691     if (sink->proxy_host) {
1692       GST_DEBUG_OBJECT (sink, "setting proxy %s:%d", sink->proxy_host,
1693           sink->proxy_port);
1694       gst_rtsp_connection_set_proxy (info->connection, sink->proxy_host,
1695           sink->proxy_port);
1696     }
1697   }
1698
1699   if (!info->connected) {
1700     /* connect */
1701     if (async)
1702       GST_ELEMENT_PROGRESS (sink, CONTINUE, "connect",
1703           ("Connecting to %s", info->location));
1704     GST_DEBUG_OBJECT (sink, "connecting (%s)...", info->location);
1705     if ((res =
1706             gst_rtsp_connection_connect (info->connection,
1707                 sink->ptcp_timeout)) < 0)
1708       goto could_not_connect;
1709
1710     info->connected = TRUE;
1711   }
1712   return GST_RTSP_OK;
1713
1714   /* ERRORS */
1715 parse_error:
1716   {
1717     GST_ERROR_OBJECT (sink, "No valid RTSP URL was provided");
1718     return res;
1719   }
1720 could_not_create:
1721   {
1722     gchar *str = gst_rtsp_strresult (res);
1723     GST_ERROR_OBJECT (sink, "Could not create connection. (%s)", str);
1724     g_free (str);
1725     return res;
1726   }
1727 could_not_connect:
1728   {
1729     gchar *str = gst_rtsp_strresult (res);
1730     GST_ERROR_OBJECT (sink, "Could not connect to server. (%s)", str);
1731     g_free (str);
1732     return res;
1733   }
1734 }
1735
1736 static GstRTSPResult
1737 gst_rtsp_conninfo_close (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1738     gboolean free)
1739 {
1740   GST_RTSP_STATE_LOCK (sink);
1741   if (info->connected) {
1742     GST_DEBUG_OBJECT (sink, "closing connection...");
1743     gst_rtsp_connection_close (info->connection);
1744     info->connected = FALSE;
1745   }
1746   if (free && info->connection) {
1747     /* free connection */
1748     GST_DEBUG_OBJECT (sink, "freeing connection...");
1749     gst_rtsp_connection_free (info->connection);
1750     info->connection = NULL;
1751   }
1752   GST_RTSP_STATE_UNLOCK (sink);
1753   return GST_RTSP_OK;
1754 }
1755
1756 static GstRTSPResult
1757 gst_rtsp_conninfo_reconnect (GstRTSPClientSink * sink, GstRTSPConnInfo * info,
1758     gboolean async)
1759 {
1760   GstRTSPResult res;
1761
1762   GST_DEBUG_OBJECT (sink, "reconnecting connection...");
1763   gst_rtsp_conninfo_close (sink, info, FALSE);
1764   res = gst_rtsp_conninfo_connect (sink, info, async);
1765
1766   return res;
1767 }
1768
1769 static void
1770 gst_rtsp_client_sink_connection_flush (GstRTSPClientSink * sink, gboolean flush)
1771 {
1772   GList *walk;
1773
1774   GST_DEBUG_OBJECT (sink, "set flushing %d", flush);
1775   g_mutex_lock (&sink->preroll_lock);
1776   if (sink->conninfo.connection && sink->conninfo.flushing != flush) {
1777     GST_DEBUG_OBJECT (sink, "connection flush");
1778     gst_rtsp_connection_flush (sink->conninfo.connection, flush);
1779     sink->conninfo.flushing = flush;
1780   }
1781   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
1782     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
1783     if (stream->conninfo.connection && stream->conninfo.flushing != flush) {
1784       GST_DEBUG_OBJECT (sink, "stream %p flush", stream);
1785       gst_rtsp_connection_flush (stream->conninfo.connection, flush);
1786       stream->conninfo.flushing = flush;
1787     }
1788   }
1789   g_cond_broadcast (&sink->preroll_cond);
1790   g_mutex_unlock (&sink->preroll_lock);
1791 }
1792
1793 static GstRTSPResult
1794 gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink,
1795     GstRTSPMessage * msg, GstRTSPMethod method, const gchar * uri)
1796 {
1797   GstRTSPResult res;
1798
1799   res = gst_rtsp_message_init_request (msg, method, uri);
1800   if (res < 0)
1801     return res;
1802
1803   /* set user-agent */
1804   if (sink->user_agent)
1805     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_USER_AGENT,
1806         sink->user_agent);
1807
1808   return res;
1809 }
1810
1811 /* FIXME, handle server request, reply with OK, for now */
1812 static GstRTSPResult
1813 gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
1814     GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
1815 {
1816   GstRTSPMessage response = { 0 };
1817   GstRTSPResult res;
1818
1819   GST_DEBUG_OBJECT (sink, "got server request message");
1820
1821   if (sink->debug)
1822     gst_rtsp_message_dump (request);
1823
1824   /* default implementation, send OK */
1825   GST_DEBUG_OBJECT (sink, "prepare OK reply");
1826   res =
1827       gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, "OK",
1828       request);
1829   if (res < 0)
1830     goto send_error;
1831
1832   /* let app parse and reply */
1833   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_HANDLE_REQUEST],
1834       0, request, &response);
1835
1836   if (sink->debug)
1837     gst_rtsp_message_dump (&response);
1838
1839   res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL);
1840   if (res < 0)
1841     goto send_error;
1842
1843   gst_rtsp_message_unset (&response);
1844
1845   return GST_RTSP_OK;
1846
1847   /* ERRORS */
1848 send_error:
1849   {
1850     gst_rtsp_message_unset (&response);
1851     return res;
1852   }
1853 }
1854
1855 /* send server keep-alive */
1856 static GstRTSPResult
1857 gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink)
1858 {
1859   GstRTSPMessage request = { 0 };
1860   GstRTSPResult res;
1861   GstRTSPMethod method;
1862   const gchar *control;
1863
1864   if (sink->do_rtsp_keep_alive == FALSE) {
1865     GST_DEBUG_OBJECT (sink, "do-rtsp-keep-alive is FALSE, not sending.");
1866     gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
1867     return GST_RTSP_OK;
1868   }
1869
1870   GST_DEBUG_OBJECT (sink, "creating server keep-alive");
1871
1872   /* find a method to use for keep-alive */
1873   if (sink->methods & GST_RTSP_GET_PARAMETER)
1874     method = GST_RTSP_GET_PARAMETER;
1875   else
1876     method = GST_RTSP_OPTIONS;
1877
1878   control = get_aggregate_control (sink);
1879   if (control == NULL)
1880     goto no_control;
1881
1882   res = gst_rtsp_client_sink_init_request (sink, &request, method, control);
1883   if (res < 0)
1884     goto send_error;
1885
1886   if (sink->debug)
1887     gst_rtsp_message_dump (&request);
1888
1889   res =
1890       gst_rtsp_client_sink_connection_send (sink, &sink->conninfo,
1891       &request, NULL);
1892   if (res < 0)
1893     goto send_error;
1894
1895   gst_rtsp_connection_reset_timeout (sink->conninfo.connection);
1896   gst_rtsp_message_unset (&request);
1897
1898   return GST_RTSP_OK;
1899
1900   /* ERRORS */
1901 no_control:
1902   {
1903     GST_WARNING_OBJECT (sink, "no control url to send keepalive");
1904     return GST_RTSP_OK;
1905   }
1906 send_error:
1907   {
1908     gchar *str = gst_rtsp_strresult (res);
1909
1910     gst_rtsp_message_unset (&request);
1911     GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, (NULL),
1912         ("Could not send keep-alive. (%s)", str));
1913     g_free (str);
1914     return res;
1915   }
1916 }
1917
1918 static GstFlowReturn
1919 gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
1920 {
1921   GstRTSPResult res;
1922   GstRTSPMessage message = { 0 };
1923   gint retry = 0;
1924
1925   while (TRUE) {
1926     GTimeVal tv_timeout;
1927
1928     /* get the next timeout interval */
1929     gst_rtsp_connection_next_timeout (sink->conninfo.connection, &tv_timeout);
1930
1931     GST_DEBUG_OBJECT (sink, "doing receive with timeout %d seconds",
1932         (gint) tv_timeout.tv_sec);
1933
1934     gst_rtsp_message_unset (&message);
1935
1936     /* we should continue reading the TCP socket because the server might
1937      * send us requests. When the session timeout expires, we need to send a
1938      * keep-alive request to keep the session open. */
1939     res =
1940         gst_rtsp_client_sink_connection_receive (sink,
1941         &sink->conninfo, &message, &tv_timeout);
1942
1943     switch (res) {
1944       case GST_RTSP_OK:
1945         GST_DEBUG_OBJECT (sink, "we received a server message");
1946         break;
1947       case GST_RTSP_EINTR:
1948         /* we got interrupted, see what we have to do */
1949         goto interrupt;
1950       case GST_RTSP_ETIMEOUT:
1951         /* send keep-alive, ignore the result, a warning will be posted. */
1952         GST_DEBUG_OBJECT (sink, "timeout, sending keep-alive");
1953         if ((res =
1954                 gst_rtsp_client_sink_send_keep_alive (sink)) == GST_RTSP_EINTR)
1955           goto interrupt;
1956         continue;
1957       case GST_RTSP_EEOF:
1958         /* server closed the connection. not very fatal for UDP, reconnect and
1959          * see what happens. */
1960         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
1961             ("The server closed the connection."));
1962         if (sink->udp_reconnect) {
1963           if ((res =
1964                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
1965                       FALSE)) < 0)
1966             goto connect_error;
1967         } else {
1968           goto server_eof;
1969         }
1970         continue;
1971         break;
1972       case GST_RTSP_ENET:
1973         GST_DEBUG_OBJECT (sink, "An ethernet problem occured.");
1974       default:
1975         GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
1976             ("Unhandled return value %d.", res));
1977         goto receive_error;
1978     }
1979
1980     switch (message.type) {
1981       case GST_RTSP_MESSAGE_REQUEST:
1982         /* server sends us a request message, handle it */
1983         res =
1984             gst_rtsp_client_sink_handle_request (sink,
1985             &sink->conninfo, &message);
1986         if (res == GST_RTSP_EEOF)
1987           goto server_eof;
1988         else if (res < 0)
1989           goto handle_request_failed;
1990         break;
1991       case GST_RTSP_MESSAGE_RESPONSE:
1992         /* we ignore response and data messages */
1993         GST_DEBUG_OBJECT (sink, "ignoring response message");
1994         if (sink->debug)
1995           gst_rtsp_message_dump (&message);
1996         if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) {
1997           GST_DEBUG_OBJECT (sink, "but is Unauthorized response ...");
1998           if (gst_rtsp_client_sink_setup_auth (sink, &message) && !(retry++)) {
1999             GST_DEBUG_OBJECT (sink, "so retrying keep-alive");
2000             if ((res =
2001                     gst_rtsp_client_sink_send_keep_alive (sink)) ==
2002                 GST_RTSP_EINTR)
2003               goto interrupt;
2004           }
2005         } else {
2006           retry = 0;
2007         }
2008         break;
2009       case GST_RTSP_MESSAGE_DATA:
2010         /* we ignore response and data messages */
2011         GST_DEBUG_OBJECT (sink, "ignoring data message");
2012         break;
2013       default:
2014         GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2015             message.type);
2016         break;
2017     }
2018   }
2019   g_assert_not_reached ();
2020
2021   /* we get here when the connection got interrupted */
2022 interrupt:
2023   {
2024     gst_rtsp_message_unset (&message);
2025     GST_DEBUG_OBJECT (sink, "got interrupted");
2026     return GST_FLOW_FLUSHING;
2027   }
2028 connect_error:
2029   {
2030     gchar *str = gst_rtsp_strresult (res);
2031     GstFlowReturn ret;
2032
2033     sink->conninfo.connected = FALSE;
2034     if (res != GST_RTSP_EINTR) {
2035       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
2036           ("Could not connect to server. (%s)", str));
2037       g_free (str);
2038       ret = GST_FLOW_ERROR;
2039     } else {
2040       ret = GST_FLOW_FLUSHING;
2041     }
2042     return ret;
2043   }
2044 receive_error:
2045   {
2046     gchar *str = gst_rtsp_strresult (res);
2047
2048     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2049         ("Could not receive message. (%s)", str));
2050     g_free (str);
2051     return GST_FLOW_ERROR;
2052   }
2053 handle_request_failed:
2054   {
2055     gchar *str = gst_rtsp_strresult (res);
2056     GstFlowReturn ret;
2057
2058     gst_rtsp_message_unset (&message);
2059     if (res != GST_RTSP_EINTR) {
2060       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2061           ("Could not handle server message. (%s)", str));
2062       g_free (str);
2063       ret = GST_FLOW_ERROR;
2064     } else {
2065       ret = GST_FLOW_FLUSHING;
2066     }
2067     return ret;
2068   }
2069 server_eof:
2070   {
2071     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2072     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2073         ("The server closed the connection."));
2074     sink->conninfo.connected = FALSE;
2075     gst_rtsp_message_unset (&message);
2076     return GST_FLOW_EOS;
2077   }
2078 }
2079
2080 static GstRTSPResult
2081 gst_rtsp_client_sink_reconnect (GstRTSPClientSink * sink, gboolean async)
2082 {
2083   GstRTSPResult res = GST_RTSP_OK;
2084   gboolean restart = FALSE;
2085
2086   GST_DEBUG_OBJECT (sink, "doing reconnect");
2087
2088   GST_FIXME_OBJECT (sink, "Reconnection is not yet implemented");
2089
2090   /* no need to restart, we're done */
2091   if (!restart)
2092     goto done;
2093
2094   /* we can try only TCP now */
2095   sink->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
2096
2097   /* close and cleanup our state */
2098   if ((res = gst_rtsp_client_sink_close (sink, async, FALSE)) < 0)
2099     goto done;
2100
2101   /* see if we have TCP left to try. Also don't try TCP when we were configured
2102    * with an SDP. */
2103   if (!(sink->protocols & GST_RTSP_LOWER_TRANS_TCP) || sink->from_sdp)
2104     goto no_protocols;
2105
2106   /* We post a warning message now to inform the user
2107    * that nothing happened. It's most likely a firewall thing. */
2108   GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2109       ("Could not receive any UDP packets for %.4f seconds, maybe your "
2110           "firewall is blocking it. Retrying using a TCP connection.",
2111           gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2112
2113   /* open new connection using tcp */
2114   if (gst_rtsp_client_sink_open (sink, async) < 0)
2115     goto open_failed;
2116
2117   /* start recording */
2118   if (gst_rtsp_client_sink_record (sink, async) < 0)
2119     goto play_failed;
2120
2121 done:
2122   return res;
2123
2124   /* ERRORS */
2125 no_protocols:
2126   {
2127     sink->cur_protocols = 0;
2128     /* no transport possible, post an error and stop */
2129     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2130         ("Could not receive any UDP packets for %.4f seconds, maybe your "
2131             "firewall is blocking it. No other protocols to try.",
2132             gst_guint64_to_gdouble (sink->udp_timeout / 1000000.0)));
2133     return GST_RTSP_ERROR;
2134   }
2135 open_failed:
2136   {
2137     GST_DEBUG_OBJECT (sink, "open failed");
2138     return GST_RTSP_OK;
2139   }
2140 play_failed:
2141   {
2142     GST_DEBUG_OBJECT (sink, "play failed");
2143     return GST_RTSP_OK;
2144   }
2145 }
2146
2147 static void
2148 gst_rtsp_client_sink_loop_start_cmd (GstRTSPClientSink * sink, gint cmd)
2149 {
2150   switch (cmd) {
2151     case CMD_OPEN:
2152       GST_ELEMENT_PROGRESS (sink, START, "open", ("Opening Stream"));
2153       break;
2154     case CMD_RECORD:
2155       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending RECORD request"));
2156       break;
2157     case CMD_PAUSE:
2158       GST_ELEMENT_PROGRESS (sink, START, "request", ("Sending PAUSE request"));
2159       break;
2160     case CMD_CLOSE:
2161       GST_ELEMENT_PROGRESS (sink, START, "close", ("Closing Stream"));
2162       break;
2163     default:
2164       break;
2165   }
2166 }
2167
2168 static void
2169 gst_rtsp_client_sink_loop_complete_cmd (GstRTSPClientSink * sink, gint cmd)
2170 {
2171   switch (cmd) {
2172     case CMD_OPEN:
2173       GST_ELEMENT_PROGRESS (sink, COMPLETE, "open", ("Opened Stream"));
2174       break;
2175     case CMD_RECORD:
2176       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent RECORD request"));
2177       break;
2178     case CMD_PAUSE:
2179       GST_ELEMENT_PROGRESS (sink, COMPLETE, "request", ("Sent PAUSE request"));
2180       break;
2181     case CMD_CLOSE:
2182       GST_ELEMENT_PROGRESS (sink, COMPLETE, "close", ("Closed Stream"));
2183       break;
2184     default:
2185       break;
2186   }
2187 }
2188
2189 static void
2190 gst_rtsp_client_sink_loop_cancel_cmd (GstRTSPClientSink * sink, gint cmd)
2191 {
2192   switch (cmd) {
2193     case CMD_OPEN:
2194       GST_ELEMENT_PROGRESS (sink, CANCELED, "open", ("Open canceled"));
2195       break;
2196     case CMD_RECORD:
2197       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("RECORD canceled"));
2198       break;
2199     case CMD_PAUSE:
2200       GST_ELEMENT_PROGRESS (sink, CANCELED, "request", ("PAUSE canceled"));
2201       break;
2202     case CMD_CLOSE:
2203       GST_ELEMENT_PROGRESS (sink, CANCELED, "close", ("Close canceled"));
2204       break;
2205     default:
2206       break;
2207   }
2208 }
2209
2210 static void
2211 gst_rtsp_client_sink_loop_error_cmd (GstRTSPClientSink * sink, gint cmd)
2212 {
2213   switch (cmd) {
2214     case CMD_OPEN:
2215       GST_ELEMENT_PROGRESS (sink, ERROR, "open", ("Open failed"));
2216       break;
2217     case CMD_RECORD:
2218       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("RECORD failed"));
2219       break;
2220     case CMD_PAUSE:
2221       GST_ELEMENT_PROGRESS (sink, ERROR, "request", ("PAUSE failed"));
2222       break;
2223     case CMD_CLOSE:
2224       GST_ELEMENT_PROGRESS (sink, ERROR, "close", ("Close failed"));
2225       break;
2226     default:
2227       break;
2228   }
2229 }
2230
2231 static void
2232 gst_rtsp_client_sink_loop_end_cmd (GstRTSPClientSink * sink, gint cmd,
2233     GstRTSPResult ret)
2234 {
2235   if (ret == GST_RTSP_OK)
2236     gst_rtsp_client_sink_loop_complete_cmd (sink, cmd);
2237   else if (ret == GST_RTSP_EINTR)
2238     gst_rtsp_client_sink_loop_cancel_cmd (sink, cmd);
2239   else
2240     gst_rtsp_client_sink_loop_error_cmd (sink, cmd);
2241 }
2242
2243 static gboolean
2244 gst_rtsp_client_sink_loop_send_cmd (GstRTSPClientSink * sink, gint cmd,
2245     gint mask)
2246 {
2247   gint old;
2248   gboolean flushed = FALSE;
2249
2250   /* start new request */
2251   gst_rtsp_client_sink_loop_start_cmd (sink, cmd);
2252
2253   GST_DEBUG_OBJECT (sink, "sending cmd %s", cmd_to_string (cmd));
2254
2255   GST_OBJECT_LOCK (sink);
2256   old = sink->pending_cmd;
2257   if (old == CMD_RECONNECT) {
2258     GST_DEBUG_OBJECT (sink, "ignore, we were reconnecting");
2259     cmd = CMD_RECONNECT;
2260   }
2261   if (old != CMD_WAIT) {
2262     sink->pending_cmd = CMD_WAIT;
2263     GST_OBJECT_UNLOCK (sink);
2264     /* cancel previous request */
2265     GST_DEBUG_OBJECT (sink, "cancel previous request %s", cmd_to_string (old));
2266     gst_rtsp_client_sink_loop_cancel_cmd (sink, old);
2267     GST_OBJECT_LOCK (sink);
2268   }
2269   sink->pending_cmd = cmd;
2270   /* interrupt if allowed */
2271   if (sink->busy_cmd & mask) {
2272     GST_DEBUG_OBJECT (sink, "connection flush busy %s",
2273         cmd_to_string (sink->busy_cmd));
2274     gst_rtsp_client_sink_connection_flush (sink, TRUE);
2275     flushed = TRUE;
2276   } else {
2277     GST_DEBUG_OBJECT (sink, "not interrupting busy cmd %s",
2278         cmd_to_string (sink->busy_cmd));
2279   }
2280   if (sink->task)
2281     gst_task_start (sink->task);
2282   GST_OBJECT_UNLOCK (sink);
2283
2284   return flushed;
2285 }
2286
2287 static gboolean
2288 gst_rtsp_client_sink_loop (GstRTSPClientSink * sink)
2289 {
2290   GstFlowReturn ret;
2291
2292   if (!sink->conninfo.connection || !sink->conninfo.connected)
2293     goto no_connection;
2294
2295   ret = gst_rtsp_client_sink_loop_rx (sink);
2296   if (ret != GST_FLOW_OK)
2297     goto pause;
2298
2299   return TRUE;
2300
2301   /* ERRORS */
2302 no_connection:
2303   {
2304     GST_WARNING_OBJECT (sink, "we are not connected");
2305     ret = GST_FLOW_FLUSHING;
2306     goto pause;
2307   }
2308 pause:
2309   {
2310     const gchar *reason = gst_flow_get_name (ret);
2311
2312     GST_DEBUG_OBJECT (sink, "pausing task, reason %s", reason);
2313     gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_LOOP);
2314     return FALSE;
2315   }
2316 }
2317
2318 #ifndef GST_DISABLE_GST_DEBUG
2319 static const gchar *
2320 gst_rtsp_auth_method_to_string (GstRTSPAuthMethod method)
2321 {
2322   gint index = 0;
2323
2324   while (method != 0) {
2325     index++;
2326     method >>= 1;
2327   }
2328   switch (index) {
2329     case 0:
2330       return "None";
2331     case 1:
2332       return "Basic";
2333     case 2:
2334       return "Digest";
2335   }
2336
2337   return "Unknown";
2338 }
2339 #endif
2340
2341 /* Parse a WWW-Authenticate Response header and determine the
2342  * available authentication methods
2343  *
2344  * This code should also cope with the fact that each WWW-Authenticate
2345  * header can contain multiple challenge methods + tokens
2346  *
2347  * At the moment, for Basic auth, we just do a minimal check and don't
2348  * even parse out the realm */
2349 static void
2350 gst_rtsp_client_sink_parse_auth_hdr (GstRTSPMessage * response,
2351     GstRTSPAuthMethod * methods, GstRTSPConnection * conn, gboolean * stale)
2352 {
2353   GstRTSPAuthCredential **credentials, **credential;
2354
2355   g_return_if_fail (response != NULL);
2356   g_return_if_fail (methods != NULL);
2357   g_return_if_fail (stale != NULL);
2358
2359   credentials =
2360       gst_rtsp_message_parse_auth_credentials (response,
2361       GST_RTSP_HDR_WWW_AUTHENTICATE);
2362   if (!credentials)
2363     return;
2364
2365   credential = credentials;
2366   while (*credential) {
2367     if ((*credential)->scheme == GST_RTSP_AUTH_BASIC) {
2368       *methods |= GST_RTSP_AUTH_BASIC;
2369     } else if ((*credential)->scheme == GST_RTSP_AUTH_DIGEST) {
2370       GstRTSPAuthParam **param = (*credential)->params;
2371
2372       *methods |= GST_RTSP_AUTH_DIGEST;
2373
2374       gst_rtsp_connection_clear_auth_params (conn);
2375       *stale = FALSE;
2376
2377       while (*param) {
2378         if (strcmp ((*param)->name, "stale") == 0
2379             && g_ascii_strcasecmp ((*param)->value, "TRUE") == 0)
2380           *stale = TRUE;
2381         gst_rtsp_connection_set_auth_param (conn, (*param)->name,
2382             (*param)->value);
2383         param++;
2384       }
2385     }
2386
2387     credential++;
2388   }
2389
2390   gst_rtsp_auth_credentials_free (credentials);
2391 }
2392
2393 /**
2394  * gst_rtsp_client_sink_setup_auth:
2395  * @src: the rtsp source
2396  *
2397  * Configure a username and password and auth method on the
2398  * connection object based on a response we received from the
2399  * peer.
2400  *
2401  * Currently, this requires that a username and password were supplied
2402  * in the uri. In the future, they may be requested on demand by sending
2403  * a message up the bus.
2404  *
2405  * Returns: TRUE if authentication information could be set up correctly.
2406  */
2407 static gboolean
2408 gst_rtsp_client_sink_setup_auth (GstRTSPClientSink * sink,
2409     GstRTSPMessage * response)
2410 {
2411   gchar *user = NULL;
2412   gchar *pass = NULL;
2413   GstRTSPAuthMethod avail_methods = GST_RTSP_AUTH_NONE;
2414   GstRTSPAuthMethod method;
2415   GstRTSPResult auth_result;
2416   GstRTSPUrl *url;
2417   GstRTSPConnection *conn;
2418   gboolean stale = FALSE;
2419
2420   conn = sink->conninfo.connection;
2421
2422   /* Identify the available auth methods and see if any are supported */
2423   gst_rtsp_client_sink_parse_auth_hdr (response, &avail_methods, conn, &stale);
2424
2425   if (avail_methods == GST_RTSP_AUTH_NONE)
2426     goto no_auth_available;
2427
2428   /* For digest auth, if the response indicates that the session
2429    * data are stale, we just update them in the connection object and
2430    * return TRUE to retry the request */
2431   if (stale)
2432     sink->tried_url_auth = FALSE;
2433
2434   url = gst_rtsp_connection_get_url (conn);
2435
2436   /* Do we have username and password available? */
2437   if (url != NULL && !sink->tried_url_auth && url->user != NULL
2438       && url->passwd != NULL) {
2439     user = url->user;
2440     pass = url->passwd;
2441     sink->tried_url_auth = TRUE;
2442     GST_DEBUG_OBJECT (sink,
2443         "Attempting authentication using credentials from the URL");
2444   } else {
2445     user = sink->user_id;
2446     pass = sink->user_pw;
2447     GST_DEBUG_OBJECT (sink,
2448         "Attempting authentication using credentials from the properties");
2449   }
2450
2451   /* FIXME: If the url didn't contain username and password or we tried them
2452    * already, request a username and passwd from the application via some kind
2453    * of credentials request message */
2454
2455   /* If we don't have a username and passwd at this point, bail out. */
2456   if (user == NULL || pass == NULL)
2457     goto no_user_pass;
2458
2459   /* Try to configure for each available authentication method, strongest to
2460    * weakest */
2461   for (method = GST_RTSP_AUTH_MAX; method != GST_RTSP_AUTH_NONE; method >>= 1) {
2462     /* Check if this method is available on the server */
2463     if ((method & avail_methods) == 0)
2464       continue;
2465
2466     /* Pass the credentials to the connection to try on the next request */
2467     auth_result = gst_rtsp_connection_set_auth (conn, method, user, pass);
2468     /* INVAL indicates an invalid username/passwd were supplied, so we'll just
2469      * ignore it and end up retrying later */
2470     if (auth_result == GST_RTSP_OK || auth_result == GST_RTSP_EINVAL) {
2471       GST_DEBUG_OBJECT (sink, "Attempting %s authentication",
2472           gst_rtsp_auth_method_to_string (method));
2473       break;
2474     }
2475   }
2476
2477   if (method == GST_RTSP_AUTH_NONE)
2478     goto no_auth_available;
2479
2480   return TRUE;
2481
2482 no_auth_available:
2483   {
2484     /* Output an error indicating that we couldn't connect because there were
2485      * no supported authentication protocols */
2486     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
2487         ("No supported authentication protocol was found"));
2488     return FALSE;
2489   }
2490 no_user_pass:
2491   {
2492     /* We don't fire an error message, we just return FALSE and let the
2493      * normal NOT_AUTHORIZED error be propagated */
2494     return FALSE;
2495   }
2496 }
2497
2498 static GstRTSPResult
2499 gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
2500     GstRTSPConnInfo * conninfo, GstRTSPMessage * request,
2501     GstRTSPMessage * response, GstRTSPStatusCode * code)
2502 {
2503   GstRTSPResult res;
2504   GstRTSPStatusCode thecode;
2505   gchar *content_base = NULL;
2506   gint try = 0;
2507
2508 again:
2509   GST_DEBUG_OBJECT (sink, "sending message");
2510
2511   if (sink->debug)
2512     gst_rtsp_message_dump (request);
2513
2514   g_mutex_lock (&sink->send_lock);
2515
2516   res =
2517       gst_rtsp_client_sink_connection_send (sink, conninfo, request,
2518       sink->ptcp_timeout);
2519   if (res < 0) {
2520     g_mutex_unlock (&sink->send_lock);
2521     goto send_error;
2522   }
2523
2524   gst_rtsp_connection_reset_timeout (conninfo->connection);
2525
2526   /* See if we should handle the response */
2527   if (response == NULL) {
2528     g_mutex_unlock (&sink->send_lock);
2529     return GST_RTSP_OK;
2530   }
2531 next:
2532   res =
2533       gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
2534       sink->ptcp_timeout);
2535
2536   g_mutex_unlock (&sink->send_lock);
2537
2538   if (res < 0)
2539     goto receive_error;
2540
2541   if (sink->debug)
2542     gst_rtsp_message_dump (response);
2543
2544
2545   switch (response->type) {
2546     case GST_RTSP_MESSAGE_REQUEST:
2547       res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
2548       if (res == GST_RTSP_EEOF)
2549         goto server_eof;
2550       else if (res < 0)
2551         goto handle_request_failed;
2552       g_mutex_lock (&sink->send_lock);
2553       goto next;
2554     case GST_RTSP_MESSAGE_RESPONSE:
2555       /* ok, a response is good */
2556       GST_DEBUG_OBJECT (sink, "received response message");
2557       break;
2558     case GST_RTSP_MESSAGE_DATA:
2559       /* we ignore data messages */
2560       GST_DEBUG_OBJECT (sink, "ignoring data message");
2561       g_mutex_lock (&sink->send_lock);
2562       goto next;
2563     default:
2564       GST_WARNING_OBJECT (sink, "ignoring unknown message type %d",
2565           response->type);
2566       g_mutex_lock (&sink->send_lock);
2567       goto next;
2568   }
2569
2570   thecode = response->type_data.response.code;
2571
2572   GST_DEBUG_OBJECT (sink, "got response message %d", thecode);
2573
2574   /* if the caller wanted the result code, we store it. */
2575   if (code)
2576     *code = thecode;
2577
2578   /* If the request didn't succeed, bail out before doing any more */
2579   if (thecode != GST_RTSP_STS_OK)
2580     return GST_RTSP_OK;
2581
2582   /* store new content base if any */
2583   gst_rtsp_message_get_header (response, GST_RTSP_HDR_CONTENT_BASE,
2584       &content_base, 0);
2585   if (content_base) {
2586     g_free (sink->content_base);
2587     sink->content_base = g_strdup (content_base);
2588   }
2589
2590   return GST_RTSP_OK;
2591
2592   /* ERRORS */
2593 send_error:
2594   {
2595     gchar *str = gst_rtsp_strresult (res);
2596
2597     if (res != GST_RTSP_EINTR) {
2598       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
2599           ("Could not send message. (%s)", str));
2600     } else {
2601       GST_WARNING_OBJECT (sink, "send interrupted");
2602     }
2603     g_free (str);
2604     return res;
2605   }
2606 receive_error:
2607   {
2608     switch (res) {
2609       case GST_RTSP_EEOF:
2610         GST_WARNING_OBJECT (sink, "server closed connection");
2611         if ((try == 0) && !sink->interleaved && sink->udp_reconnect) {
2612           try++;
2613           /* if reconnect succeeds, try again */
2614           if ((res =
2615                   gst_rtsp_conninfo_reconnect (sink, &sink->conninfo,
2616                       FALSE)) == 0)
2617             goto again;
2618         }
2619         /* only try once after reconnect, then fallthrough and error out */
2620       default:
2621       {
2622         gchar *str = gst_rtsp_strresult (res);
2623
2624         if (res != GST_RTSP_EINTR) {
2625           GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2626               ("Could not receive message. (%s)", str));
2627         } else {
2628           GST_WARNING_OBJECT (sink, "receive interrupted");
2629         }
2630         g_free (str);
2631         break;
2632       }
2633     }
2634     return res;
2635   }
2636 handle_request_failed:
2637   {
2638     /* ERROR was posted */
2639     gst_rtsp_message_unset (response);
2640     return res;
2641   }
2642 server_eof:
2643   {
2644     GST_DEBUG_OBJECT (sink, "we got an eof from the server");
2645     GST_ELEMENT_WARNING (sink, RESOURCE, READ, (NULL),
2646         ("The server closed the connection."));
2647     gst_rtsp_message_unset (response);
2648     return res;
2649   }
2650 }
2651
2652 static void
2653 gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state)
2654 {
2655   GST_DEBUG_OBJECT (sink, "Setting internal state to %s",
2656       gst_element_state_get_name (state));
2657   gst_element_set_state (GST_ELEMENT (sink->internal_bin), state);
2658 }
2659
2660 /**
2661  * gst_rtsp_client_sink_send:
2662  * @src: the rtsp source
2663  * @conn: the connection to send on
2664  * @request: must point to a valid request
2665  * @response: must point to an empty #GstRTSPMessage
2666  * @code: an optional code result
2667  *
2668  * send @request and retrieve the response in @response. optionally @code can be
2669  * non-NULL in which case it will contain the status code of the response.
2670  *
2671  * If This function returns #GST_RTSP_OK, @response will contain a valid response
2672  * message that should be cleaned with gst_rtsp_message_unset() after usage.
2673  *
2674  * If @code is NULL, this function will return #GST_RTSP_ERROR (with an invalid
2675  * @response message) if the response code was not 200 (OK).
2676  *
2677  * If the attempt results in an authentication failure, then this will attempt
2678  * to retrieve authentication credentials via gst_rtsp_client_sink_setup_auth and retry
2679  * the request.
2680  *
2681  * Returns: #GST_RTSP_OK if the processing was successful.
2682  */
2683 static GstRTSPResult
2684 gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
2685     GstRTSPMessage * request, GstRTSPMessage * response,
2686     GstRTSPStatusCode * code)
2687 {
2688   GstRTSPStatusCode int_code = GST_RTSP_STS_OK;
2689   GstRTSPResult res = GST_RTSP_ERROR;
2690   gint count;
2691   gboolean retry;
2692   GstRTSPMethod method = GST_RTSP_INVALID;
2693
2694   count = 0;
2695   do {
2696     retry = FALSE;
2697
2698     /* make sure we don't loop forever */
2699     if (count++ > 8)
2700       break;
2701
2702     /* save method so we can disable it when the server complains */
2703     method = request->type_data.request.method;
2704
2705     if ((res =
2706             gst_rtsp_client_sink_try_send (sink, conninfo, request, response,
2707                 &int_code)) < 0)
2708       goto error;
2709
2710     switch (int_code) {
2711       case GST_RTSP_STS_UNAUTHORIZED:
2712         if (gst_rtsp_client_sink_setup_auth (sink, response)) {
2713           /* Try the request/response again after configuring the auth info
2714            * and loop again */
2715           retry = TRUE;
2716         }
2717         break;
2718       default:
2719         break;
2720     }
2721   } while (retry == TRUE);
2722
2723   /* If the user requested the code, let them handle errors, otherwise
2724    * post an error below */
2725   if (code != NULL)
2726     *code = int_code;
2727   else if (int_code != GST_RTSP_STS_OK)
2728     goto error_response;
2729
2730   return res;
2731
2732   /* ERRORS */
2733 error:
2734   {
2735     GST_DEBUG_OBJECT (sink, "got error %d", res);
2736     return res;
2737   }
2738 error_response:
2739   {
2740     res = GST_RTSP_ERROR;
2741
2742     switch (response->type_data.response.code) {
2743       case GST_RTSP_STS_NOT_FOUND:
2744         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL), ("%s",
2745                 response->type_data.response.reason));
2746         break;
2747       case GST_RTSP_STS_UNAUTHORIZED:
2748         GST_ELEMENT_ERROR (sink, RESOURCE, NOT_AUTHORIZED, (NULL), ("%s",
2749                 response->type_data.response.reason));
2750         break;
2751       case GST_RTSP_STS_MOVED_PERMANENTLY:
2752       case GST_RTSP_STS_MOVE_TEMPORARILY:
2753       {
2754         gchar *new_location;
2755         GstRTSPLowerTrans transports;
2756
2757         GST_DEBUG_OBJECT (sink, "got redirection");
2758         /* if we don't have a Location Header, we must error */
2759         if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_LOCATION,
2760                 &new_location, 0) < 0)
2761           break;
2762
2763         /* When we receive a redirect result, we go back to the INIT state after
2764          * parsing the new URI. The caller should do the needed steps to issue
2765          * a new setup when it detects this state change. */
2766         GST_DEBUG_OBJECT (sink, "redirection to %s", new_location);
2767
2768         /* save current transports */
2769         if (sink->conninfo.url)
2770           transports = sink->conninfo.url->transports;
2771         else
2772           transports = GST_RTSP_LOWER_TRANS_UNKNOWN;
2773
2774         gst_rtsp_client_sink_uri_set_uri (GST_URI_HANDLER (sink), new_location,
2775             NULL);
2776
2777         /* set old transports */
2778         if (sink->conninfo.url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN)
2779           sink->conninfo.url->transports = transports;
2780
2781         sink->need_redirect = TRUE;
2782         sink->state = GST_RTSP_STATE_INIT;
2783         res = GST_RTSP_OK;
2784         break;
2785       }
2786       case GST_RTSP_STS_NOT_ACCEPTABLE:
2787       case GST_RTSP_STS_NOT_IMPLEMENTED:
2788       case GST_RTSP_STS_METHOD_NOT_ALLOWED:
2789         GST_WARNING_OBJECT (sink, "got NOT IMPLEMENTED, disable method %s",
2790             gst_rtsp_method_as_text (method));
2791         sink->methods &= ~method;
2792         res = GST_RTSP_OK;
2793         break;
2794       default:
2795         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
2796             ("Got error response: %d (%s).", response->type_data.response.code,
2797                 response->type_data.response.reason));
2798         break;
2799     }
2800     /* if we return ERROR we should unset the response ourselves */
2801     if (res == GST_RTSP_ERROR)
2802       gst_rtsp_message_unset (response);
2803
2804     return res;
2805   }
2806 }
2807
2808 /* parse the response and collect all the supported methods. We need this
2809  * information so that we don't try to send an unsupported request to the
2810  * server.
2811  */
2812 static gboolean
2813 gst_rtsp_client_sink_parse_methods (GstRTSPClientSink * sink,
2814     GstRTSPMessage * response)
2815 {
2816   GstRTSPHeaderField field;
2817   gchar *respoptions;
2818   gint indx = 0;
2819
2820   /* reset supported methods */
2821   sink->methods = 0;
2822
2823   /* Try Allow Header first */
2824   field = GST_RTSP_HDR_ALLOW;
2825   while (TRUE) {
2826     respoptions = NULL;
2827     gst_rtsp_message_get_header (response, field, &respoptions, indx);
2828     if (indx == 0 && !respoptions) {
2829       /* if no Allow header was found then try the Public header... */
2830       field = GST_RTSP_HDR_PUBLIC;
2831       gst_rtsp_message_get_header (response, field, &respoptions, indx);
2832     }
2833     if (!respoptions)
2834       break;
2835
2836     sink->methods |= gst_rtsp_options_from_text (respoptions);
2837
2838     indx++;
2839   }
2840
2841   if (sink->methods == 0) {
2842     /* neither Allow nor Public are required, assume the server supports
2843      * at least SETUP. */
2844     GST_DEBUG_OBJECT (sink, "could not get OPTIONS");
2845     sink->methods = GST_RTSP_SETUP;
2846   }
2847
2848   /* Even if the server replied, and didn't say it supports
2849    * RECORD|ANNOUNCE, try anyway by assuming it does */
2850   sink->methods |= GST_RTSP_ANNOUNCE | GST_RTSP_RECORD;
2851
2852   if (!(sink->methods & GST_RTSP_SETUP))
2853     goto no_setup;
2854
2855   return TRUE;
2856
2857   /* ERRORS */
2858 no_setup:
2859   {
2860     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ, (NULL),
2861         ("Server does not support SETUP."));
2862     return FALSE;
2863   }
2864 }
2865
2866 static GstRTSPResult
2867 gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink,
2868     gboolean async)
2869 {
2870   GstRTSPResult res;
2871   GstRTSPMessage request = { 0 };
2872   GstRTSPMessage response = { 0 };
2873   GSocket *conn_socket;
2874   GSocketAddress *sa;
2875   GInetAddress *ia;
2876
2877   sink->need_redirect = FALSE;
2878
2879   /* can't continue without a valid url */
2880   if (G_UNLIKELY (sink->conninfo.url == NULL)) {
2881     res = GST_RTSP_EINVAL;
2882     goto no_url;
2883   }
2884   sink->tried_url_auth = FALSE;
2885
2886   if ((res = gst_rtsp_conninfo_connect (sink, &sink->conninfo, async)) < 0)
2887     goto connect_failed;
2888
2889   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
2890   sa = g_socket_get_remote_address (conn_socket, NULL);
2891   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
2892
2893   sink->server_ip = g_inet_address_to_string (ia);
2894
2895   g_object_unref (sa);
2896
2897   /* create OPTIONS */
2898   GST_DEBUG_OBJECT (sink, "create options...");
2899   res =
2900       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_OPTIONS,
2901       sink->conninfo.url_str);
2902   if (res < 0)
2903     goto create_request_failed;
2904
2905   /* send OPTIONS */
2906   GST_DEBUG_OBJECT (sink, "send options...");
2907
2908   if (async)
2909     GST_ELEMENT_PROGRESS (sink, CONTINUE, "open",
2910         ("Retrieving server options"));
2911
2912   if ((res =
2913           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
2914               &response, NULL)) < 0)
2915     goto send_error;
2916
2917   /* parse OPTIONS */
2918   if (!gst_rtsp_client_sink_parse_methods (sink, &response))
2919     goto methods_error;
2920
2921   /* FIXME: Do we need to handle REDIRECT responses for OPTIONS? */
2922
2923   /* clean up any messages */
2924   gst_rtsp_message_unset (&request);
2925   gst_rtsp_message_unset (&response);
2926
2927   return res;
2928
2929   /* ERRORS */
2930 no_url:
2931   {
2932     GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
2933         ("No valid RTSP URL was provided"));
2934     goto cleanup_error;
2935   }
2936 connect_failed:
2937   {
2938     gchar *str = gst_rtsp_strresult (res);
2939
2940     if (res != GST_RTSP_EINTR) {
2941       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, (NULL),
2942           ("Failed to connect. (%s)", str));
2943     } else {
2944       GST_WARNING_OBJECT (sink, "connect interrupted");
2945     }
2946     g_free (str);
2947     goto cleanup_error;
2948   }
2949 create_request_failed:
2950   {
2951     gchar *str = gst_rtsp_strresult (res);
2952
2953     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
2954         ("Could not create request. (%s)", str));
2955     g_free (str);
2956     goto cleanup_error;
2957   }
2958 send_error:
2959   {
2960     /* Don't post a message - the rtsp_send method will have
2961      * taken care of it because we passed NULL for the response code */
2962     goto cleanup_error;
2963   }
2964 methods_error:
2965   {
2966     /* error was posted */
2967     res = GST_RTSP_ERROR;
2968     goto cleanup_error;
2969   }
2970 cleanup_error:
2971   {
2972     if (sink->conninfo.connection) {
2973       GST_DEBUG_OBJECT (sink, "free connection");
2974       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
2975     }
2976     gst_rtsp_message_unset (&request);
2977     gst_rtsp_message_unset (&response);
2978     return res;
2979   }
2980 }
2981
2982 static GstRTSPResult
2983 gst_rtsp_client_sink_open (GstRTSPClientSink * sink, gboolean async)
2984 {
2985   GstRTSPResult ret;
2986
2987   sink->methods =
2988       GST_RTSP_SETUP | GST_RTSP_RECORD | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN;
2989
2990   if ((ret = gst_rtsp_client_sink_connect_to_server (sink, async)) < 0)
2991     goto open_failed;
2992
2993   if (async)
2994     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
2995
2996   /* Collect all our input streams and create
2997    * stream objects before actually returning */
2998   gst_rtsp_client_sink_collect_streams (sink);
2999
3000   return ret;
3001
3002   /* ERRORS */
3003 open_failed:
3004   {
3005     GST_WARNING_OBJECT (sink, "Failed to connect to server");
3006     sink->open_error = TRUE;
3007     if (async)
3008       gst_rtsp_client_sink_loop_end_cmd (sink, CMD_OPEN, ret);
3009     return ret;
3010   }
3011 }
3012
3013 static GstRTSPResult
3014 gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async,
3015     gboolean only_close)
3016 {
3017   GstRTSPMessage request = { 0 };
3018   GstRTSPMessage response = { 0 };
3019   GstRTSPResult res = GST_RTSP_OK;
3020   GList *walk;
3021   const gchar *control;
3022
3023   GST_DEBUG_OBJECT (sink, "TEARDOWN...");
3024
3025   gst_rtsp_client_sink_set_state (sink, GST_STATE_NULL);
3026
3027   if (sink->state < GST_RTSP_STATE_READY) {
3028     GST_DEBUG_OBJECT (sink, "not ready, doing cleanup");
3029     goto close;
3030   }
3031
3032   if (only_close)
3033     goto close;
3034
3035   /* construct a control url */
3036   control = get_aggregate_control (sink);
3037
3038   if (!(sink->methods & (GST_RTSP_RECORD | GST_RTSP_TEARDOWN)))
3039     goto not_supported;
3040
3041   /* stop streaming */
3042   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3043     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3044
3045     if (context->stream_transport)
3046       gst_rtsp_stream_transport_set_active (context->stream_transport, FALSE);
3047
3048     if (context->joined) {
3049       gst_rtsp_stream_leave_bin (context->stream, GST_BIN (sink->internal_bin),
3050           sink->rtpbin);
3051       context->joined = FALSE;
3052     }
3053   }
3054
3055   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3056     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3057     const gchar *setup_url;
3058     GstRTSPConnInfo *info;
3059
3060     GST_DEBUG_OBJECT (sink, "Looking at stream %p for teardown",
3061         context->stream);
3062
3063     /* try aggregate control first but do non-aggregate control otherwise */
3064     if (control)
3065       setup_url = control;
3066     else if ((setup_url = context->conninfo.location) == NULL) {
3067       GST_DEBUG_OBJECT (sink, "Skipping TEARDOWN stream %p - no setup URL",
3068           context->stream);
3069       continue;
3070     }
3071
3072     if (sink->conninfo.connection) {
3073       info = &sink->conninfo;
3074     } else if (context->conninfo.connection) {
3075       info = &context->conninfo;
3076     } else {
3077       continue;
3078     }
3079     if (!info->connected)
3080       goto next;
3081
3082     /* do TEARDOWN */
3083     GST_DEBUG_OBJECT (sink, "Sending teardown for stream %p at URL %s",
3084         context->stream, setup_url);
3085     res =
3086         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_TEARDOWN,
3087         setup_url);
3088     if (res < 0)
3089       goto create_request_failed;
3090
3091     if (async)
3092       GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
3093
3094     if ((res =
3095             gst_rtsp_client_sink_send (sink, info, &request,
3096                 &response, NULL)) < 0)
3097       goto send_error;
3098
3099     /* FIXME, parse result? */
3100     gst_rtsp_message_unset (&request);
3101     gst_rtsp_message_unset (&response);
3102
3103   next:
3104     /* early exit when we did aggregate control */
3105     if (control)
3106       break;
3107   }
3108
3109 close:
3110   /* close connections */
3111   GST_DEBUG_OBJECT (sink, "closing connection...");
3112   gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
3113   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3114     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
3115     gst_rtsp_conninfo_close (sink, &stream->conninfo, TRUE);
3116   }
3117
3118   /* cleanup */
3119   gst_rtsp_client_sink_cleanup (sink);
3120
3121   sink->state = GST_RTSP_STATE_INVALID;
3122
3123   if (async)
3124     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_CLOSE, res);
3125
3126   return res;
3127
3128   /* ERRORS */
3129 create_request_failed:
3130   {
3131     gchar *str = gst_rtsp_strresult (res);
3132
3133     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3134         ("Could not create request. (%s)", str));
3135     g_free (str);
3136     goto close;
3137   }
3138 send_error:
3139   {
3140     gchar *str = gst_rtsp_strresult (res);
3141
3142     gst_rtsp_message_unset (&request);
3143     if (res != GST_RTSP_EINTR) {
3144       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
3145           ("Could not send message. (%s)", str));
3146     } else {
3147       GST_WARNING_OBJECT (sink, "TEARDOWN interrupted");
3148     }
3149     g_free (str);
3150     goto close;
3151   }
3152 not_supported:
3153   {
3154     GST_DEBUG_OBJECT (sink,
3155         "TEARDOWN and PLAY not supported, can't do TEARDOWN");
3156     goto close;
3157   }
3158 }
3159
3160 static gboolean
3161 gst_rtsp_client_sink_configure_manager (GstRTSPClientSink * sink)
3162 {
3163   GstElement *rtpbin;
3164   GstStateChangeReturn ret;
3165
3166   rtpbin = sink->rtpbin;
3167
3168   if (rtpbin == NULL) {
3169     GObjectClass *klass;
3170
3171     rtpbin = gst_element_factory_make ("rtpbin", NULL);
3172     if (rtpbin == NULL)
3173       goto no_rtpbin;
3174
3175     gst_bin_add (GST_BIN_CAST (sink->internal_bin), rtpbin);
3176
3177     sink->rtpbin = rtpbin;
3178
3179     /* Any more settings we should configure on rtpbin here? */
3180     g_object_set (sink->rtpbin, "latency", sink->latency, NULL);
3181
3182     klass = G_OBJECT_GET_CLASS (G_OBJECT (rtpbin));
3183
3184     if (g_object_class_find_property (klass, "ntp-time-source")) {
3185       g_object_set (sink->rtpbin, "ntp-time-source", sink->ntp_time_source,
3186           NULL);
3187     }
3188
3189     if (sink->sdes && g_object_class_find_property (klass, "sdes")) {
3190       g_object_set (sink->rtpbin, "sdes", sink->sdes, NULL);
3191     }
3192
3193     g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_NEW_MANAGER], 0,
3194         sink->rtpbin);
3195   }
3196
3197   ret = gst_element_set_state (rtpbin, GST_STATE_PAUSED);
3198   if (ret == GST_STATE_CHANGE_FAILURE)
3199     goto start_manager_failure;
3200
3201   return TRUE;
3202
3203 no_rtpbin:
3204   {
3205     GST_WARNING ("no rtpbin element");
3206     g_warning ("failed to create element 'rtpbin', check your installation");
3207     return FALSE;
3208   }
3209 start_manager_failure:
3210   {
3211     GST_DEBUG_OBJECT (sink, "could not start session manager");
3212     gst_bin_remove (GST_BIN_CAST (sink->internal_bin), rtpbin);
3213     return FALSE;
3214   }
3215 }
3216
3217 static GstElement *
3218 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPClientSink * sink)
3219 {
3220   GstRTSPStream *stream = NULL;
3221   GstElement *ret = NULL;
3222   GList *walk;
3223
3224   GST_RTSP_STATE_LOCK (sink);
3225   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3226     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3227
3228     if (sessid == gst_rtsp_stream_get_index (context->stream)) {
3229       stream = context->stream;
3230       break;
3231     }
3232   }
3233
3234   if (stream != NULL) {
3235     GST_DEBUG_OBJECT (sink, "Creating aux sender for stream %u", sessid);
3236     ret = gst_rtsp_stream_request_aux_sender (stream, sessid);
3237   }
3238
3239   GST_RTSP_STATE_UNLOCK (sink);
3240
3241   return ret;
3242 }
3243
3244 static gboolean
3245 gst_rtsp_client_sink_collect_streams (GstRTSPClientSink * sink)
3246 {
3247   GstRTSPStreamContext *context;
3248   GList *walk;
3249   const gchar *base;
3250   gboolean has_slash;
3251
3252   GST_DEBUG_OBJECT (sink, "Collecting stream information");
3253
3254   if (!gst_rtsp_client_sink_configure_manager (sink))
3255     return FALSE;
3256
3257   base = get_aggregate_control (sink);
3258   /* check if the base ends with / */
3259   has_slash = g_str_has_suffix (base, "/");
3260
3261   g_mutex_lock (&sink->preroll_lock);
3262   while (sink->contexts == NULL && !sink->conninfo.flushing) {
3263     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3264   }
3265   g_mutex_unlock (&sink->preroll_lock);
3266
3267   /* FIXME: Need different locking - need to protect against pad releases
3268    * and potential state changes ruining things here */
3269   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3270     GstPad *srcpad;
3271
3272     context = (GstRTSPStreamContext *) walk->data;
3273     if (context->stream)
3274       continue;
3275
3276     g_mutex_lock (&sink->preroll_lock);
3277     while (!context->prerolled && !sink->conninfo.flushing) {
3278       GST_DEBUG_OBJECT (sink, "Waiting for caps on stream %d", context->index);
3279       g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3280     }
3281     if (sink->conninfo.flushing) {
3282       g_mutex_unlock (&sink->preroll_lock);
3283       break;
3284     }
3285     g_mutex_unlock (&sink->preroll_lock);
3286
3287     if (context->payloader == NULL)
3288       continue;
3289
3290     srcpad = gst_element_get_static_pad (context->payloader, "src");
3291
3292     GST_DEBUG_OBJECT (sink, "Creating stream object for stream %d",
3293         context->index);
3294     context->stream =
3295         gst_rtsp_client_sink_create_stream (sink, context, context->payloader,
3296         srcpad);
3297
3298     /* concatenate the two strings, insert / when not present */
3299     g_free (context->conninfo.location);
3300     context->conninfo.location =
3301         g_strdup_printf ("%s%sstream=%d", base, has_slash ? "" : "/",
3302         context->index);
3303
3304     if (sink->rtx_time > 0) {
3305       /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
3306       g_signal_connect (sink->rtpbin, "request-aux-sender",
3307           (GCallback) request_aux_sender, sink);
3308     }
3309
3310     if (!gst_rtsp_stream_join_bin (context->stream,
3311             GST_BIN (sink->internal_bin), sink->rtpbin, GST_STATE_PAUSED)) {
3312       goto join_bin_failed;
3313     }
3314     context->joined = TRUE;
3315
3316     /* Let the stream object receive data */
3317     gst_pad_remove_probe (srcpad, context->payloader_block_id);
3318
3319     gst_object_unref (srcpad);
3320   }
3321
3322   /* Now wait for the preroll of the rtp bin */
3323   g_mutex_lock (&sink->preroll_lock);
3324   while (!sink->prerolled && !sink->conninfo.flushing) {
3325     GST_LOG_OBJECT (sink, "Waiting for preroll before continuing");
3326     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3327   }
3328   GST_LOG_OBJECT (sink, "Marking streams as collected");
3329   sink->streams_collected = TRUE;
3330   g_mutex_unlock (&sink->preroll_lock);
3331
3332   return TRUE;
3333
3334 join_bin_failed:
3335
3336   GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3337       ("Could not start stream %d", context->index));
3338   return FALSE;
3339 }
3340
3341 static GstRTSPResult
3342 gst_rtsp_client_sink_create_transports_string (GstRTSPClientSink * sink,
3343     GstRTSPStreamContext * context, GSocketFamily family,
3344     GstRTSPLowerTrans protocols, GstRTSPProfile profiles, gchar ** transports)
3345 {
3346   GString *result;
3347   GstRTSPStream *stream = context->stream;
3348   gboolean first = TRUE;
3349
3350   /* the default RTSP transports */
3351   result = g_string_new ("RTP");
3352
3353   while (profiles != 0) {
3354     if (!first)
3355       g_string_append (result, ",RTP");
3356
3357     if (profiles & GST_RTSP_PROFILE_SAVPF) {
3358       g_string_append (result, "/SAVPF");
3359       profiles &= ~GST_RTSP_PROFILE_SAVPF;
3360     } else if (profiles & GST_RTSP_PROFILE_SAVP) {
3361       g_string_append (result, "/SAVP");
3362       profiles &= ~GST_RTSP_PROFILE_SAVP;
3363     } else if (profiles & GST_RTSP_PROFILE_AVPF) {
3364       g_string_append (result, "/AVPF");
3365       profiles &= ~GST_RTSP_PROFILE_AVPF;
3366     } else if (profiles & GST_RTSP_PROFILE_AVP) {
3367       g_string_append (result, "/AVP");
3368       profiles &= ~GST_RTSP_PROFILE_AVP;
3369     } else {
3370       GST_WARNING_OBJECT (sink, "Unimplemented profile(s) 0x%x", profiles);
3371       break;
3372     }
3373
3374     if (protocols & GST_RTSP_LOWER_TRANS_UDP) {
3375       GstRTSPRange ports;
3376
3377       GST_DEBUG_OBJECT (sink, "adding UDP unicast");
3378       gst_rtsp_stream_get_server_port (stream, &ports, family);
3379
3380       g_string_append_printf (result, "/UDP;unicast;client_port=%d-%d",
3381           ports.min, ports.max);
3382     } else if (protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3383       GstRTSPAddress *addr =
3384           gst_rtsp_stream_get_multicast_address (stream, family);
3385       if (addr) {
3386         GST_DEBUG_OBJECT (sink, "adding UDP multicast");
3387         g_string_append_printf (result, "/UDP;multicast;client_port=%d-%d",
3388             addr->port, addr->port + addr->n_ports - 1);
3389         gst_rtsp_address_free (addr);
3390       }
3391     } else if (protocols & GST_RTSP_LOWER_TRANS_TCP) {
3392       GST_DEBUG_OBJECT (sink, "adding TCP");
3393       g_string_append_printf (result, "/TCP;unicast;interleaved=%d-%d",
3394           sink->free_channel, sink->free_channel + 1);
3395     }
3396
3397     g_string_append (result, ";mode=RECORD");
3398     /* FIXME: Support appending too:
3399        if (sink->append)
3400        g_string_append (result, ";append");
3401      */
3402
3403     first = FALSE;
3404   }
3405
3406   if (first) {
3407     /* No valid transport could be constructed */
3408     GST_ERROR_OBJECT (sink, "No supported profiles configured");
3409     goto fail;
3410   }
3411
3412   *transports = g_string_free (result, FALSE);
3413
3414   GST_DEBUG_OBJECT (sink, "prepared transports %s", GST_STR_NULL (*transports));
3415
3416   return GST_RTSP_OK;
3417 fail:
3418   g_string_free (result, TRUE);
3419   return GST_RTSP_ERROR;
3420 }
3421
3422 static GstCaps *
3423 signal_get_srtcp_params (GstRTSPClientSink * sink,
3424     GstRTSPStreamContext * context)
3425 {
3426   GstCaps *caps = NULL;
3427
3428   g_signal_emit (sink, gst_rtsp_client_sink_signals[SIGNAL_REQUEST_RTCP_KEY], 0,
3429       context->index, &caps);
3430
3431   if (caps != NULL)
3432     GST_DEBUG_OBJECT (sink, "SRTP parameters received");
3433
3434   return caps;
3435 }
3436
3437 static gchar *
3438 gst_rtsp_client_sink_stream_make_keymgmt (GstRTSPClientSink * sink,
3439     GstRTSPStreamContext * context)
3440 {
3441   gchar *base64, *result = NULL;
3442   GstMIKEYMessage *mikey_msg;
3443
3444   context->srtcpparams = signal_get_srtcp_params (sink, context);
3445   if (context->srtcpparams == NULL)
3446     context->srtcpparams = gst_rtsp_stream_get_caps (context->stream);
3447
3448   mikey_msg = gst_mikey_message_new_from_caps (context->srtcpparams);
3449   if (mikey_msg) {
3450     guint send_ssrc;
3451
3452     /* add policy '0' for our SSRC */
3453     gst_rtsp_stream_get_ssrc (context->stream, &send_ssrc);
3454     GST_LOG_OBJECT (sink, "Stream %p ssrc %x", context->stream, send_ssrc);
3455     gst_mikey_message_add_cs_srtp (mikey_msg, 0, send_ssrc, 0);
3456
3457     base64 = gst_mikey_message_base64_encode (mikey_msg);
3458     gst_mikey_message_unref (mikey_msg);
3459
3460     if (base64) {
3461       result = gst_sdp_make_keymgmt (context->conninfo.location, base64);
3462       g_free (base64);
3463     }
3464   }
3465
3466   return result;
3467 }
3468
3469 /* masks to be kept in sync with the hardcoded protocol order of preference
3470  * in code below */
3471 static const guint protocol_masks[] = {
3472   GST_RTSP_LOWER_TRANS_UDP,
3473   GST_RTSP_LOWER_TRANS_UDP_MCAST,
3474   GST_RTSP_LOWER_TRANS_TCP,
3475   0
3476 };
3477
3478 /* Same for profile_masks */
3479 static const guint profile_masks[] = {
3480   GST_RTSP_PROFILE_SAVPF,
3481   GST_RTSP_PROFILE_SAVP,
3482   GST_RTSP_PROFILE_AVPF,
3483   GST_RTSP_PROFILE_AVP,
3484   0
3485 };
3486
3487 static gboolean
3488 do_send_data (GstBuffer * buffer, guint8 channel,
3489     GstRTSPStreamContext * context)
3490 {
3491   GstRTSPClientSink *sink = context->parent;
3492   GstRTSPMessage message = { 0 };
3493   GstRTSPResult res = GST_RTSP_OK;
3494   GstMapInfo map_info;
3495   guint8 *data;
3496   guint usize;
3497
3498   gst_rtsp_message_init_data (&message, channel);
3499
3500   /* FIXME, need some sort of iovec RTSPMessage here */
3501   if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ))
3502     return FALSE;
3503
3504   gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
3505
3506   res =
3507       gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message,
3508       NULL, NULL);
3509
3510   gst_rtsp_message_steal_body (&message, &data, &usize);
3511   gst_buffer_unmap (buffer, &map_info);
3512
3513   gst_rtsp_message_unset (&message);
3514
3515   return res == GST_RTSP_OK;
3516 }
3517
3518 static GstRTSPResult
3519 gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
3520 {
3521   GstRTSPResult res = GST_RTSP_ERROR;
3522   GstRTSPMessage request = { 0 };
3523   GstRTSPMessage response = { 0 };
3524   GstRTSPLowerTrans protocols;
3525   GstRTSPStatusCode code;
3526   GSocketFamily family;
3527   GSocketAddress *sa;
3528   GSocket *conn_socket;
3529   GstRTSPUrl *url;
3530   GList *walk;
3531   gchar *hval;
3532
3533   if (sink->conninfo.connection) {
3534     url = gst_rtsp_connection_get_url (sink->conninfo.connection);
3535     /* we initially allow all configured lower transports. based on the URL
3536      * transports and the replies from the server we narrow them down. */
3537     protocols = url->transports & sink->cur_protocols;
3538   } else {
3539     url = NULL;
3540     protocols = sink->cur_protocols;
3541   }
3542
3543   if (protocols == 0)
3544     goto no_protocols;
3545
3546   GST_RTSP_STATE_LOCK (sink);
3547
3548   if (G_UNLIKELY (sink->contexts == NULL))
3549     goto no_streams;
3550
3551   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3552     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3553     GstRTSPStream *stream;
3554
3555     GstRTSPConnInfo *info;
3556     GstRTSPProfile profiles;
3557     GstRTSPProfile cur_profile;
3558     gchar *transports;
3559     gint retry = 0;
3560     guint profile_mask = 0;
3561     guint mask = 0;
3562     GstCaps *caps;
3563     const GstSDPMedia *media;
3564
3565     stream = context->stream;
3566     profiles = gst_rtsp_stream_get_profiles (stream);
3567
3568     caps = gst_rtsp_stream_get_caps (stream);
3569     if (caps == NULL) {
3570       GST_DEBUG_OBJECT (sink, "skipping stream %p, no caps", stream);
3571       continue;
3572     }
3573     gst_caps_unref (caps);
3574     media = gst_sdp_message_get_media (&sink->cursdp, context->sdp_index);
3575     if (media == NULL) {
3576       GST_DEBUG_OBJECT (sink, "skipping stream %p, no SDP info", stream);
3577       continue;
3578     }
3579
3580     /* skip setup if we have no URL for it */
3581     if (context->conninfo.location == NULL) {
3582       GST_DEBUG_OBJECT (sink, "skipping stream %p, no setup", stream);
3583       continue;
3584     }
3585
3586     if (sink->conninfo.connection == NULL) {
3587       if (!gst_rtsp_conninfo_connect (sink, &context->conninfo, async)) {
3588         GST_DEBUG_OBJECT (sink, "skipping stream %p, failed to connect",
3589             stream);
3590         continue;
3591       }
3592       info = &context->conninfo;
3593     } else {
3594       info = &sink->conninfo;
3595     }
3596     GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
3597         context->conninfo.location);
3598
3599     conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
3600     sa = g_socket_get_local_address (conn_socket, NULL);
3601     family = g_socket_address_get_family (sa);
3602     g_object_unref (sa);
3603
3604   next_protocol:
3605     /* first selectable profile */
3606     while (profile_masks[profile_mask]
3607         && !(profiles & profile_masks[profile_mask]))
3608       profile_mask++;
3609     if (!profile_masks[profile_mask])
3610       goto no_profiles;
3611
3612     /* first selectable protocol */
3613     while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
3614       mask++;
3615     if (!protocol_masks[mask])
3616       goto no_protocols;
3617
3618   retry:
3619     GST_DEBUG_OBJECT (sink, "protocols = 0x%x, protocol mask = 0x%x", protocols,
3620         protocol_masks[mask]);
3621     /* create a string with first transport in line */
3622     transports = NULL;
3623     cur_profile = profiles & profile_masks[profile_mask];
3624     res = gst_rtsp_client_sink_create_transports_string (sink, context, family,
3625         protocols & protocol_masks[mask], cur_profile, &transports);
3626     if (res < 0 || transports == NULL)
3627       goto setup_transport_failed;
3628
3629     if (strlen (transports) == 0) {
3630       g_free (transports);
3631       GST_DEBUG_OBJECT (sink, "no transports found");
3632       mask++;
3633       profile_mask = 0;
3634       goto next_protocol;
3635     }
3636
3637     GST_DEBUG_OBJECT (sink, "transport is %s", GST_STR_NULL (transports));
3638
3639     /* create SETUP request */
3640     res =
3641         gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_SETUP,
3642         context->conninfo.location);
3643     if (res < 0) {
3644       g_free (transports);
3645       goto create_request_failed;
3646     }
3647
3648     /* select transport */
3649     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_TRANSPORT, transports);
3650
3651     /* set up keys */
3652     if (cur_profile == GST_RTSP_PROFILE_SAVP ||
3653         cur_profile == GST_RTSP_PROFILE_SAVPF) {
3654       hval = gst_rtsp_client_sink_stream_make_keymgmt (sink, context);
3655       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_KEYMGMT, hval);
3656     }
3657
3658     /* if the user wants a non default RTP packet size we add the blocksize
3659      * parameter */
3660     if (sink->rtp_blocksize > 0) {
3661       hval = g_strdup_printf ("%d", sink->rtp_blocksize);
3662       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_BLOCKSIZE, hval);
3663     }
3664
3665     if (async)
3666       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request", ("SETUP stream %d",
3667               context->index));
3668
3669     /* handle the code ourselves */
3670     res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
3671     if (res < 0)
3672       goto send_error;
3673
3674     switch (code) {
3675       case GST_RTSP_STS_OK:
3676         break;
3677       case GST_RTSP_STS_UNSUPPORTED_TRANSPORT:
3678         gst_rtsp_message_unset (&request);
3679         gst_rtsp_message_unset (&response);
3680
3681         /* Try another profile. If no more, move to the next protocol */
3682         profile_mask++;
3683         while (profile_masks[profile_mask]
3684             && !(profiles & profile_masks[profile_mask]))
3685           profile_mask++;
3686         if (profile_masks[profile_mask])
3687           goto retry;
3688
3689         /* select next available protocol, give up on this stream if none */
3690         /* Reset profiles to try: */
3691         profile_mask = 0;
3692
3693         mask++;
3694         while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
3695           mask++;
3696         if (!protocol_masks[mask])
3697           continue;
3698         else
3699           goto retry;
3700       default:
3701         goto response_error;
3702     }
3703
3704     /* parse response transport */
3705     {
3706       gchar *resptrans = NULL;
3707       GstRTSPTransport *transport;
3708
3709       gst_rtsp_message_get_header (&response, GST_RTSP_HDR_TRANSPORT,
3710           &resptrans, 0);
3711       if (!resptrans) {
3712         goto no_transport;
3713       }
3714
3715       gst_rtsp_transport_new (&transport);
3716
3717       /* parse transport, go to next stream on parse error */
3718       if (gst_rtsp_transport_parse (resptrans, transport) != GST_RTSP_OK) {
3719         GST_WARNING_OBJECT (sink, "failed to parse transport %s", resptrans);
3720         goto next;
3721       }
3722
3723       /* update allowed transports for other streams. once the transport of
3724        * one stream has been determined, we make sure that all other streams
3725        * are configured in the same way */
3726       switch (transport->lower_transport) {
3727         case GST_RTSP_LOWER_TRANS_TCP:
3728           GST_DEBUG_OBJECT (sink, "stream %p as TCP interleaved", stream);
3729           protocols = GST_RTSP_LOWER_TRANS_TCP;
3730           sink->interleaved = TRUE;
3731           /* update free channels */
3732           sink->free_channel =
3733               MAX (transport->interleaved.min, sink->free_channel);
3734           sink->free_channel =
3735               MAX (transport->interleaved.max, sink->free_channel);
3736           sink->free_channel++;
3737           break;
3738         case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3739           /* only allow multicast for other streams */
3740           GST_DEBUG_OBJECT (sink, "stream %p as UDP multicast", stream);
3741           protocols = GST_RTSP_LOWER_TRANS_UDP_MCAST;
3742           break;
3743         case GST_RTSP_LOWER_TRANS_UDP:
3744           /* only allow unicast for other streams */
3745           GST_DEBUG_OBJECT (sink, "stream %p as UDP unicast", stream);
3746           protocols = GST_RTSP_LOWER_TRANS_UDP;
3747           /* Update transport with server destination if not provided by the server */
3748           if (transport->destination == NULL) {
3749             transport->destination = g_strdup (sink->server_ip);
3750           }
3751           break;
3752         default:
3753           GST_DEBUG_OBJECT (sink, "stream %p unknown transport %d", stream,
3754               transport->lower_transport);
3755           break;
3756       }
3757
3758       if (!retry) {
3759         GST_DEBUG ("Configuring the stream transport for stream %d",
3760             context->index);
3761         if (context->stream_transport == NULL)
3762           context->stream_transport =
3763               gst_rtsp_stream_transport_new (stream, transport);
3764         else
3765           gst_rtsp_stream_transport_set_transport (context->stream_transport,
3766               transport);
3767
3768         if (transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
3769           /* our callbacks to send data on this TCP connection */
3770           gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
3771               (GstRTSPSendFunc) do_send_data,
3772               (GstRTSPSendFunc) do_send_data, context, NULL);
3773         }
3774
3775         /* The stream_transport now owns the transport */
3776         transport = NULL;
3777
3778         gst_rtsp_stream_transport_set_active (context->stream_transport, TRUE);
3779       }
3780     next:
3781       if (transport)
3782         gst_rtsp_transport_free (transport);
3783       /* clean up used RTSP messages */
3784       gst_rtsp_message_unset (&request);
3785       gst_rtsp_message_unset (&response);
3786     }
3787   }
3788   GST_RTSP_STATE_UNLOCK (sink);
3789
3790   /* store the transport protocol that was configured */
3791   sink->cur_protocols = protocols;
3792
3793   return res;
3794
3795 no_streams:
3796   {
3797     GST_RTSP_STATE_UNLOCK (sink);
3798     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
3799         ("SDP contains no streams"));
3800     return GST_RTSP_ERROR;
3801   }
3802 setup_transport_failed:
3803   {
3804     GST_RTSP_STATE_UNLOCK (sink);
3805     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
3806         ("Could not setup transport."));
3807     res = GST_RTSP_ERROR;
3808     goto cleanup_error;
3809   }
3810 no_profiles:
3811   {
3812     GST_RTSP_STATE_UNLOCK (sink);
3813     /* no transport possible, post an error and stop */
3814     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3815         ("Could not connect to server, no profiles left"));
3816     return GST_RTSP_ERROR;
3817   }
3818 no_protocols:
3819   {
3820     GST_RTSP_STATE_UNLOCK (sink);
3821     /* no transport possible, post an error and stop */
3822     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
3823         ("Could not connect to server, no protocols left"));
3824     return GST_RTSP_ERROR;
3825   }
3826 no_transport:
3827   {
3828     GST_RTSP_STATE_UNLOCK (sink);
3829     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
3830         ("Server did not select transport."));
3831     res = GST_RTSP_ERROR;
3832     goto cleanup_error;
3833   }
3834 create_request_failed:
3835   {
3836     gchar *str = gst_rtsp_strresult (res);
3837
3838     GST_RTSP_STATE_UNLOCK (sink);
3839     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
3840         ("Could not create request. (%s)", str));
3841     g_free (str);
3842     goto cleanup_error;
3843   }
3844 send_error:
3845   {
3846     gchar *str = gst_rtsp_strresult (res);
3847
3848     GST_RTSP_STATE_UNLOCK (sink);
3849     if (res != GST_RTSP_EINTR) {
3850       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
3851           ("Could not send message. (%s)", str));
3852     } else {
3853       GST_WARNING_OBJECT (sink, "send interrupted");
3854     }
3855     g_free (str);
3856     goto cleanup_error;
3857   }
3858 response_error:
3859   {
3860     const gchar *str = gst_rtsp_status_as_text (code);
3861
3862     GST_RTSP_STATE_UNLOCK (sink);
3863     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
3864         ("Error (%d): %s", code, GST_STR_NULL (str)));
3865     res = GST_RTSP_ERROR;
3866     goto cleanup_error;
3867   }
3868 cleanup_error:
3869   {
3870     gst_rtsp_message_unset (&request);
3871     gst_rtsp_message_unset (&response);
3872     return res;
3873   }
3874 }
3875
3876 static GstRTSPResult
3877 gst_rtsp_client_sink_ensure_open (GstRTSPClientSink * sink, gboolean async)
3878 {
3879   GstRTSPResult res = GST_RTSP_OK;
3880
3881   if (sink->state < GST_RTSP_STATE_READY) {
3882     res = GST_RTSP_ERROR;
3883     if (sink->open_error) {
3884       GST_DEBUG_OBJECT (sink, "the stream was in error");
3885       goto done;
3886     }
3887     if (async)
3888       gst_rtsp_client_sink_loop_start_cmd (sink, CMD_OPEN);
3889
3890     if ((res = gst_rtsp_client_sink_open (sink, async)) < 0) {
3891       GST_DEBUG_OBJECT (sink, "failed to open stream");
3892       goto done;
3893     }
3894   }
3895
3896 done:
3897   return res;
3898 }
3899
3900 static GstRTSPResult
3901 gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
3902 {
3903   GstRTSPMessage request = { 0 };
3904   GstRTSPMessage response = { 0 };
3905   GstRTSPResult res = GST_RTSP_OK;
3906   GstSDPMessage *sdp;
3907   guint sdp_index = 0;
3908   GstSDPInfo info = { 0, };
3909
3910   const gchar *proto;
3911   gchar *sess_id, *client_ip, *str;
3912   GSocketAddress *sa;
3913   GInetAddress *ia;
3914   GSocket *conn_socket;
3915   GList *walk;
3916
3917   /* Wait for streams to preroll */
3918   g_mutex_lock (&sink->preroll_lock);
3919   while (sink->in_async) {
3920     GST_LOG_OBJECT (sink, "Waiting for ASYNC_DONE preroll");
3921     g_cond_wait (&sink->preroll_cond, &sink->preroll_lock);
3922   }
3923   g_mutex_unlock (&sink->preroll_lock);
3924
3925   if (sink->state == GST_RTSP_STATE_PLAYING) {
3926     /* Already recording, don't send another request */
3927     GST_LOG_OBJECT (sink, "Already in RECORD. Skipping duplicate request.");
3928     goto done;
3929   }
3930
3931   /* Send announce, then setup for all streams */
3932   gst_sdp_message_init (&sink->cursdp);
3933   sdp = &sink->cursdp;
3934
3935   /* some standard things first */
3936   gst_sdp_message_set_version (sdp, "0");
3937
3938   /* session ID doesn't have to be super-unique in this case */
3939   sess_id = g_strdup_printf ("%u", g_random_int ());
3940
3941   if (sink->conninfo.connection == NULL)
3942     return GST_RTSP_ERROR;
3943
3944   conn_socket = gst_rtsp_connection_get_read_socket (sink->conninfo.connection);
3945
3946   sa = g_socket_get_local_address (conn_socket, NULL);
3947   ia = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sa));
3948   client_ip = g_inet_address_to_string (ia);
3949   if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV6) {
3950     info.is_ipv6 = TRUE;
3951     proto = "IP6";
3952   } else if (g_socket_address_get_family (sa) == G_SOCKET_FAMILY_IPV4)
3953     proto = "IP4";
3954   else
3955     g_assert_not_reached ();
3956   g_object_unref (sa);
3957
3958   /* FIXME: Should this actually be the server's IP or ours? */
3959   info.server_ip = sink->server_ip;
3960
3961   gst_sdp_message_set_origin (sdp, "-", sess_id, "1", "IN", proto, client_ip);
3962
3963   gst_sdp_message_set_session_name (sdp, "Session streamed with GStreamer");
3964   gst_sdp_message_set_information (sdp, "rtspclientsink");
3965   gst_sdp_message_add_time (sdp, "0", "0", NULL);
3966   gst_sdp_message_add_attribute (sdp, "tool", "GStreamer");
3967
3968   /* add stream */
3969   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
3970     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
3971
3972     gst_rtsp_sdp_from_stream (sdp, &info, context->stream);
3973     context->sdp_index = sdp_index++;
3974   }
3975
3976   g_free (sess_id);
3977   g_free (client_ip);
3978
3979   /* send ANNOUNCE request */
3980   GST_DEBUG_OBJECT (sink, "create ANNOUNCE request...");
3981   res =
3982       gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_ANNOUNCE,
3983       sink->conninfo.url_str);
3984   if (res < 0)
3985     goto create_request_failed;
3986
3987   gst_rtsp_message_add_header (&request, GST_RTSP_HDR_CONTENT_TYPE,
3988       "application/sdp");
3989
3990   /* add SDP to the request body */
3991   str = gst_sdp_message_as_text (sdp);
3992   gst_rtsp_message_take_body (&request, (guint8 *) str, strlen (str));
3993
3994   /* send ANNOUNCE */
3995   GST_DEBUG_OBJECT (sink, "sending announce...");
3996
3997   if (async)
3998     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record",
3999         ("Sending server stream info"));
4000
4001   if ((res =
4002           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4003               &response, NULL)) < 0)
4004     goto send_error;
4005
4006   /* send setup for all streams */
4007   if ((res = gst_rtsp_client_sink_setup_streams (sink, async)) < 0)
4008     goto setup_failed;
4009
4010   res = gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_RECORD,
4011       sink->conninfo.url_str);
4012
4013   if (res < 0)
4014     goto create_request_failed;
4015
4016 #if 0                           /* FIXME: Configure a range based on input segments? */
4017   if (src->need_range) {
4018     hval = gen_range_header (src, segment);
4019
4020     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_RANGE, hval);
4021   }
4022
4023   if (segment->rate != 1.0) {
4024     gchar hval[G_ASCII_DTOSTR_BUF_SIZE];
4025
4026     g_ascii_dtostr (hval, sizeof (hval), segment->rate);
4027     if (src->skip)
4028       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SCALE, hval);
4029     else
4030       gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SPEED, hval);
4031   }
4032 #endif
4033
4034   if (async)
4035     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
4036   if ((res =
4037           gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
4038               &response, NULL)) < 0)
4039     goto send_error;
4040
4041 #if 0                           /* FIXME: Check if servers return these for record: */
4042   /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp
4043    * for the RTP packets. If this is not present, we assume all starts from 0...
4044    * This is info for the RTP session manager that we pass to it in caps. */
4045   hval_idx = 0;
4046   while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTP_INFO,
4047           &hval, hval_idx++) == GST_RTSP_OK)
4048     gst_rtspsrc_parse_rtpinfo (src, hval);
4049
4050   /* some servers indicate RTCP parameters in PLAY response,
4051    * rather than properly in SDP */
4052   if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTCP_INTERVAL,
4053           &hval, 0) == GST_RTSP_OK)
4054     gst_rtspsrc_handle_rtcp_interval (src, hval);
4055 #endif
4056
4057   gst_rtsp_client_sink_set_state (sink, GST_STATE_PLAYING);
4058   sink->state = GST_RTSP_STATE_PLAYING;
4059
4060   /* clean up any messages */
4061   gst_rtsp_message_unset (&request);
4062   gst_rtsp_message_unset (&response);
4063
4064 done:
4065   return res;
4066
4067 create_request_failed:
4068   {
4069     gchar *str = gst_rtsp_strresult (res);
4070
4071     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4072         ("Could not create request. (%s)", str));
4073     g_free (str);
4074     goto cleanup_error;
4075   }
4076 send_error:
4077   {
4078     /* Don't post a message - the rtsp_send method will have
4079      * taken care of it because we passed NULL for the response code */
4080     goto cleanup_error;
4081   }
4082 setup_failed:
4083   {
4084     GST_ERROR_OBJECT (sink, "setup failed");
4085     goto cleanup_error;
4086   }
4087 cleanup_error:
4088   {
4089     if (sink->conninfo.connection) {
4090       GST_DEBUG_OBJECT (sink, "free connection");
4091       gst_rtsp_conninfo_close (sink, &sink->conninfo, TRUE);
4092     }
4093     gst_rtsp_message_unset (&request);
4094     gst_rtsp_message_unset (&response);
4095     return res;
4096   }
4097 }
4098
4099 static GstRTSPResult
4100 gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
4101 {
4102   GstRTSPResult res = GST_RTSP_OK;
4103   GstRTSPMessage request = { 0 };
4104   GstRTSPMessage response = { 0 };
4105   GList *walk;
4106   const gchar *control;
4107
4108   GST_DEBUG_OBJECT (sink, "PAUSE...");
4109
4110   if ((res = gst_rtsp_client_sink_ensure_open (sink, async)) < 0)
4111     goto open_failed;
4112
4113   if (!(sink->methods & GST_RTSP_PAUSE))
4114     goto not_supported;
4115
4116   if (sink->state == GST_RTSP_STATE_READY)
4117     goto was_paused;
4118
4119   if (!sink->conninfo.connection || !sink->conninfo.connected)
4120     goto no_connection;
4121
4122   /* construct a control url */
4123   control = get_aggregate_control (sink);
4124
4125   /* loop over the streams. We might exit the loop early when we could do an
4126    * aggregate control */
4127   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
4128     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
4129     GstRTSPConnInfo *info;
4130     const gchar *setup_url;
4131
4132     /* try aggregate control first but do non-aggregate control otherwise */
4133     if (control)
4134       setup_url = control;
4135     else if ((setup_url = stream->conninfo.location) == NULL)
4136       continue;
4137
4138     if (sink->conninfo.connection) {
4139       info = &sink->conninfo;
4140     } else if (stream->conninfo.connection) {
4141       info = &stream->conninfo;
4142     } else {
4143       continue;
4144     }
4145
4146     if (async)
4147       GST_ELEMENT_PROGRESS (sink, CONTINUE, "request",
4148           ("Sending PAUSE request"));
4149
4150     if ((res =
4151             gst_rtsp_client_sink_init_request (sink, &request, GST_RTSP_PAUSE,
4152                 setup_url)) < 0)
4153       goto create_request_failed;
4154
4155     if ((res =
4156             gst_rtsp_client_sink_send (sink, info, &request, &response,
4157                 NULL)) < 0)
4158       goto send_error;
4159
4160     gst_rtsp_message_unset (&request);
4161     gst_rtsp_message_unset (&response);
4162
4163     /* exit early when we did agregate control */
4164     if (control)
4165       break;
4166   }
4167
4168   /* change element states now */
4169   gst_rtsp_client_sink_set_state (sink, GST_STATE_PAUSED);
4170
4171 no_connection:
4172   sink->state = GST_RTSP_STATE_READY;
4173
4174 done:
4175   if (async)
4176     gst_rtsp_client_sink_loop_end_cmd (sink, CMD_PAUSE, res);
4177
4178   return res;
4179
4180   /* ERRORS */
4181 open_failed:
4182   {
4183     GST_DEBUG_OBJECT (sink, "failed to open stream");
4184     goto done;
4185   }
4186 not_supported:
4187   {
4188     GST_DEBUG_OBJECT (sink, "PAUSE is not supported");
4189     goto done;
4190   }
4191 was_paused:
4192   {
4193     GST_DEBUG_OBJECT (sink, "we were already PAUSED");
4194     goto done;
4195   }
4196 create_request_failed:
4197   {
4198     gchar *str = gst_rtsp_strresult (res);
4199
4200     GST_ELEMENT_ERROR (sink, LIBRARY, INIT, (NULL),
4201         ("Could not create request. (%s)", str));
4202     g_free (str);
4203     goto done;
4204   }
4205 send_error:
4206   {
4207     gchar *str = gst_rtsp_strresult (res);
4208
4209     gst_rtsp_message_unset (&request);
4210     if (res != GST_RTSP_EINTR) {
4211       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
4212           ("Could not send message. (%s)", str));
4213     } else {
4214       GST_WARNING_OBJECT (sink, "PAUSE interrupted");
4215     }
4216     g_free (str);
4217     goto done;
4218   }
4219 }
4220
4221 static void
4222 gst_rtsp_client_sink_handle_message (GstBin * bin, GstMessage * message)
4223 {
4224   GstRTSPClientSink *rtsp_client_sink;
4225
4226   rtsp_client_sink = GST_RTSP_CLIENT_SINK (bin);
4227
4228   switch (GST_MESSAGE_TYPE (message)) {
4229     case GST_MESSAGE_ELEMENT:
4230     {
4231       const GstStructure *s = gst_message_get_structure (message);
4232
4233       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
4234         gboolean ignore_timeout;
4235
4236         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
4237
4238         GST_OBJECT_LOCK (rtsp_client_sink);
4239         ignore_timeout = rtsp_client_sink->ignore_timeout;
4240         rtsp_client_sink->ignore_timeout = TRUE;
4241         GST_OBJECT_UNLOCK (rtsp_client_sink);
4242
4243         /* we only act on the first udp timeout message, others are irrelevant
4244          * and can be ignored. */
4245         if (!ignore_timeout)
4246           gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECONNECT,
4247               CMD_LOOP);
4248         /* eat and free */
4249         gst_message_unref (message);
4250         return;
4251       } else if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
4252         /* An RTSPStream has prerolled */
4253         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4254       }
4255       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4256       break;
4257     }
4258     case GST_MESSAGE_ASYNC_START:{
4259       GstObject *sender;
4260
4261       sender = GST_MESSAGE_SRC (message);
4262
4263       GST_LOG_OBJECT (rtsp_client_sink,
4264           "Have async-start from %" GST_PTR_FORMAT, sender);
4265       if (sender == GST_OBJECT (rtsp_client_sink->internal_bin)) {
4266         GST_LOG_OBJECT (rtsp_client_sink, "child bin is now ASYNC");
4267       }
4268       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4269       break;
4270     }
4271     case GST_MESSAGE_ASYNC_DONE:
4272     {
4273       GstObject *sender;
4274       gboolean need_async_done;
4275
4276       sender = GST_MESSAGE_SRC (message);
4277       GST_LOG_OBJECT (rtsp_client_sink, "Have async-done from %" GST_PTR_FORMAT,
4278           sender);
4279
4280       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4281       if (sender == GST_OBJECT_CAST (rtsp_client_sink->internal_bin)) {
4282         GST_LOG_OBJECT (rtsp_client_sink, "child bin is no longer ASYNC");
4283       }
4284       need_async_done = rtsp_client_sink->in_async;
4285       if (rtsp_client_sink->in_async) {
4286         rtsp_client_sink->in_async = FALSE;
4287         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4288       }
4289       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4290
4291       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4292
4293       if (need_async_done) {
4294         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-DONE");
4295         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4296             gst_message_new_async_done (GST_OBJECT_CAST (rtsp_client_sink),
4297                 GST_CLOCK_TIME_NONE));
4298       }
4299       break;
4300     }
4301     case GST_MESSAGE_ERROR:
4302     {
4303       GstObject *sender;
4304
4305       sender = GST_MESSAGE_SRC (message);
4306
4307       GST_DEBUG_OBJECT (rtsp_client_sink, "got error from %s",
4308           GST_ELEMENT_NAME (sender));
4309
4310       /* FIXME: Ignore errors on RTCP? */
4311       /* fatal but not our message, forward */
4312       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4313       break;
4314     }
4315     case GST_MESSAGE_STATE_CHANGED:
4316     {
4317       if (GST_MESSAGE_SRC (message) ==
4318           (GstObject *) rtsp_client_sink->internal_bin) {
4319         GstState newstate, pending;
4320         gst_message_parse_state_changed (message, NULL, &newstate, &pending);
4321         g_mutex_lock (&rtsp_client_sink->preroll_lock);
4322         rtsp_client_sink->prerolled = (newstate >= GST_STATE_PAUSED)
4323             && pending == GST_STATE_VOID_PENDING;
4324         g_cond_broadcast (&rtsp_client_sink->preroll_cond);
4325         g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4326         GST_DEBUG_OBJECT (bin,
4327             "Internal bin changed state to %s (pending %s). Prerolled now %d",
4328             gst_element_state_get_name (newstate),
4329             gst_element_state_get_name (pending), rtsp_client_sink->prerolled);
4330       }
4331       /* fallthrough */
4332     }
4333     default:
4334     {
4335       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
4336       break;
4337     }
4338   }
4339 }
4340
4341 /* the thread where everything happens */
4342 static void
4343 gst_rtsp_client_sink_thread (GstRTSPClientSink * sink)
4344 {
4345   gint cmd;
4346
4347   GST_OBJECT_LOCK (sink);
4348   cmd = sink->pending_cmd;
4349   if (cmd == CMD_RECONNECT || cmd == CMD_RECORD || cmd == CMD_PAUSE
4350       || cmd == CMD_LOOP || cmd == CMD_OPEN)
4351     sink->pending_cmd = CMD_LOOP;
4352   else
4353     sink->pending_cmd = CMD_WAIT;
4354   GST_DEBUG_OBJECT (sink, "got command %s", cmd_to_string (cmd));
4355
4356   /* we got the message command, so ensure communication is possible again */
4357   gst_rtsp_client_sink_connection_flush (sink, FALSE);
4358
4359   sink->busy_cmd = cmd;
4360   GST_OBJECT_UNLOCK (sink);
4361
4362   switch (cmd) {
4363     case CMD_OPEN:
4364       gst_rtsp_client_sink_open (sink, TRUE);
4365       break;
4366     case CMD_RECORD:
4367       gst_rtsp_client_sink_record (sink, TRUE);
4368       break;
4369     case CMD_PAUSE:
4370       gst_rtsp_client_sink_pause (sink, TRUE);
4371       break;
4372     case CMD_CLOSE:
4373       gst_rtsp_client_sink_close (sink, TRUE, FALSE);
4374       break;
4375     case CMD_LOOP:
4376       gst_rtsp_client_sink_loop (sink);
4377       break;
4378     case CMD_RECONNECT:
4379       gst_rtsp_client_sink_reconnect (sink, FALSE);
4380       break;
4381     default:
4382       break;
4383   }
4384
4385   GST_OBJECT_LOCK (sink);
4386   /* and go back to sleep */
4387   if (sink->pending_cmd == CMD_WAIT) {
4388     if (sink->task)
4389       gst_task_pause (sink->task);
4390   }
4391   /* reset waiting */
4392   sink->busy_cmd = CMD_WAIT;
4393   GST_OBJECT_UNLOCK (sink);
4394 }
4395
4396 static gboolean
4397 gst_rtsp_client_sink_start (GstRTSPClientSink * sink)
4398 {
4399   GST_DEBUG_OBJECT (sink, "starting");
4400
4401   sink->streams_collected = FALSE;
4402   sink->in_async = TRUE;
4403   gst_element_set_locked_state (GST_ELEMENT (sink->internal_bin), TRUE);
4404
4405   gst_rtsp_client_sink_set_state (sink, GST_STATE_READY);
4406
4407   GST_OBJECT_LOCK (sink);
4408   sink->pending_cmd = CMD_WAIT;
4409
4410   if (sink->task == NULL) {
4411     sink->task =
4412         gst_task_new ((GstTaskFunction) gst_rtsp_client_sink_thread, sink,
4413         NULL);
4414     if (sink->task == NULL)
4415       goto task_error;
4416
4417     gst_task_set_lock (sink->task, GST_RTSP_STREAM_GET_LOCK (sink));
4418   }
4419   GST_OBJECT_UNLOCK (sink);
4420
4421   return TRUE;
4422
4423   /* ERRORS */
4424 task_error:
4425   {
4426     GST_OBJECT_UNLOCK (sink);
4427     GST_ERROR_OBJECT (sink, "failed to create task");
4428     return FALSE;
4429   }
4430 }
4431
4432 static gboolean
4433 gst_rtsp_client_sink_stop (GstRTSPClientSink * sink)
4434 {
4435   GstTask *task;
4436
4437   GST_DEBUG_OBJECT (sink, "stopping");
4438
4439   /* also cancels pending task */
4440   gst_rtsp_client_sink_loop_send_cmd (sink, CMD_WAIT, CMD_ALL & ~CMD_CLOSE);
4441
4442   GST_OBJECT_LOCK (sink);
4443   if ((task = sink->task)) {
4444     sink->task = NULL;
4445     GST_OBJECT_UNLOCK (sink);
4446
4447     gst_task_stop (task);
4448
4449     /* make sure it is not running */
4450     GST_RTSP_STREAM_LOCK (sink);
4451     GST_RTSP_STREAM_UNLOCK (sink);
4452
4453     /* now wait for the task to finish */
4454     gst_task_join (task);
4455
4456     /* and free the task */
4457     gst_object_unref (GST_OBJECT (task));
4458
4459     GST_OBJECT_LOCK (sink);
4460   }
4461   GST_OBJECT_UNLOCK (sink);
4462
4463   /* ensure synchronously all is closed and clean */
4464   gst_rtsp_client_sink_close (sink, FALSE, TRUE);
4465
4466   return TRUE;
4467 }
4468
4469 static GstStateChangeReturn
4470 gst_rtsp_client_sink_change_state (GstElement * element,
4471     GstStateChange transition)
4472 {
4473   GstRTSPClientSink *rtsp_client_sink;
4474   GstStateChangeReturn ret;
4475
4476   rtsp_client_sink = GST_RTSP_CLIENT_SINK (element);
4477
4478   switch (transition) {
4479     case GST_STATE_CHANGE_NULL_TO_READY:
4480       if (!gst_rtsp_client_sink_start (rtsp_client_sink))
4481         goto start_failed;
4482       break;
4483     case GST_STATE_CHANGE_READY_TO_PAUSED:
4484       /* init some state */
4485       rtsp_client_sink->cur_protocols = rtsp_client_sink->protocols;
4486       /* first attempt, don't ignore timeouts */
4487       rtsp_client_sink->ignore_timeout = FALSE;
4488       rtsp_client_sink->open_error = FALSE;
4489
4490       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_PAUSED);
4491
4492       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4493       if (rtsp_client_sink->in_async) {
4494         GST_DEBUG_OBJECT (rtsp_client_sink, "Posting ASYNC-START");
4495         gst_element_post_message (GST_ELEMENT_CAST (rtsp_client_sink),
4496             gst_message_new_async_start (GST_OBJECT_CAST (rtsp_client_sink)));
4497       }
4498       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4499
4500       break;
4501     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4502       /* fall-through */
4503     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4504       /* unblock the tcp tasks and make the loop waiting */
4505       if (gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_WAIT,
4506               CMD_LOOP)) {
4507         /* make sure it is waiting before we send PLAY below */
4508         GST_RTSP_STREAM_LOCK (rtsp_client_sink);
4509         GST_RTSP_STREAM_UNLOCK (rtsp_client_sink);
4510       }
4511       break;
4512     case GST_STATE_CHANGE_PAUSED_TO_READY:
4513       gst_rtsp_client_sink_set_state (rtsp_client_sink, GST_STATE_READY);
4514       break;
4515     default:
4516       break;
4517   }
4518
4519   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4520   if (ret == GST_STATE_CHANGE_FAILURE)
4521     goto done;
4522
4523   switch (transition) {
4524     case GST_STATE_CHANGE_NULL_TO_READY:
4525       ret = GST_STATE_CHANGE_SUCCESS;
4526       break;
4527     case GST_STATE_CHANGE_READY_TO_PAUSED:
4528       /* Return ASYNC and preroll input streams */
4529       g_mutex_lock (&rtsp_client_sink->preroll_lock);
4530       if (rtsp_client_sink->in_async)
4531         ret = GST_STATE_CHANGE_ASYNC;
4532       g_mutex_unlock (&rtsp_client_sink->preroll_lock);
4533       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_OPEN, 0);
4534       break;
4535     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:{
4536       GST_DEBUG_OBJECT (rtsp_client_sink,
4537           "Switching to playing -sending RECORD");
4538       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_RECORD, 0);
4539       ret = GST_STATE_CHANGE_SUCCESS;
4540       break;
4541     }
4542     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4543       /* send pause request and keep the idle task around */
4544       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_PAUSE,
4545           CMD_LOOP);
4546       ret = GST_STATE_CHANGE_NO_PREROLL;
4547       break;
4548     case GST_STATE_CHANGE_PAUSED_TO_READY:
4549       gst_rtsp_client_sink_loop_send_cmd (rtsp_client_sink, CMD_CLOSE,
4550           CMD_PAUSE);
4551       ret = GST_STATE_CHANGE_SUCCESS;
4552       break;
4553     case GST_STATE_CHANGE_READY_TO_NULL:
4554       gst_rtsp_client_sink_stop (rtsp_client_sink);
4555       ret = GST_STATE_CHANGE_SUCCESS;
4556       break;
4557     default:
4558       break;
4559   }
4560
4561 done:
4562   return ret;
4563
4564 start_failed:
4565   {
4566     GST_DEBUG_OBJECT (rtsp_client_sink, "start failed");
4567     return GST_STATE_CHANGE_FAILURE;
4568   }
4569 }
4570
4571 /*** GSTURIHANDLER INTERFACE *************************************************/
4572
4573 static GstURIType
4574 gst_rtsp_client_sink_uri_get_type (GType type)
4575 {
4576   return GST_URI_SINK;
4577 }
4578
4579 static const gchar *const *
4580 gst_rtsp_client_sink_uri_get_protocols (GType type)
4581 {
4582   static const gchar *protocols[] =
4583       { "rtsp", "rtspu", "rtspt", "rtsph", "rtsp-sdp",
4584     "rtsps", "rtspsu", "rtspst", "rtspsh", NULL
4585   };
4586
4587   return protocols;
4588 }
4589
4590 static gchar *
4591 gst_rtsp_client_sink_uri_get_uri (GstURIHandler * handler)
4592 {
4593   GstRTSPClientSink *sink = GST_RTSP_CLIENT_SINK (handler);
4594
4595   /* FIXME: make thread-safe */
4596   return g_strdup (sink->conninfo.location);
4597 }
4598
4599 static gboolean
4600 gst_rtsp_client_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
4601     GError ** error)
4602 {
4603   GstRTSPClientSink *sink;
4604   GstRTSPResult res;
4605   GstSDPResult sres;
4606   GstRTSPUrl *newurl = NULL;
4607   GstSDPMessage *sdp = NULL;
4608
4609   sink = GST_RTSP_CLIENT_SINK (handler);
4610
4611   /* same URI, we're fine */
4612   if (sink->conninfo.location && uri && !strcmp (uri, sink->conninfo.location))
4613     goto was_ok;
4614
4615   if (g_str_has_prefix (uri, "rtsp-sdp://")) {
4616     sres = gst_sdp_message_new (&sdp);
4617     if (sres < 0)
4618       goto sdp_failed;
4619
4620     GST_DEBUG_OBJECT (sink, "parsing SDP message");
4621     sres = gst_sdp_message_parse_uri (uri, sdp);
4622     if (sres < 0)
4623       goto invalid_sdp;
4624   } else {
4625     /* try to parse */
4626     GST_DEBUG_OBJECT (sink, "parsing URI");
4627     if ((res = gst_rtsp_url_parse (uri, &newurl)) < 0)
4628       goto parse_error;
4629   }
4630
4631   /* if worked, free previous and store new url object along with the original
4632    * location. */
4633   GST_DEBUG_OBJECT (sink, "configuring URI");
4634   g_free (sink->conninfo.location);
4635   sink->conninfo.location = g_strdup (uri);
4636   gst_rtsp_url_free (sink->conninfo.url);
4637   sink->conninfo.url = newurl;
4638   g_free (sink->conninfo.url_str);
4639   if (newurl)
4640     sink->conninfo.url_str = gst_rtsp_url_get_request_uri (sink->conninfo.url);
4641   else
4642     sink->conninfo.url_str = NULL;
4643
4644   if (sink->uri_sdp)
4645     gst_sdp_message_free (sink->uri_sdp);
4646   sink->uri_sdp = sdp;
4647   sink->from_sdp = sdp != NULL;
4648
4649   GST_DEBUG_OBJECT (sink, "set uri: %s", GST_STR_NULL (uri));
4650   GST_DEBUG_OBJECT (sink, "request uri is: %s",
4651       GST_STR_NULL (sink->conninfo.url_str));
4652
4653   return TRUE;
4654
4655   /* Special cases */
4656 was_ok:
4657   {
4658     GST_DEBUG_OBJECT (sink, "URI was ok: '%s'", GST_STR_NULL (uri));
4659     return TRUE;
4660   }
4661 sdp_failed:
4662   {
4663     GST_ERROR_OBJECT (sink, "Could not create new SDP (%d)", sres);
4664     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
4665         "Could not create SDP");
4666     return FALSE;
4667   }
4668 invalid_sdp:
4669   {
4670     GST_ERROR_OBJECT (sink, "Not a valid SDP (%d) '%s'", sres,
4671         GST_STR_NULL (uri));
4672     gst_sdp_message_free (sdp);
4673     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
4674         "Invalid SDP");
4675     return FALSE;
4676   }
4677 parse_error:
4678   {
4679     GST_ERROR_OBJECT (sink, "Not a valid RTSP url '%s' (%d)",
4680         GST_STR_NULL (uri), res);
4681     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
4682         "Invalid RTSP URI");
4683     return FALSE;
4684   }
4685 }
4686
4687 static void
4688 gst_rtsp_client_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
4689 {
4690   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
4691
4692   iface->get_type = gst_rtsp_client_sink_uri_get_type;
4693   iface->get_protocols = gst_rtsp_client_sink_uri_get_protocols;
4694   iface->get_uri = gst_rtsp_client_sink_uri_get_uri;
4695   iface->set_uri = gst_rtsp_client_sink_uri_set_uri;
4696 }