rtspsrc: use aggregate control for PLAY/PAUSE/TEARDOWN
[platform/upstream/gst-plugins-good.git] / gst / rtsp / gstrtspsrc.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim at fluendo dot com>
3  *               <2006> Lutz Mueller <lutz at topfrose dot de>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /*
21  * Unless otherwise indicated, Source Code is licensed under MIT license.
22  * See further explanation attached in License Statement (distributed in the file
23  * LICENSE).
24  *
25  * Permission is hereby granted, free of charge, to any person obtaining a copy of
26  * this software and associated documentation files (the "Software"), to deal in
27  * the Software without restriction, including without limitation the rights to
28  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
29  * of the Software, and to permit persons to whom the Software is furnished to do
30  * so, subject to the following conditions:
31  *
32  * The above copyright notice and this permission notice shall be included in all
33  * copies or substantial portions of the Software.
34  *
35  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
36  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
37  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
38  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
39  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
40  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
41  * SOFTWARE.
42  */
43 /**
44  * SECTION:element-rtspsrc
45  *
46  * Makes a connection to an RTSP server and read the data.
47  * rtspsrc strictly follows RFC 2326 and therefore does not (yet) support
48  * RealMedia/Quicktime/Microsoft extensions.
49  *
50  * RTSP supports transport over TCP or UDP in unicast or multicast mode. By
51  * default rtspsrc will negotiate a connection in the following order:
52  * UDP unicast/UDP multicast/TCP. The order cannot be changed but the allowed
53  * protocols can be controlled with the #GstRTSPSrc:protocols property.
54  *
55  * rtspsrc currently understands SDP as the format of the session description.
56  * For each stream listed in the SDP a new rtp_stream%d pad will be created
57  * with caps derived from the SDP media description. This is a caps of mime type
58  * "application/x-rtp" that can be connected to any available RTP depayloader
59  * element.
60  *
61  * rtspsrc will internally instantiate an RTP session manager element
62  * that will handle the RTCP messages to and from the server, jitter removal,
63  * packet reordering along with providing a clock for the pipeline.
64  * This feature is implemented using the gstrtpbin element.
65  *
66  * rtspsrc acts like a live source and will therefore only generate data in the
67  * PLAYING state.
68  *
69  * <refsect2>
70  * <title>Example launch line</title>
71  * |[
72  * gst-launch-1.0 rtspsrc location=rtsp://some.server/url ! fakesink
73  * ]| Establish a connection to an RTSP server and send the raw RTP packets to a
74  * fakesink.
75  * </refsect2>
76  *
77  * Last reviewed on 2006-08-18 (0.10.5)
78  */
79
80 #ifdef HAVE_CONFIG_H
81 #include "config.h"
82 #endif
83
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif /* HAVE_UNISTD_H */
87 #include <stdlib.h>
88 #include <string.h>
89 #include <stdio.h>
90 #include <stdarg.h>
91
92 #include <gst/net/gstnet.h>
93 #include <gst/sdp/gstsdpmessage.h>
94 #include <gst/rtp/gstrtppayloads.h>
95
96 #include "gst/gst-i18n-plugin.h"
97
98 #include "gstrtspsrc.h"
99
100 GST_DEBUG_CATEGORY_STATIC (rtspsrc_debug);
101 #define GST_CAT_DEFAULT (rtspsrc_debug)
102
103 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream_%u",
104     GST_PAD_SRC,
105     GST_PAD_SOMETIMES,
106     GST_STATIC_CAPS ("application/x-rtp; application/x-rdt"));
107
108 /* templates used internally */
109 static GstStaticPadTemplate anysrctemplate =
110 GST_STATIC_PAD_TEMPLATE ("internalsrc_%u",
111     GST_PAD_SRC,
112     GST_PAD_SOMETIMES,
113     GST_STATIC_CAPS_ANY);
114
115 static GstStaticPadTemplate anysinktemplate =
116 GST_STATIC_PAD_TEMPLATE ("internalsink_%u",
117     GST_PAD_SINK,
118     GST_PAD_SOMETIMES,
119     GST_STATIC_CAPS_ANY);
120
121 enum
122 {
123   SIGNAL_HANDLE_REQUEST,
124   SIGNAL_ON_SDP,
125   SIGNAL_SELECT_STREAM,
126   LAST_SIGNAL
127 };
128
129 enum _GstRtspSrcRtcpSyncMode
130 {
131   RTCP_SYNC_ALWAYS,
132   RTCP_SYNC_INITIAL,
133   RTCP_SYNC_RTP
134 };
135
136 enum _GstRtspSrcBufferMode
137 {
138   BUFFER_MODE_NONE,
139   BUFFER_MODE_SLAVE,
140   BUFFER_MODE_BUFFER,
141   BUFFER_MODE_AUTO,
142   BUFFER_MODE_SYNCED
143 };
144
145 #define GST_TYPE_RTSP_SRC_BUFFER_MODE (gst_rtsp_src_buffer_mode_get_type())
146 static GType
147 gst_rtsp_src_buffer_mode_get_type (void)
148 {
149   static GType buffer_mode_type = 0;
150   static const GEnumValue buffer_modes[] = {
151     {BUFFER_MODE_NONE, "Only use RTP timestamps", "none"},
152     {BUFFER_MODE_SLAVE, "Slave receiver to sender clock", "slave"},
153     {BUFFER_MODE_BUFFER, "Do low/high watermark buffering", "buffer"},
154     {BUFFER_MODE_AUTO, "Choose mode depending on stream live", "auto"},
155     {BUFFER_MODE_SYNCED, "Synchronized sender and receiver clocks", "synced"},
156     {0, NULL, NULL},
157   };
158
159   if (!buffer_mode_type) {
160     buffer_mode_type =
161         g_enum_register_static ("GstRTSPSrcBufferMode", buffer_modes);
162   }
163   return buffer_mode_type;
164 }
165
166 #define DEFAULT_LOCATION         NULL
167 #define DEFAULT_PROTOCOLS        GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP
168 #define DEFAULT_DEBUG            FALSE
169 #define DEFAULT_RETRY            20
170 #define DEFAULT_TIMEOUT          5000000
171 #define DEFAULT_UDP_BUFFER_SIZE  0x80000
172 #define DEFAULT_TCP_TIMEOUT      20000000
173 #define DEFAULT_LATENCY_MS       2000
174 #define DEFAULT_DROP_ON_LATENCY  FALSE
175 #define DEFAULT_CONNECTION_SPEED 0
176 #define DEFAULT_NAT_METHOD       GST_RTSP_NAT_DUMMY
177 #define DEFAULT_DO_RTCP          TRUE
178 #define DEFAULT_DO_RTSP_KEEP_ALIVE       TRUE
179 #define DEFAULT_PROXY            NULL
180 #define DEFAULT_RTP_BLOCKSIZE    0
181 #define DEFAULT_USER_ID          NULL
182 #define DEFAULT_USER_PW          NULL
183 #define DEFAULT_BUFFER_MODE      BUFFER_MODE_AUTO
184 #define DEFAULT_PORT_RANGE       NULL
185 #define DEFAULT_SHORT_HEADER     FALSE
186 #define DEFAULT_PROBATION        2
187 #define DEFAULT_UDP_RECONNECT    TRUE
188 #define DEFAULT_MULTICAST_IFACE  NULL
189 #define DEFAULT_NTP_SYNC         FALSE
190 #define DEFAULT_USE_PIPELINE_CLOCK      FALSE
191 #define DEFAULT_TLS_VALIDATION_FLAGS G_TLS_CERTIFICATE_VALIDATE_ALL
192
193 enum
194 {
195   PROP_0,
196   PROP_LOCATION,
197   PROP_PROTOCOLS,
198   PROP_DEBUG,
199   PROP_RETRY,
200   PROP_TIMEOUT,
201   PROP_TCP_TIMEOUT,
202   PROP_LATENCY,
203   PROP_DROP_ON_LATENCY,
204   PROP_CONNECTION_SPEED,
205   PROP_NAT_METHOD,
206   PROP_DO_RTCP,
207   PROP_DO_RTSP_KEEP_ALIVE,
208   PROP_PROXY,
209   PROP_PROXY_ID,
210   PROP_PROXY_PW,
211   PROP_RTP_BLOCKSIZE,
212   PROP_USER_ID,
213   PROP_USER_PW,
214   PROP_BUFFER_MODE,
215   PROP_PORT_RANGE,
216   PROP_UDP_BUFFER_SIZE,
217   PROP_SHORT_HEADER,
218   PROP_PROBATION,
219   PROP_UDP_RECONNECT,
220   PROP_MULTICAST_IFACE,
221   PROP_NTP_SYNC,
222   PROP_USE_PIPELINE_CLOCK,
223   PROP_SDES,
224   PROP_TLS_VALIDATION_FLAGS,
225   PROP_LAST
226 };
227
228 #define GST_TYPE_RTSP_NAT_METHOD (gst_rtsp_nat_method_get_type())
229 static GType
230 gst_rtsp_nat_method_get_type (void)
231 {
232   static GType rtsp_nat_method_type = 0;
233   static const GEnumValue rtsp_nat_method[] = {
234     {GST_RTSP_NAT_NONE, "None", "none"},
235     {GST_RTSP_NAT_DUMMY, "Send Dummy packets", "dummy"},
236     {0, NULL, NULL},
237   };
238
239   if (!rtsp_nat_method_type) {
240     rtsp_nat_method_type =
241         g_enum_register_static ("GstRTSPNatMethod", rtsp_nat_method);
242   }
243   return rtsp_nat_method_type;
244 }
245
246 static void gst_rtspsrc_finalize (GObject * object);
247
248 static void gst_rtspsrc_set_property (GObject * object, guint prop_id,
249     const GValue * value, GParamSpec * pspec);
250 static void gst_rtspsrc_get_property (GObject * object, guint prop_id,
251     GValue * value, GParamSpec * pspec);
252
253 static GstClock *gst_rtspsrc_provide_clock (GstElement * element);
254
255 static void gst_rtspsrc_uri_handler_init (gpointer g_iface,
256     gpointer iface_data);
257
258 static void gst_rtspsrc_sdp_attributes_to_caps (GArray * attributes,
259     GstCaps * caps);
260
261 static gboolean gst_rtspsrc_set_proxy (GstRTSPSrc * rtsp, const gchar * proxy);
262 static void gst_rtspsrc_set_tcp_timeout (GstRTSPSrc * rtspsrc, guint64 timeout);
263
264 static GstCaps *gst_rtspsrc_media_to_caps (gint pt, const GstSDPMedia * media);
265
266 static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element,
267     GstStateChange transition);
268 static gboolean gst_rtspsrc_send_event (GstElement * element, GstEvent * event);
269 static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message);
270
271 static gboolean gst_rtspsrc_setup_auth (GstRTSPSrc * src,
272     GstRTSPMessage * response);
273
274 static gboolean gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd,
275     gint mask);
276 static GstRTSPResult gst_rtspsrc_send_cb (GstRTSPExtension * ext,
277     GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPSrc * src);
278
279 static GstRTSPResult gst_rtspsrc_open (GstRTSPSrc * src, gboolean async);
280 static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment,
281     gboolean async);
282 static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async);
283 static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async,
284     gboolean only_close);
285
286 static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler,
287     const gchar * uri, GError ** error);
288 static gchar *gst_rtspsrc_uri_get_uri (GstURIHandler * handler);
289
290 static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src);
291 static gboolean gst_rtspsrc_loop (GstRTSPSrc * src);
292 static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src,
293     GstRTSPStream * stream, GstEvent * event);
294 static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event);
295
296 /* commands we send to out loop to notify it of events */
297 #define CMD_OPEN        (1 << 0)
298 #define CMD_PLAY        (1 << 1)
299 #define CMD_PAUSE       (1 << 2)
300 #define CMD_CLOSE       (1 << 3)
301 #define CMD_WAIT        (1 << 4)
302 #define CMD_RECONNECT   (1 << 5)
303 #define CMD_LOOP        (1 << 6)
304
305 /* mask for all commands */
306 #define CMD_ALL         ((CMD_LOOP << 1) - 1)
307
308 #define GST_ELEMENT_PROGRESS(el, type, code, text)      \
309 G_STMT_START {                                          \
310   gchar *__txt = _gst_element_error_printf text;        \
311   gst_element_post_message (GST_ELEMENT_CAST (el),      \
312       gst_message_new_progress (GST_OBJECT_CAST (el),   \
313           GST_PROGRESS_TYPE_ ##type, code, __txt));     \
314   g_free (__txt);                                       \
315 } G_STMT_END
316
317 static guint gst_rtspsrc_signals[LAST_SIGNAL] = { 0 };
318
319 #define gst_rtspsrc_parent_class parent_class
320 G_DEFINE_TYPE_WITH_CODE (GstRTSPSrc, gst_rtspsrc, GST_TYPE_BIN,
321     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtspsrc_uri_handler_init));
322
323 static gboolean
324 default_select_stream (GstRTSPSrc * src, guint id, GstCaps * caps)
325 {
326   GST_DEBUG_OBJECT (src, "default handler");
327   return TRUE;
328 }
329
330 static gboolean
331 select_stream_accum (GSignalInvocationHint * ihint,
332     GValue * return_accu, const GValue * handler_return, gpointer data)
333 {
334   gboolean myboolean;
335
336   myboolean = g_value_get_boolean (handler_return);
337   GST_DEBUG ("accum %d", myboolean);
338   g_value_set_boolean (return_accu, myboolean);
339
340   /* stop emission if FALSE */
341   return myboolean;
342 }
343
344 static void
345 gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
346 {
347   GObjectClass *gobject_class;
348   GstElementClass *gstelement_class;
349   GstBinClass *gstbin_class;
350
351   gobject_class = (GObjectClass *) klass;
352   gstelement_class = (GstElementClass *) klass;
353   gstbin_class = (GstBinClass *) klass;
354
355   GST_DEBUG_CATEGORY_INIT (rtspsrc_debug, "rtspsrc", 0, "RTSP src");
356
357   gobject_class->set_property = gst_rtspsrc_set_property;
358   gobject_class->get_property = gst_rtspsrc_get_property;
359
360   gobject_class->finalize = gst_rtspsrc_finalize;
361
362   g_object_class_install_property (gobject_class, PROP_LOCATION,
363       g_param_spec_string ("location", "RTSP Location",
364           "Location of the RTSP url to read",
365           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
368       g_param_spec_flags ("protocols", "Protocols",
369           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
370           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371
372   g_object_class_install_property (gobject_class, PROP_DEBUG,
373       g_param_spec_boolean ("debug", "Debug",
374           "Dump request and response messages to stdout",
375           DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
376
377   g_object_class_install_property (gobject_class, PROP_RETRY,
378       g_param_spec_uint ("retry", "Retry",
379           "Max number of retries when allocating RTP ports.",
380           0, G_MAXUINT16, DEFAULT_RETRY,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
384       g_param_spec_uint64 ("timeout", "Timeout",
385           "Retry TCP transport after UDP timeout microseconds (0 = disabled)",
386           0, G_MAXUINT64, DEFAULT_TIMEOUT,
387           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT,
390       g_param_spec_uint64 ("tcp-timeout", "TCP Timeout",
391           "Fail after timeout microseconds on TCP connections (0 = disabled)",
392           0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT,
393           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394
395   g_object_class_install_property (gobject_class, PROP_LATENCY,
396       g_param_spec_uint ("latency", "Buffer latency in ms",
397           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
401       g_param_spec_boolean ("drop-on-latency",
402           "Drop buffers when maximum latency is reached",
403           "Tells the jitterbuffer to never exceed the given latency in size",
404           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
405
406   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
407       g_param_spec_uint64 ("connection-speed", "Connection Speed",
408           "Network connection speed in kbps (0 = unknown)",
409           0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
410           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411
412   g_object_class_install_property (gobject_class, PROP_NAT_METHOD,
413       g_param_spec_enum ("nat-method", "NAT Method",
414           "Method to use for traversing firewalls and NAT",
415           GST_TYPE_RTSP_NAT_METHOD, DEFAULT_NAT_METHOD,
416           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
417
418   /**
419    * GstRTSPSrc:do-rtcp:
420    *
421    * Enable RTCP support. Some old server don't like RTCP and then this property
422    * needs to be set to FALSE.
423    */
424   g_object_class_install_property (gobject_class, PROP_DO_RTCP,
425       g_param_spec_boolean ("do-rtcp", "Do RTCP",
426           "Send RTCP packets, disable for old incompatible server.",
427           DEFAULT_DO_RTCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428
429   /**
430    * GstRTSPSrc:do-rtsp-keep-alive:
431    *
432    * Enable RTSP keep alive support. Some old server don't like RTSP
433    * keep alive and then this property needs to be set to FALSE.
434    */
435   g_object_class_install_property (gobject_class, PROP_DO_RTSP_KEEP_ALIVE,
436       g_param_spec_boolean ("do-rtsp-keep-alive", "Do RTSP Keep Alive",
437           "Send RTSP keep alive packets, disable for old incompatible server.",
438           DEFAULT_DO_RTSP_KEEP_ALIVE,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   /**
442    * GstRTSPSrc:proxy:
443    *
444    * Set the proxy parameters. This has to be a string of the format
445    * [http://][user:passwd@]host[:port].
446    */
447   g_object_class_install_property (gobject_class, PROP_PROXY,
448       g_param_spec_string ("proxy", "Proxy",
449           "Proxy settings for HTTP tunneling. Format: [http://][user:passwd@]host[:port]",
450           DEFAULT_PROXY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstRTSPSrc:proxy-id:
453    *
454    * Sets the proxy URI user id for authentication. If the URI set via the
455    * "proxy" property contains a user-id already, that will take precedence.
456    *
457    * Since: 1.2
458    */
459   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
460       g_param_spec_string ("proxy-id", "proxy-id",
461           "HTTP proxy URI user id for authentication", "",
462           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
463   /**
464    * GstRTSPSrc:proxy-pw:
465    *
466    * Sets the proxy URI password for authentication. If the URI set via the
467    * "proxy" property contains a password already, that will take precedence.
468    *
469    * Since: 1.2
470    */
471   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
472       g_param_spec_string ("proxy-pw", "proxy-pw",
473           "HTTP proxy URI user password for authentication", "",
474           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
475
476   /**
477    * GstRTSPSrc:rtp-blocksize:
478    *
479    * RTP package size to suggest to server.
480    */
481   g_object_class_install_property (gobject_class, PROP_RTP_BLOCKSIZE,
482       g_param_spec_uint ("rtp-blocksize", "RTP Blocksize",
483           "RTP package size to suggest to server (0 = disabled)",
484           0, 65536, DEFAULT_RTP_BLOCKSIZE,
485           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486
487   g_object_class_install_property (gobject_class,
488       PROP_USER_ID,
489       g_param_spec_string ("user-id", "user-id",
490           "RTSP location URI user id for authentication", DEFAULT_USER_ID,
491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492   g_object_class_install_property (gobject_class, PROP_USER_PW,
493       g_param_spec_string ("user-pw", "user-pw",
494           "RTSP location URI user password for authentication", DEFAULT_USER_PW,
495           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496
497   /**
498    * GstRTSPSrc:buffer-mode:
499    *
500    * Control the buffering and timestamping mode used by the jitterbuffer.
501    */
502   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
503       g_param_spec_enum ("buffer-mode", "Buffer Mode",
504           "Control the buffering algorithm in use",
505           GST_TYPE_RTSP_SRC_BUFFER_MODE, DEFAULT_BUFFER_MODE,
506           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
507
508   /**
509    * GstRTSPSrc:port-range:
510    *
511    * Configure the client port numbers that can be used to recieve RTP and
512    * RTCP.
513    */
514   g_object_class_install_property (gobject_class, PROP_PORT_RANGE,
515       g_param_spec_string ("port-range", "Port range",
516           "Client port range that can be used to receive RTP and RTCP data, "
517           "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE,
518           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
519
520   /**
521    * GstRTSPSrc:udp-buffer-size:
522    *
523    * Size of the kernel UDP receive buffer in bytes.
524    */
525   g_object_class_install_property (gobject_class, PROP_UDP_BUFFER_SIZE,
526       g_param_spec_int ("udp-buffer-size", "UDP Buffer Size",
527           "Size of the kernel UDP receive buffer in bytes, 0=default",
528           0, G_MAXINT, DEFAULT_UDP_BUFFER_SIZE,
529           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
530
531   /**
532    * GstRTSPSrc:short-header:
533    *
534    * Only send the basic RTSP headers for broken encoders.
535    */
536   g_object_class_install_property (gobject_class, PROP_SHORT_HEADER,
537       g_param_spec_boolean ("short-header", "Short Header",
538           "Only send the basic RTSP headers for broken encoders",
539           DEFAULT_SHORT_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
540
541   g_object_class_install_property (gobject_class, PROP_PROBATION,
542       g_param_spec_uint ("probation", "Number of probations",
543           "Consecutive packet sequence numbers to accept the source",
544           0, G_MAXUINT, DEFAULT_PROBATION,
545           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
546
547   g_object_class_install_property (gobject_class, PROP_UDP_RECONNECT,
548       g_param_spec_boolean ("udp-reconnect", "Reconnect to the server",
549           "Reconnect to the server if RTSP connection is closed when doing UDP",
550           DEFAULT_UDP_RECONNECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551
552   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
553       g_param_spec_string ("multicast-iface", "Multicast Interface",
554           "The network interface on which to join the multicast group",
555           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
556
557   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
558       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
559           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
560           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
561
562   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
563       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
564           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
565           DEFAULT_USE_PIPELINE_CLOCK,
566           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
567
568   g_object_class_install_property (gobject_class, PROP_SDES,
569       g_param_spec_boxed ("sdes", "SDES",
570           "The SDES items of this session",
571           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
572
573   /**
574    * GstRTSPSrc::tls-validation-flags:
575    *
576    * TLS certificate validation flags used to validate server
577    * certificate.
578    *
579    * Since: 1.2.1
580    */
581   g_object_class_install_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
582       g_param_spec_flags ("tls-validation-flags", "TLS validation flags",
583           "TLS certificate validation flags used to validate the server certificate",
584           G_TYPE_TLS_CERTIFICATE_FLAGS, DEFAULT_TLS_VALIDATION_FLAGS,
585           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
586
587   /**
588    * GstRTSPSrc::handle-request:
589    * @rtspsrc: a #GstRTSPSrc
590    * @request: a #GstRTSPMessage
591    * @response: a #GstRTSPMessage
592    *
593    * Handle a server request in @request and prepare @response.
594    *
595    * This signal is called from the streaming thread, you should therefore not
596    * do any state changes on @rtspsrc because this might deadlock. If you want
597    * to modify the state as a result of this signal, post a
598    * #GST_MESSAGE_REQUEST_STATE message on the bus or signal the main thread
599    * in some other way.
600    *
601    * Since: 1.2
602    */
603   gst_rtspsrc_signals[SIGNAL_HANDLE_REQUEST] =
604       g_signal_new ("handle-request", G_TYPE_FROM_CLASS (klass), 0,
605       0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
606       G_TYPE_POINTER, G_TYPE_POINTER);
607
608   /**
609    * GstRTSPSrc::on-sdp:
610    * @rtspsrc: a #GstRTSPSrc
611    * @sdp: a #GstSDPMessage
612    *
613    * Emited when the client has retrieved the SDP and before it configures the
614    * streams in the SDP. @sdp can be inspected and modified.
615    *
616    * This signal is called from the streaming thread, you should therefore not
617    * do any state changes on @rtspsrc because this might deadlock. If you want
618    * to modify the state as a result of this signal, post a
619    * #GST_MESSAGE_REQUEST_STATE message on the bus or signal the main thread
620    * in some other way.
621    *
622    * Since: 1.2
623    */
624   gst_rtspsrc_signals[SIGNAL_ON_SDP] =
625       g_signal_new ("on-sdp", G_TYPE_FROM_CLASS (klass), 0,
626       0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
627       GST_TYPE_SDP_MESSAGE | G_SIGNAL_TYPE_STATIC_SCOPE);
628
629   /**
630    * GstRTSPSrc::select-stream:
631    * @rtspsrc: a #GstRTSPSrc
632    * @num: the stream number
633    * @caps: the stream caps
634    *
635    * Emited before the client decides to configure the stream @num with
636    * @caps.
637    *
638    * Returns: %TRUE when the stream should be selected, %FALSE when the stream
639    * is to be ignored.
640    *
641    * Since: 1.2
642    */
643   gst_rtspsrc_signals[SIGNAL_SELECT_STREAM] =
644       g_signal_new_class_handler ("select-stream", G_TYPE_FROM_CLASS (klass),
645       G_SIGNAL_RUN_FIRST | G_SIGNAL_RUN_CLEANUP,
646       (GCallback) default_select_stream, select_stream_accum, NULL,
647       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2, G_TYPE_UINT,
648       GST_TYPE_CAPS);
649
650   gstelement_class->send_event = gst_rtspsrc_send_event;
651   gstelement_class->provide_clock = gst_rtspsrc_provide_clock;
652   gstelement_class->change_state = gst_rtspsrc_change_state;
653
654   gst_element_class_add_pad_template (gstelement_class,
655       gst_static_pad_template_get (&rtptemplate));
656
657   gst_element_class_set_static_metadata (gstelement_class,
658       "RTSP packet receiver", "Source/Network",
659       "Receive data over the network via RTSP (RFC 2326)",
660       "Wim Taymans <wim@fluendo.com>, "
661       "Thijs Vermeir <thijs.vermeir@barco.com>, "
662       "Lutz Mueller <lutz@topfrose.de>");
663
664   gstbin_class->handle_message = gst_rtspsrc_handle_message;
665
666   gst_rtsp_ext_list_init ();
667 }
668
669 static void
670 gst_rtspsrc_init (GstRTSPSrc * src)
671 {
672   src->conninfo.location = g_strdup (DEFAULT_LOCATION);
673   src->protocols = DEFAULT_PROTOCOLS;
674   src->debug = DEFAULT_DEBUG;
675   src->retry = DEFAULT_RETRY;
676   src->udp_timeout = DEFAULT_TIMEOUT;
677   gst_rtspsrc_set_tcp_timeout (src, DEFAULT_TCP_TIMEOUT);
678   src->latency = DEFAULT_LATENCY_MS;
679   src->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
680   src->connection_speed = DEFAULT_CONNECTION_SPEED;
681   src->nat_method = DEFAULT_NAT_METHOD;
682   src->do_rtcp = DEFAULT_DO_RTCP;
683   src->do_rtsp_keep_alive = DEFAULT_DO_RTSP_KEEP_ALIVE;
684   gst_rtspsrc_set_proxy (src, DEFAULT_PROXY);
685   src->rtp_blocksize = DEFAULT_RTP_BLOCKSIZE;
686   src->user_id = g_strdup (DEFAULT_USER_ID);
687   src->user_pw = g_strdup (DEFAULT_USER_PW);
688   src->buffer_mode = DEFAULT_BUFFER_MODE;
689   src->client_port_range.min = 0;
690   src->client_port_range.max = 0;
691   src->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
692   src->short_header = DEFAULT_SHORT_HEADER;
693   src->probation = DEFAULT_PROBATION;
694   src->udp_reconnect = DEFAULT_UDP_RECONNECT;
695   src->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
696   src->ntp_sync = DEFAULT_NTP_SYNC;
697   src->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
698   src->sdes = NULL;
699   src->tls_validation_flags = DEFAULT_TLS_VALIDATION_FLAGS;
700
701   /* get a list of all extensions */
702   src->extensions = gst_rtsp_ext_list_get ();
703
704   /* connect to send signal */
705   gst_rtsp_ext_list_connect (src->extensions, "send",
706       (GCallback) gst_rtspsrc_send_cb, src);
707
708   /* protects the streaming thread in interleaved mode or the polling
709    * thread in UDP mode. */
710   g_rec_mutex_init (&src->stream_rec_lock);
711
712   /* protects our state changes from multiple invocations */
713   g_rec_mutex_init (&src->state_rec_lock);
714
715   src->state = GST_RTSP_STATE_INVALID;
716
717   GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
718 }
719
720 static void
721 gst_rtspsrc_finalize (GObject * object)
722 {
723   GstRTSPSrc *rtspsrc;
724
725   rtspsrc = GST_RTSPSRC (object);
726
727   gst_rtsp_ext_list_free (rtspsrc->extensions);
728   g_free (rtspsrc->conninfo.location);
729   gst_rtsp_url_free (rtspsrc->conninfo.url);
730   g_free (rtspsrc->conninfo.url_str);
731   g_free (rtspsrc->user_id);
732   g_free (rtspsrc->user_pw);
733   g_free (rtspsrc->multi_iface);
734
735   if (rtspsrc->sdp) {
736     gst_sdp_message_free (rtspsrc->sdp);
737     rtspsrc->sdp = NULL;
738   }
739   if (rtspsrc->provided_clock)
740     gst_object_unref (rtspsrc->provided_clock);
741
742   if (rtspsrc->sdes)
743     gst_structure_free (rtspsrc->sdes);
744
745   /* free locks */
746   g_rec_mutex_clear (&rtspsrc->stream_rec_lock);
747   g_rec_mutex_clear (&rtspsrc->state_rec_lock);
748
749   G_OBJECT_CLASS (parent_class)->finalize (object);
750 }
751
752 static GstClock *
753 gst_rtspsrc_provide_clock (GstElement * element)
754 {
755   GstRTSPSrc *src = GST_RTSPSRC (element);
756   GstClock *clock;
757
758   if ((clock = src->provided_clock) != NULL)
759     gst_object_ref (clock);
760
761   return clock;
762 }
763
764 /* a proxy string of the format [user:passwd@]host[:port] */
765 static gboolean
766 gst_rtspsrc_set_proxy (GstRTSPSrc * rtsp, const gchar * proxy)
767 {
768   gchar *p, *at, *col;
769
770   g_free (rtsp->proxy_user);
771   rtsp->proxy_user = NULL;
772   g_free (rtsp->proxy_passwd);
773   rtsp->proxy_passwd = NULL;
774   g_free (rtsp->proxy_host);
775   rtsp->proxy_host = NULL;
776   rtsp->proxy_port = 0;
777
778   p = (gchar *) proxy;
779
780   if (p == NULL)
781     return TRUE;
782
783   /* we allow http:// in front but ignore it */
784   if (g_str_has_prefix (p, "http://"))
785     p += 7;
786
787   at = strchr (p, '@');
788   if (at) {
789     /* look for user:passwd */
790     col = strchr (proxy, ':');
791     if (col == NULL || col > at)
792       return FALSE;
793
794     rtsp->proxy_user = g_strndup (p, col - p);
795     col++;
796     rtsp->proxy_passwd = g_strndup (col, at - col);
797
798     /* move to host */
799     p = at + 1;
800   } else {
801     if (rtsp->prop_proxy_id != NULL && *rtsp->prop_proxy_id != '\0')
802       rtsp->proxy_user = g_strdup (rtsp->prop_proxy_id);
803     if (rtsp->prop_proxy_pw != NULL && *rtsp->prop_proxy_pw != '\0')
804       rtsp->proxy_passwd = g_strdup (rtsp->prop_proxy_pw);
805     if (rtsp->proxy_user != NULL || rtsp->proxy_passwd != NULL) {
806       GST_LOG_OBJECT (rtsp, "set proxy user/pw from properties: %s:%s",
807           GST_STR_NULL (rtsp->proxy_user), GST_STR_NULL (rtsp->proxy_passwd));
808     }
809   }
810   col = strchr (p, ':');
811
812   if (col) {
813     /* everything before the colon is the hostname */
814     rtsp->proxy_host = g_strndup (p, col - p);
815     p = col + 1;
816     rtsp->proxy_port = strtoul (p, (char **) &p, 10);
817   } else {
818     rtsp->proxy_host = g_strdup (p);
819     rtsp->proxy_port = 8080;
820   }
821   return TRUE;
822 }
823
824 static void
825 gst_rtspsrc_set_tcp_timeout (GstRTSPSrc * rtspsrc, guint64 timeout)
826 {
827   rtspsrc->tcp_timeout.tv_sec = timeout / G_USEC_PER_SEC;
828   rtspsrc->tcp_timeout.tv_usec = timeout % G_USEC_PER_SEC;
829
830   if (timeout != 0)
831     rtspsrc->ptcp_timeout = &rtspsrc->tcp_timeout;
832   else
833     rtspsrc->ptcp_timeout = NULL;
834 }
835
836 static void
837 gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
838     GParamSpec * pspec)
839 {
840   GstRTSPSrc *rtspsrc;
841
842   rtspsrc = GST_RTSPSRC (object);
843
844   switch (prop_id) {
845     case PROP_LOCATION:
846       gst_rtspsrc_uri_set_uri (GST_URI_HANDLER (rtspsrc),
847           g_value_get_string (value), NULL);
848       break;
849     case PROP_PROTOCOLS:
850       rtspsrc->protocols = g_value_get_flags (value);
851       break;
852     case PROP_DEBUG:
853       rtspsrc->debug = g_value_get_boolean (value);
854       break;
855     case PROP_RETRY:
856       rtspsrc->retry = g_value_get_uint (value);
857       break;
858     case PROP_TIMEOUT:
859       rtspsrc->udp_timeout = g_value_get_uint64 (value);
860       break;
861     case PROP_TCP_TIMEOUT:
862       gst_rtspsrc_set_tcp_timeout (rtspsrc, g_value_get_uint64 (value));
863       break;
864     case PROP_LATENCY:
865       rtspsrc->latency = g_value_get_uint (value);
866       break;
867     case PROP_DROP_ON_LATENCY:
868       rtspsrc->drop_on_latency = g_value_get_boolean (value);
869       break;
870     case PROP_CONNECTION_SPEED:
871       rtspsrc->connection_speed = g_value_get_uint64 (value);
872       break;
873     case PROP_NAT_METHOD:
874       rtspsrc->nat_method = g_value_get_enum (value);
875       break;
876     case PROP_DO_RTCP:
877       rtspsrc->do_rtcp = g_value_get_boolean (value);
878       break;
879     case PROP_DO_RTSP_KEEP_ALIVE:
880       rtspsrc->do_rtsp_keep_alive = g_value_get_boolean (value);
881       break;
882     case PROP_PROXY:
883       gst_rtspsrc_set_proxy (rtspsrc, g_value_get_string (value));
884       break;
885     case PROP_PROXY_ID:
886       if (rtspsrc->prop_proxy_id)
887         g_free (rtspsrc->prop_proxy_id);
888       rtspsrc->prop_proxy_id = g_value_dup_string (value);
889       break;
890     case PROP_PROXY_PW:
891       if (rtspsrc->prop_proxy_pw)
892         g_free (rtspsrc->prop_proxy_pw);
893       rtspsrc->prop_proxy_pw = g_value_dup_string (value);
894       break;
895     case PROP_RTP_BLOCKSIZE:
896       rtspsrc->rtp_blocksize = g_value_get_uint (value);
897       break;
898     case PROP_USER_ID:
899       if (rtspsrc->user_id)
900         g_free (rtspsrc->user_id);
901       rtspsrc->user_id = g_value_dup_string (value);
902       break;
903     case PROP_USER_PW:
904       if (rtspsrc->user_pw)
905         g_free (rtspsrc->user_pw);
906       rtspsrc->user_pw = g_value_dup_string (value);
907       break;
908     case PROP_BUFFER_MODE:
909       rtspsrc->buffer_mode = g_value_get_enum (value);
910       break;
911     case PROP_PORT_RANGE:
912     {
913       const gchar *str;
914
915       str = g_value_get_string (value);
916       if (str) {
917         sscanf (str, "%u-%u",
918             &rtspsrc->client_port_range.min, &rtspsrc->client_port_range.max);
919       } else {
920         rtspsrc->client_port_range.min = 0;
921         rtspsrc->client_port_range.max = 0;
922       }
923       break;
924     }
925     case PROP_UDP_BUFFER_SIZE:
926       rtspsrc->udp_buffer_size = g_value_get_int (value);
927       break;
928     case PROP_SHORT_HEADER:
929       rtspsrc->short_header = g_value_get_boolean (value);
930       break;
931     case PROP_PROBATION:
932       rtspsrc->probation = g_value_get_uint (value);
933       break;
934     case PROP_UDP_RECONNECT:
935       rtspsrc->udp_reconnect = g_value_get_boolean (value);
936       break;
937     case PROP_MULTICAST_IFACE:
938       g_free (rtspsrc->multi_iface);
939
940       if (g_value_get_string (value) == NULL)
941         rtspsrc->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
942       else
943         rtspsrc->multi_iface = g_value_dup_string (value);
944       break;
945     case PROP_NTP_SYNC:
946       rtspsrc->ntp_sync = g_value_get_boolean (value);
947       break;
948     case PROP_USE_PIPELINE_CLOCK:
949       rtspsrc->use_pipeline_clock = g_value_get_boolean (value);
950       break;
951     case PROP_SDES:
952       rtspsrc->sdes = g_value_dup_boxed (value);
953       break;
954     case PROP_TLS_VALIDATION_FLAGS:
955       rtspsrc->tls_validation_flags = g_value_get_flags (value);
956       break;
957     default:
958       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
959       break;
960   }
961 }
962
963 static void
964 gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
965     GParamSpec * pspec)
966 {
967   GstRTSPSrc *rtspsrc;
968
969   rtspsrc = GST_RTSPSRC (object);
970
971   switch (prop_id) {
972     case PROP_LOCATION:
973       g_value_set_string (value, rtspsrc->conninfo.location);
974       break;
975     case PROP_PROTOCOLS:
976       g_value_set_flags (value, rtspsrc->protocols);
977       break;
978     case PROP_DEBUG:
979       g_value_set_boolean (value, rtspsrc->debug);
980       break;
981     case PROP_RETRY:
982       g_value_set_uint (value, rtspsrc->retry);
983       break;
984     case PROP_TIMEOUT:
985       g_value_set_uint64 (value, rtspsrc->udp_timeout);
986       break;
987     case PROP_TCP_TIMEOUT:
988     {
989       guint64 timeout;
990
991       timeout = rtspsrc->tcp_timeout.tv_sec * G_USEC_PER_SEC +
992           rtspsrc->tcp_timeout.tv_usec;
993       g_value_set_uint64 (value, timeout);
994       break;
995     }
996     case PROP_LATENCY:
997       g_value_set_uint (value, rtspsrc->latency);
998       break;
999     case PROP_DROP_ON_LATENCY:
1000       g_value_set_boolean (value, rtspsrc->drop_on_latency);
1001       break;
1002     case PROP_CONNECTION_SPEED:
1003       g_value_set_uint64 (value, rtspsrc->connection_speed);
1004       break;
1005     case PROP_NAT_METHOD:
1006       g_value_set_enum (value, rtspsrc->nat_method);
1007       break;
1008     case PROP_DO_RTCP:
1009       g_value_set_boolean (value, rtspsrc->do_rtcp);
1010       break;
1011     case PROP_DO_RTSP_KEEP_ALIVE:
1012       g_value_set_boolean (value, rtspsrc->do_rtsp_keep_alive);
1013       break;
1014     case PROP_PROXY:
1015     {
1016       gchar *str;
1017
1018       if (rtspsrc->proxy_host) {
1019         str =
1020             g_strdup_printf ("%s:%d", rtspsrc->proxy_host, rtspsrc->proxy_port);
1021       } else {
1022         str = NULL;
1023       }
1024       g_value_take_string (value, str);
1025       break;
1026     }
1027     case PROP_PROXY_ID:
1028       g_value_set_string (value, rtspsrc->prop_proxy_id);
1029       break;
1030     case PROP_PROXY_PW:
1031       g_value_set_string (value, rtspsrc->prop_proxy_pw);
1032       break;
1033     case PROP_RTP_BLOCKSIZE:
1034       g_value_set_uint (value, rtspsrc->rtp_blocksize);
1035       break;
1036     case PROP_USER_ID:
1037       g_value_set_string (value, rtspsrc->user_id);
1038       break;
1039     case PROP_USER_PW:
1040       g_value_set_string (value, rtspsrc->user_pw);
1041       break;
1042     case PROP_BUFFER_MODE:
1043       g_value_set_enum (value, rtspsrc->buffer_mode);
1044       break;
1045     case PROP_PORT_RANGE:
1046     {
1047       gchar *str;
1048
1049       if (rtspsrc->client_port_range.min != 0) {
1050         str = g_strdup_printf ("%u-%u", rtspsrc->client_port_range.min,
1051             rtspsrc->client_port_range.max);
1052       } else {
1053         str = NULL;
1054       }
1055       g_value_take_string (value, str);
1056       break;
1057     }
1058     case PROP_UDP_BUFFER_SIZE:
1059       g_value_set_int (value, rtspsrc->udp_buffer_size);
1060       break;
1061     case PROP_SHORT_HEADER:
1062       g_value_set_boolean (value, rtspsrc->short_header);
1063       break;
1064     case PROP_PROBATION:
1065       g_value_set_uint (value, rtspsrc->probation);
1066       break;
1067     case PROP_UDP_RECONNECT:
1068       g_value_set_boolean (value, rtspsrc->udp_reconnect);
1069       break;
1070     case PROP_MULTICAST_IFACE:
1071       g_value_set_string (value, rtspsrc->multi_iface);
1072       break;
1073     case PROP_NTP_SYNC:
1074       g_value_set_boolean (value, rtspsrc->ntp_sync);
1075       break;
1076     case PROP_USE_PIPELINE_CLOCK:
1077       g_value_set_boolean (value, rtspsrc->use_pipeline_clock);
1078       break;
1079     case PROP_SDES:
1080       g_value_set_boxed (value, rtspsrc->sdes);
1081       break;
1082     case PROP_TLS_VALIDATION_FLAGS:
1083       g_value_set_flags (value, rtspsrc->tls_validation_flags);
1084       break;
1085     default:
1086       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1087       break;
1088   }
1089 }
1090
1091 static gint
1092 find_stream_by_id (GstRTSPStream * stream, gint * id)
1093 {
1094   if (stream->id == *id)
1095     return 0;
1096
1097   return -1;
1098 }
1099
1100 static gint
1101 find_stream_by_channel (GstRTSPStream * stream, gint * channel)
1102 {
1103   if (stream->channel[0] == *channel || stream->channel[1] == *channel)
1104     return 0;
1105
1106   return -1;
1107 }
1108
1109 static gint
1110 find_stream_by_pt (GstRTSPStream * stream, gint * pt)
1111 {
1112   if (stream->pt == *pt)
1113     return 0;
1114
1115   return -1;
1116 }
1117
1118 static gint
1119 find_stream_by_udpsrc (GstRTSPStream * stream, gconstpointer a)
1120 {
1121   GstElement *src = (GstElement *) a;
1122
1123   if (stream->udpsrc[0] == src)
1124     return 0;
1125   if (stream->udpsrc[1] == src)
1126     return 0;
1127
1128   return -1;
1129 }
1130
1131 static gint
1132 find_stream_by_setup (GstRTSPStream * stream, gconstpointer a)
1133 {
1134   /* check qualified setup_url */
1135   if (!strcmp (stream->conninfo.location, (gchar *) a))
1136     return 0;
1137   /* check original control_url */
1138   if (!strcmp (stream->control_url, (gchar *) a))
1139     return 0;
1140
1141   /* check if qualified setup_url ends with string */
1142   if (g_str_has_suffix (stream->control_url, (gchar *) a))
1143     return 0;
1144
1145   return -1;
1146 }
1147
1148 static GstRTSPStream *
1149 find_stream (GstRTSPSrc * src, gconstpointer data, gconstpointer func)
1150 {
1151   GList *lstream;
1152
1153   /* find and get stream */
1154   if ((lstream = g_list_find_custom (src->streams, data, (GCompareFunc) func)))
1155     return (GstRTSPStream *) lstream->data;
1156
1157   return NULL;
1158 }
1159
1160 static const GstSDPBandwidth *
1161 gst_rtspsrc_get_bandwidth (GstRTSPSrc * src, const GstSDPMessage * sdp,
1162     const GstSDPMedia * media, const gchar * type)
1163 {
1164   guint i, len;
1165
1166   /* first look in the media specific section */
1167   len = gst_sdp_media_bandwidths_len (media);
1168   for (i = 0; i < len; i++) {
1169     const GstSDPBandwidth *bw = gst_sdp_media_get_bandwidth (media, i);
1170
1171     if (strcmp (bw->bwtype, type) == 0)
1172       return bw;
1173   }
1174   /* then look in the message specific section */
1175   len = gst_sdp_message_bandwidths_len (sdp);
1176   for (i = 0; i < len; i++) {
1177     const GstSDPBandwidth *bw = gst_sdp_message_get_bandwidth (sdp, i);
1178
1179     if (strcmp (bw->bwtype, type) == 0)
1180       return bw;
1181   }
1182   return NULL;
1183 }
1184
1185 static void
1186 gst_rtspsrc_collect_bandwidth (GstRTSPSrc * src, const GstSDPMessage * sdp,
1187     const GstSDPMedia * media, GstRTSPStream * stream)
1188 {
1189   const GstSDPBandwidth *bw;
1190
1191   if ((bw = gst_rtspsrc_get_bandwidth (src, sdp, media, GST_SDP_BWTYPE_AS)))
1192     stream->as_bandwidth = bw->bandwidth;
1193   else
1194     stream->as_bandwidth = -1;
1195
1196   if ((bw = gst_rtspsrc_get_bandwidth (src, sdp, media, GST_SDP_BWTYPE_RR)))
1197     stream->rr_bandwidth = bw->bandwidth;
1198   else
1199     stream->rr_bandwidth = -1;
1200
1201   if ((bw = gst_rtspsrc_get_bandwidth (src, sdp, media, GST_SDP_BWTYPE_RS)))
1202     stream->rs_bandwidth = bw->bandwidth;
1203   else
1204     stream->rs_bandwidth = -1;
1205 }
1206
1207 static void
1208 gst_rtspsrc_do_stream_connection (GstRTSPSrc * src, GstRTSPStream * stream,
1209     const GstSDPConnection * conn)
1210 {
1211   if (conn->nettype == NULL || strcmp (conn->nettype, "IN") != 0)
1212     return;
1213
1214   if (conn->addrtype == NULL)
1215     return;
1216
1217   /* check for IPV6 */
1218   if (strcmp (conn->addrtype, "IP4") == 0)
1219     stream->is_ipv6 = FALSE;
1220   else if (strcmp (conn->addrtype, "IP6") == 0)
1221     stream->is_ipv6 = TRUE;
1222   else
1223     return;
1224
1225   /* save address */
1226   g_free (stream->destination);
1227   stream->destination = g_strdup (conn->address);
1228
1229   /* check for multicast */
1230   stream->is_multicast =
1231       gst_sdp_address_is_multicast (conn->nettype, conn->addrtype,
1232       conn->address);
1233   stream->ttl = conn->ttl;
1234 }
1235
1236 /* Go over the connections for a stream.
1237  * - If we are dealing with IPV6, we will setup IPV6 sockets for sending and
1238  *   receiving.
1239  * - If we are dealing with a localhost address, we disable multicast
1240  */
1241 static void
1242 gst_rtspsrc_collect_connections (GstRTSPSrc * src, const GstSDPMessage * sdp,
1243     const GstSDPMedia * media, GstRTSPStream * stream)
1244 {
1245   const GstSDPConnection *conn;
1246   guint i, len;
1247
1248   /* first look in the media specific section */
1249   len = gst_sdp_media_connections_len (media);
1250   for (i = 0; i < len; i++) {
1251     conn = gst_sdp_media_get_connection (media, i);
1252
1253     gst_rtspsrc_do_stream_connection (src, stream, conn);
1254   }
1255   /* then look in the message specific section */
1256   if ((conn = gst_sdp_message_get_connection (sdp))) {
1257     gst_rtspsrc_do_stream_connection (src, stream, conn);
1258   }
1259 }
1260
1261 static const gchar *
1262 get_aggregate_control (GstRTSPSrc * src)
1263 {
1264   const gchar *base;
1265
1266   if (src->control)
1267     base = src->control;
1268   else if (src->content_base)
1269     base = src->content_base;
1270   else if (src->conninfo.url_str)
1271     base = src->conninfo.url_str;
1272   else
1273     base = "/";
1274
1275   return base;
1276 }
1277
1278 static GstRTSPStream *
1279 gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx)
1280 {
1281   GstRTSPStream *stream;
1282   const gchar *control_url;
1283   const gchar *payload;
1284   const GstSDPMedia *media;
1285
1286   /* get media, should not return NULL */
1287   media = gst_sdp_message_get_media (sdp, idx);
1288   if (media == NULL)
1289     return NULL;
1290
1291   stream = g_new0 (GstRTSPStream, 1);
1292   stream->parent = src;
1293   /* we mark the pad as not linked, we will mark it as OK when we add the pad to
1294    * the element. */
1295   stream->last_ret = GST_FLOW_NOT_LINKED;
1296   stream->added = FALSE;
1297   stream->disabled = FALSE;
1298   stream->id = src->numstreams++;
1299   stream->eos = FALSE;
1300   stream->discont = TRUE;
1301   stream->seqbase = -1;
1302   stream->timebase = -1;
1303
1304   /* collect bandwidth information for this steam. FIXME, configure in the RTP
1305    * session manager to scale RTCP. */
1306   gst_rtspsrc_collect_bandwidth (src, sdp, media, stream);
1307
1308   /* collect connection info */
1309   gst_rtspsrc_collect_connections (src, sdp, media, stream);
1310
1311   /* we must have a payload. No payload means we cannot create caps */
1312   /* FIXME, handle multiple formats. The problem here is that we just want to
1313    * take the first available format that we can handle but in order to do that
1314    * we need to scan for depayloader plugins. Scanning for payloader plugins is
1315    * also suboptimal because the user maybe just wants to save the raw stream
1316    * and then we don't care. */
1317   if ((payload = gst_sdp_media_get_format (media, 0))) {
1318     stream->pt = atoi (payload);
1319     /* convert caps */
1320     stream->caps = gst_rtspsrc_media_to_caps (stream->pt, media);
1321
1322     GST_DEBUG ("mapping sdp session level attributes to caps");
1323     gst_rtspsrc_sdp_attributes_to_caps (sdp->attributes, stream->caps);
1324     GST_DEBUG ("mapping sdp media level attributes to caps");
1325     gst_rtspsrc_sdp_attributes_to_caps (media->attributes, stream->caps);
1326
1327     if (stream->pt >= 96) {
1328       /* If we have a dynamic payload type, see if we have a stream with the
1329        * same payload number. If there is one, they are part of the same
1330        * container and we only need to add one pad. */
1331       if (find_stream (src, &stream->pt, (gpointer) find_stream_by_pt)) {
1332         stream->container = TRUE;
1333         GST_DEBUG ("found another stream with pt %d, marking as container",
1334             stream->pt);
1335       }
1336     }
1337   }
1338   /* collect port number */
1339   stream->port = gst_sdp_media_get_port (media);
1340
1341   /* get control url to construct the setup url. The setup url is used to
1342    * configure the transport of the stream and is used to identity the stream in
1343    * the RTP-Info header field returned from PLAY. */
1344   control_url = gst_sdp_media_get_attribute_val (media, "control");
1345   if (control_url == NULL)
1346     control_url = gst_sdp_message_get_attribute_val_n (sdp, "control", 0);
1347
1348   GST_DEBUG_OBJECT (src, "stream %d, (%p)", stream->id, stream);
1349   GST_DEBUG_OBJECT (src, " pt: %d", stream->pt);
1350   GST_DEBUG_OBJECT (src, " port: %d", stream->port);
1351   GST_DEBUG_OBJECT (src, " container: %d", stream->container);
1352   GST_DEBUG_OBJECT (src, " caps: %" GST_PTR_FORMAT, stream->caps);
1353   GST_DEBUG_OBJECT (src, " control: %s", GST_STR_NULL (control_url));
1354
1355   if (control_url != NULL) {
1356     stream->control_url = g_strdup (control_url);
1357     /* Build a fully qualified url using the content_base if any or by prefixing
1358      * the original request.
1359      * If the control_url starts with a '/' or a non rtsp: protocol we will most
1360      * likely build a URL that the server will fail to understand, this is ok,
1361      * we will fail then. */
1362     if (g_str_has_prefix (control_url, "rtsp://"))
1363       stream->conninfo.location = g_strdup (control_url);
1364     else {
1365       const gchar *base;
1366       gboolean has_slash;
1367
1368       if (g_strcmp0 (control_url, "*") == 0)
1369         control_url = "";
1370
1371       base = get_aggregate_control (src);
1372
1373       /* check if the base ends or control starts with / */
1374       has_slash = g_str_has_prefix (control_url, "/");
1375       has_slash = has_slash || g_str_has_suffix (base, "/");
1376
1377       /* concatenate the two strings, insert / when not present */
1378       stream->conninfo.location =
1379           g_strdup_printf ("%s%s%s", base, has_slash ? "" : "/", control_url);
1380     }
1381   }
1382   GST_DEBUG_OBJECT (src, " setup: %s",
1383       GST_STR_NULL (stream->conninfo.location));
1384
1385   /* we keep track of all streams */
1386   src->streams = g_list_append (src->streams, stream);
1387
1388   return stream;
1389
1390   /* ERRORS */
1391 }
1392
1393 static void
1394 gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream)
1395 {
1396   gint i;
1397
1398   GST_DEBUG_OBJECT (src, "free stream %p", stream);
1399
1400   if (stream->caps)
1401     gst_caps_unref (stream->caps);
1402
1403   g_free (stream->destination);
1404   g_free (stream->control_url);
1405   g_free (stream->conninfo.location);
1406
1407   for (i = 0; i < 2; i++) {
1408     if (stream->udpsrc[i]) {
1409       gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
1410       gst_bin_remove (GST_BIN_CAST (src), stream->udpsrc[i]);
1411       gst_object_unref (stream->udpsrc[i]);
1412       stream->udpsrc[i] = NULL;
1413     }
1414     if (stream->channelpad[i]) {
1415       gst_object_unref (stream->channelpad[i]);
1416       stream->channelpad[i] = NULL;
1417     }
1418     if (stream->udpsink[i]) {
1419       gst_element_set_state (stream->udpsink[i], GST_STATE_NULL);
1420       gst_bin_remove (GST_BIN_CAST (src), stream->udpsink[i]);
1421       gst_object_unref (stream->udpsink[i]);
1422       stream->udpsink[i] = NULL;
1423     }
1424   }
1425   if (stream->fakesrc) {
1426     gst_element_set_state (stream->fakesrc, GST_STATE_NULL);
1427     gst_bin_remove (GST_BIN_CAST (src), stream->fakesrc);
1428     gst_object_unref (stream->fakesrc);
1429     stream->fakesrc = NULL;
1430   }
1431   if (stream->srcpad) {
1432     gst_pad_set_active (stream->srcpad, FALSE);
1433     if (stream->added) {
1434       gst_element_remove_pad (GST_ELEMENT_CAST (src), stream->srcpad);
1435       stream->added = FALSE;
1436     }
1437     stream->srcpad = NULL;
1438   }
1439   if (stream->rtcppad) {
1440     gst_object_unref (stream->rtcppad);
1441     stream->rtcppad = NULL;
1442   }
1443   if (stream->session) {
1444     g_object_unref (stream->session);
1445     stream->session = NULL;
1446   }
1447   g_free (stream);
1448 }
1449
1450 static void
1451 gst_rtspsrc_cleanup (GstRTSPSrc * src)
1452 {
1453   GList *walk;
1454
1455   GST_DEBUG_OBJECT (src, "cleanup");
1456
1457   for (walk = src->streams; walk; walk = g_list_next (walk)) {
1458     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
1459
1460     gst_rtspsrc_stream_free (src, stream);
1461   }
1462   g_list_free (src->streams);
1463   src->streams = NULL;
1464   if (src->manager) {
1465     if (src->manager_sig_id) {
1466       g_signal_handler_disconnect (src->manager, src->manager_sig_id);
1467       src->manager_sig_id = 0;
1468     }
1469     gst_element_set_state (src->manager, GST_STATE_NULL);
1470     gst_bin_remove (GST_BIN_CAST (src), src->manager);
1471     src->manager = NULL;
1472   }
1473   src->numstreams = 0;
1474   if (src->props)
1475     gst_structure_free (src->props);
1476   src->props = NULL;
1477
1478   g_free (src->content_base);
1479   src->content_base = NULL;
1480
1481   g_free (src->control);
1482   src->control = NULL;
1483
1484   if (src->range)
1485     gst_rtsp_range_free (src->range);
1486   src->range = NULL;
1487
1488   /* don't clear the SDP when it was used in the url */
1489   if (src->sdp && !src->from_sdp) {
1490     gst_sdp_message_free (src->sdp);
1491     src->sdp = NULL;
1492   }
1493   if (src->start_segment) {
1494     gst_event_unref (src->start_segment);
1495     src->start_segment = NULL;
1496   }
1497   if (src->provided_clock) {
1498     gst_object_unref (src->provided_clock);
1499     src->provided_clock = NULL;
1500   }
1501 }
1502
1503 #define PARSE_INT(p, del, res)          \
1504 G_STMT_START {                          \
1505   gchar *t = p;                         \
1506   p = strstr (p, del);                  \
1507   if (p == NULL)                        \
1508     res = -1;                           \
1509   else {                                \
1510     *p = '\0';                          \
1511     p++;                                \
1512     res = atoi (t);                     \
1513   }                                     \
1514 } G_STMT_END
1515
1516 #define PARSE_STRING(p, del, res)       \
1517 G_STMT_START {                          \
1518   gchar *t = p;                         \
1519   p = strstr (p, del);                  \
1520   if (p == NULL) {                      \
1521     res = NULL;                         \
1522     p = t;                              \
1523   }                                     \
1524   else {                                \
1525     *p = '\0';                          \
1526     p++;                                \
1527     res = t;                            \
1528   }                                     \
1529 } G_STMT_END
1530
1531 #define SKIP_SPACES(p)                  \
1532   while (*p && g_ascii_isspace (*p))    \
1533     p++;
1534
1535 /* rtpmap contains:
1536  *
1537  *  <payload> <encoding_name>/<clock_rate>[/<encoding_params>]
1538  */
1539 static gboolean
1540 gst_rtspsrc_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name,
1541     gint * rate, gchar ** params)
1542 {
1543   gchar *p, *t;
1544
1545   p = (gchar *) rtpmap;
1546
1547   PARSE_INT (p, " ", *payload);
1548   if (*payload == -1)
1549     return FALSE;
1550
1551   SKIP_SPACES (p);
1552   if (*p == '\0')
1553     return FALSE;
1554
1555   PARSE_STRING (p, "/", *name);
1556   if (*name == NULL) {
1557     GST_DEBUG ("no rate, name %s", p);
1558     /* no rate, assume -1 then, this is not supposed to happen but RealMedia
1559      * streams seem to omit the rate. */
1560     *name = p;
1561     *rate = -1;
1562     return TRUE;
1563   }
1564
1565   t = p;
1566   p = strstr (p, "/");
1567   if (p == NULL) {
1568     *rate = atoi (t);
1569     return TRUE;
1570   }
1571   *p = '\0';
1572   p++;
1573   *rate = atoi (t);
1574
1575   t = p;
1576   if (*p == '\0')
1577     return TRUE;
1578   *params = t;
1579
1580   return TRUE;
1581 }
1582
1583 /*
1584  * Mapping SDP attributes to caps
1585  *
1586  * prepend 'a-' to IANA registered sdp attributes names
1587  * (ie: not prefixed with 'x-') in order to avoid
1588  * collision with gstreamer standard caps properties names
1589  */
1590 static void
1591 gst_rtspsrc_sdp_attributes_to_caps (GArray * attributes, GstCaps * caps)
1592 {
1593   if (attributes->len > 0) {
1594     GstStructure *s;
1595     guint i;
1596
1597     s = gst_caps_get_structure (caps, 0);
1598
1599     for (i = 0; i < attributes->len; i++) {
1600       GstSDPAttribute *attr = &g_array_index (attributes, GstSDPAttribute, i);
1601       gchar *tofree, *key;
1602
1603       key = attr->key;
1604
1605       /* skip some of the attribute we already handle */
1606       if (!strcmp (key, "fmtp"))
1607         continue;
1608       if (!strcmp (key, "rtpmap"))
1609         continue;
1610       if (!strcmp (key, "control"))
1611         continue;
1612       if (!strcmp (key, "range"))
1613         continue;
1614
1615       /* string must be valid UTF8 */
1616       if (!g_utf8_validate (attr->value, -1, NULL))
1617         continue;
1618
1619       if (!g_str_has_prefix (key, "x-"))
1620         tofree = key = g_strdup_printf ("a-%s", key);
1621       else
1622         tofree = NULL;
1623
1624       GST_DEBUG ("adding caps: %s=%s", key, attr->value);
1625       gst_structure_set (s, key, G_TYPE_STRING, attr->value, NULL);
1626       g_free (tofree);
1627     }
1628   }
1629 }
1630
1631 /*
1632  *  Mapping of caps to and from SDP fields:
1633  *
1634  *   m=<media> <UDP port> RTP/AVP <payload>
1635  *   a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
1636  *   a=fmtp:<payload> <param>[=<value>];...
1637  */
1638 static GstCaps *
1639 gst_rtspsrc_media_to_caps (gint pt, const GstSDPMedia * media)
1640 {
1641   GstCaps *caps;
1642   const gchar *rtpmap;
1643   const gchar *fmtp;
1644   gchar *name = NULL;
1645   gint rate = -1;
1646   gchar *params = NULL;
1647   gchar *tmp;
1648   GstStructure *s;
1649   gint payload = 0;
1650   gboolean ret;
1651
1652   /* get and parse rtpmap */
1653   if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) {
1654     ret = gst_rtspsrc_parse_rtpmap (rtpmap, &payload, &name, &rate, &params);
1655     if (ret) {
1656       if (payload != pt) {
1657         /* we ignore the rtpmap if the payload type is different. */
1658         g_warning ("rtpmap of wrong payload type, ignoring");
1659         name = NULL;
1660         rate = -1;
1661         params = NULL;
1662       }
1663     } else {
1664       /* if we failed to parse the rtpmap for a dynamic payload type, we have an
1665        * error */
1666       if (pt >= 96)
1667         goto no_rtpmap;
1668       /* else we can ignore */
1669       g_warning ("error parsing rtpmap, ignoring");
1670     }
1671   } else {
1672     /* dynamic payloads need rtpmap or we fail */
1673     if (pt >= 96)
1674       goto no_rtpmap;
1675   }
1676   /* check if we have a rate, if not, we need to look up the rate from the
1677    * default rates based on the payload types. */
1678   if (rate == -1) {
1679     const GstRTPPayloadInfo *info;
1680
1681     if (GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
1682       /* dynamic types, use media and encoding_name */
1683       tmp = g_ascii_strdown (media->media, -1);
1684       info = gst_rtp_payload_info_for_name (tmp, name);
1685       g_free (tmp);
1686     } else {
1687       /* static types, use payload type */
1688       info = gst_rtp_payload_info_for_pt (pt);
1689     }
1690
1691     if (info) {
1692       if ((rate = info->clock_rate) == 0)
1693         rate = -1;
1694     }
1695     /* we fail if we cannot find one */
1696     if (rate == -1)
1697       goto no_rate;
1698   }
1699
1700   tmp = g_ascii_strdown (media->media, -1);
1701   caps = gst_caps_new_simple ("application/x-unknown",
1702       "media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL);
1703   g_free (tmp);
1704   s = gst_caps_get_structure (caps, 0);
1705
1706   gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL);
1707
1708   /* encoding name must be upper case */
1709   if (name != NULL) {
1710     tmp = g_ascii_strup (name, -1);
1711     gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL);
1712     g_free (tmp);
1713   }
1714
1715   /* params must be lower case */
1716   if (params != NULL) {
1717     tmp = g_ascii_strdown (params, -1);
1718     gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL);
1719     g_free (tmp);
1720   }
1721
1722   /* parse optional fmtp: field */
1723   if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) {
1724     gchar *p;
1725     gint payload = 0;
1726
1727     p = (gchar *) fmtp;
1728
1729     /* p is now of the format <payload> <param>[=<value>];... */
1730     PARSE_INT (p, " ", payload);
1731     if (payload != -1 && payload == pt) {
1732       gchar **pairs;
1733       gint i;
1734
1735       /* <param>[=<value>] are separated with ';' */
1736       pairs = g_strsplit (p, ";", 0);
1737       for (i = 0; pairs[i]; i++) {
1738         gchar *valpos;
1739         const gchar *val, *key;
1740
1741         /* the key may not have a '=', the value can have other '='s */
1742         valpos = strstr (pairs[i], "=");
1743         if (valpos) {
1744           /* we have a '=' and thus a value, remove the '=' with \0 */
1745           *valpos = '\0';
1746           /* value is everything between '=' and ';'. We split the pairs at ;
1747            * boundaries so we can take the remainder of the value. Some servers
1748            * put spaces around the value which we strip off here. Alternatively
1749            * we could strip those spaces in the depayloaders should these spaces
1750            * actually carry any meaning in the future. */
1751           val = g_strstrip (valpos + 1);
1752         } else {
1753           /* simple <param>;.. is translated into <param>=1;... */
1754           val = "1";
1755         }
1756         /* strip the key of spaces, convert key to lowercase but not the value. */
1757         key = g_strstrip (pairs[i]);
1758         if (strlen (key) > 1) {
1759           tmp = g_ascii_strdown (key, -1);
1760           gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL);
1761           g_free (tmp);
1762         }
1763       }
1764       g_strfreev (pairs);
1765     }
1766   }
1767   return caps;
1768
1769   /* ERRORS */
1770 no_rtpmap:
1771   {
1772     g_warning ("rtpmap type not given for dynamic payload %d", pt);
1773     return NULL;
1774   }
1775 no_rate:
1776   {
1777     g_warning ("rate unknown for payload type %d", pt);
1778     return NULL;
1779   }
1780 }
1781
1782 static gboolean
1783 gst_rtspsrc_alloc_udp_ports (GstRTSPStream * stream,
1784     gint * rtpport, gint * rtcpport)
1785 {
1786   GstRTSPSrc *src;
1787   GstStateChangeReturn ret;
1788   GstElement *udpsrc0, *udpsrc1;
1789   gint tmp_rtp, tmp_rtcp;
1790   guint count;
1791   const gchar *host;
1792
1793   src = stream->parent;
1794
1795   udpsrc0 = NULL;
1796   udpsrc1 = NULL;
1797   count = 0;
1798
1799   /* Start at next port */
1800   tmp_rtp = src->next_port_num;
1801
1802   if (stream->is_ipv6)
1803     host = "udp://[::0]";
1804   else
1805     host = "udp://0.0.0.0";
1806
1807   /* try to allocate 2 UDP ports, the RTP port should be an even
1808    * number and the RTCP port should be the next (uneven) port */
1809 again:
1810
1811   if (tmp_rtp != 0 && src->client_port_range.max > 0 &&
1812       tmp_rtp >= src->client_port_range.max)
1813     goto no_ports;
1814
1815   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
1816   if (udpsrc0 == NULL)
1817     goto no_udp_protocol;
1818   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, "reuse", FALSE, NULL);
1819
1820   if (src->udp_buffer_size != 0)
1821     g_object_set (G_OBJECT (udpsrc0), "buffer-size", src->udp_buffer_size,
1822         NULL);
1823
1824   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1825   if (ret == GST_STATE_CHANGE_FAILURE) {
1826     if (tmp_rtp != 0) {
1827       GST_DEBUG_OBJECT (src, "Unable to make udpsrc from RTP port %d", tmp_rtp);
1828
1829       tmp_rtp += 2;
1830       if (++count > src->retry)
1831         goto no_ports;
1832
1833       GST_DEBUG_OBJECT (src, "free RTP udpsrc");
1834       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1835       gst_object_unref (udpsrc0);
1836       udpsrc0 = NULL;
1837
1838       GST_DEBUG_OBJECT (src, "retry %d", count);
1839       goto again;
1840     }
1841     goto no_udp_protocol;
1842   }
1843
1844   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
1845   GST_DEBUG_OBJECT (src, "got RTP port %d", tmp_rtp);
1846
1847   /* check if port is even */
1848   if ((tmp_rtp & 0x01) != 0) {
1849     /* port not even, close and allocate another */
1850     if (++count > src->retry)
1851       goto no_ports;
1852
1853     GST_DEBUG_OBJECT (src, "RTP port not even");
1854
1855     GST_DEBUG_OBJECT (src, "free RTP udpsrc");
1856     gst_element_set_state (udpsrc0, GST_STATE_NULL);
1857     gst_object_unref (udpsrc0);
1858     udpsrc0 = NULL;
1859
1860     GST_DEBUG_OBJECT (src, "retry %d", count);
1861     tmp_rtp++;
1862     goto again;
1863   }
1864
1865   /* allocate port+1 for RTCP now */
1866   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
1867   if (udpsrc1 == NULL)
1868     goto no_udp_rtcp_protocol;
1869
1870   /* set port */
1871   tmp_rtcp = tmp_rtp + 1;
1872   if (src->client_port_range.max > 0 && tmp_rtcp > src->client_port_range.max)
1873     goto no_ports;
1874
1875   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, "reuse", FALSE, NULL);
1876
1877   GST_DEBUG_OBJECT (src, "starting RTCP on port %d", tmp_rtcp);
1878   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1879   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
1880   if (ret == GST_STATE_CHANGE_FAILURE) {
1881     GST_DEBUG_OBJECT (src, "Unable to make udpsrc from RTCP port %d", tmp_rtcp);
1882
1883     if (++count > src->retry)
1884       goto no_ports;
1885
1886     GST_DEBUG_OBJECT (src, "free RTP udpsrc");
1887     gst_element_set_state (udpsrc0, GST_STATE_NULL);
1888     gst_object_unref (udpsrc0);
1889     udpsrc0 = NULL;
1890
1891     GST_DEBUG_OBJECT (src, "free RTCP udpsrc");
1892     gst_element_set_state (udpsrc1, GST_STATE_NULL);
1893     gst_object_unref (udpsrc1);
1894     udpsrc1 = NULL;
1895
1896     tmp_rtp += 2;
1897     GST_DEBUG_OBJECT (src, "retry %d", count);
1898     goto again;
1899   }
1900
1901   /* all fine, do port check */
1902   g_object_get (G_OBJECT (udpsrc0), "port", rtpport, NULL);
1903   g_object_get (G_OBJECT (udpsrc1), "port", rtcpport, NULL);
1904
1905   /* this should not happen... */
1906   if (*rtpport != tmp_rtp || *rtcpport != tmp_rtcp)
1907     goto port_error;
1908
1909   /* we keep these elements, we configure all in configure_transport when the
1910    * server told us to really use the UDP ports. */
1911   stream->udpsrc[0] = gst_object_ref_sink (udpsrc0);
1912   stream->udpsrc[1] = gst_object_ref_sink (udpsrc1);
1913   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1914   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1915
1916   /* keep track of next available port number when we have a range
1917    * configured */
1918   if (src->next_port_num != 0)
1919     src->next_port_num = tmp_rtcp + 1;
1920
1921   return TRUE;
1922
1923   /* ERRORS */
1924 no_udp_protocol:
1925   {
1926     GST_DEBUG_OBJECT (src, "could not get UDP source");
1927     goto cleanup;
1928   }
1929 no_ports:
1930   {
1931     GST_DEBUG_OBJECT (src, "could not allocate UDP port pair after %d retries",
1932         count);
1933     goto cleanup;
1934   }
1935 no_udp_rtcp_protocol:
1936   {
1937     GST_DEBUG_OBJECT (src, "could not get UDP source for RTCP");
1938     goto cleanup;
1939   }
1940 port_error:
1941   {
1942     GST_DEBUG_OBJECT (src, "ports don't match rtp: %d<->%d, rtcp: %d<->%d",
1943         tmp_rtp, *rtpport, tmp_rtcp, *rtcpport);
1944     goto cleanup;
1945   }
1946 cleanup:
1947   {
1948     if (udpsrc0) {
1949       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1950       gst_object_unref (udpsrc0);
1951     }
1952     if (udpsrc1) {
1953       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1954       gst_object_unref (udpsrc1);
1955     }
1956     return FALSE;
1957   }
1958 }
1959
1960 static void
1961 gst_rtspsrc_set_state (GstRTSPSrc * src, GstState state)
1962 {
1963   GList *walk;
1964
1965   if (src->manager)
1966     gst_element_set_state (GST_ELEMENT_CAST (src->manager), state);
1967
1968   for (walk = src->streams; walk; walk = g_list_next (walk)) {
1969     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
1970     gint i;
1971
1972     for (i = 0; i < 2; i++) {
1973       if (stream->udpsrc[i])
1974         gst_element_set_state (stream->udpsrc[i], state);
1975     }
1976   }
1977 }
1978
1979 static void
1980 gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
1981 {
1982   GstEvent *event;
1983   gint cmd;
1984   GstState state;
1985
1986   if (flush) {
1987     event = gst_event_new_flush_start ();
1988     GST_DEBUG_OBJECT (src, "start flush");
1989     cmd = CMD_WAIT;
1990     state = GST_STATE_PAUSED;
1991   } else {
1992     event = gst_event_new_flush_stop (FALSE);
1993     GST_DEBUG_OBJECT (src, "stop flush; playing %d", playing);
1994     cmd = CMD_LOOP;
1995     if (playing)
1996       state = GST_STATE_PLAYING;
1997     else
1998       state = GST_STATE_PAUSED;
1999   }
2000   gst_rtspsrc_push_event (src, event);
2001   gst_rtspsrc_loop_send_cmd (src, cmd, CMD_LOOP);
2002   gst_rtspsrc_set_state (src, state);
2003 }
2004
2005 static GstRTSPResult
2006 gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn,
2007     GstRTSPMessage * message, GTimeVal * timeout)
2008 {
2009   GstRTSPResult ret;
2010
2011   if (conn)
2012     ret = gst_rtsp_connection_send (conn, message, timeout);
2013   else
2014     ret = GST_RTSP_ERROR;
2015
2016   return ret;
2017 }
2018
2019 static GstRTSPResult
2020 gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn,
2021     GstRTSPMessage * message, GTimeVal * timeout)
2022 {
2023   GstRTSPResult ret;
2024
2025   if (conn)
2026     ret = gst_rtsp_connection_receive (conn, message, timeout);
2027   else
2028     ret = GST_RTSP_ERROR;
2029
2030   return ret;
2031 }
2032
2033 static void
2034 gst_rtspsrc_get_position (GstRTSPSrc * src)
2035 {
2036   GstQuery *query;
2037   GList *walk;
2038
2039   query = gst_query_new_position (GST_FORMAT_TIME);
2040   /*  should be known somewhere down the stream (e.g. jitterbuffer) */
2041   for (walk = src->streams; walk; walk = g_list_next (walk)) {
2042     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
2043     GstFormat fmt;
2044     gint64 pos;
2045
2046     if (stream->srcpad) {
2047       if (gst_pad_query (stream->srcpad, query)) {
2048         gst_query_parse_position (query, &fmt, &pos);
2049         GST_DEBUG_OBJECT (src, "retaining position %" GST_TIME_FORMAT,
2050             GST_TIME_ARGS (pos));
2051         src->last_pos = pos;
2052         return;
2053       }
2054     }
2055   }
2056
2057   src->last_pos = 0;
2058 }
2059
2060 static gboolean
2061 gst_rtspsrc_do_seek (GstRTSPSrc * src, GstSegment * segment)
2062 {
2063   src->state = GST_RTSP_STATE_SEEKING;
2064   /* PLAY will add the range header now. */
2065   src->need_range = TRUE;
2066
2067   return TRUE;
2068 }
2069
2070 static gboolean
2071 gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event)
2072 {
2073   gdouble rate;
2074   GstFormat format;
2075   GstSeekFlags flags;
2076   GstSeekType cur_type = GST_SEEK_TYPE_NONE, stop_type;
2077   gint64 cur, stop;
2078   gboolean flush, skip;
2079   gboolean update;
2080   gboolean playing;
2081   GstSegment seeksegment = { 0, };
2082   GList *walk;
2083
2084   if (event) {
2085     GST_DEBUG_OBJECT (src, "doing seek with event");
2086
2087     gst_event_parse_seek (event, &rate, &format, &flags,
2088         &cur_type, &cur, &stop_type, &stop);
2089
2090     /* no negative rates yet */
2091     if (rate < 0.0)
2092       goto negative_rate;
2093
2094     /* we need TIME format */
2095     if (format != src->segment.format)
2096       goto no_format;
2097   } else {
2098     GST_DEBUG_OBJECT (src, "doing seek without event");
2099     flags = 0;
2100     cur_type = GST_SEEK_TYPE_SET;
2101     stop_type = GST_SEEK_TYPE_SET;
2102   }
2103
2104   /* get flush flag */
2105   flush = flags & GST_SEEK_FLAG_FLUSH;
2106   skip = flags & GST_SEEK_FLAG_SKIP;
2107
2108   /* now we need to make sure the streaming thread is stopped. We do this by
2109    * either sending a FLUSH_START event downstream which will cause the
2110    * streaming thread to stop with a WRONG_STATE.
2111    * For a non-flushing seek we simply pause the task, which will happen as soon
2112    * as it completes one iteration (and thus might block when the sink is
2113    * blocking in preroll). */
2114   if (flush) {
2115     GST_DEBUG_OBJECT (src, "starting flush");
2116     gst_rtspsrc_flush (src, TRUE, FALSE);
2117   } else {
2118     if (src->task) {
2119       gst_task_pause (src->task);
2120     }
2121   }
2122
2123   /* we should now be able to grab the streaming thread because we stopped it
2124    * with the above flush/pause code */
2125   GST_RTSP_STREAM_LOCK (src);
2126
2127   GST_DEBUG_OBJECT (src, "stopped streaming");
2128
2129   /* copy segment, we need this because we still need the old
2130    * segment when we close the current segment. */
2131   memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
2132
2133   /* configure the seek parameters in the seeksegment. We will then have the
2134    * right values in the segment to perform the seek */
2135   if (event) {
2136     GST_DEBUG_OBJECT (src, "configuring seek");
2137     gst_segment_do_seek (&seeksegment, rate, format, flags,
2138         cur_type, cur, stop_type, stop, &update);
2139   }
2140
2141   /* figure out the last position we need to play. If it's configured (stop !=
2142    * -1), use that, else we play until the total duration of the file */
2143   if ((stop = seeksegment.stop) == -1)
2144     stop = seeksegment.duration;
2145
2146   playing = (src->state == GST_RTSP_STATE_PLAYING);
2147
2148   /* if we were playing, pause first */
2149   if (playing) {
2150     /* obtain current position in case seek fails */
2151     gst_rtspsrc_get_position (src);
2152     gst_rtspsrc_pause (src, FALSE);
2153   }
2154   src->skip = skip;
2155
2156   gst_rtspsrc_do_seek (src, &seeksegment);
2157
2158   /* and continue playing */
2159   if (playing)
2160     gst_rtspsrc_play (src, &seeksegment, FALSE);
2161
2162   /* prepare for streaming again */
2163   if (flush) {
2164     /* if we started flush, we stop now */
2165     GST_DEBUG_OBJECT (src, "stopping flush");
2166     gst_rtspsrc_flush (src, FALSE, playing);
2167   }
2168
2169   /* now we did the seek and can activate the new segment values */
2170   memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
2171
2172   /* if we're doing a segment seek, post a SEGMENT_START message */
2173   if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
2174     gst_element_post_message (GST_ELEMENT_CAST (src),
2175         gst_message_new_segment_start (GST_OBJECT_CAST (src),
2176             src->segment.format, src->segment.position));
2177   }
2178
2179   /* now create the newsegment */
2180   GST_DEBUG_OBJECT (src, "Creating newsegment from %" G_GINT64_FORMAT
2181       " to %" G_GINT64_FORMAT, src->segment.position, stop);
2182
2183   /* mark discont */
2184   GST_DEBUG_OBJECT (src, "mark DISCONT, we did a seek to another position");
2185   for (walk = src->streams; walk; walk = g_list_next (walk)) {
2186     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
2187     stream->discont = TRUE;
2188   }
2189
2190   GST_RTSP_STREAM_UNLOCK (src);
2191
2192   return TRUE;
2193
2194   /* ERRORS */
2195 negative_rate:
2196   {
2197     GST_DEBUG_OBJECT (src, "negative playback rates are not supported yet.");
2198     return FALSE;
2199   }
2200 no_format:
2201   {
2202     GST_DEBUG_OBJECT (src, "unsupported format given, seek aborted.");
2203     return FALSE;
2204   }
2205 }
2206
2207 static gboolean
2208 gst_rtspsrc_handle_src_event (GstPad * pad, GstObject * parent,
2209     GstEvent * event)
2210 {
2211   GstRTSPSrc *src;
2212   gboolean res = TRUE;
2213   gboolean forward;
2214
2215   src = GST_RTSPSRC_CAST (parent);
2216
2217   GST_DEBUG_OBJECT (src, "pad %s:%s received event %s",
2218       GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event));
2219
2220   switch (GST_EVENT_TYPE (event)) {
2221     case GST_EVENT_SEEK:
2222       res = gst_rtspsrc_perform_seek (src, event);
2223       forward = FALSE;
2224       break;
2225     case GST_EVENT_QOS:
2226     case GST_EVENT_NAVIGATION:
2227     case GST_EVENT_LATENCY:
2228     default:
2229       forward = TRUE;
2230       break;
2231   }
2232   if (forward) {
2233     GstPad *target;
2234
2235     if ((target = gst_ghost_pad_get_target (GST_GHOST_PAD_CAST (pad)))) {
2236       res = gst_pad_send_event (target, event);
2237       gst_object_unref (target);
2238     } else {
2239       gst_event_unref (event);
2240     }
2241   } else {
2242     gst_event_unref (event);
2243   }
2244
2245   return res;
2246 }
2247
2248 /* this is the final event function we receive on the internal source pad when
2249  * we deal with TCP connections */
2250 static gboolean
2251 gst_rtspsrc_handle_internal_src_event (GstPad * pad, GstObject * parent,
2252     GstEvent * event)
2253 {
2254   gboolean res;
2255
2256   GST_DEBUG_OBJECT (pad, "received event %s", GST_EVENT_TYPE_NAME (event));
2257
2258   switch (GST_EVENT_TYPE (event)) {
2259     case GST_EVENT_SEEK:
2260     case GST_EVENT_QOS:
2261     case GST_EVENT_NAVIGATION:
2262     case GST_EVENT_LATENCY:
2263     default:
2264       gst_event_unref (event);
2265       res = TRUE;
2266       break;
2267   }
2268   return res;
2269 }
2270
2271 /* this is the final query function we receive on the internal source pad when
2272  * we deal with TCP connections */
2273 static gboolean
2274 gst_rtspsrc_handle_internal_src_query (GstPad * pad, GstObject * parent,
2275     GstQuery * query)
2276 {
2277   GstRTSPSrc *src;
2278   gboolean res = TRUE;
2279
2280   src = GST_RTSPSRC_CAST (gst_pad_get_element_private (pad));
2281
2282   GST_DEBUG_OBJECT (src, "pad %s:%s received query %s",
2283       GST_DEBUG_PAD_NAME (pad), GST_QUERY_TYPE_NAME (query));
2284
2285   switch (GST_QUERY_TYPE (query)) {
2286     case GST_QUERY_POSITION:
2287     {
2288       /* no idea */
2289       break;
2290     }
2291     case GST_QUERY_DURATION:
2292     {
2293       GstFormat format;
2294
2295       gst_query_parse_duration (query, &format, NULL);
2296
2297       switch (format) {
2298         case GST_FORMAT_TIME:
2299           gst_query_set_duration (query, format, src->segment.duration);
2300           break;
2301         default:
2302           res = FALSE;
2303           break;
2304       }
2305       break;
2306     }
2307     case GST_QUERY_LATENCY:
2308     {
2309       /* we are live with a min latency of 0 and unlimited max latency, this
2310        * result will be updated by the session manager if there is any. */
2311       gst_query_set_latency (query, TRUE, 0, -1);
2312       break;
2313     }
2314     default:
2315       break;
2316   }
2317
2318   return res;
2319 }
2320
2321 /* this query is executed on the ghost source pad exposed on rtspsrc. */
2322 static gboolean
2323 gst_rtspsrc_handle_src_query (GstPad * pad, GstObject * parent,
2324     GstQuery * query)
2325 {
2326   GstRTSPSrc *src;
2327   gboolean res = FALSE;
2328
2329   src = GST_RTSPSRC_CAST (parent);
2330
2331   GST_DEBUG_OBJECT (src, "pad %s:%s received query %s",
2332       GST_DEBUG_PAD_NAME (pad), GST_QUERY_TYPE_NAME (query));
2333
2334   switch (GST_QUERY_TYPE (query)) {
2335     case GST_QUERY_DURATION:
2336     {
2337       GstFormat format;
2338
2339       gst_query_parse_duration (query, &format, NULL);
2340
2341       switch (format) {
2342         case GST_FORMAT_TIME:
2343           gst_query_set_duration (query, format, src->segment.duration);
2344           res = TRUE;
2345           break;
2346         default:
2347           break;
2348       }
2349       break;
2350     }
2351     case GST_QUERY_SEEKING:
2352     {
2353       GstFormat format;
2354
2355       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2356       if (format == GST_FORMAT_TIME) {
2357         gboolean seekable =
2358             src->cur_protocols != GST_RTSP_LOWER_TRANS_UDP_MCAST;
2359
2360         /* seeking without duration is unlikely */
2361         seekable = seekable && src->seekable && src->segment.duration &&
2362             GST_CLOCK_TIME_IS_VALID (src->segment.duration);
2363
2364         /* FIXME ?? should we have 0 and segment.duration here; see demuxers */
2365         gst_query_set_seeking (query, GST_FORMAT_TIME, seekable,
2366             src->segment.start, src->segment.stop);
2367         res = TRUE;
2368       }
2369       break;
2370     }
2371     case GST_QUERY_URI:
2372     {
2373       gchar *uri;
2374
2375       uri = gst_rtspsrc_uri_get_uri (GST_URI_HANDLER (src));
2376       if (uri != NULL) {
2377         gst_query_set_uri (query, uri);
2378         g_free (uri);
2379         res = TRUE;
2380       }
2381       break;
2382     }
2383     default:
2384     {
2385       GstPad *target = gst_ghost_pad_get_target (GST_GHOST_PAD_CAST (pad));
2386
2387       /* forward the query to the proxy target pad */
2388       if (target) {
2389         res = gst_pad_query (target, query);
2390         gst_object_unref (target);
2391       }
2392       break;
2393     }
2394   }
2395
2396   return res;
2397 }
2398
2399 /* callback for RTCP messages to be sent to the server when operating in TCP
2400  * mode. */
2401 static GstFlowReturn
2402 gst_rtspsrc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2403 {
2404   GstRTSPSrc *src;
2405   GstRTSPStream *stream;
2406   GstFlowReturn res = GST_FLOW_OK;
2407   GstMapInfo map;
2408   guint8 *data;
2409   guint size;
2410   GstRTSPResult ret;
2411   GstRTSPMessage message = { 0 };
2412   GstRTSPConnection *conn;
2413
2414   stream = (GstRTSPStream *) gst_pad_get_element_private (pad);
2415   src = stream->parent;
2416
2417   gst_buffer_map (buffer, &map, GST_MAP_READ);
2418   size = map.size;
2419   data = map.data;
2420
2421   gst_rtsp_message_init_data (&message, stream->channel[1]);
2422
2423   /* lend the body data to the message */
2424   gst_rtsp_message_take_body (&message, data, size);
2425
2426   if (stream->conninfo.connection)
2427     conn = stream->conninfo.connection;
2428   else
2429     conn = src->conninfo.connection;
2430
2431   GST_DEBUG_OBJECT (src, "sending %u bytes RTCP", size);
2432   ret = gst_rtspsrc_connection_send (src, conn, &message, NULL);
2433   GST_DEBUG_OBJECT (src, "sent RTCP, %d", ret);
2434
2435   /* and steal it away again because we will free it when unreffing the
2436    * buffer */
2437   gst_rtsp_message_steal_body (&message, &data, &size);
2438   gst_rtsp_message_unset (&message);
2439
2440   gst_buffer_unmap (buffer, &map);
2441   gst_buffer_unref (buffer);
2442
2443   return res;
2444 }
2445
2446 static GstPadProbeReturn
2447 pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2448 {
2449   GstRTSPSrc *src = user_data;
2450
2451   GST_DEBUG_OBJECT (src, "pad %s:%s blocked, activating streams",
2452       GST_DEBUG_PAD_NAME (pad));
2453
2454   /* activate the streams */
2455   GST_OBJECT_LOCK (src);
2456   if (!src->need_activate)
2457     goto was_ok;
2458
2459   src->need_activate = FALSE;
2460   GST_OBJECT_UNLOCK (src);
2461
2462   gst_rtspsrc_activate_streams (src);
2463
2464   return GST_PAD_PROBE_OK;
2465
2466 was_ok:
2467   {
2468     GST_OBJECT_UNLOCK (src);
2469     return GST_PAD_PROBE_OK;
2470   }
2471 }
2472
2473 /* this callback is called when the session manager generated a new src pad with
2474  * payloaded RTP packets. We simply ghost the pad here. */
2475 static void
2476 new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src)
2477 {
2478   gchar *name;
2479   GstPadTemplate *template;
2480   gint id, ssrc, pt;
2481   GList *ostreams;
2482   GstRTSPStream *stream;
2483   gboolean all_added;
2484
2485   GST_DEBUG_OBJECT (src, "got new manager pad %" GST_PTR_FORMAT, pad);
2486
2487   GST_RTSP_STATE_LOCK (src);
2488   /* find stream */
2489   name = gst_object_get_name (GST_OBJECT_CAST (pad));
2490   if (sscanf (name, "recv_rtp_src_%u_%u_%u", &id, &ssrc, &pt) != 3)
2491     goto unknown_stream;
2492
2493   GST_DEBUG_OBJECT (src, "stream: %u, SSRC %08x, PT %d", id, ssrc, pt);
2494
2495   stream = find_stream (src, &id, (gpointer) find_stream_by_id);
2496   if (stream == NULL)
2497     goto unknown_stream;
2498
2499   /* save SSRC */
2500   stream->ssrc = ssrc;
2501
2502   /* we'll add it later see below */
2503   stream->added = TRUE;
2504
2505   /* check if we added all streams */
2506   all_added = TRUE;
2507   for (ostreams = src->streams; ostreams; ostreams = g_list_next (ostreams)) {
2508     GstRTSPStream *ostream = (GstRTSPStream *) ostreams->data;
2509
2510     GST_DEBUG_OBJECT (src, "stream %p, container %d, disabled %d, added %d",
2511         ostream, ostream->container, ostream->disabled, ostream->added);
2512
2513     /* a container stream only needs one pad added. Also disabled streams don't
2514      * count */
2515     if (!ostream->container && !ostream->disabled && !ostream->added) {
2516       all_added = FALSE;
2517       break;
2518     }
2519   }
2520   GST_RTSP_STATE_UNLOCK (src);
2521
2522   /* create a new pad we will use to stream to */
2523   template = gst_static_pad_template_get (&rtptemplate);
2524   stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
2525   gst_object_unref (template);
2526   g_free (name);
2527
2528   gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event);
2529   gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query);
2530   gst_pad_set_active (stream->srcpad, TRUE);
2531   gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad);
2532
2533   if (all_added) {
2534     GST_DEBUG_OBJECT (src, "We added all streams");
2535     /* when we get here, all stream are added and we can fire the no-more-pads
2536      * signal. */
2537     gst_element_no_more_pads (GST_ELEMENT_CAST (src));
2538   }
2539
2540   return;
2541
2542   /* ERRORS */
2543 unknown_stream:
2544   {
2545     GST_DEBUG_OBJECT (src, "ignoring unknown stream");
2546     GST_RTSP_STATE_UNLOCK (src);
2547     g_free (name);
2548     return;
2549   }
2550 }
2551
2552 static GstCaps *
2553 request_pt_map (GstElement * manager, guint session, guint pt, GstRTSPSrc * src)
2554 {
2555   GstRTSPStream *stream;
2556   GstCaps *caps;
2557
2558   GST_DEBUG_OBJECT (src, "getting pt map for pt %d in session %d", pt, session);
2559
2560   GST_RTSP_STATE_LOCK (src);
2561   stream = find_stream (src, &session, (gpointer) find_stream_by_id);
2562   if (!stream)
2563     goto unknown_stream;
2564
2565   caps = stream->caps;
2566   if (caps)
2567     gst_caps_ref (caps);
2568   GST_RTSP_STATE_UNLOCK (src);
2569
2570   return caps;
2571
2572 unknown_stream:
2573   {
2574     GST_DEBUG_OBJECT (src, "unknown stream %d", session);
2575     GST_RTSP_STATE_UNLOCK (src);
2576     return NULL;
2577   }
2578 }
2579
2580 static void
2581 gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, GstRTSPStream * stream)
2582 {
2583   GST_DEBUG_OBJECT (src, "setting stream for session %u to EOS", stream->id);
2584
2585   if (stream->eos)
2586     goto was_eos;
2587
2588   stream->eos = TRUE;
2589   gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos ());
2590   return;
2591
2592   /* ERRORS */
2593 was_eos:
2594   {
2595     GST_DEBUG_OBJECT (src, "stream for session %u was already EOS", stream->id);
2596     return;
2597   }
2598 }
2599
2600 static void
2601 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2602 {
2603   GstRTSPSrc *src = stream->parent;
2604   guint ssrc;
2605
2606   g_object_get (source, "ssrc", &ssrc, NULL);
2607
2608   GST_DEBUG_OBJECT (src, "source %08x, stream %08x, session %u received BYE",
2609       ssrc, stream->ssrc, stream->id);
2610
2611   if (ssrc == stream->ssrc)
2612     gst_rtspsrc_do_stream_eos (src, stream);
2613 }
2614
2615 static void
2616 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2617 {
2618   GstRTSPSrc *src = stream->parent;
2619   guint ssrc;
2620
2621   g_object_get (source, "ssrc", &ssrc, NULL);
2622
2623   GST_WARNING_OBJECT (src, "source %08x, stream %08x in session %u timed out",
2624       ssrc, stream->ssrc, stream->id);
2625
2626   if (ssrc == stream->ssrc)
2627     gst_rtspsrc_do_stream_eos (src, stream);
2628 }
2629
2630 static void
2631 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, GstRTSPSrc * src)
2632 {
2633   GstRTSPStream *stream;
2634
2635   GST_DEBUG_OBJECT (src, "source in session %u reached NPT stop", session);
2636
2637   /* get stream for session */
2638   stream = find_stream (src, &session, (gpointer) find_stream_by_id);
2639   if (stream) {
2640     gst_rtspsrc_do_stream_eos (src, stream);
2641   }
2642 }
2643
2644 static void
2645 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2646 {
2647   GST_DEBUG_OBJECT (stream->parent, "source in session %u is active",
2648       stream->id);
2649 }
2650
2651 static void
2652 set_manager_buffer_mode (GstRTSPSrc * src)
2653 {
2654   GObjectClass *klass;
2655
2656   if (src->manager == NULL)
2657     return;
2658
2659   klass = G_OBJECT_GET_CLASS (G_OBJECT (src->manager));
2660
2661   if (!g_object_class_find_property (klass, "buffer-mode"))
2662     return;
2663
2664   if (src->buffer_mode != BUFFER_MODE_AUTO) {
2665     g_object_set (src->manager, "buffer-mode", src->buffer_mode, NULL);
2666
2667     return;
2668   }
2669
2670   GST_DEBUG_OBJECT (src,
2671       "auto buffering mode, have clock %" GST_PTR_FORMAT, src->provided_clock);
2672
2673   if (src->provided_clock) {
2674     GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (src));
2675
2676     if (clock == src->provided_clock) {
2677       GST_DEBUG_OBJECT (src, "selected synced");
2678       g_object_set (src->manager, "buffer-mode", BUFFER_MODE_SYNCED, NULL);
2679
2680       if (clock)
2681         gst_object_unref (clock);
2682
2683       return;
2684     }
2685
2686     /* Otherwise fall-through and use another buffer mode */
2687     if (clock)
2688       gst_object_unref (clock);
2689   }
2690
2691   GST_DEBUG_OBJECT (src, "auto buffering mode");
2692   if (src->use_buffering) {
2693     GST_DEBUG_OBJECT (src, "selected buffer");
2694     g_object_set (src->manager, "buffer-mode", BUFFER_MODE_BUFFER, NULL);
2695   } else {
2696     GST_DEBUG_OBJECT (src, "selected slave");
2697     g_object_set (src->manager, "buffer-mode", BUFFER_MODE_SLAVE, NULL);
2698   }
2699 }
2700
2701 /* try to get and configure a manager */
2702 static gboolean
2703 gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
2704     GstRTSPTransport * transport)
2705 {
2706   const gchar *manager;
2707   gchar *name;
2708   GstStateChangeReturn ret;
2709
2710   /* find a manager */
2711   if (gst_rtsp_transport_get_manager (transport->trans, &manager, 0) < 0)
2712     goto no_manager;
2713
2714   if (manager) {
2715     GST_DEBUG_OBJECT (src, "using manager %s", manager);
2716
2717     /* configure the manager */
2718     if (src->manager == NULL) {
2719       GObjectClass *klass;
2720       GstStructure *s;
2721       const gchar *encoding;
2722       gboolean need_slave;
2723
2724       if (!(src->manager = gst_element_factory_make (manager, "manager"))) {
2725         /* fallback */
2726         if (gst_rtsp_transport_get_manager (transport->trans, &manager, 1) < 0)
2727           goto no_manager;
2728
2729         if (!manager)
2730           goto use_no_manager;
2731
2732         if (!(src->manager = gst_element_factory_make (manager, "manager")))
2733           goto manager_failed;
2734       }
2735
2736       /* we manage this element */
2737       gst_element_set_locked_state (src->manager, TRUE);
2738       gst_bin_add (GST_BIN_CAST (src), src->manager);
2739
2740       ret = gst_element_set_state (src->manager, GST_STATE_PAUSED);
2741       if (ret == GST_STATE_CHANGE_FAILURE)
2742         goto start_manager_failure;
2743
2744       g_object_set (src->manager, "latency", src->latency, NULL);
2745
2746       klass = G_OBJECT_GET_CLASS (G_OBJECT (src->manager));
2747
2748       if (g_object_class_find_property (klass, "ntp-sync")) {
2749         g_object_set (src->manager, "ntp-sync", src->ntp_sync, NULL);
2750       }
2751
2752       if (g_object_class_find_property (klass, "use-pipeline-clock")) {
2753         g_object_set (src->manager, "use-pipeline-clock",
2754             src->use_pipeline_clock, NULL);
2755       }
2756
2757       if (src->sdes && g_object_class_find_property (klass, "sdes")) {
2758         g_object_set (src->manager, "sdes", src->sdes, NULL);
2759       }
2760
2761       if (g_object_class_find_property (klass, "drop-on-latency")) {
2762         g_object_set (src->manager, "drop-on-latency", src->drop_on_latency,
2763             NULL);
2764       }
2765
2766       /* buffer mode pauses are handled by adding offsets to buffer times,
2767        * but some depayloaders may have a hard time syncing output times
2768        * with such input times, e.g. container ones, most notably ASF */
2769       /* TODO alternatives are having an event that indicates these shifts,
2770        * or having rtsp extensions provide suggestion on buffer mode */
2771       need_slave = stream->container;
2772       if (stream->caps && (s = gst_caps_get_structure (stream->caps, 0)) &&
2773           (encoding = gst_structure_get_string (s, "encoding-name")))
2774         need_slave = need_slave || (strcmp (encoding, "X-ASF-PF") == 0);
2775       /* valid duration implies not likely live pipeline,
2776        * so slaving in jitterbuffer does not make much sense
2777        * (and might mess things up due to bursts) */
2778       if (GST_CLOCK_TIME_IS_VALID (src->segment.duration) &&
2779           src->segment.duration && !need_slave) {
2780         src->use_buffering = TRUE;
2781       } else {
2782         src->use_buffering = FALSE;
2783       }
2784
2785       set_manager_buffer_mode (src);
2786
2787       /* connect to signals if we did not already do so */
2788       GST_DEBUG_OBJECT (src, "connect to signals on session manager, stream %p",
2789           stream);
2790       src->manager_sig_id =
2791           g_signal_connect (src->manager, "pad-added",
2792           (GCallback) new_manager_pad, src);
2793       src->manager_ptmap_id =
2794           g_signal_connect (src->manager, "request-pt-map",
2795           (GCallback) request_pt_map, src);
2796
2797       g_signal_connect (src->manager, "on-npt-stop", (GCallback) on_npt_stop,
2798           src);
2799     }
2800
2801     /* we stream directly to the manager, get some pads. Each RTSP stream goes
2802      * into a separate RTP session. */
2803     name = g_strdup_printf ("recv_rtp_sink_%u", stream->id);
2804     stream->channelpad[0] = gst_element_get_request_pad (src->manager, name);
2805     g_free (name);
2806     name = g_strdup_printf ("recv_rtcp_sink_%u", stream->id);
2807     stream->channelpad[1] = gst_element_get_request_pad (src->manager, name);
2808     g_free (name);
2809
2810     /* now configure the bandwidth in the manager */
2811     if (g_signal_lookup ("get-internal-session",
2812             G_OBJECT_TYPE (src->manager)) != 0) {
2813       GObject *rtpsession;
2814
2815       g_signal_emit_by_name (src->manager, "get-internal-session", stream->id,
2816           &rtpsession);
2817       if (rtpsession) {
2818         GST_INFO_OBJECT (src, "configure bandwidth in session %p", rtpsession);
2819
2820         stream->session = rtpsession;
2821
2822         if (stream->as_bandwidth != -1) {
2823           GST_INFO_OBJECT (src, "setting AS: %f",
2824               (gdouble) (stream->as_bandwidth * 1000));
2825           g_object_set (rtpsession, "bandwidth",
2826               (gdouble) (stream->as_bandwidth * 1000), NULL);
2827         }
2828         if (stream->rr_bandwidth != -1) {
2829           GST_INFO_OBJECT (src, "setting RR: %u", stream->rr_bandwidth);
2830           g_object_set (rtpsession, "rtcp-rr-bandwidth", stream->rr_bandwidth,
2831               NULL);
2832         }
2833         if (stream->rs_bandwidth != -1) {
2834           GST_INFO_OBJECT (src, "setting RS: %u", stream->rs_bandwidth);
2835           g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth,
2836               NULL);
2837         }
2838
2839         g_object_set (rtpsession, "probation", src->probation, NULL);
2840
2841         g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2842             stream);
2843         g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout,
2844             stream);
2845         g_signal_connect (rtpsession, "on-timeout", (GCallback) on_timeout,
2846             stream);
2847         g_signal_connect (rtpsession, "on-ssrc-active",
2848             (GCallback) on_ssrc_active, stream);
2849       }
2850     }
2851   }
2852
2853 use_no_manager:
2854   return TRUE;
2855
2856   /* ERRORS */
2857 no_manager:
2858   {
2859     GST_DEBUG_OBJECT (src, "cannot get a session manager");
2860     return FALSE;
2861   }
2862 manager_failed:
2863   {
2864     GST_DEBUG_OBJECT (src, "no session manager element %s found", manager);
2865     return FALSE;
2866   }
2867 start_manager_failure:
2868   {
2869     GST_DEBUG_OBJECT (src, "could not start session manager");
2870     return FALSE;
2871   }
2872 }
2873
2874 /* free the UDP sources allocated when negotiating a transport.
2875  * This function is called when the server negotiated to a transport where the
2876  * UDP sources are not needed anymore, such as TCP or multicast. */
2877 static void
2878 gst_rtspsrc_stream_free_udp (GstRTSPStream * stream)
2879 {
2880   gint i;
2881
2882   for (i = 0; i < 2; i++) {
2883     if (stream->udpsrc[i]) {
2884       GST_DEBUG ("free UDP source %d for stream %p", i, stream);
2885       gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
2886       gst_object_unref (stream->udpsrc[i]);
2887       stream->udpsrc[i] = NULL;
2888     }
2889   }
2890 }
2891
2892 /* for TCP, create pads to send and receive data to and from the manager and to
2893  * intercept various events and queries
2894  */
2895 static gboolean
2896 gst_rtspsrc_stream_configure_tcp (GstRTSPSrc * src, GstRTSPStream * stream,
2897     GstRTSPTransport * transport, GstPad ** outpad)
2898 {
2899   gchar *name;
2900   GstPadTemplate *template;
2901   GstPad *pad0, *pad1;
2902
2903   /* configure for interleaved delivery, nothing needs to be done
2904    * here, the loop function will call the chain functions of the
2905    * session manager. */
2906   stream->channel[0] = transport->interleaved.min;
2907   stream->channel[1] = transport->interleaved.max;
2908   GST_DEBUG_OBJECT (src, "stream %p on channels %d-%d", stream,
2909       stream->channel[0], stream->channel[1]);
2910
2911   /* we can remove the allocated UDP ports now */
2912   gst_rtspsrc_stream_free_udp (stream);
2913
2914   /* no session manager, send data to srcpad directly */
2915   if (!stream->channelpad[0]) {
2916     GST_DEBUG_OBJECT (src, "no manager, creating pad");
2917
2918     /* create a new pad we will use to stream to */
2919     name = g_strdup_printf ("stream_%u", stream->id);
2920     template = gst_static_pad_template_get (&rtptemplate);
2921     stream->channelpad[0] = gst_pad_new_from_template (template, name);
2922     gst_object_unref (template);
2923     g_free (name);
2924
2925     /* set caps and activate */
2926     gst_pad_use_fixed_caps (stream->channelpad[0]);
2927     gst_pad_set_active (stream->channelpad[0], TRUE);
2928
2929     *outpad = gst_object_ref (stream->channelpad[0]);
2930   } else {
2931     GST_DEBUG_OBJECT (src, "using manager source pad");
2932
2933     template = gst_static_pad_template_get (&anysrctemplate);
2934
2935     /* allocate pads for sending the channel data into the manager */
2936     pad0 = gst_pad_new_from_template (template, "internalsrc_0");
2937     gst_pad_link_full (pad0, stream->channelpad[0], GST_PAD_LINK_CHECK_NOTHING);
2938     gst_object_unref (stream->channelpad[0]);
2939     stream->channelpad[0] = pad0;
2940     gst_pad_set_event_function (pad0, gst_rtspsrc_handle_internal_src_event);
2941     gst_pad_set_query_function (pad0, gst_rtspsrc_handle_internal_src_query);
2942     gst_pad_set_element_private (pad0, src);
2943     gst_pad_set_active (pad0, TRUE);
2944
2945     if (stream->channelpad[1]) {
2946       /* if we have a sinkpad for the other channel, create a pad and link to the
2947        * manager. */
2948       pad1 = gst_pad_new_from_template (template, "internalsrc_1");
2949       gst_pad_set_event_function (pad1, gst_rtspsrc_handle_internal_src_event);
2950       gst_pad_link_full (pad1, stream->channelpad[1],
2951           GST_PAD_LINK_CHECK_NOTHING);
2952       gst_object_unref (stream->channelpad[1]);
2953       stream->channelpad[1] = pad1;
2954       gst_pad_set_active (pad1, TRUE);
2955     }
2956     gst_object_unref (template);
2957   }
2958   /* setup RTCP transport back to the server if we have to. */
2959   if (src->manager && src->do_rtcp) {
2960     GstPad *pad;
2961
2962     template = gst_static_pad_template_get (&anysinktemplate);
2963
2964     stream->rtcppad = gst_pad_new_from_template (template, "internalsink_0");
2965     gst_pad_set_chain_function (stream->rtcppad, gst_rtspsrc_sink_chain);
2966     gst_pad_set_element_private (stream->rtcppad, stream);
2967     gst_pad_set_active (stream->rtcppad, TRUE);
2968
2969     /* get session RTCP pad */
2970     name = g_strdup_printf ("send_rtcp_src_%u", stream->id);
2971     pad = gst_element_get_request_pad (src->manager, name);
2972     g_free (name);
2973
2974     /* and link */
2975     if (pad) {
2976       gst_pad_link_full (pad, stream->rtcppad, GST_PAD_LINK_CHECK_NOTHING);
2977       gst_object_unref (pad);
2978     }
2979
2980     gst_object_unref (template);
2981   }
2982   return TRUE;
2983 }
2984
2985 static void
2986 gst_rtspsrc_get_transport_info (GstRTSPSrc * src, GstRTSPStream * stream,
2987     GstRTSPTransport * transport, const gchar ** destination, gint * min,
2988     gint * max, guint * ttl)
2989 {
2990   if (transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2991     if (destination) {
2992       if (!(*destination = transport->destination))
2993         *destination = stream->destination;
2994     }
2995     if (min && max) {
2996       /* transport first */
2997       *min = transport->port.min;
2998       *max = transport->port.max;
2999       if (*min == -1 && *max == -1) {
3000         /* then try from SDP */
3001         if (stream->port != 0) {
3002           *min = stream->port;
3003           *max = stream->port + 1;
3004         }
3005       }
3006     }
3007
3008     if (ttl) {
3009       if (!(*ttl = transport->ttl))
3010         *ttl = stream->ttl;
3011     }
3012   } else {
3013     if (destination) {
3014       /* first take the source, then the endpoint to figure out where to send
3015        * the RTCP. */
3016       if (!(*destination = transport->source)) {
3017         if (src->conninfo.connection)
3018           *destination = gst_rtsp_connection_get_ip (src->conninfo.connection);
3019         else if (stream->conninfo.connection)
3020           *destination =
3021               gst_rtsp_connection_get_ip (stream->conninfo.connection);
3022       }
3023     }
3024     if (min && max) {
3025       /* for unicast we only expect the ports here */
3026       *min = transport->server_port.min;
3027       *max = transport->server_port.max;
3028     }
3029   }
3030 }
3031
3032 /* For multicast create UDP sources and join the multicast group. */
3033 static gboolean
3034 gst_rtspsrc_stream_configure_mcast (GstRTSPSrc * src, GstRTSPStream * stream,
3035     GstRTSPTransport * transport, GstPad ** outpad)
3036 {
3037   gchar *uri;
3038   const gchar *destination;
3039   gint min, max;
3040
3041   GST_DEBUG_OBJECT (src, "creating UDP sources for multicast");
3042
3043   /* we can remove the allocated UDP ports now */
3044   gst_rtspsrc_stream_free_udp (stream);
3045
3046   gst_rtspsrc_get_transport_info (src, stream, transport, &destination, &min,
3047       &max, NULL);
3048
3049   /* we need a destination now */
3050   if (destination == NULL)
3051     goto no_destination;
3052
3053   /* we really need ports now or we won't be able to receive anything at all */
3054   if (min == -1 && max == -1)
3055     goto no_ports;
3056
3057   GST_DEBUG_OBJECT (src, "have destination '%s' and ports (%d)-(%d)",
3058       destination, min, max);
3059
3060   /* creating UDP source for RTP */
3061   if (min != -1) {
3062     uri = g_strdup_printf ("udp://%s:%d", destination, min);
3063     stream->udpsrc[0] =
3064         gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3065     g_free (uri);
3066     if (stream->udpsrc[0] == NULL)
3067       goto no_element;
3068
3069     /* take ownership */
3070     gst_object_ref_sink (stream->udpsrc[0]);
3071
3072     if (src->udp_buffer_size != 0)
3073       g_object_set (G_OBJECT (stream->udpsrc[0]), "buffer-size",
3074           src->udp_buffer_size, NULL);
3075
3076     if (src->multi_iface != NULL)
3077       g_object_set (G_OBJECT (stream->udpsrc[0]), "multicast-iface",
3078           src->multi_iface, NULL);
3079
3080     /* change state */
3081     gst_element_set_locked_state (stream->udpsrc[0], TRUE);
3082     gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
3083   }
3084
3085   /* creating another UDP source for RTCP */
3086   if (max != -1) {
3087     GstCaps *caps;
3088
3089     uri = g_strdup_printf ("udp://%s:%d", destination, max);
3090     stream->udpsrc[1] =
3091         gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3092     g_free (uri);
3093     if (stream->udpsrc[1] == NULL)
3094       goto no_element;
3095
3096     caps = gst_caps_new_empty_simple ("application/x-rtcp");
3097     g_object_set (stream->udpsrc[1], "caps", caps, NULL);
3098     gst_caps_unref (caps);
3099
3100     /* take ownership */
3101     gst_object_ref_sink (stream->udpsrc[1]);
3102
3103     if (src->multi_iface != NULL)
3104       g_object_set (G_OBJECT (stream->udpsrc[0]), "multicast-iface",
3105           src->multi_iface, NULL);
3106
3107     gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
3108   }
3109   return TRUE;
3110
3111   /* ERRORS */
3112 no_element:
3113   {
3114     GST_DEBUG_OBJECT (src, "no UDP source element found");
3115     return FALSE;
3116   }
3117 no_destination:
3118   {
3119     GST_DEBUG_OBJECT (src, "no destination found");
3120     return FALSE;
3121   }
3122 no_ports:
3123   {
3124     GST_DEBUG_OBJECT (src, "no ports found");
3125     return FALSE;
3126   }
3127 }
3128
3129 /* configure the remainder of the UDP ports */
3130 static gboolean
3131 gst_rtspsrc_stream_configure_udp (GstRTSPSrc * src, GstRTSPStream * stream,
3132     GstRTSPTransport * transport, GstPad ** outpad)
3133 {
3134   /* we manage the UDP elements now. For unicast, the UDP sources where
3135    * allocated in the stream when we suggested a transport. */
3136   if (stream->udpsrc[0]) {
3137     gst_element_set_locked_state (stream->udpsrc[0], TRUE);
3138     gst_bin_add (GST_BIN_CAST (src), stream->udpsrc[0]);
3139
3140     GST_DEBUG_OBJECT (src, "setting up UDP source");
3141
3142     /* configure a timeout on the UDP port. When the timeout message is
3143      * posted, we assume UDP transport is not possible. We reconnect using TCP
3144      * if we can. */
3145     g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout",
3146         src->udp_timeout * 1000, NULL);
3147
3148     /* get output pad of the UDP source. */
3149     *outpad = gst_element_get_static_pad (stream->udpsrc[0], "src");
3150
3151     /* save it so we can unblock */
3152     stream->blockedpad = *outpad;
3153
3154     /* configure pad block on the pad. As soon as there is dataflow on the
3155      * UDP source, we know that UDP is not blocked by a firewall and we can
3156      * configure all the streams to let the application autoplug decoders. */
3157     stream->blockid =
3158         gst_pad_add_probe (stream->blockedpad,
3159         GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, pad_blocked, src, NULL);
3160
3161     if (stream->channelpad[0]) {
3162       GST_DEBUG_OBJECT (src, "connecting UDP source 0 to manager");
3163       /* configure for UDP delivery, we need to connect the UDP pads to
3164        * the session plugin. */
3165       gst_pad_link_full (*outpad, stream->channelpad[0],
3166           GST_PAD_LINK_CHECK_NOTHING);
3167       gst_object_unref (*outpad);
3168       *outpad = NULL;
3169       /* we connected to pad-added signal to get pads from the manager */
3170     } else {
3171       GST_DEBUG_OBJECT (src, "using UDP src pad as output");
3172     }
3173   }
3174
3175   /* RTCP port */
3176   if (stream->udpsrc[1]) {
3177     GstCaps *caps;
3178
3179     gst_element_set_locked_state (stream->udpsrc[1], TRUE);
3180     gst_bin_add (GST_BIN_CAST (src), stream->udpsrc[1]);
3181
3182     caps = gst_caps_new_empty_simple ("application/x-rtcp");
3183     g_object_set (stream->udpsrc[1], "caps", caps, NULL);
3184     gst_caps_unref (caps);
3185
3186     if (stream->channelpad[1]) {
3187       GstPad *pad;
3188
3189       GST_DEBUG_OBJECT (src, "connecting UDP source 1 to manager");
3190
3191       pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
3192       gst_pad_link_full (pad, stream->channelpad[1],
3193           GST_PAD_LINK_CHECK_NOTHING);
3194       gst_object_unref (pad);
3195     } else {
3196       /* leave unlinked */
3197     }
3198   }
3199   return TRUE;
3200 }
3201
3202 /* configure the UDP sink back to the server for status reports */
3203 static gboolean
3204 gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src,
3205     GstRTSPStream * stream, GstRTSPTransport * transport)
3206 {
3207   GstPad *pad;
3208   gint rtp_port, rtcp_port;
3209   gboolean do_rtp, do_rtcp;
3210   const gchar *destination;
3211   gchar *uri, *name;
3212   guint ttl = 0;
3213   GSocket *socket;
3214
3215   /* get transport info */
3216   gst_rtspsrc_get_transport_info (src, stream, transport, &destination,
3217       &rtp_port, &rtcp_port, &ttl);
3218
3219   /* see what we need to do */
3220   do_rtp = (rtp_port != -1);
3221   /* it's possible that the server does not want us to send RTCP in which case
3222    * the port is -1 */
3223   do_rtcp = (rtcp_port != -1 && src->manager != NULL && src->do_rtcp);
3224
3225   /* we need a destination when we have RTP or RTCP ports */
3226   if (destination == NULL && (do_rtp || do_rtcp))
3227     goto no_destination;
3228
3229   /* try to construct the fakesrc to the RTP port of the server to open up any
3230    * NAT firewalls */
3231   if (do_rtp) {
3232     GST_DEBUG_OBJECT (src, "configure RTP UDP sink for %s:%d", destination,
3233         rtp_port);
3234
3235     uri = g_strdup_printf ("udp://%s:%d", destination, rtp_port);
3236     stream->udpsink[0] =
3237         gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL);
3238     g_free (uri);
3239     if (stream->udpsink[0] == NULL)
3240       goto no_sink_element;
3241
3242     /* don't join multicast group, we will have the source socket do that */
3243     /* no sync or async state changes needed */
3244     g_object_set (G_OBJECT (stream->udpsink[0]), "auto-multicast", FALSE,
3245         "loop", FALSE, "sync", FALSE, "async", FALSE, NULL);
3246     if (ttl > 0)
3247       g_object_set (G_OBJECT (stream->udpsink[0]), "ttl", ttl, NULL);
3248
3249     if (stream->udpsrc[0]) {
3250       /* configure socket, we give it the same UDP socket as the udpsrc for RTP
3251        * so that NAT firewalls will open a hole for us */
3252       g_object_get (G_OBJECT (stream->udpsrc[0]), "used-socket", &socket, NULL);
3253       GST_DEBUG_OBJECT (src, "RTP UDP src has sock %p", socket);
3254       /* configure socket and make sure udpsink does not close it when shutting
3255        * down, it belongs to udpsrc after all. */
3256       g_object_set (G_OBJECT (stream->udpsink[0]), "socket", socket,
3257           "close-socket", FALSE, NULL);
3258       g_object_unref (socket);
3259     }
3260
3261     /* the source for the dummy packets to open up NAT */
3262     stream->fakesrc = gst_element_factory_make ("fakesrc", NULL);
3263     if (stream->fakesrc == NULL)
3264       goto no_fakesrc_element;
3265
3266     /* random data in 5 buffers, a size of 200 bytes should be fine */
3267     g_object_set (G_OBJECT (stream->fakesrc), "filltype", 3, "num-buffers", 5,
3268         "sizetype", 2, "sizemax", 200, "silent", TRUE, NULL);
3269
3270     /* we don't want to consider this a sink */
3271     GST_OBJECT_FLAG_UNSET (stream->udpsink[0], GST_ELEMENT_FLAG_SINK);
3272
3273     /* keep everything locked */
3274     gst_element_set_locked_state (stream->udpsink[0], TRUE);
3275     gst_element_set_locked_state (stream->fakesrc, TRUE);
3276
3277     gst_object_ref (stream->udpsink[0]);
3278     gst_bin_add (GST_BIN_CAST (src), stream->udpsink[0]);
3279     gst_object_ref (stream->fakesrc);
3280     gst_bin_add (GST_BIN_CAST (src), stream->fakesrc);
3281
3282     gst_element_link_pads_full (stream->fakesrc, "src", stream->udpsink[0],
3283         "sink", GST_PAD_LINK_CHECK_NOTHING);
3284   }
3285   if (do_rtcp) {
3286     GST_DEBUG_OBJECT (src, "configure RTCP UDP sink for %s:%d", destination,
3287         rtcp_port);
3288
3289     uri = g_strdup_printf ("udp://%s:%d", destination, rtcp_port);
3290     stream->udpsink[1] =
3291         gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL);
3292     g_free (uri);
3293     if (stream->udpsink[1] == NULL)
3294       goto no_sink_element;
3295
3296     /* don't join multicast group, we will have the source socket do that */
3297     /* no sync or async state changes needed */
3298     g_object_set (G_OBJECT (stream->udpsink[1]), "auto-multicast", FALSE,
3299         "loop", FALSE, "sync", FALSE, "async", FALSE, NULL);
3300     if (ttl > 0)
3301       g_object_set (G_OBJECT (stream->udpsink[0]), "ttl", ttl, NULL);
3302
3303     if (stream->udpsrc[1]) {
3304       /* configure socket, we give it the same UDP socket as the udpsrc for RTCP
3305        * because some servers check the port number of where it sends RTCP to identify
3306        * the RTCP packets it receives */
3307       g_object_get (G_OBJECT (stream->udpsrc[1]), "used-socket", &socket, NULL);
3308       GST_DEBUG_OBJECT (src, "RTCP UDP src has sock %p", socket);
3309       /* configure socket and make sure udpsink does not close it when shutting
3310        * down, it belongs to udpsrc after all. */
3311       g_object_set (G_OBJECT (stream->udpsink[1]), "socket", socket,
3312           "close-socket", FALSE, NULL);
3313       g_object_unref (socket);
3314     }
3315
3316     /* we don't want to consider this a sink */
3317     GST_OBJECT_FLAG_UNSET (stream->udpsink[1], GST_ELEMENT_FLAG_SINK);
3318
3319     /* we keep this playing always */
3320     gst_element_set_locked_state (stream->udpsink[1], TRUE);
3321     gst_element_set_state (stream->udpsink[1], GST_STATE_PLAYING);
3322
3323     gst_object_ref (stream->udpsink[1]);
3324     gst_bin_add (GST_BIN_CAST (src), stream->udpsink[1]);
3325
3326     stream->rtcppad = gst_element_get_static_pad (stream->udpsink[1], "sink");
3327
3328     /* get session RTCP pad */
3329     name = g_strdup_printf ("send_rtcp_src_%u", stream->id);
3330     pad = gst_element_get_request_pad (src->manager, name);
3331     g_free (name);
3332
3333     /* and link */
3334     if (pad) {
3335       gst_pad_link_full (pad, stream->rtcppad, GST_PAD_LINK_CHECK_NOTHING);
3336       gst_object_unref (pad);
3337     }
3338   }
3339
3340   return TRUE;
3341
3342   /* ERRORS */
3343 no_destination:
3344   {
3345     GST_DEBUG_OBJECT (src, "no destination address specified");
3346     return FALSE;
3347   }
3348 no_sink_element:
3349   {
3350     GST_DEBUG_OBJECT (src, "no UDP sink element found");
3351     return FALSE;
3352   }
3353 no_fakesrc_element:
3354   {
3355     GST_DEBUG_OBJECT (src, "no fakesrc element found");
3356     return FALSE;
3357   }
3358 }
3359
3360 /* sets up all elements needed for streaming over the specified transport.
3361  * Does not yet expose the element pads, this will be done when there is actuall
3362  * dataflow detected, which might never happen when UDP is blocked in a
3363  * firewall, for example.
3364  */
3365 static gboolean
3366 gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
3367     GstRTSPTransport * transport)
3368 {
3369   GstRTSPSrc *src;
3370   GstPad *outpad = NULL;
3371   GstPadTemplate *template;
3372   gchar *name;
3373   GstStructure *s;
3374   const gchar *mime;
3375
3376   src = stream->parent;
3377
3378   GST_DEBUG_OBJECT (src, "configuring transport for stream %p", stream);
3379
3380   s = gst_caps_get_structure (stream->caps, 0);
3381
3382   /* get the proper mime type for this stream now */
3383   if (gst_rtsp_transport_get_mime (transport->trans, &mime) < 0)
3384     goto unknown_transport;
3385   if (!mime)
3386     goto unknown_transport;
3387
3388   /* configure the final mime type */
3389   GST_DEBUG_OBJECT (src, "setting mime to %s", mime);
3390   gst_structure_set_name (s, mime);
3391
3392   /* try to get and configure a manager, channelpad[0-1] will be configured with
3393    * the pads for the manager, or NULL when no manager is needed. */
3394   if (!gst_rtspsrc_stream_configure_manager (src, stream, transport))
3395     goto no_manager;
3396
3397   switch (transport->lower_transport) {
3398     case GST_RTSP_LOWER_TRANS_TCP:
3399       if (!gst_rtspsrc_stream_configure_tcp (src, stream, transport, &outpad))
3400         goto transport_failed;
3401       break;
3402     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3403       if (!gst_rtspsrc_stream_configure_mcast (src, stream, transport, &outpad))
3404         goto transport_failed;
3405       /* fallthrough, the rest is the same for UDP and MCAST */
3406     case GST_RTSP_LOWER_TRANS_UDP:
3407       if (!gst_rtspsrc_stream_configure_udp (src, stream, transport, &outpad))
3408         goto transport_failed;
3409       /* configure udpsinks back to the server for RTCP messages and for the
3410        * dummy RTP messages to open NAT. */
3411       if (!gst_rtspsrc_stream_configure_udp_sinks (src, stream, transport))
3412         goto transport_failed;
3413       break;
3414     default:
3415       goto unknown_transport;
3416   }
3417
3418   if (outpad) {
3419     GST_DEBUG_OBJECT (src, "creating ghostpad");
3420
3421     gst_pad_use_fixed_caps (outpad);
3422
3423     /* create ghostpad, don't add just yet, this will be done when we activate
3424      * the stream. */
3425     name = g_strdup_printf ("stream_%u", stream->id);
3426     template = gst_static_pad_template_get (&rtptemplate);
3427     stream->srcpad = gst_ghost_pad_new_from_template (name, outpad, template);
3428     gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event);
3429     gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query);
3430     gst_object_unref (template);
3431     g_free (name);
3432
3433     gst_object_unref (outpad);
3434   }
3435   /* mark pad as ok */
3436   stream->last_ret = GST_FLOW_OK;
3437
3438   return TRUE;
3439
3440   /* ERRORS */
3441 transport_failed:
3442   {
3443     GST_DEBUG_OBJECT (src, "failed to configure transport");
3444     return FALSE;
3445   }
3446 unknown_transport:
3447   {
3448     GST_DEBUG_OBJECT (src, "unknown transport");
3449     return FALSE;
3450   }
3451 no_manager:
3452   {
3453     GST_DEBUG_OBJECT (src, "cannot get a session manager");
3454     return FALSE;
3455   }
3456 }
3457
3458 /* send a couple of dummy random packets on the receiver RTP port to the server,
3459  * this should make a firewall think we initiated the data transfer and
3460  * hopefully allow packets to go from the sender port to our RTP receiver port */
3461 static gboolean
3462 gst_rtspsrc_send_dummy_packets (GstRTSPSrc * src)
3463 {
3464   GList *walk;
3465
3466   if (src->nat_method != GST_RTSP_NAT_DUMMY)
3467     return TRUE;
3468
3469   for (walk = src->streams; walk; walk = g_list_next (walk)) {
3470     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
3471
3472     if (stream->fakesrc && stream->udpsink[0]) {
3473       GST_DEBUG_OBJECT (src, "sending dummy packet to stream %p", stream);
3474       gst_element_set_state (stream->udpsink[0], GST_STATE_NULL);
3475       gst_element_set_state (stream->fakesrc, GST_STATE_NULL);
3476       gst_element_set_state (stream->udpsink[0], GST_STATE_PLAYING);
3477       gst_element_set_state (stream->fakesrc, GST_STATE_PLAYING);
3478     }
3479   }
3480   return TRUE;
3481 }
3482
3483 /* Adds the source pads of all configured streams to the element.
3484  * This code is performed when we detected dataflow.
3485  *
3486  * We detect dataflow from either the _loop function or with pad probes on the
3487  * udp sources.
3488  */
3489 static gboolean
3490 gst_rtspsrc_activate_streams (GstRTSPSrc * src)
3491 {
3492   GList *walk;
3493
3494   GST_DEBUG_OBJECT (src, "activating streams");
3495
3496   for (walk = src->streams; walk; walk = g_list_next (walk)) {
3497     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
3498
3499     if (stream->udpsrc[0]) {
3500       /* remove timeout, we are streaming now and timeouts will be handled by
3501        * the session manager and jitter buffer */
3502       g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
3503     }
3504     if (stream->srcpad) {
3505       GST_DEBUG_OBJECT (src, "activating stream pad %p", stream);
3506       gst_pad_set_active (stream->srcpad, TRUE);
3507
3508       /* if we don't have a session manager, set the caps now. If we have a
3509        * session, we will get a notification of the pad and the caps. */
3510       if (!src->manager) {
3511         GST_DEBUG_OBJECT (src, "setting pad caps for stream %p", stream);
3512         gst_pad_set_caps (stream->srcpad, stream->caps);
3513       }
3514       /* add the pad */
3515       if (!stream->added) {
3516         GST_DEBUG_OBJECT (src, "adding stream pad %p", stream);
3517         gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad);
3518         stream->added = TRUE;
3519       }
3520     }
3521   }
3522
3523   /* unblock all pads */
3524   for (walk = src->streams; walk; walk = g_list_next (walk)) {
3525     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
3526
3527     if (stream->blockid) {
3528       GST_DEBUG_OBJECT (src, "unblocking stream pad %p", stream);
3529       gst_pad_remove_probe (stream->blockedpad, stream->blockid);
3530       stream->blockid = 0;
3531     }
3532   }
3533
3534   return TRUE;
3535 }
3536
3537 static void
3538 gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment,
3539     gboolean reset_manager)
3540 {
3541   GList *walk;
3542   guint64 start, stop;
3543   gdouble play_speed, play_scale;
3544
3545   GST_DEBUG_OBJECT (src, "configuring stream caps");
3546
3547   start = segment->position;
3548   stop = segment->duration;
3549   play_speed = segment->rate;
3550   play_scale = segment->applied_rate;
3551
3552   for (walk = src->streams; walk; walk = g_list_next (walk)) {
3553     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
3554     GstCaps *caps;
3555
3556     if ((caps = stream->caps)) {
3557       caps = gst_caps_make_writable (caps);
3558       /* update caps */
3559       if (stream->timebase != -1)
3560         gst_caps_set_simple (caps, "clock-base", G_TYPE_UINT,
3561             (guint) stream->timebase, NULL);
3562       if (stream->seqbase != -1)
3563         gst_caps_set_simple (caps, "seqnum-base", G_TYPE_UINT,
3564             (guint) stream->seqbase, NULL);
3565       gst_caps_set_simple (caps, "npt-start", G_TYPE_UINT64, start, NULL);
3566       if (stop != -1)
3567         gst_caps_set_simple (caps, "npt-stop", G_TYPE_UINT64, stop, NULL);
3568       gst_caps_set_simple (caps, "play-speed", G_TYPE_DOUBLE, play_speed, NULL);
3569       gst_caps_set_simple (caps, "play-scale", G_TYPE_DOUBLE, play_scale, NULL);
3570
3571       stream->caps = caps;
3572     }
3573     GST_DEBUG_OBJECT (src, "stream %p, caps %" GST_PTR_FORMAT, stream, caps);
3574   }
3575   if (reset_manager && src->manager) {
3576     GST_DEBUG_OBJECT (src, "clear session");
3577     g_signal_emit_by_name (src->manager, "clear-pt-map", NULL);
3578   }
3579 }
3580
3581 static GstFlowReturn
3582 gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream,
3583     GstFlowReturn ret)
3584 {
3585   GList *streams;
3586
3587   /* store the value */
3588   stream->last_ret = ret;
3589
3590   /* if it's success we can return the value right away */
3591   if (ret == GST_FLOW_OK)
3592     goto done;
3593
3594   /* any other error that is not-linked can be returned right
3595    * away */
3596   if (ret != GST_FLOW_NOT_LINKED)
3597     goto done;
3598
3599   /* only return NOT_LINKED if all other pads returned NOT_LINKED */
3600   for (streams = src->streams; streams; streams = g_list_next (streams)) {
3601     GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
3602
3603     ret = ostream->last_ret;
3604     /* some other return value (must be SUCCESS but we can return
3605      * other values as well) */
3606     if (ret != GST_FLOW_NOT_LINKED)
3607       goto done;
3608   }
3609   /* if we get here, all other pads were unlinked and we return
3610    * NOT_LINKED then */
3611 done:
3612   return ret;
3613 }
3614
3615 static gboolean
3616 gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
3617     GstEvent * event)
3618 {
3619   gboolean res = TRUE;
3620
3621   /* only streams that have a connection to the outside world */
3622   if (stream->container || stream->disabled)
3623     goto done;
3624
3625   if (stream->udpsrc[0]) {
3626     gst_event_ref (event);
3627     res = gst_element_send_event (stream->udpsrc[0], event);
3628   } else if (stream->channelpad[0]) {
3629     gst_event_ref (event);
3630     if (GST_PAD_IS_SRC (stream->channelpad[0]))
3631       res = gst_pad_push_event (stream->channelpad[0], event);
3632     else
3633       res = gst_pad_send_event (stream->channelpad[0], event);
3634   }
3635
3636   if (stream->udpsrc[1]) {
3637     gst_event_ref (event);
3638     res &= gst_element_send_event (stream->udpsrc[1], event);
3639   } else if (stream->channelpad[1]) {
3640     gst_event_ref (event);
3641     if (GST_PAD_IS_SRC (stream->channelpad[1]))
3642       res &= gst_pad_push_event (stream->channelpad[1], event);
3643     else
3644       res &= gst_pad_send_event (stream->channelpad[1], event);
3645   }
3646
3647 done:
3648   gst_event_unref (event);
3649
3650   return res;
3651 }
3652
3653 static gboolean
3654 gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event)
3655 {
3656   GList *streams;
3657   gboolean res = TRUE;
3658
3659   for (streams = src->streams; streams; streams = g_list_next (streams)) {
3660     GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
3661
3662     gst_event_ref (event);
3663     res &= gst_rtspsrc_stream_push_event (src, ostream, event);
3664   }
3665   gst_event_unref (event);
3666
3667   return res;
3668 }
3669
3670 static GstRTSPResult
3671 gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info,
3672     gboolean async)
3673 {
3674   GstRTSPResult res;
3675
3676   if (info->connection == NULL) {
3677     if (info->url == NULL) {
3678       GST_DEBUG_OBJECT (src, "parsing uri (%s)...", info->location);
3679       if ((res = gst_rtsp_url_parse (info->location, &info->url)) < 0)
3680         goto parse_error;
3681     }
3682
3683     /* create connection */
3684     GST_DEBUG_OBJECT (src, "creating connection (%s)...", info->location);
3685     if ((res = gst_rtsp_connection_create (info->url, &info->connection)) < 0)
3686       goto could_not_create;
3687
3688     if (info->url_str)
3689       g_free (info->url_str);
3690     info->url_str = gst_rtsp_url_get_request_uri (info->url);
3691
3692     GST_DEBUG_OBJECT (src, "sanitized uri %s", info->url_str);
3693
3694     if (info->url->transports & GST_RTSP_LOWER_TRANS_TLS) {
3695       if (!gst_rtsp_connection_set_tls_validation_flags (info->connection,
3696               src->tls_validation_flags))
3697         GST_WARNING_OBJECT (src, "Unable to set TLS validation flags");
3698     }
3699
3700     if (info->url->transports & GST_RTSP_LOWER_TRANS_HTTP)
3701       gst_rtsp_connection_set_tunneled (info->connection, TRUE);
3702
3703     if (src->proxy_host) {
3704       GST_DEBUG_OBJECT (src, "setting proxy %s:%d", src->proxy_host,
3705           src->proxy_port);
3706       gst_rtsp_connection_set_proxy (info->connection, src->proxy_host,
3707           src->proxy_port);
3708     }
3709   }
3710
3711   if (!info->connected) {
3712     /* connect */
3713     if (async)
3714       GST_ELEMENT_PROGRESS (src, CONTINUE, "connect",
3715           ("Connecting to %s", info->location));
3716     GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location);
3717     if ((res =
3718             gst_rtsp_connection_connect (info->connection,
3719                 src->ptcp_timeout)) < 0)
3720       goto could_not_connect;
3721
3722     info->connected = TRUE;
3723   }
3724   return GST_RTSP_OK;
3725
3726   /* ERRORS */
3727 parse_error:
3728   {
3729     GST_ERROR_OBJECT (src, "No valid RTSP URL was provided");
3730     return res;
3731   }
3732 could_not_create:
3733   {
3734     gchar *str = gst_rtsp_strresult (res);
3735     GST_ERROR_OBJECT (src, "Could not create connection. (%s)", str);
3736     g_free (str);
3737     return res;
3738   }
3739 could_not_connect:
3740   {
3741     gchar *str = gst_rtsp_strresult (res);
3742     GST_ERROR_OBJECT (src, "Could not connect to server. (%s)", str);
3743     g_free (str);
3744     return res;
3745   }
3746 }
3747
3748 static GstRTSPResult
3749 gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info,
3750     gboolean free)
3751 {
3752   GST_RTSP_STATE_LOCK (src);
3753   if (info->connected) {
3754     GST_DEBUG_OBJECT (src, "closing connection...");
3755     gst_rtsp_connection_close (info->connection);
3756     info->connected = FALSE;
3757   }
3758   if (free && info->connection) {
3759     /* free connection */
3760     GST_DEBUG_OBJECT (src, "freeing connection...");
3761     gst_rtsp_connection_free (info->connection);
3762     info->connection = NULL;
3763   }
3764   GST_RTSP_STATE_UNLOCK (src);
3765   return GST_RTSP_OK;
3766 }
3767
3768 static GstRTSPResult
3769 gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info,
3770     gboolean async)
3771 {
3772   GstRTSPResult res;
3773
3774   GST_DEBUG_OBJECT (src, "reconnecting connection...");
3775   gst_rtsp_conninfo_close (src, info, FALSE);
3776   res = gst_rtsp_conninfo_connect (src, info, async);
3777
3778   return res;
3779 }
3780
3781 static void
3782 gst_rtspsrc_connection_flush (GstRTSPSrc * src, gboolean flush)
3783 {
3784   GList *walk;
3785
3786   GST_DEBUG_OBJECT (src, "set flushing %d", flush);
3787   GST_RTSP_STATE_LOCK (src);
3788   if (src->conninfo.connection && src->conninfo.flushing != flush) {
3789     GST_DEBUG_OBJECT (src, "connection flush");
3790     gst_rtsp_connection_flush (src->conninfo.connection, flush);
3791     src->conninfo.flushing = flush;
3792   }
3793   for (walk = src->streams; walk; walk = g_list_next (walk)) {
3794     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
3795     if (stream->conninfo.connection && stream->conninfo.flushing != flush) {
3796       GST_DEBUG_OBJECT (src, "stream %p flush", stream);
3797       gst_rtsp_connection_flush (stream->conninfo.connection, flush);
3798       stream->conninfo.flushing = flush;
3799     }
3800   }
3801   GST_RTSP_STATE_UNLOCK (src);
3802 }
3803
3804 /* FIXME, handle server request, reply with OK, for now */
3805 static GstRTSPResult
3806 gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnection * conn,
3807     GstRTSPMessage * request)
3808 {
3809   GstRTSPMessage response = { 0 };
3810   GstRTSPResult res;
3811
3812   GST_DEBUG_OBJECT (src, "got server request message");
3813
3814   if (src->debug)
3815     gst_rtsp_message_dump (request);
3816
3817   res = gst_rtsp_ext_list_receive_request (src->extensions, request);
3818
3819   if (res == GST_RTSP_ENOTIMPL) {
3820     /* default implementation, send OK */
3821     GST_DEBUG_OBJECT (src, "prepare OK reply");
3822     res =
3823         gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK, "OK",
3824         request);
3825     if (res < 0)
3826       goto send_error;
3827
3828     /* let app parse and reply */
3829     g_signal_emit (src, gst_rtspsrc_signals[SIGNAL_HANDLE_REQUEST],
3830         0, request, &response);
3831
3832     if (src->debug)
3833       gst_rtsp_message_dump (&response);
3834
3835     res = gst_rtspsrc_connection_send (src, conn, &response, NULL);
3836     if (res < 0)
3837       goto send_error;
3838
3839     gst_rtsp_message_unset (&response);
3840   } else if (res == GST_RTSP_EEOF)
3841     return res;
3842
3843   return GST_RTSP_OK;
3844
3845   /* ERRORS */
3846 send_error:
3847   {
3848     gst_rtsp_message_unset (&response);
3849     return res;
3850   }
3851 }
3852
3853 /* send server keep-alive */
3854 static GstRTSPResult
3855 gst_rtspsrc_send_keep_alive (GstRTSPSrc * src)
3856 {
3857   GstRTSPMessage request = { 0 };
3858   GstRTSPResult res;
3859   GstRTSPMethod method;
3860   const gchar *control;
3861
3862   if (src->do_rtsp_keep_alive == FALSE) {
3863     GST_DEBUG_OBJECT (src, "do-rtsp-keep-alive is FALSE, not sending.");
3864     gst_rtsp_connection_reset_timeout (src->conninfo.connection);
3865     return GST_RTSP_OK;
3866   }
3867
3868   GST_DEBUG_OBJECT (src, "creating server keep-alive");
3869
3870   /* find a method to use for keep-alive */
3871   if (src->methods & GST_RTSP_GET_PARAMETER)
3872     method = GST_RTSP_GET_PARAMETER;
3873   else
3874     method = GST_RTSP_OPTIONS;
3875
3876   control = get_aggregate_control (src);
3877   if (control == NULL)
3878     goto no_control;
3879
3880   res = gst_rtsp_message_init_request (&request, method, control);
3881   if (res < 0)
3882     goto send_error;
3883
3884   if (src->debug)
3885     gst_rtsp_message_dump (&request);
3886
3887   res =
3888       gst_rtspsrc_connection_send (src, src->conninfo.connection, &request,
3889       NULL);
3890   if (res < 0)
3891     goto send_error;
3892
3893   gst_rtsp_connection_reset_timeout (src->conninfo.connection);
3894   gst_rtsp_message_unset (&request);
3895
3896   return GST_RTSP_OK;
3897
3898   /* ERRORS */
3899 no_control:
3900   {
3901     GST_WARNING_OBJECT (src, "no control url to send keepalive");
3902     return GST_RTSP_OK;
3903   }
3904 send_error:
3905   {
3906     gchar *str = gst_rtsp_strresult (res);
3907
3908     gst_rtsp_message_unset (&request);
3909     GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL),
3910         ("Could not send keep-alive. (%s)", str));
3911     g_free (str);
3912     return res;
3913   }
3914 }
3915
3916 static GstFlowReturn
3917 gst_rtspsrc_handle_data (GstRTSPSrc * src, GstRTSPMessage * message)
3918 {
3919   GstFlowReturn ret = GST_FLOW_OK;
3920   gint channel;
3921   GstRTSPStream *stream;
3922   GstPad *outpad = NULL;
3923   guint8 *data;
3924   guint size;
3925   GstBuffer *buf;
3926   gboolean is_rtcp;
3927   GstEvent *event;
3928
3929   channel = message->type_data.data.channel;
3930
3931   stream = find_stream (src, &channel, (gpointer) find_stream_by_channel);
3932   if (!stream)
3933     goto unknown_stream;
3934
3935   if (channel == stream->channel[0]) {
3936     outpad = stream->channelpad[0];
3937     is_rtcp = FALSE;
3938   } else if (channel == stream->channel[1]) {
3939     outpad = stream->channelpad[1];
3940     is_rtcp = TRUE;
3941   } else {
3942     is_rtcp = FALSE;
3943   }
3944
3945   /* take a look at the body to figure out what we have */
3946   gst_rtsp_message_get_body (message, &data, &size);
3947   if (size < 2)
3948     goto invalid_length;
3949
3950   /* channels are not correct on some servers, do extra check */
3951   if (data[1] >= 200 && data[1] <= 204) {
3952     /* hmm RTCP message switch to the RTCP pad of the same stream. */
3953     outpad = stream->channelpad[1];
3954     is_rtcp = TRUE;
3955   }
3956
3957   /* we have no clue what this is, just ignore then. */
3958   if (outpad == NULL)
3959     goto unknown_stream;
3960
3961   /* take the message body for further processing */
3962   gst_rtsp_message_steal_body (message, &data, &size);
3963
3964   /* strip the trailing \0 */
3965   size -= 1;
3966
3967   buf = gst_buffer_new ();
3968   gst_buffer_append_memory (buf,
3969       gst_memory_new_wrapped (0, data, size, 0, size, data, g_free));
3970
3971   /* don't need message anymore */
3972   gst_rtsp_message_unset (message);
3973
3974   GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
3975       channel);
3976
3977   if (src->need_activate) {
3978     gchar *stream_id;
3979     GstEvent *event;
3980     GChecksum *cs;
3981     gchar *uri;
3982     GList *streams;
3983     guint group_id = gst_util_group_id_next ();
3984
3985     /* generate an SHA256 sum of the URI */
3986     cs = g_checksum_new (G_CHECKSUM_SHA256);
3987     uri = src->conninfo.location;
3988     g_checksum_update (cs, (const guchar *) uri, strlen (uri));
3989
3990     for (streams = src->streams; streams; streams = g_list_next (streams)) {
3991       GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
3992
3993       stream_id =
3994           g_strdup_printf ("%s/%d", g_checksum_get_string (cs), ostream->id);
3995       event = gst_event_new_stream_start (stream_id);
3996       gst_event_set_group_id (event, group_id);
3997
3998       g_free (stream_id);
3999       gst_rtspsrc_stream_push_event (src, ostream, event);
4000     }
4001     g_checksum_free (cs);
4002
4003     gst_rtspsrc_activate_streams (src);
4004     src->need_activate = FALSE;
4005   }
4006   if ((event = src->start_segment) != NULL) {
4007     src->start_segment = NULL;
4008     gst_rtspsrc_push_event (src, event);
4009   }
4010
4011   if (src->base_time == -1) {
4012     /* Take current running_time. This timestamp will be put on
4013      * the first buffer of each stream because we are a live source and so we
4014      * timestamp with the running_time. When we are dealing with TCP, we also
4015      * only timestamp the first buffer (using the DISCONT flag) because a server
4016      * typically bursts data, for which we don't want to compensate by speeding
4017      * up the media. The other timestamps will be interpollated from this one
4018      * using the RTP timestamps. */
4019     GST_OBJECT_LOCK (src);
4020     if (GST_ELEMENT_CLOCK (src)) {
4021       GstClockTime now;
4022       GstClockTime base_time;
4023
4024       now = gst_clock_get_time (GST_ELEMENT_CLOCK (src));
4025       base_time = GST_ELEMENT_CAST (src)->base_time;
4026
4027       src->base_time = now - base_time;
4028
4029       GST_DEBUG_OBJECT (src, "first buffer at time %" GST_TIME_FORMAT ", base %"
4030           GST_TIME_FORMAT, GST_TIME_ARGS (now), GST_TIME_ARGS (base_time));
4031     }
4032     GST_OBJECT_UNLOCK (src);
4033   }
4034
4035   if (stream->discont && !is_rtcp) {
4036     /* mark first RTP buffer as discont */
4037     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
4038     stream->discont = FALSE;
4039     /* first buffer gets the timestamp, other buffers are not timestamped and
4040      * their presentation time will be interpollated from the rtp timestamps. */
4041     GST_DEBUG_OBJECT (src, "setting timestamp %" GST_TIME_FORMAT,
4042         GST_TIME_ARGS (src->base_time));
4043
4044     GST_BUFFER_TIMESTAMP (buf) = src->base_time;
4045   }
4046
4047   /* chain to the peer pad */
4048   if (GST_PAD_IS_SINK (outpad))
4049     ret = gst_pad_chain (outpad, buf);
4050   else
4051     ret = gst_pad_push (outpad, buf);
4052
4053   if (!is_rtcp) {
4054     /* combine all stream flows for the data transport */
4055     ret = gst_rtspsrc_combine_flows (src, stream, ret);
4056   }
4057   return ret;
4058
4059   /* ERRORS */
4060 unknown_stream:
4061   {
4062     GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
4063     gst_rtsp_message_unset (message);
4064     return GST_FLOW_OK;
4065   }
4066 invalid_length:
4067   {
4068     GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
4069         ("Short message received, ignoring."));
4070     gst_rtsp_message_unset (message);
4071     return GST_FLOW_OK;
4072   }
4073 }
4074
4075 static GstFlowReturn
4076 gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
4077 {
4078   GstRTSPMessage message = { 0 };
4079   GstRTSPResult res;
4080   GstFlowReturn ret = GST_FLOW_OK;
4081   GTimeVal tv_timeout;
4082
4083   while (TRUE) {
4084     /* get the next timeout interval */
4085     gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
4086
4087     /* see if the timeout period expired */
4088     if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) {
4089       GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
4090       /* send keep-alive, only act on interrupt, a warning will be posted for
4091        * other errors. */
4092       if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
4093         goto interrupt;
4094       /* get new timeout */
4095       gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
4096     }
4097
4098     GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
4099         tv_timeout.tv_sec, tv_timeout.tv_usec);
4100
4101     /* protect the connection with the connection lock so that we can see when
4102      * we are finished doing server communication */
4103     res =
4104         gst_rtspsrc_connection_receive (src, src->conninfo.connection,
4105         &message, src->ptcp_timeout);
4106
4107     switch (res) {
4108       case GST_RTSP_OK:
4109         GST_DEBUG_OBJECT (src, "we received a server message");
4110         break;
4111       case GST_RTSP_EINTR:
4112         /* we got interrupted this means we need to stop */
4113         goto interrupt;
4114       case GST_RTSP_ETIMEOUT:
4115         /* no reply, send keep alive */
4116         GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
4117         if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
4118           goto interrupt;
4119         continue;
4120       case GST_RTSP_EEOF:
4121         /* go EOS when the server closed the connection */
4122         goto server_eof;
4123       default:
4124         goto receive_error;
4125     }
4126
4127     switch (message.type) {
4128       case GST_RTSP_MESSAGE_REQUEST:
4129         /* server sends us a request message, handle it */
4130         res =
4131             gst_rtspsrc_handle_request (src, src->conninfo.connection,
4132             &message);
4133         if (res == GST_RTSP_EEOF)
4134           goto server_eof;
4135         else if (res < 0)
4136           goto handle_request_failed;
4137         break;
4138       case GST_RTSP_MESSAGE_RESPONSE:
4139         /* we ignore response messages */
4140         GST_DEBUG_OBJECT (src, "ignoring response message");
4141         if (src->debug)
4142           gst_rtsp_message_dump (&message);
4143         break;
4144       case GST_RTSP_MESSAGE_DATA:
4145         GST_DEBUG_OBJECT (src, "got data message");
4146         ret = gst_rtspsrc_handle_data (src, &message);
4147         if (ret != GST_FLOW_OK)
4148           goto handle_data_failed;
4149         break;
4150       default:
4151         GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
4152             message.type);
4153         break;
4154     }
4155   }
4156   g_assert_not_reached ();
4157
4158   /* ERRORS */
4159 server_eof:
4160   {
4161     GST_DEBUG_OBJECT (src, "we got an eof from the server");
4162     GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
4163         ("The server closed the connection."));
4164     src->conninfo.connected = FALSE;
4165     gst_rtsp_message_unset (&message);
4166     return GST_FLOW_EOS;
4167   }
4168 interrupt:
4169   {
4170     gst_rtsp_message_unset (&message);
4171     GST_DEBUG_OBJECT (src, "got interrupted");
4172     return GST_FLOW_FLUSHING;
4173   }
4174 receive_error:
4175   {
4176     gchar *str = gst_rtsp_strresult (res);
4177
4178     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
4179         ("Could not receive message. (%s)", str));
4180     g_free (str);
4181
4182     gst_rtsp_message_unset (&message);
4183     return GST_FLOW_ERROR;
4184   }
4185 handle_request_failed:
4186   {
4187     gchar *str = gst_rtsp_strresult (res);
4188
4189     GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
4190         ("Could not handle server message. (%s)", str));
4191     g_free (str);
4192     gst_rtsp_message_unset (&message);
4193     return GST_FLOW_ERROR;
4194   }
4195 handle_data_failed:
4196   {
4197     GST_DEBUG_OBJECT (src, "could no handle data message");
4198     return ret;
4199   }
4200 }
4201
4202 static GstFlowReturn
4203 gst_rtspsrc_loop_udp (GstRTSPSrc * src)
4204 {
4205   GstRTSPResult res;
4206   GstRTSPMessage message = { 0 };
4207   gint retry = 0;
4208
4209   while (TRUE) {
4210     GTimeVal tv_timeout;
4211
4212     /* get the next timeout interval */
4213     gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
4214
4215     GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds",
4216         (gint) tv_timeout.tv_sec);
4217
4218     gst_rtsp_message_unset (&message);
4219
4220     /* we should continue reading the TCP socket because the server might
4221      * send us requests. When the session timeout expires, we need to send a
4222      * keep-alive request to keep the session open. */
4223     res = gst_rtspsrc_connection_receive (src, src->conninfo.connection,
4224         &message, &tv_timeout);
4225
4226     switch (res) {
4227       case GST_RTSP_OK:
4228         GST_DEBUG_OBJECT (src, "we received a server message");
4229         break;
4230       case GST_RTSP_EINTR:
4231         /* we got interrupted, see what we have to do */
4232         goto interrupt;
4233       case GST_RTSP_ETIMEOUT:
4234         /* send keep-alive, ignore the result, a warning will be posted. */
4235         GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
4236         if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
4237           goto interrupt;
4238         continue;
4239       case GST_RTSP_EEOF:
4240         /* server closed the connection. not very fatal for UDP, reconnect and
4241          * see what happens. */
4242         GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
4243             ("The server closed the connection."));
4244         if (src->udp_reconnect) {
4245           if ((res =
4246                   gst_rtsp_conninfo_reconnect (src, &src->conninfo, FALSE)) < 0)
4247             goto connect_error;
4248         } else {
4249           goto server_eof;
4250         }
4251         continue;
4252       case GST_RTSP_ENET:
4253         GST_DEBUG_OBJECT (src, "An ethernet problem occured.");
4254       default:
4255         GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
4256             ("Unhandled return value %d.", res));
4257         goto receive_error;
4258     }
4259
4260     switch (message.type) {
4261       case GST_RTSP_MESSAGE_REQUEST:
4262         /* server sends us a request message, handle it */
4263         res =
4264             gst_rtspsrc_handle_request (src, src->conninfo.connection,
4265             &message);
4266         if (res == GST_RTSP_EEOF)
4267           goto server_eof;
4268         else if (res < 0)
4269           goto handle_request_failed;
4270         break;
4271       case GST_RTSP_MESSAGE_RESPONSE:
4272         /* we ignore response and data messages */
4273         GST_DEBUG_OBJECT (src, "ignoring response message");
4274         if (src->debug)
4275           gst_rtsp_message_dump (&message);
4276         if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) {
4277           GST_DEBUG_OBJECT (src, "but is Unauthorized response ...");
4278           if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) {
4279             GST_DEBUG_OBJECT (src, "so retrying keep-alive");
4280             if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
4281               goto interrupt;
4282           }
4283         } else {
4284           retry = 0;
4285         }
4286         break;
4287       case GST_RTSP_MESSAGE_DATA:
4288         /* we ignore response and data messages */
4289         GST_DEBUG_OBJECT (src, "ignoring data message");
4290         break;
4291       default:
4292         GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
4293             message.type);
4294         break;
4295     }
4296   }
4297   g_assert_not_reached ();
4298
4299   /* we get here when the connection got interrupted */
4300 interrupt:
4301   {
4302     gst_rtsp_message_unset (&message);
4303     GST_DEBUG_OBJECT (src, "got interrupted");
4304     return GST_FLOW_FLUSHING;
4305   }
4306 connect_error:
4307   {
4308     gchar *str = gst_rtsp_strresult (res);
4309     GstFlowReturn ret;
4310
4311     src->conninfo.connected = FALSE;
4312     if (res != GST_RTSP_EINTR) {
4313       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
4314           ("Could not connect to server. (%s)", str));
4315       g_free (str);
4316       ret = GST_FLOW_ERROR;
4317     } else {
4318       ret = GST_FLOW_FLUSHING;
4319     }
4320     return ret;
4321   }
4322 receive_error:
4323   {
4324     gchar *str = gst_rtsp_strresult (res);
4325
4326     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
4327         ("Could not receive message. (%s)", str));
4328     g_free (str);
4329     return GST_FLOW_ERROR;
4330   }
4331 handle_request_failed:
4332   {
4333     gchar *str = gst_rtsp_strresult (res);
4334     GstFlowReturn ret;
4335
4336     gst_rtsp_message_unset (&message);
4337     if (res != GST_RTSP_EINTR) {
4338       GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
4339           ("Could not handle server message. (%s)", str));
4340       g_free (str);
4341       ret = GST_FLOW_ERROR;
4342     } else {
4343       ret = GST_FLOW_FLUSHING;
4344     }
4345     return ret;
4346   }
4347 server_eof:
4348   {
4349     GST_DEBUG_OBJECT (src, "we got an eof from the server");
4350     GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
4351         ("The server closed the connection."));
4352     src->conninfo.connected = FALSE;
4353     gst_rtsp_message_unset (&message);
4354     return GST_FLOW_EOS;
4355   }
4356 }
4357
4358 static GstRTSPResult
4359 gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async)
4360 {
4361   GstRTSPResult res = GST_RTSP_OK;
4362   gboolean restart;
4363
4364   GST_DEBUG_OBJECT (src, "doing reconnect");
4365
4366   GST_OBJECT_LOCK (src);
4367   /* only restart when the pads were not yet activated, else we were
4368    * streaming over UDP */
4369   restart = src->need_activate;
4370   GST_OBJECT_UNLOCK (src);
4371
4372   /* no need to restart, we're done */
4373   if (!restart)
4374     goto done;
4375
4376   /* we can try only TCP now */
4377   src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
4378
4379   /* close and cleanup our state */
4380   if ((res = gst_rtspsrc_close (src, async, FALSE)) < 0)
4381     goto done;
4382
4383   /* see if we have TCP left to try. Also don't try TCP when we were configured
4384    * with an SDP. */
4385   if (!(src->protocols & GST_RTSP_LOWER_TRANS_TCP) || src->from_sdp)
4386     goto no_protocols;
4387
4388   /* We post a warning message now to inform the user
4389    * that nothing happened. It's most likely a firewall thing. */
4390   GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
4391       ("Could not receive any UDP packets for %.4f seconds, maybe your "
4392           "firewall is blocking it. Retrying using a TCP connection.",
4393           gst_guint64_to_gdouble (src->udp_timeout / 1000000.0)));
4394
4395   /* open new connection using tcp */
4396   if (gst_rtspsrc_open (src, async) < 0)
4397     goto open_failed;
4398
4399   /* start playback */
4400   if (gst_rtspsrc_play (src, &src->segment, async) < 0)
4401     goto play_failed;
4402
4403 done:
4404   return res;
4405
4406   /* ERRORS */
4407 no_protocols:
4408   {
4409     src->cur_protocols = 0;
4410     /* no transport possible, post an error and stop */
4411     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
4412         ("Could not receive any UDP packets for %.4f seconds, maybe your "
4413             "firewall is blocking it. No other protocols to try.",
4414             gst_guint64_to_gdouble (src->udp_timeout / 1000000.0)));
4415     return GST_RTSP_ERROR;
4416   }
4417 open_failed:
4418   {
4419     GST_DEBUG_OBJECT (src, "open failed");
4420     return GST_RTSP_OK;
4421   }
4422 play_failed:
4423   {
4424     GST_DEBUG_OBJECT (src, "play failed");
4425     return GST_RTSP_OK;
4426   }
4427 }
4428
4429 static void
4430 gst_rtspsrc_loop_start_cmd (GstRTSPSrc * src, gint cmd)
4431 {
4432   switch (cmd) {
4433     case CMD_OPEN:
4434       GST_ELEMENT_PROGRESS (src, START, "open", ("Opening Stream"));
4435       break;
4436     case CMD_PLAY:
4437       GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PLAY request"));
4438       break;
4439     case CMD_PAUSE:
4440       GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PAUSE request"));
4441       break;
4442     case CMD_CLOSE:
4443       GST_ELEMENT_PROGRESS (src, START, "close", ("Closing Stream"));
4444       break;
4445     default:
4446       break;
4447   }
4448 }
4449
4450 static void
4451 gst_rtspsrc_loop_complete_cmd (GstRTSPSrc * src, gint cmd)
4452 {
4453   switch (cmd) {
4454     case CMD_OPEN:
4455       GST_ELEMENT_PROGRESS (src, COMPLETE, "open", ("Opened Stream"));
4456       break;
4457     case CMD_PLAY:
4458       GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("Sent PLAY request"));
4459       break;
4460     case CMD_PAUSE:
4461       GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("Sent PAUSE request"));
4462       break;
4463     case CMD_CLOSE:
4464       GST_ELEMENT_PROGRESS (src, COMPLETE, "close", ("Closed Stream"));
4465       break;
4466     default:
4467       break;
4468   }
4469 }
4470
4471 static void
4472 gst_rtspsrc_loop_cancel_cmd (GstRTSPSrc * src, gint cmd)
4473 {
4474   switch (cmd) {
4475     case CMD_OPEN:
4476       GST_ELEMENT_PROGRESS (src, CANCELED, "open", ("Open canceled"));
4477       break;
4478     case CMD_PLAY:
4479       GST_ELEMENT_PROGRESS (src, CANCELED, "request", ("PLAY canceled"));
4480       break;
4481     case CMD_PAUSE:
4482       GST_ELEMENT_PROGRESS (src, CANCELED, "request", ("PAUSE canceled"));
4483       break;
4484     case CMD_CLOSE:
4485       GST_ELEMENT_PROGRESS (src, CANCELED, "close", ("Close canceled"));
4486       break;
4487     default:
4488       break;
4489   }
4490 }
4491
4492 static void
4493 gst_rtspsrc_loop_error_cmd (GstRTSPSrc * src, gint cmd)
4494 {
4495   switch (cmd) {
4496     case CMD_OPEN:
4497       GST_ELEMENT_PROGRESS (src, ERROR, "open", ("Open failed"));
4498       break;
4499     case CMD_PLAY:
4500       GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PLAY failed"));
4501       break;
4502     case CMD_PAUSE:
4503       GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PAUSE failed"));
4504       break;
4505     case CMD_CLOSE:
4506       GST_ELEMENT_PROGRESS (src, ERROR, "close", ("Close failed"));
4507       break;
4508     default:
4509       break;
4510   }
4511 }
4512
4513 static void
4514 gst_rtspsrc_loop_end_cmd (GstRTSPSrc * src, gint cmd, GstRTSPResult ret)
4515 {
4516   if (ret == GST_RTSP_OK)
4517     gst_rtspsrc_loop_complete_cmd (src, cmd);
4518   else if (ret == GST_RTSP_EINTR)
4519     gst_rtspsrc_loop_cancel_cmd (src, cmd);
4520   else
4521     gst_rtspsrc_loop_error_cmd (src, cmd);
4522 }
4523
4524 static gboolean
4525 gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gint mask)
4526 {
4527   gint old;
4528   gboolean flushed = FALSE;
4529
4530   /* start new request */
4531   gst_rtspsrc_loop_start_cmd (src, cmd);
4532
4533   GST_DEBUG_OBJECT (src, "sending cmd %d", cmd);
4534
4535   GST_OBJECT_LOCK (src);
4536   old = src->pending_cmd;
4537   if (old == CMD_RECONNECT) {
4538     GST_DEBUG_OBJECT (src, "ignore, we were reconnecting");
4539     cmd = CMD_RECONNECT;
4540   }
4541   if (old != CMD_WAIT) {
4542     src->pending_cmd = CMD_WAIT;
4543     GST_OBJECT_UNLOCK (src);
4544     /* cancel previous request */
4545     GST_DEBUG_OBJECT (src, "cancel previous request %d", old);
4546     gst_rtspsrc_loop_cancel_cmd (src, old);
4547     GST_OBJECT_LOCK (src);
4548   }
4549   src->pending_cmd = cmd;
4550   /* interrupt if allowed */
4551   if (src->busy_cmd & mask) {
4552     GST_DEBUG_OBJECT (src, "connection flush busy %d", src->busy_cmd);
4553     gst_rtspsrc_connection_flush (src, TRUE);
4554     flushed = TRUE;
4555   } else {
4556     GST_DEBUG_OBJECT (src, "not interrupting busy cmd %d", src->busy_cmd);
4557   }
4558   if (src->task)
4559     gst_task_start (src->task);
4560   GST_OBJECT_UNLOCK (src);
4561
4562   return flushed;
4563 }
4564
4565 static gboolean
4566 gst_rtspsrc_loop (GstRTSPSrc * src)
4567 {
4568   GstFlowReturn ret;
4569
4570   if (!src->conninfo.connection || !src->conninfo.connected)
4571     goto no_connection;
4572
4573   if (src->interleaved)
4574     ret = gst_rtspsrc_loop_interleaved (src);
4575   else
4576     ret = gst_rtspsrc_loop_udp (src);
4577
4578   if (ret != GST_FLOW_OK)
4579     goto pause;
4580
4581   return TRUE;
4582
4583   /* ERRORS */
4584 no_connection:
4585   {
4586     GST_WARNING_OBJECT (src, "we are not connected");
4587     ret = GST_FLOW_FLUSHING;
4588     goto pause;
4589   }
4590 pause:
4591   {
4592     const gchar *reason = gst_flow_get_name (ret);
4593
4594     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
4595     src->running = FALSE;
4596     if (ret == GST_FLOW_EOS) {
4597       /* perform EOS logic */
4598       if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4599         gst_element_post_message (GST_ELEMENT_CAST (src),
4600             gst_message_new_segment_done (GST_OBJECT_CAST (src),
4601                 src->segment.format, src->segment.position));
4602         gst_rtspsrc_push_event (src,
4603             gst_event_new_segment_done (src->segment.format,
4604                 src->segment.position));
4605       } else {
4606         gst_rtspsrc_push_event (src, gst_event_new_eos ());
4607       }
4608     } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
4609       /* for fatal errors we post an error message, post the error before the
4610        * EOS so the app knows about the error first. */
4611       GST_ELEMENT_ERROR (src, STREAM, FAILED,
4612           ("Internal data flow error."),
4613           ("streaming task paused, reason %s (%d)", reason, ret));
4614       gst_rtspsrc_push_event (src, gst_event_new_eos ());
4615     }
4616     gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, CMD_LOOP);
4617     return FALSE;
4618   }
4619 }
4620
4621 #ifndef GST_DISABLE_GST_DEBUG
4622 static const gchar *
4623 gst_rtsp_auth_method_to_string (GstRTSPAuthMethod method)
4624 {
4625   gint index = 0;
4626
4627   while (method != 0) {
4628     index++;
4629     method >>= 1;
4630   }
4631   switch (index) {
4632     case 0:
4633       return "None";
4634     case 1:
4635       return "Basic";
4636     case 2:
4637       return "Digest";
4638   }
4639
4640   return "Unknown";
4641 }
4642 #endif
4643
4644 static const gchar *
4645 gst_rtspsrc_skip_lws (const gchar * s)
4646 {
4647   while (g_ascii_isspace (*s))
4648     s++;
4649   return s;
4650 }
4651
4652 static const gchar *
4653 gst_rtspsrc_unskip_lws (const gchar * s, const gchar * start)
4654 {
4655   while (s > start && g_ascii_isspace (*(s - 1)))
4656     s--;
4657   return s;
4658 }
4659
4660 static const gchar *
4661 gst_rtspsrc_skip_commas (const gchar * s)
4662 {
4663   /* The grammar allows for multiple commas */
4664   while (g_ascii_isspace (*s) || *s == ',')
4665     s++;
4666   return s;
4667 }
4668
4669 static const gchar *
4670 gst_rtspsrc_skip_item (const gchar * s)
4671 {
4672   gboolean quoted = FALSE;
4673   const gchar *start = s;
4674
4675   /* A list item ends at the last non-whitespace character
4676    * before a comma which is not inside a quoted-string. Or at
4677    * the end of the string.
4678    */
4679   while (*s) {
4680     if (*s == '"')
4681       quoted = !quoted;
4682     else if (quoted) {
4683       if (*s == '\\' && *(s + 1))
4684         s++;
4685     } else {
4686       if (*s == ',')
4687         break;
4688     }
4689     s++;
4690   }
4691
4692   return gst_rtspsrc_unskip_lws (s, start);
4693 }
4694
4695 static void
4696 gst_rtsp_decode_quoted_string (gchar * quoted_string)
4697 {
4698   gchar *src, *dst;
4699
4700   src = quoted_string + 1;
4701   dst = quoted_string;
4702   while (*src && *src != '"') {
4703     if (*src == '\\' && *(src + 1))
4704       src++;
4705     *dst++ = *src++;
4706   }
4707   *dst = '\0';
4708 }
4709
4710 /* Extract the authentication tokens that the server provided for each method
4711  * into an array of structures and give those to the connection object.
4712  */
4713 static void
4714 gst_rtspsrc_parse_digest_challenge (GstRTSPConnection * conn,
4715     const gchar * header, gboolean * stale)
4716 {
4717   GSList *list = NULL, *iter;
4718   const gchar *end;
4719   gchar *item, *eq, *name_end, *value;
4720
4721   g_return_if_fail (stale != NULL);
4722
4723   gst_rtsp_connection_clear_auth_params (conn);
4724   *stale = FALSE;
4725
4726   /* Parse a header whose content is described by RFC2616 as
4727    * "#something", where "something" does not itself contain commas,
4728    * except as part of quoted-strings, into a list of allocated strings.
4729    */
4730   header = gst_rtspsrc_skip_commas (header);
4731   while (*header) {
4732     end = gst_rtspsrc_skip_item (header);
4733     list = g_slist_prepend (list, g_strndup (header, end - header));
4734     header = gst_rtspsrc_skip_commas (end);
4735   }
4736   if (!list)
4737     return;
4738
4739   list = g_slist_reverse (list);
4740   for (iter = list; iter; iter = iter->next) {
4741     item = iter->data;
4742
4743     eq = strchr (item, '=');
4744     if (eq) {
4745       name_end = (gchar *) gst_rtspsrc_unskip_lws (eq, item);
4746       if (name_end == item) {
4747         /* That's no good... */
4748         g_free (item);
4749         continue;
4750       }
4751
4752       *name_end = '\0';
4753
4754       value = (gchar *) gst_rtspsrc_skip_lws (eq + 1);
4755       if (*value == '"')
4756         gst_rtsp_decode_quoted_string (value);
4757     } else
4758       value = NULL;
4759
4760     if (item && (strcmp (item, "stale") == 0) &&
4761         value && (strcmp (value, "TRUE") == 0))
4762       *stale = TRUE;
4763     gst_rtsp_connection_set_auth_param (conn, item, value);
4764     g_free (item);
4765   }
4766
4767   g_slist_free (list);
4768 }
4769
4770 /* Parse a WWW-Authenticate Response header and determine the
4771  * available authentication methods
4772  *
4773  * This code should also cope with the fact that each WWW-Authenticate
4774  * header can contain multiple challenge methods + tokens
4775  *
4776  * At the moment, for Basic auth, we just do a minimal check and don't
4777  * even parse out the realm */
4778 static void
4779 gst_rtspsrc_parse_auth_hdr (gchar * hdr, GstRTSPAuthMethod * methods,
4780     GstRTSPConnection * conn, gboolean * stale)
4781 {
4782   gchar *start;
4783
4784   g_return_if_fail (hdr != NULL);
4785   g_return_if_fail (methods != NULL);
4786   g_return_if_fail (stale != NULL);
4787
4788   /* Skip whitespace at the start of the string */
4789   for (start = hdr; start[0] != '\0' && g_ascii_isspace (start[0]); start++);
4790
4791   if (g_ascii_strncasecmp (start, "basic", 5) == 0)
4792     *methods |= GST_RTSP_AUTH_BASIC;
4793   else if (g_ascii_strncasecmp (start, "digest ", 7) == 0) {
4794     *methods |= GST_RTSP_AUTH_DIGEST;
4795     gst_rtspsrc_parse_digest_challenge (conn, &start[7], stale);
4796   }
4797 }
4798
4799 /**
4800  * gst_rtspsrc_setup_auth:
4801  * @src: the rtsp source
4802  *
4803  * Configure a username and password and auth method on the
4804  * connection object based on a response we received from the
4805  * peer.
4806  *
4807  * Currently, this requires that a username and password were supplied
4808  * in the uri. In the future, they may be requested on demand by sending
4809  * a message up the bus.
4810  *
4811  * Returns: TRUE if authentication information could be set up correctly.
4812  */
4813 static gboolean
4814 gst_rtspsrc_setup_auth (GstRTSPSrc * src, GstRTSPMessage * response)
4815 {
4816   gchar *user = NULL;
4817   gchar *pass = NULL;
4818   GstRTSPAuthMethod avail_methods = GST_RTSP_AUTH_NONE;
4819   GstRTSPAuthMethod method;
4820   GstRTSPResult auth_result;
4821   GstRTSPUrl *url;
4822   GstRTSPConnection *conn;
4823   gchar *hdr;
4824   gboolean stale = FALSE;
4825
4826   conn = src->conninfo.connection;
4827
4828   /* Identify the available auth methods and see if any are supported */
4829   if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_WWW_AUTHENTICATE,
4830           &hdr, 0) == GST_RTSP_OK) {
4831     gst_rtspsrc_parse_auth_hdr (hdr, &avail_methods, conn, &stale);
4832   }
4833
4834   if (avail_methods == GST_RTSP_AUTH_NONE)
4835     goto no_auth_available;
4836
4837   /* For digest auth, if the response indicates that the session
4838    * data are stale, we just update them in the connection object and
4839    * return TRUE to retry the request */
4840   if (stale)
4841     src->tried_url_auth = FALSE;
4842
4843   url = gst_rtsp_connection_get_url (conn);
4844
4845   /* Do we have username and password available? */
4846   if (url != NULL && !src->tried_url_auth && url->user != NULL
4847       && url->passwd != NULL) {
4848     user = url->user;
4849     pass = url->passwd;
4850     src->tried_url_auth = TRUE;
4851     GST_DEBUG_OBJECT (src,
4852         "Attempting authentication using credentials from the URL");
4853   } else {
4854     user = src->user_id;
4855     pass = src->user_pw;
4856     GST_DEBUG_OBJECT (src,
4857         "Attempting authentication using credentials from the properties");
4858   }
4859
4860   /* FIXME: If the url didn't contain username and password or we tried them
4861    * already, request a username and passwd from the application via some kind
4862    * of credentials request message */
4863
4864   /* If we don't have a username and passwd at this point, bail out. */
4865   if (user == NULL || pass == NULL)
4866     goto no_user_pass;
4867
4868   /* Try to configure for each available authentication method, strongest to
4869    * weakest */
4870   for (method = GST_RTSP_AUTH_MAX; method != GST_RTSP_AUTH_NONE; method >>= 1) {
4871     /* Check if this method is available on the server */
4872     if ((method & avail_methods) == 0)
4873       continue;
4874
4875     /* Pass the credentials to the connection to try on the next request */
4876     auth_result = gst_rtsp_connection_set_auth (conn, method, user, pass);
4877     /* INVAL indicates an invalid username/passwd were supplied, so we'll just
4878      * ignore it and end up retrying later */
4879     if (auth_result == GST_RTSP_OK || auth_result == GST_RTSP_EINVAL) {
4880       GST_DEBUG_OBJECT (src, "Attempting %s authentication",
4881           gst_rtsp_auth_method_to_string (method));
4882       break;
4883     }
4884   }
4885
4886   if (method == GST_RTSP_AUTH_NONE)
4887     goto no_auth_available;
4888
4889   return TRUE;
4890
4891 no_auth_available:
4892   {
4893     /* Output an error indicating that we couldn't connect because there were
4894      * no supported authentication protocols */
4895     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
4896         ("No supported authentication protocol was found"));
4897     return FALSE;
4898   }
4899 no_user_pass:
4900   {
4901     /* We don't fire an error message, we just return FALSE and let the
4902      * normal NOT_AUTHORIZED error be propagated */
4903     return FALSE;
4904   }
4905 }
4906
4907 static GstRTSPResult
4908 gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPConnection * conn,
4909     GstRTSPMessage * request, GstRTSPMessage * response,
4910     GstRTSPStatusCode * code)
4911 {
4912   GstRTSPResult res;
4913   GstRTSPStatusCode thecode;
4914   gchar *content_base = NULL;
4915   gint try = 0;
4916
4917 again:
4918   if (!src->short_header)
4919     gst_rtsp_ext_list_before_send (src->extensions, request);
4920
4921   GST_DEBUG_OBJECT (src, "sending message");
4922
4923   if (src->debug)
4924     gst_rtsp_message_dump (request);
4925
4926   res = gst_rtspsrc_connection_send (src, conn, request, src->ptcp_timeout);
4927   if (res < 0)
4928     goto send_error;
4929
4930   gst_rtsp_connection_reset_timeout (conn);
4931
4932 next:
4933   res = gst_rtspsrc_connection_receive (src, conn, response, src->ptcp_timeout);
4934   if (res < 0)
4935     goto receive_error;
4936
4937   if (src->debug)
4938     gst_rtsp_message_dump (response);
4939
4940   switch (response->type) {
4941     case GST_RTSP_MESSAGE_REQUEST:
4942       res = gst_rtspsrc_handle_request (src, conn, response);
4943       if (res == GST_RTSP_EEOF)
4944         goto server_eof;
4945       else if (res < 0)
4946         goto handle_request_failed;
4947       goto next;
4948     case GST_RTSP_MESSAGE_RESPONSE:
4949       /* ok, a response is good */
4950       GST_DEBUG_OBJECT (src, "received response message");
4951       break;
4952     case GST_RTSP_MESSAGE_DATA:
4953       /* get next response */
4954       GST_DEBUG_OBJECT (src, "handle data response message");
4955       gst_rtspsrc_handle_data (src, response);
4956       goto next;
4957     default:
4958       GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
4959           response->type);
4960       goto next;
4961   }
4962
4963   thecode = response->type_data.response.code;
4964
4965   GST_DEBUG_OBJECT (src, "got response message %d", thecode);
4966
4967   /* if the caller wanted the result code, we store it. */
4968   if (code)
4969     *code = thecode;
4970
4971   /* If the request didn't succeed, bail out before doing any more */
4972   if (thecode != GST_RTSP_STS_OK)
4973     return GST_RTSP_OK;
4974
4975   /* store new content base if any */
4976   gst_rtsp_message_get_header (response, GST_RTSP_HDR_CONTENT_BASE,
4977       &content_base, 0);
4978   if (content_base) {
4979     g_free (src->content_base);
4980     src->content_base = g_strdup (content_base);
4981   }
4982   gst_rtsp_ext_list_after_send (src->extensions, request, response);
4983
4984   return GST_RTSP_OK;
4985
4986   /* ERRORS */
4987 send_error:
4988   {
4989     gchar *str = gst_rtsp_strresult (res);
4990
4991     if (res != GST_RTSP_EINTR) {
4992       GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
4993           ("Could not send message. (%s)", str));
4994     } else {
4995       GST_WARNING_OBJECT (src, "send interrupted");
4996     }
4997     g_free (str);
4998     return res;
4999   }
5000 receive_error:
5001   {
5002     switch (res) {
5003       case GST_RTSP_EEOF:
5004         GST_WARNING_OBJECT (src, "server closed connection");
5005         if ((try == 0) && !src->interleaved && src->udp_reconnect) {
5006           try++;
5007           /* if reconnect succeeds, try again */
5008           if ((res =
5009                   gst_rtsp_conninfo_reconnect (src, &src->conninfo,
5010                       FALSE)) == 0)
5011             goto again;
5012         }
5013         /* only try once after reconnect, then fallthrough and error out */
5014       default:
5015       {
5016         gchar *str = gst_rtsp_strresult (res);
5017
5018         if (res != GST_RTSP_EINTR) {
5019           GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
5020               ("Could not receive message. (%s)", str));
5021         } else {
5022           GST_WARNING_OBJECT (src, "receive interrupted");
5023         }
5024         g_free (str);
5025         break;
5026       }
5027     }
5028     return res;
5029   }
5030 handle_request_failed:
5031   {
5032     /* ERROR was posted */
5033     gst_rtsp_message_unset (response);
5034     return res;
5035   }
5036 server_eof:
5037   {
5038     GST_DEBUG_OBJECT (src, "we got an eof from the server");
5039     GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
5040         ("The server closed the connection."));
5041     gst_rtsp_message_unset (response);
5042     return res;
5043   }
5044 }
5045
5046 /**
5047  * gst_rtspsrc_send:
5048  * @src: the rtsp source
5049  * @conn: the connection to send on
5050  * @request: must point to a valid request
5051  * @response: must point to an empty #GstRTSPMessage
5052  * @code: an optional code result
5053  *
5054  * send @request and retrieve the response in @response. optionally @code can be
5055  * non-NULL in which case it will contain the status code of the response.
5056  *
5057  * If This function returns #GST_RTSP_OK, @response will contain a valid response
5058  * message that should be cleaned with gst_rtsp_message_unset() after usage.
5059  *
5060  * If @code is NULL, this function will return #GST_RTSP_ERROR (with an invalid
5061  * @response message) if the response code was not 200 (OK).
5062  *
5063  * If the attempt results in an authentication failure, then this will attempt
5064  * to retrieve authentication credentials via gst_rtspsrc_setup_auth and retry
5065  * the request.
5066  *
5067  * Returns: #GST_RTSP_OK if the processing was successful.
5068  */
5069 static GstRTSPResult
5070 gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnection * conn,
5071     GstRTSPMessage * request, GstRTSPMessage * response,
5072     GstRTSPStatusCode * code)
5073 {
5074   GstRTSPStatusCode int_code = GST_RTSP_STS_OK;
5075   GstRTSPResult res = GST_RTSP_ERROR;
5076   gint count;
5077   gboolean retry;
5078   GstRTSPMethod method = GST_RTSP_INVALID;
5079
5080   count = 0;
5081   do {
5082     retry = FALSE;
5083
5084     /* make sure we don't loop forever */
5085     if (count++ > 8)
5086       break;
5087
5088     /* save method so we can disable it when the server complains */
5089     method = request->type_data.request.method;
5090
5091     if ((res =
5092             gst_rtspsrc_try_send (src, conn, request, response, &int_code)) < 0)
5093       goto error;
5094
5095     switch (int_code) {
5096       case GST_RTSP_STS_UNAUTHORIZED:
5097         if (gst_rtspsrc_setup_auth (src, response)) {
5098           /* Try the request/response again after configuring the auth info
5099            * and loop again */
5100           retry = TRUE;
5101         }
5102         break;
5103       default:
5104         break;
5105     }
5106   } while (retry == TRUE);
5107
5108   /* If the user requested the code, let them handle errors, otherwise
5109    * post an error below */
5110   if (code != NULL)
5111     *code = int_code;
5112   else if (int_code != GST_RTSP_STS_OK)
5113     goto error_response;
5114
5115   return res;
5116
5117   /* ERRORS */
5118 error:
5119   {
5120     GST_DEBUG_OBJECT (src, "got error %d", res);
5121     return res;
5122   }
5123 error_response:
5124   {
5125     res = GST_RTSP_ERROR;
5126
5127     switch (response->type_data.response.code) {
5128       case GST_RTSP_STS_NOT_FOUND:
5129         GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), ("%s",
5130                 response->type_data.response.reason));
5131         break;
5132       case GST_RTSP_STS_MOVED_PERMANENTLY:
5133       case GST_RTSP_STS_MOVE_TEMPORARILY:
5134       {
5135         gchar *new_location;
5136         GstRTSPLowerTrans transports;
5137
5138         GST_DEBUG_OBJECT (src, "got redirection");
5139         /* if we don't have a Location Header, we must error */
5140         if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_LOCATION,
5141                 &new_location, 0) < 0)
5142           break;
5143
5144         /* When we receive a redirect result, we go back to the INIT state after
5145          * parsing the new URI. The caller should do the needed steps to issue
5146          * a new setup when it detects this state change. */
5147         GST_DEBUG_OBJECT (src, "redirection to %s", new_location);
5148
5149         /* save current transports */
5150         if (src->conninfo.url)
5151           transports = src->conninfo.url->transports;
5152         else
5153           transports = GST_RTSP_LOWER_TRANS_UNKNOWN;
5154
5155         gst_rtspsrc_uri_set_uri (GST_URI_HANDLER (src), new_location, NULL);
5156
5157         /* set old transports */
5158         if (src->conninfo.url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN)
5159           src->conninfo.url->transports = transports;
5160
5161         src->need_redirect = TRUE;
5162         src->state = GST_RTSP_STATE_INIT;
5163         res = GST_RTSP_OK;
5164         break;
5165       }
5166       case GST_RTSP_STS_NOT_ACCEPTABLE:
5167       case GST_RTSP_STS_NOT_IMPLEMENTED:
5168       case GST_RTSP_STS_METHOD_NOT_ALLOWED:
5169         GST_WARNING_OBJECT (src, "got NOT IMPLEMENTED, disable method %s",
5170             gst_rtsp_method_as_text (method));
5171         src->methods &= ~method;
5172         res = GST_RTSP_OK;
5173         break;
5174       default:
5175         GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
5176             ("Got error response: %d (%s).", response->type_data.response.code,
5177                 response->type_data.response.reason));
5178         break;
5179     }
5180     /* if we return ERROR we should unset the response ourselves */
5181     if (res == GST_RTSP_ERROR)
5182       gst_rtsp_message_unset (response);
5183
5184     return res;
5185   }
5186 }
5187
5188 static GstRTSPResult
5189 gst_rtspsrc_send_cb (GstRTSPExtension * ext, GstRTSPMessage * request,
5190     GstRTSPMessage * response, GstRTSPSrc * src)
5191 {
5192   return gst_rtspsrc_send (src, src->conninfo.connection, request, response,
5193       NULL);
5194 }
5195
5196
5197 /* parse the response and collect all the supported methods. We need this
5198  * information so that we don't try to send an unsupported request to the
5199  * server.
5200  */
5201 static gboolean
5202 gst_rtspsrc_parse_methods (GstRTSPSrc * src, GstRTSPMessage * response)
5203 {
5204   GstRTSPHeaderField field;
5205   gchar *respoptions;
5206   gint indx = 0;
5207
5208   /* reset supported methods */
5209   src->methods = 0;
5210
5211   /* Try Allow Header first */
5212   field = GST_RTSP_HDR_ALLOW;
5213   while (TRUE) {
5214     respoptions = NULL;
5215     gst_rtsp_message_get_header (response, field, &respoptions, indx);
5216     if (indx == 0 && !respoptions) {
5217       /* if no Allow header was found then try the Public header... */
5218       field = GST_RTSP_HDR_PUBLIC;
5219       gst_rtsp_message_get_header (response, field, &respoptions, indx);
5220     }
5221     if (!respoptions)
5222       break;
5223
5224     src->methods |= gst_rtsp_options_from_text (respoptions);
5225
5226     indx++;
5227   }
5228
5229   if (src->methods == 0) {
5230     /* neither Allow nor Public are required, assume the server supports
5231      * at least DESCRIBE, SETUP, we always assume it supports PLAY as
5232      * well. */
5233     GST_DEBUG_OBJECT (src, "could not get OPTIONS");
5234     src->methods = GST_RTSP_DESCRIBE | GST_RTSP_SETUP;
5235   }
5236   /* always assume PLAY, FIXME, extensions should be able to override
5237    * this */
5238   src->methods |= GST_RTSP_PLAY;
5239   /* also assume it will support Range */
5240   src->seekable = TRUE;
5241
5242   /* we need describe and setup */
5243   if (!(src->methods & GST_RTSP_DESCRIBE))
5244     goto no_describe;
5245   if (!(src->methods & GST_RTSP_SETUP))
5246     goto no_setup;
5247
5248   return TRUE;
5249
5250   /* ERRORS */
5251 no_describe:
5252   {
5253     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
5254         ("Server does not support DESCRIBE."));
5255     return FALSE;
5256   }
5257 no_setup:
5258   {
5259     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
5260         ("Server does not support SETUP."));
5261     return FALSE;
5262   }
5263 }
5264
5265 /* masks to be kept in sync with the hardcoded protocol order of preference
5266  * in code below */
5267 static guint protocol_masks[] = {
5268   GST_RTSP_LOWER_TRANS_UDP,
5269   GST_RTSP_LOWER_TRANS_UDP_MCAST,
5270   GST_RTSP_LOWER_TRANS_TCP,
5271   0
5272 };
5273
5274 static GstRTSPResult
5275 gst_rtspsrc_create_transports_string (GstRTSPSrc * src,
5276     GstRTSPLowerTrans protocols, gchar ** transports)
5277 {
5278   GstRTSPResult res;
5279   GString *result;
5280   gboolean add_udp_str;
5281
5282   *transports = NULL;
5283
5284   res =
5285       gst_rtsp_ext_list_get_transports (src->extensions, protocols, transports);
5286
5287   if (res < 0)
5288     goto failed;
5289
5290   GST_DEBUG_OBJECT (src, "got transports %s", GST_STR_NULL (*transports));
5291
5292   /* extension listed transports, use those */
5293   if (*transports != NULL)
5294     return GST_RTSP_OK;
5295
5296   /* it's the default */
5297   add_udp_str = FALSE;
5298
5299   /* the default RTSP transports */
5300   result = g_string_new ("");
5301   if (protocols & GST_RTSP_LOWER_TRANS_UDP) {
5302     GST_DEBUG_OBJECT (src, "adding UDP unicast");
5303
5304     g_string_append (result, "RTP/AVP");
5305     if (add_udp_str)
5306       g_string_append (result, "/UDP");
5307     g_string_append (result, ";unicast;client_port=%%u1-%%u2");
5308   } else if (protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
5309     GST_DEBUG_OBJECT (src, "adding UDP multicast");
5310
5311     /* we don't have to allocate any UDP ports yet, if the selected transport
5312      * turns out to be multicast we can create them and join the multicast
5313      * group indicated in the transport reply */
5314     if (result->len > 0)
5315       g_string_append (result, ",");
5316     g_string_append (result, "RTP/AVP");
5317     if (add_udp_str)
5318       g_string_append (result, "/UDP");
5319     g_string_append (result, ";multicast");
5320     if (src->next_port_num != 0) {
5321       if (src->client_port_range.max > 0 &&
5322           src->next_port_num >= src->client_port_range.max)
5323         goto no_ports;
5324
5325       g_string_append_printf (result, ";client_port=%d-%d",
5326           src->next_port_num, src->next_port_num + 1);
5327     }
5328   } else if (protocols & GST_RTSP_LOWER_TRANS_TCP) {
5329     GST_DEBUG_OBJECT (src, "adding TCP");
5330
5331     if (result->len > 0)
5332       g_string_append (result, ",");
5333     g_string_append (result, "RTP/AVP/TCP;unicast;interleaved=%%i1-%%i2");
5334   }
5335   *transports = g_string_free (result, FALSE);
5336
5337   GST_DEBUG_OBJECT (src, "prepared transports %s", GST_STR_NULL (*transports));
5338
5339   return GST_RTSP_OK;
5340
5341   /* ERRORS */
5342 failed:
5343   {
5344     GST_ERROR ("extension gave error %d", res);
5345     return res;
5346   }
5347 no_ports:
5348   {
5349     GST_ERROR ("no more ports available");
5350     return GST_RTSP_ERROR;
5351   }
5352 }
5353
5354 static GstRTSPResult
5355 gst_rtspsrc_prepare_transports (GstRTSPStream * stream, gchar ** transports,
5356     gint orig_rtpport, gint orig_rtcpport)
5357 {
5358   GstRTSPSrc *src;
5359   gint nr_udp, nr_int;
5360   gchar *next, *p;
5361   gint rtpport = 0, rtcpport = 0;
5362   GString *str;
5363
5364   src = stream->parent;
5365
5366   /* find number of placeholders first */
5367   if (strstr (*transports, "%%i2"))
5368     nr_int = 2;
5369   else if (strstr (*transports, "%%i1"))
5370     nr_int = 1;
5371   else
5372     nr_int = 0;
5373
5374   if (strstr (*transports, "%%u2"))
5375     nr_udp = 2;
5376   else if (strstr (*transports, "%%u1"))
5377     nr_udp = 1;
5378   else
5379     nr_udp = 0;
5380
5381   if (nr_udp == 0 && nr_int == 0)
5382     goto done;
5383
5384   if (nr_udp > 0) {
5385     if (!orig_rtpport || !orig_rtcpport) {
5386       if (!gst_rtspsrc_alloc_udp_ports (stream, &rtpport, &rtcpport))
5387         goto failed;
5388     } else {
5389       rtpport = orig_rtpport;
5390       rtcpport = orig_rtcpport;
5391     }
5392   }
5393
5394   str = g_string_new ("");
5395   p = *transports;
5396   while ((next = strstr (p, "%%"))) {
5397     g_string_append_len (str, p, next - p);
5398     if (next[2] == 'u') {
5399       if (next[3] == '1')
5400         g_string_append_printf (str, "%d", rtpport);
5401       else if (next[3] == '2')
5402         g_string_append_printf (str, "%d", rtcpport);
5403     }
5404     if (next[2] == 'i') {
5405       if (next[3] == '1')
5406         g_string_append_printf (str, "%d", src->free_channel);
5407       else if (next[3] == '2')
5408         g_string_append_printf (str, "%d", src->free_channel + 1);
5409     }
5410
5411     p = next + 4;
5412   }
5413   /* append final part */
5414   g_string_append (str, p);
5415
5416   g_free (*transports);
5417   *transports = g_string_free (str, FALSE);
5418
5419 done:
5420   return GST_RTSP_OK;
5421
5422   /* ERRORS */
5423 failed:
5424   {
5425     GST_ERROR ("failed to allocate udp ports");
5426     return GST_RTSP_ERROR;
5427   }
5428 }
5429
5430 static gboolean
5431 gst_rtspsrc_stream_is_real_media (GstRTSPStream * stream)
5432 {
5433   gboolean res = FALSE;
5434
5435   if (stream->caps) {
5436     GstStructure *s;
5437     const gchar *enc = NULL;
5438
5439     s = gst_caps_get_structure (stream->caps, 0);
5440     if ((enc = gst_structure_get_string (s, "encoding-name"))) {
5441       res = (strstr (enc, "-REAL") != NULL);
5442     }
5443   }
5444   return res;
5445 }
5446
5447 /* Perform the SETUP request for all the streams.
5448  *
5449  * We ask the server for a specific transport, which initially includes all the
5450  * ones we can support (UDP/TCP/MULTICAST). For the UDP transport we allocate
5451  * two local UDP ports that we send to the server.
5452  *
5453  * Once the server replied with a transport, we configure the other streams
5454  * with the same transport.
5455  *
5456  * This function will also configure the stream for the selected transport,
5457  * which basically means creating the pipeline.
5458  */
5459 static GstRTSPResult
5460 gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
5461 {
5462   GList *walk;
5463   GstRTSPResult res = GST_RTSP_ERROR;
5464   GstRTSPMessage request = { 0 };
5465   GstRTSPMessage response = { 0 };
5466   GstRTSPStream *stream = NULL;
5467   GstRTSPLowerTrans protocols;
5468   GstRTSPStatusCode code;
5469   gboolean unsupported_real = FALSE;
5470   gint rtpport, rtcpport;
5471   GstRTSPUrl *url;
5472   gchar *hval;
5473
5474   if (src->conninfo.connection) {
5475     url = gst_rtsp_connection_get_url (src->conninfo.connection);
5476     /* we initially allow all configured lower transports. based on the URL
5477      * transports and the replies from the server we narrow them down. */
5478     protocols = url->transports & src->cur_protocols;
5479   } else {
5480     url = NULL;
5481     protocols = src->cur_protocols;
5482   }
5483
5484   if (protocols == 0)
5485     goto no_protocols;
5486
5487   /* reset some state */
5488   src->free_channel = 0;
5489   src->interleaved = FALSE;
5490   src->need_activate = FALSE;
5491   /* keep track of next port number, 0 is random */
5492   src->next_port_num = src->client_port_range.min;
5493   rtpport = rtcpport = 0;
5494
5495   if (G_UNLIKELY (src->streams == NULL))
5496     goto no_streams;
5497
5498   for (walk = src->streams; walk; walk = g_list_next (walk)) {
5499     GstRTSPConnection *conn;
5500     gchar *transports;
5501     gint retry = 0;
5502     guint mask = 0;
5503     gboolean selected;
5504
5505     stream = (GstRTSPStream *) walk->data;
5506
5507     /* see if we need to configure this stream */
5508     if (!gst_rtsp_ext_list_configure_stream (src->extensions, stream->caps)) {
5509       GST_DEBUG_OBJECT (src, "skipping stream %p, disabled by extension",
5510           stream);
5511       stream->disabled = TRUE;
5512       continue;
5513     }
5514
5515     g_signal_emit (src, gst_rtspsrc_signals[SIGNAL_SELECT_STREAM], 0,
5516         stream->id, stream->caps, &selected);
5517     if (!selected) {
5518       GST_DEBUG_OBJECT (src, "skipping stream %p, disabled by signal", stream);
5519       stream->disabled = TRUE;
5520       continue;
5521     }
5522     stream->disabled = FALSE;
5523
5524     /* merge/overwrite global caps */
5525     if (stream->caps) {
5526       guint j, num;
5527       GstStructure *s;
5528
5529       s = gst_caps_get_structure (stream->caps, 0);
5530
5531       num = gst_structure_n_fields (src->props);
5532       for (j = 0; j < num; j++) {
5533         const gchar *name;
5534         const GValue *val;
5535
5536         name = gst_structure_nth_field_name (src->props, j);
5537         val = gst_structure_get_value (src->props, name);
5538         gst_structure_set_value (s, name, val);
5539
5540         GST_DEBUG_OBJECT (src, "copied %s", name);
5541       }
5542     }
5543
5544     /* skip setup if we have no URL for it */
5545     if (stream->conninfo.location == NULL) {
5546       GST_DEBUG_OBJECT (src, "skipping stream %p, no setup", stream);
5547       continue;
5548     }
5549
5550     if (src->conninfo.connection == NULL) {
5551       if (!gst_rtsp_conninfo_connect (src, &stream->conninfo, async)) {
5552         GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream);
5553         continue;
5554       }
5555       conn = stream->conninfo.connection;
5556     } else {
5557       conn = src->conninfo.connection;
5558     }
5559     GST_DEBUG_OBJECT (src, "doing setup of stream %p with %s", stream,
5560         stream->conninfo.location);
5561
5562     /* if we have a multicast connection, only suggest multicast from now on */
5563     if (stream->is_multicast)
5564       protocols &= GST_RTSP_LOWER_TRANS_UDP_MCAST;
5565
5566   next_protocol:
5567     /* first selectable protocol */
5568     while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
5569       mask++;
5570     if (!protocol_masks[mask])
5571       goto no_protocols;
5572
5573   retry:
5574     GST_DEBUG_OBJECT (src, "protocols = 0x%x, protocol mask = 0x%x", protocols,
5575         protocol_masks[mask]);
5576     /* create a string with first transport in line */
5577     transports = NULL;
5578     res = gst_rtspsrc_create_transports_string (src,
5579         protocols & protocol_masks[mask], &transports);
5580     if (res < 0 || transports == NULL)
5581       goto setup_transport_failed;
5582
5583     if (strlen (transports) == 0) {
5584       g_free (transports);
5585       GST_DEBUG_OBJECT (src, "no transports found");
5586       mask++;
5587       goto next_protocol;
5588     }
5589
5590     GST_DEBUG_OBJECT (src, "replace ports in %s", GST_STR_NULL (transports));
5591
5592     /* replace placeholders with real values, this function will optionally
5593      * allocate UDP ports and other info needed to execute the setup request */
5594     res = gst_rtspsrc_prepare_transports (stream, &transports,
5595         retry > 0 ? rtpport : 0, retry > 0 ? rtcpport : 0);
5596     if (res < 0) {
5597       g_free (transports);
5598       goto setup_transport_failed;
5599     }
5600
5601     GST_DEBUG_OBJECT (src, "transport is now %s", GST_STR_NULL (transports));
5602
5603     /* create SETUP request */
5604     res =
5605         gst_rtsp_message_init_request (&request, GST_RTSP_SETUP,
5606         stream->conninfo.location);
5607     if (res < 0) {
5608       g_free (transports);
5609       goto create_request_failed;
5610     }
5611
5612     /* select transport */
5613     gst_rtsp_message_take_header (&request, GST_RTSP_HDR_TRANSPORT, transports);
5614
5615     /* if the user wants a non default RTP packet size we add the blocksize
5616      * parameter */
5617     if (src->rtp_blocksize > 0) {
5618       hval = g_strdup_printf ("%d", src->rtp_blocksize);
5619       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_BLOCKSIZE, hval);
5620     }
5621
5622     if (async)
5623       GST_ELEMENT_PROGRESS (src, CONTINUE, "request", ("SETUP stream %d",
5624               stream->id));
5625
5626     /* handle the code ourselves */
5627     if ((res = gst_rtspsrc_send (src, conn, &request, &response, &code) < 0))
5628       goto send_error;
5629
5630     switch (code) {
5631       case GST_RTSP_STS_OK:
5632         break;
5633       case GST_RTSP_STS_UNSUPPORTED_TRANSPORT:
5634         gst_rtsp_message_unset (&request);
5635         gst_rtsp_message_unset (&response);
5636         /* cleanup of leftover transport */
5637         gst_rtspsrc_stream_free_udp (stream);
5638         /* MS WMServer RTSP MUST use same UDP pair in all SETUP requests;
5639          * we might be in this case */
5640         if (stream->container && rtpport && rtcpport && !retry) {
5641           GST_DEBUG_OBJECT (src, "retrying with original port pair %u-%u",
5642               rtpport, rtcpport);
5643           retry++;
5644           goto retry;
5645         }
5646         /* this transport did not go down well, but we may have others to try
5647          * that we did not send yet, try those and only give up then
5648          * but not without checking for lost cause/extension so we can
5649          * post a nicer/more useful error message later */
5650         if (!unsupported_real)
5651           unsupported_real = gst_rtspsrc_stream_is_real_media (stream);
5652         /* select next available protocol, give up on this stream if none */
5653         mask++;
5654         while (protocol_masks[mask] && !(protocols & protocol_masks[mask]))
5655           mask++;
5656         if (!protocol_masks[mask] || unsupported_real)
5657           continue;
5658         else
5659           goto retry;
5660       default:
5661         /* cleanup of leftover transport and move to the next stream */
5662         gst_rtspsrc_stream_free_udp (stream);
5663         goto response_error;
5664     }
5665
5666     /* parse response transport */
5667     {
5668       gchar *resptrans = NULL;
5669       GstRTSPTransport transport = { 0 };
5670
5671       gst_rtsp_message_get_header (&response, GST_RTSP_HDR_TRANSPORT,
5672           &resptrans, 0);
5673       if (!resptrans) {
5674         gst_rtspsrc_stream_free_udp (stream);
5675         goto no_transport;
5676       }
5677
5678       /* parse transport, go to next stream on parse error */
5679       if (gst_rtsp_transport_parse (resptrans, &transport) != GST_RTSP_OK) {
5680         GST_WARNING_OBJECT (src, "failed to parse transport %s", resptrans);
5681         goto next;
5682       }
5683
5684       /* update allowed transports for other streams. once the transport of
5685        * one stream has been determined, we make sure that all other streams
5686        * are configured in the same way */
5687       switch (transport.lower_transport) {
5688         case GST_RTSP_LOWER_TRANS_TCP:
5689           GST_DEBUG_OBJECT (src, "stream %p as TCP interleaved", stream);
5690           protocols = GST_RTSP_LOWER_TRANS_TCP;
5691           src->interleaved = TRUE;
5692           /* update free channels */
5693           src->free_channel =
5694               MAX (transport.interleaved.min, src->free_channel);
5695           src->free_channel =
5696               MAX (transport.interleaved.max, src->free_channel);
5697           src->free_channel++;
5698           break;
5699         case GST_RTSP_LOWER_TRANS_UDP_MCAST:
5700           /* only allow multicast for other streams */
5701           GST_DEBUG_OBJECT (src, "stream %p as UDP multicast", stream);
5702           protocols = GST_RTSP_LOWER_TRANS_UDP_MCAST;
5703           /* if the server selected our ports, increment our counters so that
5704            * we select a new port later */
5705           if (src->next_port_num == transport.port.min &&
5706               src->next_port_num + 1 == transport.port.max) {
5707             src->next_port_num += 2;
5708           }
5709           break;
5710         case GST_RTSP_LOWER_TRANS_UDP:
5711           /* only allow unicast for other streams */
5712           GST_DEBUG_OBJECT (src, "stream %p as UDP unicast", stream);
5713           protocols = GST_RTSP_LOWER_TRANS_UDP;
5714           break;
5715         default:
5716           GST_DEBUG_OBJECT (src, "stream %p unknown transport %d", stream,
5717               transport.lower_transport);
5718           break;
5719       }
5720
5721       if (!stream->container || (!src->interleaved && !retry)) {
5722         /* now configure the stream with the selected transport */
5723         if (!gst_rtspsrc_stream_configure_transport (stream, &transport)) {
5724           GST_DEBUG_OBJECT (src,
5725               "could not configure stream %p transport, skipping stream",
5726               stream);
5727           goto next;
5728         } else if (stream->udpsrc[0] && stream->udpsrc[1]) {
5729           /* retain the first allocated UDP port pair */
5730           g_object_get (G_OBJECT (stream->udpsrc[0]), "port", &rtpport, NULL);
5731           g_object_get (G_OBJECT (stream->udpsrc[1]), "port", &rtcpport, NULL);
5732         }
5733       }
5734       /* we need to activate at least one streams when we detect activity */
5735       src->need_activate = TRUE;
5736     next:
5737       /* clean up our transport struct */
5738       gst_rtsp_transport_init (&transport);
5739       /* clean up used RTSP messages */
5740       gst_rtsp_message_unset (&request);
5741       gst_rtsp_message_unset (&response);
5742     }
5743   }
5744
5745   /* store the transport protocol that was configured */
5746   src->cur_protocols = protocols;
5747
5748   gst_rtsp_ext_list_stream_select (src->extensions, url);
5749
5750   /* if there is nothing to activate, error out */
5751   if (!src->need_activate)
5752     goto nothing_to_activate;
5753
5754   return res;
5755
5756   /* ERRORS */
5757 no_protocols:
5758   {
5759     /* no transport possible, post an error and stop */
5760     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
5761         ("Could not connect to server, no protocols left"));
5762     return GST_RTSP_ERROR;
5763   }
5764 no_streams:
5765   {
5766     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
5767         ("SDP contains no streams"));
5768     return GST_RTSP_ERROR;
5769   }
5770 create_request_failed:
5771   {
5772     gchar *str = gst_rtsp_strresult (res);
5773
5774     GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL),
5775         ("Could not create request. (%s)", str));
5776     g_free (str);
5777     goto cleanup_error;
5778   }
5779 setup_transport_failed:
5780   {
5781     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
5782         ("Could not setup transport."));
5783     res = GST_RTSP_ERROR;
5784     goto cleanup_error;
5785   }
5786 response_error:
5787   {
5788     const gchar *str = gst_rtsp_status_as_text (code);
5789
5790     GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
5791         ("Error (%d): %s", code, GST_STR_NULL (str)));
5792     res = GST_RTSP_ERROR;
5793     goto cleanup_error;
5794   }
5795 send_error:
5796   {
5797     gchar *str = gst_rtsp_strresult (res);
5798
5799     if (res != GST_RTSP_EINTR) {
5800       GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
5801           ("Could not send message. (%s)", str));
5802     } else {
5803       GST_WARNING_OBJECT (src, "send interrupted");
5804     }
5805     g_free (str);
5806     goto cleanup_error;
5807   }
5808 no_transport:
5809   {
5810     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
5811         ("Server did not select transport."));
5812     res = GST_RTSP_ERROR;
5813     goto cleanup_error;
5814   }
5815 nothing_to_activate:
5816   {
5817     /* none of the available error codes is really right .. */
5818     if (unsupported_real) {
5819       GST_ELEMENT_ERROR (src, STREAM, CODEC_NOT_FOUND,
5820           (_("No supported stream was found. You might need to install a "
5821                   "GStreamer RTSP extension plugin for Real media streams.")),
5822           (NULL));
5823     } else {
5824       GST_ELEMENT_ERROR (src, STREAM, CODEC_NOT_FOUND,
5825           (_("No supported stream was found. You might need to allow "
5826                   "more transport protocols or may otherwise be missing "
5827                   "the right GStreamer RTSP extension plugin.")), (NULL));
5828     }
5829     return GST_RTSP_ERROR;
5830   }
5831 cleanup_error:
5832   {
5833     gst_rtsp_message_unset (&request);
5834     gst_rtsp_message_unset (&response);
5835     return res;
5836   }
5837 }
5838
5839 static gboolean
5840 gst_rtspsrc_parse_range (GstRTSPSrc * src, const gchar * range,
5841     GstSegment * segment)
5842 {
5843   gint64 seconds;
5844   GstRTSPTimeRange *therange;
5845
5846   if (src->range)
5847     gst_rtsp_range_free (src->range);
5848
5849   if (gst_rtsp_range_parse (range, &therange) == GST_RTSP_OK) {
5850     GST_DEBUG_OBJECT (src, "parsed range %s", range);
5851     src->range = therange;
5852   } else {
5853     GST_DEBUG_OBJECT (src, "failed to parse range %s", range);
5854     src->range = NULL;
5855     gst_segment_init (segment, GST_FORMAT_TIME);
5856     return FALSE;
5857   }
5858
5859   GST_DEBUG_OBJECT (src, "range: type %d, min %f - type %d,  max %f ",
5860       therange->min.type, therange->min.seconds, therange->max.type,
5861       therange->max.seconds);
5862
5863   if (therange->min.type == GST_RTSP_TIME_NOW)
5864     seconds = 0;
5865   else if (therange->min.type == GST_RTSP_TIME_END)
5866     seconds = 0;
5867   else
5868     seconds = therange->min.seconds * GST_SECOND;
5869
5870   GST_DEBUG_OBJECT (src, "range: min %" GST_TIME_FORMAT,
5871       GST_TIME_ARGS (seconds));
5872
5873   /* we need to start playback without clipping from the position reported by
5874    * the server */
5875   segment->start = seconds;
5876   segment->position = seconds;
5877
5878   if (therange->max.type == GST_RTSP_TIME_NOW)
5879     seconds = -1;
5880   else if (therange->max.type == GST_RTSP_TIME_END)
5881     seconds = -1;
5882   else
5883     seconds = therange->max.seconds * GST_SECOND;
5884
5885   GST_DEBUG_OBJECT (src, "range: max %" GST_TIME_FORMAT,
5886       GST_TIME_ARGS (seconds));
5887
5888   /* live (WMS) server might send overflowed large max as its idea of infinity,
5889    * compensate to prevent problems later on */
5890   if (seconds != -1 && seconds < 0) {
5891     seconds = -1;
5892     GST_DEBUG_OBJECT (src, "insane range, set to NONE");
5893   }
5894
5895   /* live (WMS) might send min == max, which is not worth recording */
5896   if (segment->duration == -1 && seconds == segment->start)
5897     seconds = -1;
5898
5899   /* don't change duration with unknown value, we might have a valid value
5900    * there that we want to keep. */
5901   if (seconds != -1)
5902     segment->duration = seconds;
5903
5904   return TRUE;
5905 }
5906
5907 /* Parse clock profived by the server with following syntax:
5908  *
5909  * "GstNetTimeProvider <wrapped-clock> <server-IP:port> <clock-time>"
5910  */
5911 static gboolean
5912 gst_rtspsrc_parse_gst_clock (GstRTSPSrc * src, const gchar * gstclock)
5913 {
5914   gboolean res = FALSE;
5915
5916   if (g_str_has_prefix (gstclock, "GstNetTimeProvider ")) {
5917     gchar **fields = NULL, **parts = NULL;
5918     gchar *remote_ip, *str;
5919     gint port;
5920     GstClockTime base_time;
5921     GstClock *netclock;
5922
5923     fields = g_strsplit (gstclock, " ", 0);
5924
5925     /* wrapped clock, not very interesting for now */
5926     if (fields[1] == NULL)
5927       goto cleanup;
5928
5929     /* remote IP address and port */
5930     if ((str = fields[2]) == NULL)
5931       goto cleanup;
5932
5933     parts = g_strsplit (str, ":", 0);
5934
5935     if ((remote_ip = parts[0]) == NULL)
5936       goto cleanup;
5937
5938     if ((str = parts[1]) == NULL)
5939       goto cleanup;
5940
5941     port = atoi (str);
5942     if (port == 0)
5943       goto cleanup;
5944
5945     /* base-time */
5946     if ((str = fields[3]) == NULL)
5947       goto cleanup;
5948
5949     base_time = g_ascii_strtoull (str, NULL, 10);
5950
5951     netclock =
5952         gst_net_client_clock_new ((gchar *) "GstRTSPClock", remote_ip, port,
5953         base_time);
5954
5955     if (src->provided_clock)
5956       gst_object_unref (src->provided_clock);
5957     src->provided_clock = netclock;
5958
5959     gst_element_post_message (GST_ELEMENT_CAST (src),
5960         gst_message_new_clock_provide (GST_OBJECT_CAST (src),
5961             src->provided_clock, TRUE));
5962
5963     res = TRUE;
5964   cleanup:
5965     g_strfreev (fields);
5966     g_strfreev (parts);
5967   }
5968   return res;
5969 }
5970
5971 /* must be called with the RTSP state lock */
5972 static GstRTSPResult
5973 gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp,
5974     gboolean async)
5975 {
5976   GstRTSPResult res;
5977   gint i, n_streams;
5978
5979   /* prepare global stream caps properties */
5980   if (src->props)
5981     gst_structure_remove_all_fields (src->props);
5982   else
5983     src->props = gst_structure_new_empty ("RTSPProperties");
5984
5985   if (src->debug)
5986     gst_sdp_message_dump (sdp);
5987
5988   gst_rtsp_ext_list_parse_sdp (src->extensions, sdp, src->props);
5989
5990   /* let the app inspect and change the SDP */
5991   g_signal_emit (src, gst_rtspsrc_signals[SIGNAL_ON_SDP], 0, sdp);
5992
5993   gst_segment_init (&src->segment, GST_FORMAT_TIME);
5994
5995   /* parse range for duration reporting. */
5996   {
5997     const gchar *range;
5998
5999     for (i = 0;; i++) {
6000       range = gst_sdp_message_get_attribute_val_n (sdp, "range", i);
6001       if (range == NULL)
6002         break;
6003
6004       /* keep track of the range and configure it in the segment */
6005       if (gst_rtspsrc_parse_range (src, range, &src->segment))
6006         break;
6007     }
6008   }
6009   /* parse clock information. This is GStreamer specific, a server can tell the
6010    * client what clock it is using and wrap that in a network clock. The
6011    * advantage of that is that we can slave to it. */
6012   {
6013     const gchar *gstclock;
6014
6015     for (i = 0;; i++) {
6016       gstclock = gst_sdp_message_get_attribute_val_n (sdp, "x-gst-clock", i);
6017       if (gstclock == NULL)
6018         break;
6019
6020       /* parse the clock and expose it in the provide_clock method */
6021       if (gst_rtspsrc_parse_gst_clock (src, gstclock))
6022         break;
6023     }
6024   }
6025   /* try to find a global control attribute. Note that a '*' means that we should
6026    * do aggregate control with the current url (so we don't do anything and
6027    * leave the current connection as is) */
6028   {
6029     const gchar *control;
6030
6031     for (i = 0;; i++) {
6032       control = gst_sdp_message_get_attribute_val_n (sdp, "control", i);
6033       if (control == NULL)
6034         break;
6035
6036       /* only take fully qualified urls */
6037       if (g_str_has_prefix (control, "rtsp://"))
6038         break;
6039     }
6040     if (control) {
6041       g_free (src->conninfo.location);
6042       src->conninfo.location = g_strdup (control);
6043       /* make a connection for this, if there was a connection already, nothing
6044        * happens. */
6045       if (gst_rtsp_conninfo_connect (src, &src->conninfo, async) < 0) {
6046         GST_ERROR_OBJECT (src, "could not connect");
6047       }
6048     }
6049     /* we need to keep the control url separate from the connection url because
6050      * the rules for constructing the media control url need it */
6051     g_free (src->control);
6052     src->control = g_strdup (control);
6053   }
6054
6055   /* create streams */
6056   n_streams = gst_sdp_message_medias_len (sdp);
6057   for (i = 0; i < n_streams; i++) {
6058     gst_rtspsrc_create_stream (src, sdp, i);
6059   }
6060
6061   src->state = GST_RTSP_STATE_INIT;
6062
6063   /* setup streams */
6064   if ((res = gst_rtspsrc_setup_streams (src, async)) < 0)
6065     goto setup_failed;
6066
6067   /* reset our state */
6068   src->need_range = TRUE;
6069   src->skip = FALSE;
6070
6071   src->state = GST_RTSP_STATE_READY;
6072
6073   return res;
6074
6075   /* ERRORS */
6076 setup_failed:
6077   {
6078     GST_ERROR_OBJECT (src, "setup failed");
6079     gst_rtspsrc_cleanup (src);
6080     return res;
6081   }
6082 }
6083
6084 static GstRTSPResult
6085 gst_rtspsrc_retrieve_sdp (GstRTSPSrc * src, GstSDPMessage ** sdp,
6086     gboolean async)
6087 {
6088   GstRTSPResult res;
6089   GstRTSPMessage request = { 0 };
6090   GstRTSPMessage response = { 0 };
6091   guint8 *data;
6092   guint size;
6093   gchar *respcont = NULL;
6094
6095 restart:
6096   src->need_redirect = FALSE;
6097
6098   /* can't continue without a valid url */
6099   if (G_UNLIKELY (src->conninfo.url == NULL)) {
6100     res = GST_RTSP_EINVAL;
6101     goto no_url;
6102   }
6103   src->tried_url_auth = FALSE;
6104
6105   if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo, async)) < 0)
6106     goto connect_failed;
6107
6108   /* create OPTIONS */
6109   GST_DEBUG_OBJECT (src, "create options...");
6110   res =
6111       gst_rtsp_message_init_request (&request, GST_RTSP_OPTIONS,
6112       src->conninfo.url_str);
6113   if (res < 0)
6114     goto create_request_failed;
6115
6116   /* send OPTIONS */
6117   GST_DEBUG_OBJECT (src, "send options...");
6118
6119   if (async)
6120     GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving server options"));
6121
6122   if ((res =
6123           gst_rtspsrc_send (src, src->conninfo.connection, &request, &response,
6124               NULL)) < 0)
6125     goto send_error;
6126
6127   /* parse OPTIONS */
6128   if (!gst_rtspsrc_parse_methods (src, &response))
6129     goto methods_error;
6130
6131   /* create DESCRIBE */
6132   GST_DEBUG_OBJECT (src, "create describe...");
6133   res =
6134       gst_rtsp_message_init_request (&request, GST_RTSP_DESCRIBE,
6135       src->conninfo.url_str);
6136   if (res < 0)
6137     goto create_request_failed;
6138
6139   /* we only accept SDP for now */
6140   gst_rtsp_message_add_header (&request, GST_RTSP_HDR_ACCEPT,
6141       "application/sdp");
6142
6143   /* send DESCRIBE */
6144   GST_DEBUG_OBJECT (src, "send describe...");
6145
6146   if (async)
6147     GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving media info"));
6148
6149   if ((res =
6150           gst_rtspsrc_send (src, src->conninfo.connection, &request, &response,
6151               NULL)) < 0)
6152     goto send_error;
6153
6154   /* we only perform redirect for the describe, currently */
6155   if (src->need_redirect) {
6156     /* close connection, we don't have to send a TEARDOWN yet, ignore the
6157      * result. */
6158     gst_rtsp_conninfo_close (src, &src->conninfo, TRUE);
6159
6160     gst_rtsp_message_unset (&request);
6161     gst_rtsp_message_unset (&response);
6162
6163     /* and now retry */
6164     goto restart;
6165   }
6166
6167   /* it could be that the DESCRIBE method was not implemented */
6168   if (!src->methods & GST_RTSP_DESCRIBE)
6169     goto no_describe;
6170
6171   /* check if reply is SDP */
6172   gst_rtsp_message_get_header (&response, GST_RTSP_HDR_CONTENT_TYPE, &respcont,
6173       0);
6174   /* could not be set but since the request returned OK, we assume it
6175    * was SDP, else check it. */
6176   if (respcont) {
6177     if (!g_ascii_strcasecmp (respcont, "application/sdp") == 0)
6178       goto wrong_content_type;
6179   }
6180
6181   /* get message body and parse as SDP */
6182   gst_rtsp_message_get_body (&response, &data, &size);
6183   if (data == NULL || size == 0)
6184     goto no_describe;
6185
6186   GST_DEBUG_OBJECT (src, "parse SDP...");
6187   gst_sdp_message_new (sdp);
6188   gst_sdp_message_parse_buffer (data, size, *sdp);
6189
6190   /* clean up any messages */
6191   gst_rtsp_message_unset (&request);
6192   gst_rtsp_message_unset (&response);
6193
6194   return res;
6195
6196   /* ERRORS */
6197 no_url:
6198   {
6199     GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
6200         ("No valid RTSP URL was provided"));
6201     goto cleanup_error;
6202   }
6203 connect_failed:
6204   {
6205     gchar *str = gst_rtsp_strresult (res);
6206
6207     if (res != GST_RTSP_EINTR) {
6208       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
6209           ("Failed to connect. (%s)", str));
6210     } else {
6211       GST_WARNING_OBJECT (src, "connect interrupted");
6212     }
6213     g_free (str);
6214     goto cleanup_error;
6215   }
6216 create_request_failed:
6217   {
6218     gchar *str = gst_rtsp_strresult (res);
6219
6220     GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL),
6221         ("Could not create request. (%s)", str));
6222     g_free (str);
6223     goto cleanup_error;
6224   }
6225 send_error:
6226   {
6227     /* Don't post a message - the rtsp_send method will have
6228      * taken care of it because we passed NULL for the response code */
6229     goto cleanup_error;
6230   }
6231 methods_error:
6232   {
6233     /* error was posted */
6234     res = GST_RTSP_ERROR;
6235     goto cleanup_error;
6236   }
6237 wrong_content_type:
6238   {
6239     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
6240         ("Server does not support SDP, got %s.", respcont));
6241     res = GST_RTSP_ERROR;
6242     goto cleanup_error;
6243   }
6244 no_describe:
6245   {
6246     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
6247         ("Server can not provide an SDP."));
6248     res = GST_RTSP_ERROR;
6249     goto cleanup_error;
6250   }
6251 cleanup_error:
6252   {
6253     if (src->conninfo.connection) {
6254       GST_DEBUG_OBJECT (src, "free connection");
6255       gst_rtsp_conninfo_close (src, &src->conninfo, TRUE);
6256     }
6257     gst_rtsp_message_unset (&request);
6258     gst_rtsp_message_unset (&response);
6259     return res;
6260   }
6261 }
6262
6263 static GstRTSPResult
6264 gst_rtspsrc_open (GstRTSPSrc * src, gboolean async)
6265 {
6266   GstRTSPResult ret;
6267
6268   src->methods =
6269       GST_RTSP_SETUP | GST_RTSP_PLAY | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN;
6270
6271   if (src->sdp == NULL) {
6272     if ((ret = gst_rtspsrc_retrieve_sdp (src, &src->sdp, async)) < 0)
6273       goto no_sdp;
6274   }
6275
6276   if ((ret = gst_rtspsrc_open_from_sdp (src, src->sdp, async)) < 0)
6277     goto open_failed;
6278
6279 done:
6280   if (async)
6281     gst_rtspsrc_loop_end_cmd (src, CMD_OPEN, ret);
6282
6283   return ret;
6284
6285   /* ERRORS */
6286 no_sdp:
6287   {
6288     GST_WARNING_OBJECT (src, "can't get sdp");
6289     src->open_error = TRUE;
6290     goto done;
6291   }
6292 open_failed:
6293   {
6294     GST_WARNING_OBJECT (src, "can't setup streaming from sdp");
6295     src->open_error = TRUE;
6296     goto done;
6297   }
6298 }
6299
6300 static GstRTSPResult
6301 gst_rtspsrc_close (GstRTSPSrc * src, gboolean async, gboolean only_close)
6302 {
6303   GstRTSPMessage request = { 0 };
6304   GstRTSPMessage response = { 0 };
6305   GstRTSPResult res = GST_RTSP_OK;
6306   GList *walk;
6307   const gchar *control;
6308
6309   GST_DEBUG_OBJECT (src, "TEARDOWN...");
6310
6311   gst_rtspsrc_set_state (src, GST_STATE_READY);
6312
6313   if (src->state < GST_RTSP_STATE_READY) {
6314     GST_DEBUG_OBJECT (src, "not ready, doing cleanup");
6315     goto close;
6316   }
6317
6318   if (only_close)
6319     goto close;
6320
6321   /* construct a control url */
6322   control = get_aggregate_control (src);
6323
6324   if (!(src->methods & (GST_RTSP_PLAY | GST_RTSP_TEARDOWN)))
6325     goto not_supported;
6326
6327   for (walk = src->streams; walk; walk = g_list_next (walk)) {
6328     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
6329     const gchar *setup_url;
6330     GstRTSPConnInfo *info;
6331
6332     /* try aggregate control first but do non-aggregate control otherwise */
6333     if (control)
6334       setup_url = control;
6335     else if ((setup_url = stream->conninfo.location) == NULL)
6336       continue;
6337
6338     if (src->conninfo.connection) {
6339       info = &src->conninfo;
6340     } else if (stream->conninfo.connection) {
6341       info = &stream->conninfo;
6342     } else {
6343       continue;
6344     }
6345     if (!info->connected)
6346       goto next;
6347
6348     /* do TEARDOWN */
6349     res =
6350         gst_rtsp_message_init_request (&request, GST_RTSP_TEARDOWN, setup_url);
6351     if (res < 0)
6352       goto create_request_failed;
6353
6354     if (async)
6355       GST_ELEMENT_PROGRESS (src, CONTINUE, "close", ("Closing stream"));
6356
6357     if ((res =
6358             gst_rtspsrc_send (src, info->connection, &request, &response,
6359                 NULL)) < 0)
6360       goto send_error;
6361
6362     /* FIXME, parse result? */
6363     gst_rtsp_message_unset (&request);
6364     gst_rtsp_message_unset (&response);
6365
6366   next:
6367     /* early exit when we did aggregate control */
6368     if (control)
6369       break;
6370   }
6371
6372 close:
6373   /* close connections */
6374   GST_DEBUG_OBJECT (src, "closing connection...");
6375   gst_rtsp_conninfo_close (src, &src->conninfo, TRUE);
6376   for (walk = src->streams; walk; walk = g_list_next (walk)) {
6377     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
6378     gst_rtsp_conninfo_close (src, &stream->conninfo, TRUE);
6379   }
6380
6381   /* cleanup */
6382   gst_rtspsrc_cleanup (src);
6383
6384   src->state = GST_RTSP_STATE_INVALID;
6385
6386   if (async)
6387     gst_rtspsrc_loop_end_cmd (src, CMD_CLOSE, res);
6388
6389   return res;
6390
6391   /* ERRORS */
6392 create_request_failed:
6393   {
6394     gchar *str = gst_rtsp_strresult (res);
6395
6396     GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL),
6397         ("Could not create request. (%s)", str));
6398     g_free (str);
6399     goto close;
6400   }
6401 send_error:
6402   {
6403     gchar *str = gst_rtsp_strresult (res);
6404
6405     gst_rtsp_message_unset (&request);
6406     if (res != GST_RTSP_EINTR) {
6407       GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
6408           ("Could not send message. (%s)", str));
6409     } else {
6410       GST_WARNING_OBJECT (src, "TEARDOWN interrupted");
6411     }
6412     g_free (str);
6413     goto close;
6414   }
6415 not_supported:
6416   {
6417     GST_DEBUG_OBJECT (src,
6418         "TEARDOWN and PLAY not supported, can't do TEARDOWN");
6419     goto close;
6420   }
6421 }
6422
6423 /* RTP-Info is of the format:
6424  *
6425  * url=<URL>;[seq=<seqbase>;rtptime=<timebase>] [, url=...]
6426  *
6427  * rtptime corresponds to the timestamp for the NPT time given in the header
6428  * seqbase corresponds to the next sequence number we received. This number
6429  * indicates the first seqnum after the seek and should be used to discard
6430  * packets that are from before the seek.
6431  */
6432 static gboolean
6433 gst_rtspsrc_parse_rtpinfo (GstRTSPSrc * src, gchar * rtpinfo)
6434 {
6435   gchar **infos;
6436   gint i, j;
6437
6438   GST_DEBUG_OBJECT (src, "parsing RTP-Info %s", rtpinfo);
6439
6440   infos = g_strsplit (rtpinfo, ",", 0);
6441   for (i = 0; infos[i]; i++) {
6442     gchar **fields;
6443     GstRTSPStream *stream;
6444     gint32 seqbase;
6445     gint64 timebase;
6446
6447     GST_DEBUG_OBJECT (src, "parsing info %s", infos[i]);
6448
6449     /* init values, types of seqbase and timebase are bigger than needed so we
6450      * can store -1 as uninitialized values */
6451     stream = NULL;
6452     seqbase = -1;
6453     timebase = -1;
6454
6455     /* parse url, find stream for url.
6456      * parse seq and rtptime. The seq number should be configured in the rtp
6457      * depayloader or session manager to detect gaps. Same for the rtptime, it
6458      * should be used to create an initial time newsegment. */
6459     fields = g_strsplit (infos[i], ";", 0);
6460     for (j = 0; fields[j]; j++) {
6461       GST_DEBUG_OBJECT (src, "parsing field %s", fields[j]);
6462       /* remove leading whitespace */
6463       fields[j] = g_strchug (fields[j]);
6464       if (g_str_has_prefix (fields[j], "url=")) {
6465         /* get the url and the stream */
6466         stream =
6467             find_stream (src, (fields[j] + 4), (gpointer) find_stream_by_setup);
6468       } else if (g_str_has_prefix (fields[j], "seq=")) {
6469         seqbase = atoi (fields[j] + 4);
6470       } else if (g_str_has_prefix (fields[j], "rtptime=")) {
6471         timebase = g_ascii_strtoll (fields[j] + 8, NULL, 10);
6472       }
6473     }
6474     g_strfreev (fields);
6475     /* now we need to store the values for the caps of the stream */
6476     if (stream != NULL) {
6477       GST_DEBUG_OBJECT (src,
6478           "found stream %p, setting: seqbase %d, timebase %" G_GINT64_FORMAT,
6479           stream, seqbase, timebase);
6480
6481       /* we have a stream, configure detected params */
6482       stream->seqbase = seqbase;
6483       stream->timebase = timebase;
6484     }
6485   }
6486   g_strfreev (infos);
6487
6488   return TRUE;
6489 }
6490
6491 static void
6492 gst_rtspsrc_handle_rtcp_interval (GstRTSPSrc * src, gchar * rtcp)
6493 {
6494   guint64 interval;
6495   GList *walk;
6496
6497   interval = strtoul (rtcp, NULL, 10);
6498   GST_DEBUG_OBJECT (src, "rtcp interval: %" G_GUINT64_FORMAT " ms", interval);
6499
6500   if (!interval)
6501     return;
6502
6503   interval *= GST_MSECOND;
6504
6505   for (walk = src->streams; walk; walk = g_list_next (walk)) {
6506     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
6507
6508     /* already (optionally) retrieved this when configuring manager */
6509     if (stream->session) {
6510       GObject *rtpsession = stream->session;
6511
6512       GST_DEBUG_OBJECT (src, "configure rtcp interval in session %p",
6513           rtpsession);
6514       g_object_set (rtpsession, "rtcp-min-interval", interval, NULL);
6515     }
6516   }
6517
6518   /* now it happens that (Xenon) server sending this may also provide bogus
6519    * RTCP SR sync data (i.e. with quite some jitter), so never mind those
6520    * and just use RTP-Info to sync */
6521   if (src->manager) {
6522     GObjectClass *klass;
6523
6524     klass = G_OBJECT_GET_CLASS (G_OBJECT (src->manager));
6525     if (g_object_class_find_property (klass, "rtcp-sync")) {
6526       GST_DEBUG_OBJECT (src, "configuring rtp sync method");
6527       g_object_set (src->manager, "rtcp-sync", RTCP_SYNC_RTP, NULL);
6528     }
6529   }
6530 }
6531
6532 static gdouble
6533 gst_rtspsrc_get_float (const gchar * dstr)
6534 {
6535   gchar s[G_ASCII_DTOSTR_BUF_SIZE] = { 0, };
6536
6537   /* canonicalise floating point string so we can handle float strings
6538    * in the form "24.930" or "24,930" irrespective of the current locale */
6539   g_strlcpy (s, dstr, sizeof (s));
6540   g_strdelimit (s, ",", '.');
6541   return g_ascii_strtod (s, NULL);
6542 }
6543
6544 static gchar *
6545 gen_range_header (GstRTSPSrc * src, GstSegment * segment)
6546 {
6547   gchar val_str[G_ASCII_DTOSTR_BUF_SIZE] = { 0, };
6548
6549   if (src->range && src->range->min.type == GST_RTSP_TIME_NOW) {
6550     g_strlcpy (val_str, "now", sizeof (val_str));
6551   } else {
6552     if (segment->position == 0) {
6553       g_strlcpy (val_str, "0", sizeof (val_str));
6554     } else {
6555       g_ascii_dtostr (val_str, sizeof (val_str),
6556           ((gdouble) segment->position) / GST_SECOND);
6557     }
6558   }
6559   return g_strdup_printf ("npt=%s-", val_str);
6560 }
6561
6562 static void
6563 clear_rtp_base (GstRTSPSrc * src, GstRTSPStream * stream)
6564 {
6565   stream->timebase = -1;
6566   stream->seqbase = -1;
6567   if (stream->caps) {
6568     GstStructure *s;
6569
6570     stream->caps = gst_caps_make_writable (stream->caps);
6571     s = gst_caps_get_structure (stream->caps, 0);
6572     gst_structure_remove_fields (s, "clock-base", "seqnum-base", NULL);
6573   }
6574 }
6575
6576 static GstRTSPResult
6577 gst_rtspsrc_ensure_open (GstRTSPSrc * src, gboolean async)
6578 {
6579   GstRTSPResult res = GST_RTSP_OK;
6580
6581   if (src->state < GST_RTSP_STATE_READY) {
6582     res = GST_RTSP_ERROR;
6583     if (src->open_error) {
6584       GST_DEBUG_OBJECT (src, "the stream was in error");
6585       goto done;
6586     }
6587     if (async)
6588       gst_rtspsrc_loop_start_cmd (src, CMD_OPEN);
6589
6590     if ((res = gst_rtspsrc_open (src, async)) < 0) {
6591       GST_DEBUG_OBJECT (src, "failed to open stream");
6592       goto done;
6593     }
6594   }
6595
6596 done:
6597   return res;
6598 }
6599
6600 static GstRTSPResult
6601 gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async)
6602 {
6603   GstRTSPMessage request = { 0 };
6604   GstRTSPMessage response = { 0 };
6605   GstRTSPResult res = GST_RTSP_OK;
6606   GList *walk;
6607   gchar *hval;
6608   gint hval_idx;
6609   const gchar *control;
6610
6611   GST_DEBUG_OBJECT (src, "PLAY...");
6612
6613   if ((res = gst_rtspsrc_ensure_open (src, async)) < 0)
6614     goto open_failed;
6615
6616   if (!(src->methods & GST_RTSP_PLAY))
6617     goto not_supported;
6618
6619   if (src->state == GST_RTSP_STATE_PLAYING)
6620     goto was_playing;
6621
6622   if (!src->conninfo.connection || !src->conninfo.connected)
6623     goto done;
6624
6625   /* send some dummy packets before we activate the receive in the
6626    * udp sources */
6627   gst_rtspsrc_send_dummy_packets (src);
6628
6629   /* require new SR packets */
6630   if (src->manager)
6631     g_signal_emit_by_name (src->manager, "reset-sync", NULL);
6632
6633   gst_rtspsrc_set_state (src, GST_STATE_PLAYING);
6634
6635   /* construct a control url */
6636   control = get_aggregate_control (src);
6637
6638   for (walk = src->streams; walk; walk = g_list_next (walk)) {
6639     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
6640     const gchar *setup_url;
6641     GstRTSPConnection *conn;
6642
6643     /* try aggregate control first but do non-aggregate control otherwise */
6644     if (control)
6645       setup_url = control;
6646     else if ((setup_url = stream->conninfo.location) == NULL)
6647       continue;
6648
6649     if (src->conninfo.connection) {
6650       conn = src->conninfo.connection;
6651     } else if (stream->conninfo.connection) {
6652       conn = stream->conninfo.connection;
6653     } else {
6654       continue;
6655     }
6656
6657     /* do play */
6658     res = gst_rtsp_message_init_request (&request, GST_RTSP_PLAY, setup_url);
6659     if (res < 0)
6660       goto create_request_failed;
6661
6662     if (src->need_range) {
6663       hval = gen_range_header (src, segment);
6664
6665       gst_rtsp_message_take_header (&request, GST_RTSP_HDR_RANGE, hval);
6666
6667       /* store the newsegment event so it can be sent from the streaming thread. */
6668       if (src->start_segment)
6669         gst_event_unref (src->start_segment);
6670       src->start_segment = gst_event_new_segment (&src->segment);
6671     }
6672
6673     if (segment->rate != 1.0) {
6674       gchar hval[G_ASCII_DTOSTR_BUF_SIZE];
6675
6676       g_ascii_dtostr (hval, sizeof (hval), segment->rate);
6677       if (src->skip)
6678         gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SCALE, hval);
6679       else
6680         gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SPEED, hval);
6681     }
6682
6683     if (async)
6684       GST_ELEMENT_PROGRESS (src, CONTINUE, "request", ("Sending PLAY request"));
6685
6686     if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0)
6687       goto send_error;
6688
6689     /* seek may have silently failed as it is not supported */
6690     if (!(src->methods & GST_RTSP_PLAY)) {
6691       GST_DEBUG_OBJECT (src, "PLAY Range not supported; re-enable PLAY");
6692       /* obviously it is supported as we made it here */
6693       src->methods |= GST_RTSP_PLAY;
6694       src->seekable = FALSE;
6695       /* but there is nothing to parse in the response,
6696        * so convey we have no idea and not to expect anything particular */
6697       clear_rtp_base (src, stream);
6698       if (control) {
6699         GList *run;
6700
6701         /* need to do for all streams */
6702         for (run = src->streams; run; run = g_list_next (run))
6703           clear_rtp_base (src, (GstRTSPStream *) run->data);
6704       }
6705       /* NOTE the above also disables npt based eos detection */
6706       /* and below forces position to 0,
6707        * which is visible feedback we lost the plot */
6708       segment->start = segment->position = src->last_pos;
6709     }
6710
6711     gst_rtsp_message_unset (&request);
6712
6713     /* parse RTP npt field. This is the current position in the stream (Normal
6714      * Play Time) and should be put in the NEWSEGMENT position field. */
6715     if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RANGE, &hval,
6716             0) == GST_RTSP_OK)
6717       gst_rtspsrc_parse_range (src, hval, segment);
6718
6719     /* assume 1.0 rate now, overwrite when the SCALE or SPEED headers are present. */
6720     segment->rate = 1.0;
6721
6722     /* parse Speed header. This is the intended playback rate of the stream
6723      * and should be put in the NEWSEGMENT rate field. */
6724     if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_SPEED, &hval,
6725             0) == GST_RTSP_OK) {
6726       segment->rate = gst_rtspsrc_get_float (hval);
6727     } else if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_SCALE,
6728             &hval, 0) == GST_RTSP_OK) {
6729       segment->rate = gst_rtspsrc_get_float (hval);
6730     }
6731
6732     /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp
6733      * for the RTP packets. If this is not present, we assume all starts from 0...
6734      * This is info for the RTP session manager that we pass to it in caps. */
6735     hval_idx = 0;
6736     while (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTP_INFO,
6737             &hval, hval_idx++) == GST_RTSP_OK)
6738       gst_rtspsrc_parse_rtpinfo (src, hval);
6739
6740     /* some servers indicate RTCP parameters in PLAY response,
6741      * rather than properly in SDP */
6742     if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_RTCP_INTERVAL,
6743             &hval, 0) == GST_RTSP_OK)
6744       gst_rtspsrc_handle_rtcp_interval (src, hval);
6745
6746     gst_rtsp_message_unset (&response);
6747
6748     /* early exit when we did aggregate control */
6749     if (control)
6750       break;
6751   }
6752   /* configure the caps of the streams after we parsed all headers. Only reset
6753    * the manager object when we set a new Range header (we did a seek) */
6754   gst_rtspsrc_configure_caps (src, segment, src->need_range);
6755
6756   /* set again when needed */
6757   src->need_range = FALSE;
6758
6759   src->running = TRUE;
6760   src->base_time = -1;
6761   src->state = GST_RTSP_STATE_PLAYING;
6762
6763   /* mark discont */
6764   GST_DEBUG_OBJECT (src, "mark DISCONT, we did a seek to another position");
6765   for (walk = src->streams; walk; walk = g_list_next (walk)) {
6766     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
6767     stream->discont = TRUE;
6768   }
6769
6770 done:
6771   if (async)
6772     gst_rtspsrc_loop_end_cmd (src, CMD_PLAY, res);
6773
6774   return res;
6775
6776   /* ERRORS */
6777 open_failed:
6778   {
6779     GST_DEBUG_OBJECT (src, "failed to open stream");
6780     goto done;
6781   }
6782 not_supported:
6783   {
6784     GST_DEBUG_OBJECT (src, "PLAY is not supported");
6785     goto done;
6786   }
6787 was_playing:
6788   {
6789     GST_DEBUG_OBJECT (src, "we were already PLAYING");
6790     goto done;
6791   }
6792 create_request_failed:
6793   {
6794     gchar *str = gst_rtsp_strresult (res);
6795
6796     GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL),
6797         ("Could not create request. (%s)", str));
6798     g_free (str);
6799     goto done;
6800   }
6801 send_error:
6802   {
6803     gchar *str = gst_rtsp_strresult (res);
6804
6805     gst_rtsp_message_unset (&request);
6806     if (res != GST_RTSP_EINTR) {
6807       GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
6808           ("Could not send message. (%s)", str));
6809     } else {
6810       GST_WARNING_OBJECT (src, "PLAY interrupted");
6811     }
6812     g_free (str);
6813     goto done;
6814   }
6815 }
6816
6817 static GstRTSPResult
6818 gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
6819 {
6820   GstRTSPResult res = GST_RTSP_OK;
6821   GstRTSPMessage request = { 0 };
6822   GstRTSPMessage response = { 0 };
6823   GList *walk;
6824   const gchar *control;
6825
6826   GST_DEBUG_OBJECT (src, "PAUSE...");
6827
6828   if ((res = gst_rtspsrc_ensure_open (src, async)) < 0)
6829     goto open_failed;
6830
6831   if (!(src->methods & GST_RTSP_PAUSE))
6832     goto not_supported;
6833
6834   if (src->state == GST_RTSP_STATE_READY)
6835     goto was_paused;
6836
6837   if (!src->conninfo.connection || !src->conninfo.connected)
6838     goto no_connection;
6839
6840   /* construct a control url */
6841   control = get_aggregate_control (src);
6842
6843   /* loop over the streams. We might exit the loop early when we could do an
6844    * aggregate control */
6845   for (walk = src->streams; walk; walk = g_list_next (walk)) {
6846     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
6847     GstRTSPConnection *conn;
6848     const gchar *setup_url;
6849
6850     /* try aggregate control first but do non-aggregate control otherwise */
6851     if (control)
6852       setup_url = control;
6853     else if ((setup_url = stream->conninfo.location) == NULL)
6854       continue;
6855
6856     if (src->conninfo.connection) {
6857       conn = src->conninfo.connection;
6858     } else if (stream->conninfo.connection) {
6859       conn = stream->conninfo.connection;
6860     } else {
6861       continue;
6862     }
6863
6864     if (async)
6865       GST_ELEMENT_PROGRESS (src, CONTINUE, "request",
6866           ("Sending PAUSE request"));
6867
6868     if ((res =
6869             gst_rtsp_message_init_request (&request, GST_RTSP_PAUSE,
6870                 setup_url)) < 0)
6871       goto create_request_failed;
6872
6873     if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0)
6874       goto send_error;
6875
6876     gst_rtsp_message_unset (&request);
6877     gst_rtsp_message_unset (&response);
6878
6879     /* exit early when we did agregate control */
6880     if (control)
6881       break;
6882   }
6883
6884   /* change element states now */
6885   gst_rtspsrc_set_state (src, GST_STATE_PAUSED);
6886
6887 no_connection:
6888   src->state = GST_RTSP_STATE_READY;
6889
6890 done:
6891   if (async)
6892     gst_rtspsrc_loop_end_cmd (src, CMD_PAUSE, res);
6893
6894   return res;
6895
6896   /* ERRORS */
6897 open_failed:
6898   {
6899     GST_DEBUG_OBJECT (src, "failed to open stream");
6900     goto done;
6901   }
6902 not_supported:
6903   {
6904     GST_DEBUG_OBJECT (src, "PAUSE is not supported");
6905     goto done;
6906   }
6907 was_paused:
6908   {
6909     GST_DEBUG_OBJECT (src, "we were already PAUSED");
6910     goto done;
6911   }
6912 create_request_failed:
6913   {
6914     gchar *str = gst_rtsp_strresult (res);
6915
6916     GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL),
6917         ("Could not create request. (%s)", str));
6918     g_free (str);
6919     goto done;
6920   }
6921 send_error:
6922   {
6923     gchar *str = gst_rtsp_strresult (res);
6924
6925     gst_rtsp_message_unset (&request);
6926     if (res != GST_RTSP_EINTR) {
6927       GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
6928           ("Could not send message. (%s)", str));
6929     } else {
6930       GST_WARNING_OBJECT (src, "PAUSE interrupted");
6931     }
6932     g_free (str);
6933     goto done;
6934   }
6935 }
6936
6937 static void
6938 gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
6939 {
6940   GstRTSPSrc *rtspsrc;
6941
6942   rtspsrc = GST_RTSPSRC (bin);
6943
6944   switch (GST_MESSAGE_TYPE (message)) {
6945     case GST_MESSAGE_EOS:
6946       gst_message_unref (message);
6947       break;
6948     case GST_MESSAGE_ELEMENT:
6949     {
6950       const GstStructure *s = gst_message_get_structure (message);
6951
6952       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
6953         gboolean ignore_timeout;
6954
6955         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
6956
6957         GST_OBJECT_LOCK (rtspsrc);
6958         ignore_timeout = rtspsrc->ignore_timeout;
6959         rtspsrc->ignore_timeout = TRUE;
6960         GST_OBJECT_UNLOCK (rtspsrc);
6961
6962         /* we only act on the first udp timeout message, others are irrelevant
6963          * and can be ignored. */
6964         if (!ignore_timeout)
6965           gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT, CMD_LOOP);
6966         /* eat and free */
6967         gst_message_unref (message);
6968         return;
6969       }
6970       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
6971       break;
6972     }
6973     case GST_MESSAGE_ERROR:
6974     {
6975       GstObject *udpsrc;
6976       GstRTSPStream *stream;
6977       GstFlowReturn ret;
6978
6979       udpsrc = GST_MESSAGE_SRC (message);
6980
6981       GST_DEBUG_OBJECT (rtspsrc, "got error from %s",
6982           GST_ELEMENT_NAME (udpsrc));
6983
6984       stream = find_stream (rtspsrc, udpsrc, (gpointer) find_stream_by_udpsrc);
6985       if (!stream)
6986         goto forward;
6987
6988       /* we ignore the RTCP udpsrc */
6989       if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
6990         goto done;
6991
6992       /* if we get error messages from the udp sources, that's not a problem as
6993        * long as not all of them error out. We also don't really know what the
6994        * problem is, the message does not give enough detail... */
6995       ret = gst_rtspsrc_combine_flows (rtspsrc, stream, GST_FLOW_NOT_LINKED);
6996       GST_DEBUG_OBJECT (rtspsrc, "combined flows: %s", gst_flow_get_name (ret));
6997       if (ret != GST_FLOW_OK)
6998         goto forward;
6999
7000     done:
7001       gst_message_unref (message);
7002       break;
7003
7004     forward:
7005       /* fatal but not our message, forward */
7006       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
7007       break;
7008     }
7009     default:
7010     {
7011       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
7012       break;
7013     }
7014   }
7015 }
7016
7017 /* the thread where everything happens */
7018 static void
7019 gst_rtspsrc_thread (GstRTSPSrc * src)
7020 {
7021   gint cmd;
7022
7023   GST_OBJECT_LOCK (src);
7024   cmd = src->pending_cmd;
7025   if (cmd == CMD_RECONNECT || cmd == CMD_PLAY || cmd == CMD_PAUSE
7026       || cmd == CMD_LOOP || cmd == CMD_OPEN)
7027     src->pending_cmd = CMD_LOOP;
7028   else
7029     src->pending_cmd = CMD_WAIT;
7030   GST_DEBUG_OBJECT (src, "got command %d", cmd);
7031
7032   /* we got the message command, so ensure communication is possible again */
7033   gst_rtspsrc_connection_flush (src, FALSE);
7034
7035   src->busy_cmd = cmd;
7036   GST_OBJECT_UNLOCK (src);
7037
7038   switch (cmd) {
7039     case CMD_OPEN:
7040       gst_rtspsrc_open (src, TRUE);
7041       break;
7042     case CMD_PLAY:
7043       gst_rtspsrc_play (src, &src->segment, TRUE);
7044       break;
7045     case CMD_PAUSE:
7046       gst_rtspsrc_pause (src, TRUE);
7047       break;
7048     case CMD_CLOSE:
7049       gst_rtspsrc_close (src, TRUE, FALSE);
7050       break;
7051     case CMD_LOOP:
7052       gst_rtspsrc_loop (src);
7053       break;
7054     case CMD_RECONNECT:
7055       gst_rtspsrc_reconnect (src, FALSE);
7056       break;
7057     default:
7058       break;
7059   }
7060
7061   GST_OBJECT_LOCK (src);
7062   /* and go back to sleep */
7063   if (src->pending_cmd == CMD_WAIT) {
7064     if (src->task)
7065       gst_task_pause (src->task);
7066   }
7067   /* reset waiting */
7068   src->busy_cmd = CMD_WAIT;
7069   GST_OBJECT_UNLOCK (src);
7070 }
7071
7072 static gboolean
7073 gst_rtspsrc_start (GstRTSPSrc * src)
7074 {
7075   GST_DEBUG_OBJECT (src, "starting");
7076
7077   GST_OBJECT_LOCK (src);
7078
7079   src->pending_cmd = CMD_WAIT;
7080
7081   if (src->task == NULL) {
7082     src->task = gst_task_new ((GstTaskFunction) gst_rtspsrc_thread, src, NULL);
7083     if (src->task == NULL)
7084       goto task_error;
7085
7086     gst_task_set_lock (src->task, GST_RTSP_STREAM_GET_LOCK (src));
7087   }
7088   GST_OBJECT_UNLOCK (src);
7089
7090   return TRUE;
7091
7092   /* ERRORS */
7093 task_error:
7094   {
7095     GST_ERROR_OBJECT (src, "failed to create task");
7096     return FALSE;
7097   }
7098 }
7099
7100 static gboolean
7101 gst_rtspsrc_stop (GstRTSPSrc * src)
7102 {
7103   GstTask *task;
7104
7105   GST_DEBUG_OBJECT (src, "stopping");
7106
7107   /* also cancels pending task */
7108   gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, CMD_ALL);
7109
7110   GST_OBJECT_LOCK (src);
7111   if ((task = src->task)) {
7112     src->task = NULL;
7113     GST_OBJECT_UNLOCK (src);
7114
7115     gst_task_stop (task);
7116
7117     /* make sure it is not running */
7118     GST_RTSP_STREAM_LOCK (src);
7119     GST_RTSP_STREAM_UNLOCK (src);
7120
7121     /* now wait for the task to finish */
7122     gst_task_join (task);
7123
7124     /* and free the task */
7125     gst_object_unref (GST_OBJECT (task));
7126
7127     GST_OBJECT_LOCK (src);
7128   }
7129   GST_OBJECT_UNLOCK (src);
7130
7131   /* ensure synchronously all is closed and clean */
7132   gst_rtspsrc_close (src, FALSE, TRUE);
7133
7134   return TRUE;
7135 }
7136
7137 static GstStateChangeReturn
7138 gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
7139 {
7140   GstRTSPSrc *rtspsrc;
7141   GstStateChangeReturn ret;
7142
7143   rtspsrc = GST_RTSPSRC (element);
7144
7145   switch (transition) {
7146     case GST_STATE_CHANGE_NULL_TO_READY:
7147       if (!gst_rtspsrc_start (rtspsrc))
7148         goto start_failed;
7149       break;
7150     case GST_STATE_CHANGE_READY_TO_PAUSED:
7151       /* init some state */
7152       rtspsrc->cur_protocols = rtspsrc->protocols;
7153       /* first attempt, don't ignore timeouts */
7154       rtspsrc->ignore_timeout = FALSE;
7155       rtspsrc->open_error = FALSE;
7156       gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_OPEN, 0);
7157       break;
7158     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
7159       set_manager_buffer_mode (rtspsrc);
7160       /* fall-through */
7161     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
7162       /* unblock the tcp tasks and make the loop waiting */
7163       if (gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_WAIT, CMD_LOOP)) {
7164         /* make sure it is waiting before we send PAUSE or PLAY below */
7165         GST_RTSP_STREAM_LOCK (rtspsrc);
7166         GST_RTSP_STREAM_UNLOCK (rtspsrc);
7167       }
7168       break;
7169     case GST_STATE_CHANGE_PAUSED_TO_READY:
7170       break;
7171     default:
7172       break;
7173   }
7174
7175   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
7176   if (ret == GST_STATE_CHANGE_FAILURE)
7177     goto done;
7178
7179   switch (transition) {
7180     case GST_STATE_CHANGE_NULL_TO_READY:
7181       ret = GST_STATE_CHANGE_SUCCESS;
7182       break;
7183     case GST_STATE_CHANGE_READY_TO_PAUSED:
7184       ret = GST_STATE_CHANGE_NO_PREROLL;
7185       break;
7186     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
7187       gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PLAY, 0);
7188       ret = GST_STATE_CHANGE_SUCCESS;
7189       break;
7190     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
7191       /* send pause request and keep the idle task around */
7192       gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PAUSE, CMD_LOOP);
7193       ret = GST_STATE_CHANGE_NO_PREROLL;
7194       break;
7195     case GST_STATE_CHANGE_PAUSED_TO_READY:
7196       gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE, CMD_PAUSE);
7197       ret = GST_STATE_CHANGE_SUCCESS;
7198       break;
7199     case GST_STATE_CHANGE_READY_TO_NULL:
7200       gst_rtspsrc_stop (rtspsrc);
7201       ret = GST_STATE_CHANGE_SUCCESS;
7202       break;
7203     default:
7204       break;
7205   }
7206
7207 done:
7208   return ret;
7209
7210 start_failed:
7211   {
7212     GST_DEBUG_OBJECT (rtspsrc, "start failed");
7213     return GST_STATE_CHANGE_FAILURE;
7214   }
7215 }
7216
7217 static gboolean
7218 gst_rtspsrc_send_event (GstElement * element, GstEvent * event)
7219 {
7220   gboolean res;
7221   GstRTSPSrc *rtspsrc;
7222
7223   rtspsrc = GST_RTSPSRC (element);
7224
7225   if (GST_EVENT_IS_DOWNSTREAM (event)) {
7226     res = gst_rtspsrc_push_event (rtspsrc, event);
7227   } else {
7228     res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
7229   }
7230
7231   return res;
7232 }
7233
7234
7235 /*** GSTURIHANDLER INTERFACE *************************************************/
7236
7237 static GstURIType
7238 gst_rtspsrc_uri_get_type (GType type)
7239 {
7240   return GST_URI_SRC;
7241 }
7242
7243 static const gchar *const *
7244 gst_rtspsrc_uri_get_protocols (GType type)
7245 {
7246   static const gchar *protocols[] =
7247       { "rtsp", "rtspu", "rtspt", "rtsph", "rtsp-sdp",
7248     "rtsps", "rtspsu", "rtspst", "rtspsh", NULL
7249   };
7250
7251   return protocols;
7252 }
7253
7254 static gchar *
7255 gst_rtspsrc_uri_get_uri (GstURIHandler * handler)
7256 {
7257   GstRTSPSrc *src = GST_RTSPSRC (handler);
7258
7259   /* FIXME: make thread-safe */
7260   return g_strdup (src->conninfo.location);
7261 }
7262
7263 static gboolean
7264 gst_rtspsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri,
7265     GError ** error)
7266 {
7267   GstRTSPSrc *src;
7268   GstRTSPResult res;
7269   GstRTSPUrl *newurl = NULL;
7270   GstSDPMessage *sdp = NULL;
7271
7272   src = GST_RTSPSRC (handler);
7273
7274   /* same URI, we're fine */
7275   if (src->conninfo.location && uri && !strcmp (uri, src->conninfo.location))
7276     goto was_ok;
7277
7278   if (g_str_has_prefix (uri, "rtsp-sdp://")) {
7279     if ((res = gst_sdp_message_new (&sdp) < 0))
7280       goto sdp_failed;
7281
7282     GST_DEBUG_OBJECT (src, "parsing SDP message");
7283     if ((res = gst_sdp_message_parse_uri (uri, sdp) < 0))
7284       goto invalid_sdp;
7285   } else {
7286     /* try to parse */
7287     GST_DEBUG_OBJECT (src, "parsing URI");
7288     if ((res = gst_rtsp_url_parse (uri, &newurl)) < 0)
7289       goto parse_error;
7290   }
7291
7292   /* if worked, free previous and store new url object along with the original
7293    * location. */
7294   GST_DEBUG_OBJECT (src, "configuring URI");
7295   g_free (src->conninfo.location);
7296   src->conninfo.location = g_strdup (uri);
7297   gst_rtsp_url_free (src->conninfo.url);
7298   src->conninfo.url = newurl;
7299   g_free (src->conninfo.url_str);
7300   if (newurl)
7301     src->conninfo.url_str = gst_rtsp_url_get_request_uri (src->conninfo.url);
7302   else
7303     src->conninfo.url_str = NULL;
7304
7305   if (src->sdp)
7306     gst_sdp_message_free (src->sdp);
7307   src->sdp = sdp;
7308   src->from_sdp = sdp != NULL;
7309
7310   GST_DEBUG_OBJECT (src, "set uri: %s", GST_STR_NULL (uri));
7311   GST_DEBUG_OBJECT (src, "request uri is: %s",
7312       GST_STR_NULL (src->conninfo.url_str));
7313
7314   return TRUE;
7315
7316   /* Special cases */
7317 was_ok:
7318   {
7319     GST_DEBUG_OBJECT (src, "URI was ok: '%s'", GST_STR_NULL (uri));
7320     return TRUE;
7321   }
7322 sdp_failed:
7323   {
7324     GST_ERROR_OBJECT (src, "Could not create new SDP (%d)", res);
7325     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
7326         "Could not create SDP");
7327     return FALSE;
7328   }
7329 invalid_sdp:
7330   {
7331     GST_ERROR_OBJECT (src, "Not a valid SDP (%d) '%s'", res,
7332         GST_STR_NULL (uri));
7333     gst_sdp_message_free (sdp);
7334     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
7335         "Invalid SDP");
7336     return FALSE;
7337   }
7338 parse_error:
7339   {
7340     GST_ERROR_OBJECT (src, "Not a valid RTSP url '%s' (%d)",
7341         GST_STR_NULL (uri), res);
7342     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
7343         "Invalid RTSP URI");
7344     return FALSE;
7345   }
7346 }
7347
7348 static void
7349 gst_rtspsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
7350 {
7351   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
7352
7353   iface->get_type = gst_rtspsrc_uri_get_type;
7354   iface->get_protocols = gst_rtspsrc_uri_get_protocols;
7355   iface->get_uri = gst_rtspsrc_uri_get_uri;
7356   iface->set_uri = gst_rtspsrc_uri_set_uri;
7357 }