Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED         FALSE
29 #define DEFAULT_REUSABLE       FALSE
30 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN   FALSE
33 #define DEFAULT_BUFFER_SIZE    0x80000
34
35 /* define to dump received RTCP packets */
36 #undef DUMP_STATS
37
38 enum
39 {
40   PROP_0,
41   PROP_SHARED,
42   PROP_REUSABLE,
43   PROP_PROTOCOLS,
44   PROP_EOS_SHUTDOWN,
45   PROP_BUFFER_SIZE,
46   PROP_LAST
47 };
48
49 enum
50 {
51   SIGNAL_PREPARED,
52   SIGNAL_UNPREPARED,
53   SIGNAL_NEW_STATE,
54   SIGNAL_LAST
55 };
56
57 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
58 #define GST_CAT_DEFAULT rtsp_media_debug
59
60 static GQuark ssrc_stream_map_key;
61
62 static void gst_rtsp_media_get_property (GObject * object, guint propid,
63     GValue * value, GParamSpec * pspec);
64 static void gst_rtsp_media_set_property (GObject * object, guint propid,
65     const GValue * value, GParamSpec * pspec);
66 static void gst_rtsp_media_finalize (GObject * obj);
67
68 static gpointer do_loop (GstRTSPMediaClass * klass);
69 static gboolean default_handle_message (GstRTSPMedia * media,
70     GstMessage * message);
71 static gboolean default_unprepare (GstRTSPMedia * media);
72 static void unlock_streams (GstRTSPMedia * media);
73
74 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
75
76 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
77
78 static void
79 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
80 {
81   GObjectClass *gobject_class;
82   GError *error = NULL;
83
84   gobject_class = G_OBJECT_CLASS (klass);
85
86   gobject_class->get_property = gst_rtsp_media_get_property;
87   gobject_class->set_property = gst_rtsp_media_set_property;
88   gobject_class->finalize = gst_rtsp_media_finalize;
89
90   g_object_class_install_property (gobject_class, PROP_SHARED,
91       g_param_spec_boolean ("shared", "Shared",
92           "If this media pipeline can be shared", DEFAULT_SHARED,
93           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
94
95   g_object_class_install_property (gobject_class, PROP_REUSABLE,
96       g_param_spec_boolean ("reusable", "Reusable",
97           "If this media pipeline can be reused after an unprepare",
98           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
99
100   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
101       g_param_spec_flags ("protocols", "Protocols",
102           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
103           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
104
105   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
106       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
107           "Send an EOS event to the pipeline before unpreparing",
108           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109
110   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
111       g_param_spec_uint ("buffer-size", "Buffer Size",
112           "The kernel UDP buffer size to use", 0, G_MAXUINT,
113           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
114
115   gst_rtsp_media_signals[SIGNAL_PREPARED] =
116       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
117       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
118       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
119
120   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
121       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
122       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
123       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
124
125   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
126       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
127       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
128       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
129
130   klass->context = g_main_context_new ();
131   klass->loop = g_main_loop_new (klass->context, TRUE);
132
133   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
134
135   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
136   if (error != NULL) {
137     g_critical ("could not start bus thread: %s", error->message);
138   }
139   klass->handle_message = default_handle_message;
140   klass->unprepare = default_unprepare;
141
142   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
143 }
144
145 static void
146 gst_rtsp_media_init (GstRTSPMedia * media)
147 {
148   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
149   media->lock = g_mutex_new ();
150   media->cond = g_cond_new ();
151
152   media->shared = DEFAULT_SHARED;
153   media->reusable = DEFAULT_REUSABLE;
154   media->protocols = DEFAULT_PROTOCOLS;
155   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
156   media->buffer_size = DEFAULT_BUFFER_SIZE;
157 }
158
159 void
160 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
161 {
162   if (trans->transport) {
163     gst_rtsp_transport_free (trans->transport);
164     trans->transport = NULL;
165   }
166   if (trans->rtpsource) {
167     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
168     trans->rtpsource = NULL;
169   }
170 }
171
172 static void
173 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
174 {
175   if (stream->session)
176     g_object_unref (stream->session);
177
178   if (stream->caps)
179     gst_caps_unref (stream->caps);
180
181   if (stream->send_rtp_sink)
182     gst_object_unref (stream->send_rtp_sink);
183   if (stream->send_rtp_src)
184     gst_object_unref (stream->send_rtp_src);
185   if (stream->send_rtcp_src)
186     gst_object_unref (stream->send_rtcp_src);
187   if (stream->recv_rtcp_sink)
188     gst_object_unref (stream->recv_rtcp_sink);
189   if (stream->recv_rtp_sink)
190     gst_object_unref (stream->recv_rtp_sink);
191
192   g_list_free (stream->transports);
193
194   g_free (stream);
195 }
196
197 static void
198 gst_rtsp_media_finalize (GObject * obj)
199 {
200   GstRTSPMedia *media;
201   guint i;
202
203   media = GST_RTSP_MEDIA (obj);
204
205   GST_INFO ("finalize media %p", media);
206
207   if (media->pipeline) {
208     unlock_streams (media);
209     gst_element_set_state (media->pipeline, GST_STATE_NULL);
210     gst_object_unref (media->pipeline);
211   }
212
213   for (i = 0; i < media->streams->len; i++) {
214     GstRTSPMediaStream *stream;
215
216     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
217
218     gst_rtsp_media_stream_free (stream);
219   }
220   g_array_free (media->streams, TRUE);
221
222   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
223   g_list_free (media->dynamic);
224
225   if (media->source) {
226     g_source_destroy (media->source);
227     g_source_unref (media->source);
228   }
229   g_mutex_free (media->lock);
230   g_cond_free (media->cond);
231
232   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
233 }
234
235 static void
236 gst_rtsp_media_get_property (GObject * object, guint propid,
237     GValue * value, GParamSpec * pspec)
238 {
239   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
240
241   switch (propid) {
242     case PROP_SHARED:
243       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
244       break;
245     case PROP_REUSABLE:
246       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
247       break;
248     case PROP_PROTOCOLS:
249       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
250       break;
251     case PROP_EOS_SHUTDOWN:
252       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
253       break;
254     case PROP_BUFFER_SIZE:
255       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
256       break;
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
259   }
260 }
261
262 static void
263 gst_rtsp_media_set_property (GObject * object, guint propid,
264     const GValue * value, GParamSpec * pspec)
265 {
266   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
267
268   switch (propid) {
269     case PROP_SHARED:
270       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
271       break;
272     case PROP_REUSABLE:
273       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
274       break;
275     case PROP_PROTOCOLS:
276       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
277       break;
278     case PROP_EOS_SHUTDOWN:
279       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
280       break;
281     case PROP_BUFFER_SIZE:
282       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
283       break;
284     default:
285       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
286   }
287 }
288
289 static gpointer
290 do_loop (GstRTSPMediaClass * klass)
291 {
292   GST_INFO ("enter mainloop");
293   g_main_loop_run (klass->loop);
294   GST_INFO ("exit mainloop");
295
296   return NULL;
297 }
298
299 static void
300 collect_media_stats (GstRTSPMedia * media)
301 {
302   GstFormat format;
303   gint64 position, duration;
304
305   media->range.unit = GST_RTSP_RANGE_NPT;
306
307   if (media->is_live) {
308     media->range.min.type = GST_RTSP_TIME_NOW;
309     media->range.min.seconds = -1;
310     media->range.max.type = GST_RTSP_TIME_END;
311     media->range.max.seconds = -1;
312   } else {
313     /* get the position */
314     format = GST_FORMAT_TIME;
315     if (!gst_element_query_position (media->pipeline, &format, &position)) {
316       GST_INFO ("position query failed");
317       position = 0;
318     }
319
320     /* get the duration */
321     format = GST_FORMAT_TIME;
322     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
323       GST_INFO ("duration query failed");
324       duration = -1;
325     }
326
327     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
328         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
329
330     if (position == -1) {
331       media->range.min.type = GST_RTSP_TIME_NOW;
332       media->range.min.seconds = -1;
333     } else {
334       media->range.min.type = GST_RTSP_TIME_SECONDS;
335       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
336     }
337     if (duration == -1) {
338       media->range.max.type = GST_RTSP_TIME_END;
339       media->range.max.seconds = -1;
340     } else {
341       media->range.max.type = GST_RTSP_TIME_SECONDS;
342       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
343     }
344   }
345 }
346
347 /**
348  * gst_rtsp_media_new:
349  *
350  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
351  * element to produde RTP data for one or more related (audio/video/..) 
352  * streams.
353  *
354  * Returns: a new #GstRTSPMedia object.
355  */
356 GstRTSPMedia *
357 gst_rtsp_media_new (void)
358 {
359   GstRTSPMedia *result;
360
361   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
362
363   return result;
364 }
365
366 /**
367  * gst_rtsp_media_set_shared:
368  * @media: a #GstRTSPMedia
369  * @shared: the new value
370  *
371  * Set or unset if the pipeline for @media can be shared will multiple clients.
372  * When @shared is %TRUE, client requests for this media will share the media
373  * pipeline.
374  */
375 void
376 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
377 {
378   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
379
380   media->shared = shared;
381 }
382
383 /**
384  * gst_rtsp_media_is_shared:
385  * @media: a #GstRTSPMedia
386  *
387  * Check if the pipeline for @media can be shared between multiple clients.
388  *
389  * Returns: %TRUE if the media can be shared between clients.
390  */
391 gboolean
392 gst_rtsp_media_is_shared (GstRTSPMedia * media)
393 {
394   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
395
396   return media->shared;
397 }
398
399 /**
400  * gst_rtsp_media_set_reusable:
401  * @media: a #GstRTSPMedia
402  * @reusable: the new value
403  *
404  * Set or unset if the pipeline for @media can be reused after the pipeline has
405  * been unprepared.
406  */
407 void
408 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
409 {
410   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
411
412   media->reusable = reusable;
413 }
414
415 /**
416  * gst_rtsp_media_is_reusable:
417  * @media: a #GstRTSPMedia
418  *
419  * Check if the pipeline for @media can be reused after an unprepare.
420  *
421  * Returns: %TRUE if the media can be reused
422  */
423 gboolean
424 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
425 {
426   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
427
428   return media->reusable;
429 }
430
431 /**
432  * gst_rtsp_media_set_protocols:
433  * @media: a #GstRTSPMedia
434  * @protocols: the new flags
435  *
436  * Configure the allowed lower transport for @media.
437  */
438 void
439 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
440 {
441   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
442
443   media->protocols = protocols;
444 }
445
446 /**
447  * gst_rtsp_media_get_protocols:
448  * @media: a #GstRTSPMedia
449  *
450  * Get the allowed protocols of @media.
451  *
452  * Returns: a #GstRTSPLowerTrans
453  */
454 GstRTSPLowerTrans
455 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
456 {
457   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
458       GST_RTSP_LOWER_TRANS_UNKNOWN);
459
460   return media->protocols;
461 }
462
463 /**
464  * gst_rtsp_media_set_eos_shutdown:
465  * @media: a #GstRTSPMedia
466  * @eos_shutdown: the new value
467  *
468  * Set or unset if an EOS event will be sent to the pipeline for @media before
469  * it is unprepared.
470  */
471 void
472 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
473 {
474   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
475
476   media->eos_shutdown = eos_shutdown;
477 }
478
479 /**
480  * gst_rtsp_media_is_eos_shutdown:
481  * @media: a #GstRTSPMedia
482  *
483  * Check if the pipeline for @media will send an EOS down the pipeline before
484  * unpreparing.
485  *
486  * Returns: %TRUE if the media will send EOS before unpreparing.
487  */
488 gboolean
489 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
490 {
491   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
492
493   return media->eos_shutdown;
494 }
495
496 /**
497  * gst_rtsp_media_set_buffer_size:
498  * @media: a #GstRTSPMedia
499  * @size: the new value
500  *
501  * Set the kernel UDP buffer size.
502  */
503 void
504 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
505 {
506   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
507
508   media->buffer_size = size;
509 }
510
511 /**
512  * gst_rtsp_media_get_buffer_size:
513  * @media: a #GstRTSPMedia
514  *
515  * Get the kernel UDP buffer size.
516  *
517  * Returns: the kernel UDP buffer size.
518  */
519 guint
520 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
521 {
522   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
523
524   return media->buffer_size;
525 }
526
527 /**
528  * gst_rtsp_media_set_auth:
529  * @media: a #GstRTSPMedia
530  * @auth: a #GstRTSPAuth
531  *
532  * configure @auth to be used as the authentication manager of @media.
533  */
534 void
535 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
536 {
537   GstRTSPAuth *old;
538
539   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
540
541   old = media->auth;
542
543   if (old != auth) {
544     if (auth)
545       g_object_ref (auth);
546     media->auth = auth;
547     if (old)
548       g_object_unref (old);
549   }
550 }
551
552 /**
553  * gst_rtsp_media_get_auth:
554  * @media: a #GstRTSPMedia
555  *
556  * Get the #GstRTSPAuth used as the authentication manager of @media.
557  *
558  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
559  * usage.
560  */
561 GstRTSPAuth *
562 gst_rtsp_media_get_auth (GstRTSPMedia * media)
563 {
564   GstRTSPAuth *result;
565
566   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
567
568   if ((result = media->auth))
569     g_object_ref (result);
570
571   return result;
572 }
573
574
575 /**
576  * gst_rtsp_media_n_streams:
577  * @media: a #GstRTSPMedia
578  *
579  * Get the number of streams in this media.
580  *
581  * Returns: The number of streams.
582  */
583 guint
584 gst_rtsp_media_n_streams (GstRTSPMedia * media)
585 {
586   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
587
588   return media->streams->len;
589 }
590
591 /**
592  * gst_rtsp_media_get_stream:
593  * @media: a #GstRTSPMedia
594  * @idx: the stream index
595  *
596  * Retrieve the stream with index @idx from @media.
597  *
598  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
599  * that index did not exist.
600  */
601 GstRTSPMediaStream *
602 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
603 {
604   GstRTSPMediaStream *res;
605
606   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
607
608   if (idx < media->streams->len)
609     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
610   else
611     res = NULL;
612
613   return res;
614 }
615
616 /**
617  * gst_rtsp_media_get_range_string:
618  * @media: a #GstRTSPMedia
619  * @play: for the PLAY request
620  *
621  * Get the current range as a string.
622  *
623  * Returns: The range as a string, g_free() after usage.
624  */
625 gchar *
626 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
627 {
628   gchar *result;
629   GstRTSPTimeRange range;
630
631   /* make copy */
632   range = media->range;
633
634   if (!play && media->active > 0) {
635     range.min.type = GST_RTSP_TIME_NOW;
636     range.min.seconds = -1;
637   }
638
639   result = gst_rtsp_range_to_string (&range);
640
641   return result;
642 }
643
644 /**
645  * gst_rtsp_media_seek:
646  * @media: a #GstRTSPMedia
647  * @range: a #GstRTSPTimeRange
648  *
649  * Seek the pipeline to @range.
650  *
651  * Returns: %TRUE on success.
652  */
653 gboolean
654 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
655 {
656   GstSeekFlags flags;
657   gboolean res;
658   gint64 start, stop;
659   GstSeekType start_type, stop_type;
660
661   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
662   g_return_val_if_fail (range != NULL, FALSE);
663
664   if (range->unit != GST_RTSP_RANGE_NPT)
665     goto not_supported;
666
667   /* depends on the current playing state of the pipeline. We might need to
668    * queue this until we get EOS. */
669   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
670
671   start_type = stop_type = GST_SEEK_TYPE_NONE;
672
673   switch (range->min.type) {
674     case GST_RTSP_TIME_NOW:
675       start = -1;
676       break;
677     case GST_RTSP_TIME_SECONDS:
678       /* only seek when something changed */
679       if (media->range.min.seconds == range->min.seconds) {
680         start = -1;
681       } else {
682         start = range->min.seconds * GST_SECOND;
683         start_type = GST_SEEK_TYPE_SET;
684       }
685       break;
686     case GST_RTSP_TIME_END:
687     default:
688       goto weird_type;
689   }
690   switch (range->max.type) {
691     case GST_RTSP_TIME_SECONDS:
692       /* only seek when something changed */
693       if (media->range.max.seconds == range->max.seconds) {
694         stop = -1;
695       } else {
696         stop = range->max.seconds * GST_SECOND;
697         stop_type = GST_SEEK_TYPE_SET;
698       }
699       break;
700     case GST_RTSP_TIME_END:
701       stop = -1;
702       stop_type = GST_SEEK_TYPE_SET;
703       break;
704     case GST_RTSP_TIME_NOW:
705     default:
706       goto weird_type;
707   }
708
709   if (start != -1 || stop != -1) {
710     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
711         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
712
713     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
714         flags, start_type, start, stop_type, stop);
715
716     /* and block for the seek to complete */
717     GST_INFO ("done seeking %d", res);
718     gst_element_get_state (media->pipeline, NULL, NULL, -1);
719     GST_INFO ("prerolled again");
720
721     collect_media_stats (media);
722   } else {
723     GST_INFO ("no seek needed");
724     res = TRUE;
725   }
726
727   return res;
728
729   /* ERRORS */
730 not_supported:
731   {
732     GST_WARNING ("seek unit %d not supported", range->unit);
733     return FALSE;
734   }
735 weird_type:
736   {
737     GST_WARNING ("weird range type %d not supported", range->min.type);
738     return FALSE;
739   }
740 }
741
742 /**
743  * gst_rtsp_media_stream_rtp:
744  * @stream: a #GstRTSPMediaStream
745  * @buffer: a #GstBuffer
746  *
747  * Handle an RTP buffer for the stream. This method is usually called when a
748  * message has been received from a client using the TCP transport.
749  *
750  * This function takes ownership of @buffer.
751  *
752  * Returns: a GstFlowReturn.
753  */
754 GstFlowReturn
755 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
756 {
757   GstFlowReturn ret;
758
759   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
760
761   return ret;
762 }
763
764 /**
765  * gst_rtsp_media_stream_rtcp:
766  * @stream: a #GstRTSPMediaStream
767  * @buffer: a #GstBuffer
768  *
769  * Handle an RTCP buffer for the stream. This method is usually called when a
770  * message has been received from a client using the TCP transport.
771  *
772  * This function takes ownership of @buffer.
773  *
774  * Returns: a GstFlowReturn.
775  */
776 GstFlowReturn
777 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
778 {
779   GstFlowReturn ret;
780
781   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
782
783   return ret;
784 }
785
786 /* Allocate the udp ports and sockets */
787 static gboolean
788 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
789 {
790   GstStateChangeReturn ret;
791   GstElement *udpsrc0, *udpsrc1;
792   GstElement *udpsink0, *udpsink1;
793   gint tmp_rtp, tmp_rtcp;
794   guint count;
795   gint rtpport, rtcpport, sockfd;
796   const gchar *host;
797
798   udpsrc0 = NULL;
799   udpsrc1 = NULL;
800   udpsink0 = NULL;
801   udpsink1 = NULL;
802   count = 0;
803
804   /* Start with random port */
805   tmp_rtp = 0;
806
807   if (media->is_ipv6)
808     host = "udp://[::0]";
809   else
810     host = "udp://0.0.0.0";
811
812   /* try to allocate 2 UDP ports, the RTP port should be an even
813    * number and the RTCP port should be the next (uneven) port */
814 again:
815   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
816   if (udpsrc0 == NULL)
817     goto no_udp_protocol;
818   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
819
820   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
821   if (ret == GST_STATE_CHANGE_FAILURE) {
822     if (tmp_rtp != 0) {
823       tmp_rtp += 2;
824       if (++count > 20)
825         goto no_ports;
826
827       gst_element_set_state (udpsrc0, GST_STATE_NULL);
828       gst_object_unref (udpsrc0);
829
830       goto again;
831     }
832     goto no_udp_protocol;
833   }
834
835   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
836
837   /* check if port is even */
838   if ((tmp_rtp & 1) != 0) {
839     /* port not even, close and allocate another */
840     if (++count > 20)
841       goto no_ports;
842
843     gst_element_set_state (udpsrc0, GST_STATE_NULL);
844     gst_object_unref (udpsrc0);
845
846     tmp_rtp++;
847     goto again;
848   }
849
850   /* allocate port+1 for RTCP now */
851   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
852   if (udpsrc1 == NULL)
853     goto no_udp_rtcp_protocol;
854
855   /* set port */
856   tmp_rtcp = tmp_rtp + 1;
857   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
858
859   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
860   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
861   if (ret == GST_STATE_CHANGE_FAILURE) {
862
863     if (++count > 20)
864       goto no_ports;
865
866     gst_element_set_state (udpsrc0, GST_STATE_NULL);
867     gst_object_unref (udpsrc0);
868
869     gst_element_set_state (udpsrc1, GST_STATE_NULL);
870     gst_object_unref (udpsrc1);
871
872     tmp_rtp += 2;
873     goto again;
874   }
875
876   /* all fine, do port check */
877   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
878   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
879
880   /* this should not happen... */
881   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
882     goto port_error;
883
884   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
885   if (!udpsink0)
886     goto no_udp_protocol;
887
888   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
889   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
890   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
891
892   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
893   if (!udpsink1)
894     goto no_udp_protocol;
895
896   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
897           "send-duplicates")) {
898     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
899     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
900   } else {
901     g_warning
902         ("old multiudpsink version found without send-duplicates property");
903   }
904
905   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
906           "buffer-size")) {
907     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
908   } else {
909     GST_WARNING ("multiudpsink version found without buffer-size property");
910   }
911
912   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
913   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
914   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
915   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
916   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
917
918   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
919   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
920   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
921   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
922
923   /* we keep these elements, we configure all in configure_transport when the
924    * server told us to really use the UDP ports. */
925   stream->udpsrc[0] = udpsrc0;
926   stream->udpsrc[1] = udpsrc1;
927   stream->udpsink[0] = udpsink0;
928   stream->udpsink[1] = udpsink1;
929   stream->server_port.min = rtpport;
930   stream->server_port.max = rtcpport;
931
932   return TRUE;
933
934   /* ERRORS */
935 no_udp_protocol:
936   {
937     goto cleanup;
938   }
939 no_ports:
940   {
941     goto cleanup;
942   }
943 no_udp_rtcp_protocol:
944   {
945     goto cleanup;
946   }
947 port_error:
948   {
949     goto cleanup;
950   }
951 cleanup:
952   {
953     if (udpsrc0) {
954       gst_element_set_state (udpsrc0, GST_STATE_NULL);
955       gst_object_unref (udpsrc0);
956     }
957     if (udpsrc1) {
958       gst_element_set_state (udpsrc1, GST_STATE_NULL);
959       gst_object_unref (udpsrc1);
960     }
961     if (udpsink0) {
962       gst_element_set_state (udpsink0, GST_STATE_NULL);
963       gst_object_unref (udpsink0);
964     }
965     if (udpsink1) {
966       gst_element_set_state (udpsink1, GST_STATE_NULL);
967       gst_object_unref (udpsink1);
968     }
969     return FALSE;
970   }
971 }
972
973 /* executed from streaming thread */
974 static void
975 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
976 {
977   gchar *capsstr;
978   GstCaps *newcaps, *oldcaps;
979
980   if ((newcaps = GST_PAD_CAPS (pad)))
981     gst_caps_ref (newcaps);
982
983   oldcaps = stream->caps;
984   stream->caps = newcaps;
985
986   if (oldcaps)
987     gst_caps_unref (oldcaps);
988
989   capsstr = gst_caps_to_string (newcaps);
990   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
991   g_free (capsstr);
992 }
993
994 static void
995 dump_structure (const GstStructure * s)
996 {
997   gchar *sstr;
998
999   sstr = gst_structure_to_string (s);
1000   GST_INFO ("structure: %s", sstr);
1001   g_free (sstr);
1002 }
1003
1004 static GstRTSPMediaTrans *
1005 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1006 {
1007   GList *walk;
1008   GstRTSPMediaTrans *result = NULL;
1009   const gchar *tmp;
1010   gchar *dest;
1011   guint port;
1012
1013   if (rtcp_from == NULL)
1014     return NULL;
1015
1016   tmp = g_strrstr (rtcp_from, ":");
1017   if (tmp == NULL)
1018     return NULL;
1019
1020   port = atoi (tmp + 1);
1021   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1022
1023   GST_INFO ("finding %s:%d", dest, port);
1024
1025   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1026     GstRTSPMediaTrans *trans = walk->data;
1027     gint min, max;
1028
1029     min = trans->transport->client_port.min;
1030     max = trans->transport->client_port.max;
1031
1032     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1033             || max == port)) {
1034       result = trans;
1035       break;
1036     }
1037   }
1038   g_free (dest);
1039
1040   return result;
1041 }
1042
1043 static void
1044 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1045 {
1046   GstStructure *stats;
1047   GstRTSPMediaTrans *trans;
1048
1049   GST_INFO ("%p: new source %p", stream, source);
1050
1051   /* see if we have a stream to match with the origin of the RTCP packet */
1052   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1053   if (trans == NULL) {
1054     g_object_get (source, "stats", &stats, NULL);
1055     if (stats) {
1056       const gchar *rtcp_from;
1057
1058       dump_structure (stats);
1059
1060       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1061       if ((trans = find_transport (stream, rtcp_from))) {
1062         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1063             source);
1064
1065         /* keep ref to the source */
1066         trans->rtpsource = source;
1067
1068         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1069       }
1070       gst_structure_free (stats);
1071     }
1072   } else {
1073     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1074   }
1075 }
1076
1077 static void
1078 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1079 {
1080   GST_INFO ("%p: new SDES %p", stream, source);
1081 }
1082
1083 static void
1084 on_ssrc_active (GObject * session, GObject * source,
1085     GstRTSPMediaStream * stream)
1086 {
1087   GstRTSPMediaTrans *trans;
1088
1089   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1090
1091   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1092
1093   if (trans && trans->keep_alive)
1094     trans->keep_alive (trans->ka_user_data);
1095
1096 #ifdef DUMP_STATS
1097   {
1098     GstStructure *stats;
1099     g_object_get (source, "stats", &stats, NULL);
1100     if (stats) {
1101       dump_structure (stats);
1102       gst_structure_free (stats);
1103     }
1104   }
1105 #endif
1106 }
1107
1108 static void
1109 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1110 {
1111   GST_INFO ("%p: source %p bye", stream, source);
1112 }
1113
1114 static void
1115 on_bye_timeout (GObject * session, GObject * source,
1116     GstRTSPMediaStream * stream)
1117 {
1118   GstRTSPMediaTrans *trans;
1119
1120   GST_INFO ("%p: source %p bye timeout", stream, source);
1121
1122   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1123     trans->rtpsource = NULL;
1124     trans->timeout = TRUE;
1125   }
1126 }
1127
1128 static void
1129 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1130 {
1131   GstRTSPMediaTrans *trans;
1132
1133   GST_INFO ("%p: source %p timeout", stream, source);
1134
1135   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1136     trans->rtpsource = NULL;
1137     trans->timeout = TRUE;
1138   }
1139 }
1140
1141 static GstFlowReturn
1142 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1143 {
1144   GList *walk;
1145   GstBuffer *buffer;
1146   GstRTSPMediaStream *stream;
1147
1148   buffer = gst_app_sink_pull_buffer (sink);
1149   if (!buffer)
1150     return GST_FLOW_OK;
1151
1152   stream = (GstRTSPMediaStream *) user_data;
1153
1154   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1155     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1156
1157     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1158       if (tr->send_rtp)
1159         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1160     } else {
1161       if (tr->send_rtcp)
1162         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1163     }
1164   }
1165   gst_buffer_unref (buffer);
1166
1167   return GST_FLOW_OK;
1168 }
1169
1170 static GstFlowReturn
1171 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1172 {
1173   GList *walk;
1174   GstBufferList *blist;
1175   GstRTSPMediaStream *stream;
1176
1177   blist = gst_app_sink_pull_buffer_list (sink);
1178   if (!blist)
1179     return GST_FLOW_OK;
1180
1181   stream = (GstRTSPMediaStream *) user_data;
1182
1183   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1184     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1185
1186     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1187       if (tr->send_rtp_list)
1188         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1189             tr->user_data);
1190     } else {
1191       if (tr->send_rtcp_list)
1192         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1193             tr->user_data);
1194     }
1195   }
1196   gst_buffer_list_unref (blist);
1197
1198   return GST_FLOW_OK;
1199 }
1200
1201 static GstAppSinkCallbacks sink_cb = {
1202   NULL,                         /* not interested in EOS */
1203   NULL,                         /* not interested in preroll buffers */
1204   handle_new_buffer,
1205   handle_new_buffer_list
1206 };
1207
1208 /* prepare the pipeline objects to handle @stream in @media */
1209 static gboolean
1210 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1211 {
1212   gchar *name;
1213   GstPad *pad, *teepad, *selpad;
1214   GstPadLinkReturn ret;
1215   gint i;
1216
1217   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1218    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1219    * elements */
1220   if (!alloc_udp_ports (media, stream))
1221     return FALSE;
1222
1223   /* add the ports to the pipeline */
1224   for (i = 0; i < 2; i++) {
1225     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1226     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1227   }
1228
1229   /* create elements for the TCP transfer */
1230   for (i = 0; i < 2; i++) {
1231     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1232     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1233     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1234     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1235     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1236     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1237     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1238     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1239         &sink_cb, stream, NULL);
1240   }
1241
1242   /* hook up the stream to the RTP session elements. */
1243   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1244   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1245   g_free (name);
1246   name = g_strdup_printf ("send_rtp_src_%d", idx);
1247   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1248   g_free (name);
1249   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1250   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1251   g_free (name);
1252   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1253   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1254   g_free (name);
1255   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1256   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1257   g_free (name);
1258
1259   /* get the session */
1260   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1261       &stream->session);
1262
1263   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1264       stream);
1265   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1266       stream);
1267   g_signal_connect (stream->session, "on-ssrc-active",
1268       (GCallback) on_ssrc_active, stream);
1269   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1270       stream);
1271   g_signal_connect (stream->session, "on-bye-timeout",
1272       (GCallback) on_bye_timeout, stream);
1273   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1274       stream);
1275
1276   /* link the RTP pad to the session manager */
1277   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1278   if (ret != GST_PAD_LINK_OK)
1279     goto link_failed;
1280
1281   /* make tee for RTP and link to stream */
1282   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1283   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1284
1285   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1286   gst_pad_link (stream->send_rtp_src, pad);
1287   gst_object_unref (pad);
1288
1289   /* link RTP sink, we're pretty sure this will work. */
1290   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1291   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1292   gst_pad_link (teepad, pad);
1293   gst_object_unref (pad);
1294   gst_object_unref (teepad);
1295
1296   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1297   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1298   gst_pad_link (teepad, pad);
1299   gst_object_unref (pad);
1300   gst_object_unref (teepad);
1301
1302   /* make tee for RTCP */
1303   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1304   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1305
1306   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1307   gst_pad_link (stream->send_rtcp_src, pad);
1308   gst_object_unref (pad);
1309
1310   /* link RTCP elements */
1311   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1312   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1313   gst_pad_link (teepad, pad);
1314   gst_object_unref (pad);
1315   gst_object_unref (teepad);
1316
1317   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1318   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1319   gst_pad_link (teepad, pad);
1320   gst_object_unref (pad);
1321   gst_object_unref (teepad);
1322
1323   /* make selector for the RTP receivers */
1324   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
1325   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1326
1327   pad = gst_element_get_static_pad (stream->selector[0], "src");
1328   gst_pad_link (pad, stream->recv_rtp_sink);
1329   gst_object_unref (pad);
1330
1331   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1332   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1333   gst_pad_link (pad, selpad);
1334   gst_object_unref (pad);
1335   gst_object_unref (selpad);
1336
1337   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1338   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1339   gst_pad_link (pad, selpad);
1340   gst_object_unref (pad);
1341   gst_object_unref (selpad);
1342
1343   /* make selector for the RTCP receivers */
1344   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
1345   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1346
1347   pad = gst_element_get_static_pad (stream->selector[1], "src");
1348   gst_pad_link (pad, stream->recv_rtcp_sink);
1349   gst_object_unref (pad);
1350
1351   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1352   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1353   gst_pad_link (pad, selpad);
1354   gst_object_unref (pad);
1355   gst_object_unref (selpad);
1356
1357   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1358   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1359   gst_pad_link (pad, selpad);
1360   gst_object_unref (pad);
1361   gst_object_unref (selpad);
1362
1363   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1364    * values */
1365   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1366   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1367   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1368   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1369
1370   /* be notified of caps changes */
1371   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1372       (GCallback) caps_notify, stream);
1373
1374   stream->prepared = TRUE;
1375
1376   return TRUE;
1377
1378   /* ERRORS */
1379 link_failed:
1380   {
1381     GST_WARNING ("failed to link stream %d", idx);
1382     return FALSE;
1383   }
1384 }
1385
1386 static void
1387 unlock_streams (GstRTSPMedia * media)
1388 {
1389   guint i, n_streams;
1390
1391   /* unlock the udp src elements */
1392   n_streams = gst_rtsp_media_n_streams (media);
1393   for (i = 0; i < n_streams; i++) {
1394     GstRTSPMediaStream *stream;
1395
1396     stream = gst_rtsp_media_get_stream (media, i);
1397
1398     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1399     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1400   }
1401 }
1402
1403 static void
1404 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1405 {
1406   g_mutex_lock (media->lock);
1407   /* never overwrite the error status */
1408   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1409     media->status = status;
1410   GST_DEBUG ("setting new status to %d", status);
1411   g_cond_broadcast (media->cond);
1412   g_mutex_unlock (media->lock);
1413 }
1414
1415 static GstRTSPMediaStatus
1416 gst_rtsp_media_get_status (GstRTSPMedia * media)
1417 {
1418   GstRTSPMediaStatus result;
1419   GTimeVal timeout;
1420
1421   g_mutex_lock (media->lock);
1422   g_get_current_time (&timeout);
1423   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1424   /* while we are preparing, wait */
1425   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1426     GST_DEBUG ("waiting for status change");
1427     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1428       GST_DEBUG ("timeout, assuming error status");
1429       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1430     }
1431   }
1432   /* could be success or error */
1433   result = media->status;
1434   GST_DEBUG ("got status %d", result);
1435   g_mutex_unlock (media->lock);
1436
1437   return result;
1438 }
1439
1440 static gboolean
1441 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1442 {
1443   GstMessageType type;
1444
1445   type = GST_MESSAGE_TYPE (message);
1446
1447   switch (type) {
1448     case GST_MESSAGE_STATE_CHANGED:
1449       break;
1450     case GST_MESSAGE_BUFFERING:
1451     {
1452       gint percent;
1453
1454       gst_message_parse_buffering (message, &percent);
1455
1456       /* no state management needed for live pipelines */
1457       if (media->is_live)
1458         break;
1459
1460       if (percent == 100) {
1461         /* a 100% message means buffering is done */
1462         media->buffering = FALSE;
1463         /* if the desired state is playing, go back */
1464         if (media->target_state == GST_STATE_PLAYING) {
1465           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1466           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1467         } else {
1468           GST_INFO ("Buffering done");
1469         }
1470       } else {
1471         /* buffering busy */
1472         if (media->buffering == FALSE) {
1473           if (media->target_state == GST_STATE_PLAYING) {
1474             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1475             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1476             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1477           } else {
1478             GST_INFO ("Buffering ...");
1479           }
1480         }
1481         media->buffering = TRUE;
1482       }
1483       break;
1484     }
1485     case GST_MESSAGE_LATENCY:
1486     {
1487       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1488       break;
1489     }
1490     case GST_MESSAGE_ERROR:
1491     {
1492       GError *gerror;
1493       gchar *debug;
1494
1495       gst_message_parse_error (message, &gerror, &debug);
1496       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1497       g_error_free (gerror);
1498       g_free (debug);
1499
1500       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1501       break;
1502     }
1503     case GST_MESSAGE_WARNING:
1504     {
1505       GError *gerror;
1506       gchar *debug;
1507
1508       gst_message_parse_warning (message, &gerror, &debug);
1509       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1510       g_error_free (gerror);
1511       g_free (debug);
1512       break;
1513     }
1514     case GST_MESSAGE_ELEMENT:
1515       break;
1516     case GST_MESSAGE_STREAM_STATUS:
1517       break;
1518     case GST_MESSAGE_ASYNC_DONE:
1519       if (!media->adding) {
1520         /* when we are dynamically adding pads, the addition of the udpsrc will
1521          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1522          * wait for the final ASYNC_DONE after everything prerolled */
1523         GST_INFO ("%p: got ASYNC_DONE", media);
1524         collect_media_stats (media);
1525
1526         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1527       } else {
1528         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1529       }
1530       break;
1531     case GST_MESSAGE_EOS:
1532       GST_INFO ("%p: got EOS", media);
1533       if (media->eos_pending) {
1534         GST_DEBUG ("shutting down after EOS");
1535         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1536         media->eos_pending = FALSE;
1537         g_object_unref (media);
1538       }
1539       break;
1540     default:
1541       GST_INFO ("%p: got message type %s", media,
1542           gst_message_type_get_name (type));
1543       break;
1544   }
1545   return TRUE;
1546 }
1547
1548 static gboolean
1549 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1550 {
1551   GstRTSPMediaClass *klass;
1552   gboolean ret;
1553
1554   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1555
1556   if (klass->handle_message)
1557     ret = klass->handle_message (media, message);
1558   else
1559     ret = FALSE;
1560
1561   return ret;
1562 }
1563
1564 /* called from streaming threads */
1565 static void
1566 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1567 {
1568   GstRTSPMediaStream *stream;
1569   gchar *name;
1570   gint i;
1571
1572   i = media->streams->len + 1;
1573
1574   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1575
1576   stream = g_new0 (GstRTSPMediaStream, 1);
1577   stream->payloader = element;
1578
1579   name = g_strdup_printf ("dynpay%d", i);
1580
1581   media->adding = TRUE;
1582
1583   /* ghost the pad of the payloader to the element */
1584   stream->srcpad = gst_ghost_pad_new (name, pad);
1585   gst_pad_set_active (stream->srcpad, TRUE);
1586   gst_element_add_pad (media->element, stream->srcpad);
1587   g_free (name);
1588
1589   /* add stream now */
1590   g_array_append_val (media->streams, stream);
1591
1592   setup_stream (stream, i, media);
1593
1594   for (i = 0; i < 2; i++) {
1595     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1596     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1597     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1598     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1599     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1600   }
1601   media->adding = FALSE;
1602 }
1603
1604 static void
1605 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1606 {
1607   GST_INFO ("no more pads");
1608   if (media->fakesink) {
1609     gst_object_ref (media->fakesink);
1610     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1611     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1612     gst_object_unref (media->fakesink);
1613     media->fakesink = NULL;
1614     GST_INFO ("removed fakesink");
1615   }
1616 }
1617
1618 /**
1619  * gst_rtsp_media_prepare:
1620  * @media: a #GstRTSPMedia
1621  *
1622  * Prepare @media for streaming. This function will create the pipeline and
1623  * other objects to manage the streaming.
1624  *
1625  * It will preroll the pipeline and collect vital information about the streams
1626  * such as the duration.
1627  *
1628  * Returns: %TRUE on success.
1629  */
1630 gboolean
1631 gst_rtsp_media_prepare (GstRTSPMedia * media)
1632 {
1633   GstStateChangeReturn ret;
1634   GstRTSPMediaStatus status;
1635   guint i, n_streams;
1636   GstRTSPMediaClass *klass;
1637   GstBus *bus;
1638   GList *walk;
1639
1640   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1641     goto was_prepared;
1642
1643   if (!media->reusable && media->reused)
1644     goto is_reused;
1645
1646   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1647   if (media->rtpbin == NULL)
1648     goto no_gstrtpbin;
1649
1650   GST_INFO ("preparing media %p", media);
1651
1652   /* reset some variables */
1653   media->is_live = FALSE;
1654   media->buffering = FALSE;
1655   /* we're preparing now */
1656   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1657
1658   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1659
1660   /* add the pipeline bus to our custom mainloop */
1661   media->source = gst_bus_create_watch (bus);
1662   gst_object_unref (bus);
1663
1664   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1665
1666   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1667   media->id = g_source_attach (media->source, klass->context);
1668
1669   /* add stuff to the bin */
1670   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1671
1672   /* link streams we already have, other streams might appear when we have
1673    * dynamic elements */
1674   n_streams = gst_rtsp_media_n_streams (media);
1675   for (i = 0; i < n_streams; i++) {
1676     GstRTSPMediaStream *stream;
1677
1678     stream = gst_rtsp_media_get_stream (media, i);
1679
1680     setup_stream (stream, i, media);
1681   }
1682
1683   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1684     GstElement *elem = walk->data;
1685
1686     GST_INFO ("adding callbacks for dynamic element %p", elem);
1687
1688     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1689     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1690
1691     /* we add a fakesink here in order to make the state change async. We remove
1692      * the fakesink again in the no-more-pads callback. */
1693     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1694     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1695   }
1696
1697   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1698   /* first go to PAUSED */
1699   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1700   media->target_state = GST_STATE_PAUSED;
1701
1702   switch (ret) {
1703     case GST_STATE_CHANGE_SUCCESS:
1704       GST_INFO ("SUCCESS state change for media %p", media);
1705       break;
1706     case GST_STATE_CHANGE_ASYNC:
1707       GST_INFO ("ASYNC state change for media %p", media);
1708       break;
1709     case GST_STATE_CHANGE_NO_PREROLL:
1710       /* we need to go to PLAYING */
1711       GST_INFO ("NO_PREROLL state change: live media %p", media);
1712       media->is_live = TRUE;
1713       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1714       if (ret == GST_STATE_CHANGE_FAILURE)
1715         goto state_failed;
1716       break;
1717     case GST_STATE_CHANGE_FAILURE:
1718       goto state_failed;
1719   }
1720
1721   /* now wait for all pads to be prerolled */
1722   status = gst_rtsp_media_get_status (media);
1723   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1724     goto state_failed;
1725
1726   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1727
1728   GST_INFO ("object %p is prerolled", media);
1729
1730   return TRUE;
1731
1732   /* OK */
1733 was_prepared:
1734   {
1735     return TRUE;
1736   }
1737   /* ERRORS */
1738 is_reused:
1739   {
1740     GST_WARNING ("can not reuse media %p", media);
1741     return FALSE;
1742   }
1743 no_gstrtpbin:
1744   {
1745     GST_WARNING ("no gstrtpbin element");
1746     g_warning ("failed to create element 'gstrtpbin', check your installation");
1747     return FALSE;
1748   }
1749 state_failed:
1750   {
1751     GST_WARNING ("failed to preroll pipeline");
1752     unlock_streams (media);
1753     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1754     gst_rtsp_media_unprepare (media);
1755     return FALSE;
1756   }
1757 }
1758
1759 /**
1760  * gst_rtsp_media_unprepare:
1761  * @media: a #GstRTSPMedia
1762  *
1763  * Unprepare @media. After this call, the media should be prepared again before
1764  * it can be used again. If the media is set to be non-reusable, a new instance
1765  * must be created.
1766  *
1767  * Returns: %TRUE on success.
1768  */
1769 gboolean
1770 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1771 {
1772   GstRTSPMediaClass *klass;
1773   gboolean success;
1774
1775   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1776     return TRUE;
1777
1778   GST_INFO ("unprepare media %p", media);
1779   media->target_state = GST_STATE_NULL;
1780
1781   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1782   if (klass->unprepare)
1783     success = klass->unprepare (media);
1784   else
1785     success = TRUE;
1786
1787   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1788   media->reused = TRUE;
1789
1790   /* when the media is not reusable, this will effectively unref the media and
1791    * recreate it */
1792   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1793
1794   return success;
1795 }
1796
1797 static gboolean
1798 default_unprepare (GstRTSPMedia * media)
1799 {
1800   if (media->eos_shutdown) {
1801     GST_DEBUG ("sending EOS for shutdown");
1802     /* ref so that we don't disappear */
1803     g_object_ref (media);
1804     media->eos_pending = TRUE;
1805     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1806     /* we need to go to playing again for the EOS to propagate, normally in this
1807      * state, nothing is receiving data from us anymore so this is ok. */
1808     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1809   } else {
1810     GST_DEBUG ("shutting down");
1811     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1812   }
1813   return TRUE;
1814 }
1815
1816 static void
1817 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1818     gchar * dest, gint min, gint max)
1819 {
1820   GST_INFO ("adding %s:%d-%d", dest, min, max);
1821   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1822   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1823 }
1824
1825 static void
1826 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1827     gchar * dest, gint min, gint max)
1828 {
1829   GST_INFO ("removing %s:%d-%d", dest, min, max);
1830   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1831   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1832 }
1833
1834 /**
1835  * gst_rtsp_media_set_state:
1836  * @media: a #GstRTSPMedia
1837  * @state: the target state of the media
1838  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1839  *
1840  * Set the state of @media to @state and for the transports in @transports.
1841  *
1842  * Returns: %TRUE on success.
1843  */
1844 gboolean
1845 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1846     GArray * transports)
1847 {
1848   gint i;
1849   GstStateChangeReturn ret;
1850   gboolean add, remove, do_state;
1851   gint old_active;
1852
1853   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1854   g_return_val_if_fail (transports != NULL, FALSE);
1855
1856   /* NULL and READY are the same */
1857   if (state == GST_STATE_READY)
1858     state = GST_STATE_NULL;
1859
1860   add = remove = FALSE;
1861
1862   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1863       media);
1864
1865   switch (state) {
1866     case GST_STATE_NULL:
1867       /* unlock the streams so that they follow the state changes from now on */
1868       unlock_streams (media);
1869       /* fallthrough */
1870     case GST_STATE_PAUSED:
1871       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1872       if (media->target_state == GST_STATE_PLAYING)
1873         remove = TRUE;
1874       break;
1875     case GST_STATE_PLAYING:
1876       /* we're going to PLAYING, add */
1877       add = TRUE;
1878       break;
1879     default:
1880       break;
1881   }
1882   old_active = media->active;
1883
1884   for (i = 0; i < transports->len; i++) {
1885     GstRTSPMediaTrans *tr;
1886     GstRTSPMediaStream *stream;
1887     GstRTSPTransport *trans;
1888
1889     /* we need a non-NULL entry in the array */
1890     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1891     if (tr == NULL)
1892       continue;
1893
1894     /* we need a transport */
1895     if (!(trans = tr->transport))
1896       continue;
1897
1898     /* get the stream and add the destinations */
1899     stream = gst_rtsp_media_get_stream (media, tr->idx);
1900     switch (trans->lower_transport) {
1901       case GST_RTSP_LOWER_TRANS_UDP:
1902       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1903       {
1904         gchar *dest;
1905         gint min, max;
1906
1907         dest = trans->destination;
1908         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1909           min = trans->port.min;
1910           max = trans->port.max;
1911         } else {
1912           min = trans->client_port.min;
1913           max = trans->client_port.max;
1914         }
1915
1916         if (add && !tr->active) {
1917           add_udp_destination (media, stream, dest, min, max);
1918           stream->transports = g_list_prepend (stream->transports, tr);
1919           tr->active = TRUE;
1920           media->active++;
1921         } else if (remove && tr->active) {
1922           remove_udp_destination (media, stream, dest, min, max);
1923           stream->transports = g_list_remove (stream->transports, tr);
1924           tr->active = FALSE;
1925           media->active--;
1926         }
1927         break;
1928       }
1929       case GST_RTSP_LOWER_TRANS_TCP:
1930         if (add && !tr->active) {
1931           GST_INFO ("adding TCP %s", trans->destination);
1932           stream->transports = g_list_prepend (stream->transports, tr);
1933           tr->active = TRUE;
1934           media->active++;
1935         } else if (remove && tr->active) {
1936           GST_INFO ("removing TCP %s", trans->destination);
1937           stream->transports = g_list_remove (stream->transports, tr);
1938           tr->active = FALSE;
1939           media->active--;
1940         }
1941         break;
1942       default:
1943         GST_INFO ("Unknown transport %d", trans->lower_transport);
1944         break;
1945     }
1946   }
1947
1948   /* we just added the first media, do the playing state change */
1949   if (old_active == 0 && add)
1950     do_state = TRUE;
1951   /* if we have no more active media, do the downward state changes */
1952   else if (media->active == 0)
1953     do_state = TRUE;
1954   else
1955     do_state = FALSE;
1956
1957   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
1958       media, do_state);
1959
1960   if (media->target_state != state) {
1961     if (do_state) {
1962       if (state == GST_STATE_NULL) {
1963         gst_rtsp_media_unprepare (media);
1964       } else {
1965         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
1966             media);
1967         media->target_state = state;
1968         ret = gst_element_set_state (media->pipeline, state);
1969       }
1970     }
1971     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
1972         NULL);
1973   }
1974
1975   /* remember where we are */
1976   if (state == GST_STATE_PAUSED || old_active != media->active)
1977     collect_media_stats (media);
1978
1979   return TRUE;
1980 }
1981
1982 /**
1983  * gst_rtsp_media_remove_elements:
1984  * @media: a #GstRTSPMedia
1985  *
1986  * Remove all elements and the pipeline controlled by @media.
1987  */
1988 void
1989 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1990 {
1991   gint i, j;
1992
1993   unlock_streams (media);
1994
1995   for (i = 0; i < media->streams->len; i++) {
1996     GstRTSPMediaStream *stream;
1997
1998     GST_INFO ("Removing elements of stream %d from pipeline", i);
1999
2000     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2001
2002     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2003
2004     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2005
2006     for (j = 0; j < 2; j++) {
2007       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2008       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2009       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2010       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2011       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2012       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2013
2014       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2015       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2016       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2017       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2018       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2019       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2020     }
2021     if (stream->caps)
2022       gst_caps_unref (stream->caps);
2023     stream->caps = NULL;
2024     gst_rtsp_media_stream_free (stream);
2025   }
2026   g_array_remove_range (media->streams, 0, media->streams->len);
2027
2028   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2029   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2030
2031   gst_object_unref (media->pipeline);
2032   media->pipeline = NULL;
2033 }