c4127f8ce962cfc7b8680dd9c7e320118907fe8d
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED         FALSE
29 #define DEFAULT_REUSABLE       FALSE
30 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN   FALSE
33 #define DEFAULT_BUFFER_SIZE    0x80000
34
35 /* define to dump received RTCP packets */
36 #undef DUMP_STATS
37
38 enum
39 {
40   PROP_0,
41   PROP_SHARED,
42   PROP_REUSABLE,
43   PROP_PROTOCOLS,
44   PROP_EOS_SHUTDOWN,
45   PROP_BUFFER_SIZE,
46   PROP_LAST
47 };
48
49 enum
50 {
51   SIGNAL_PREPARED,
52   SIGNAL_UNPREPARED,
53   SIGNAL_NEW_STATE,
54   SIGNAL_LAST
55 };
56
57 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
58 #define GST_CAT_DEFAULT rtsp_media_debug
59
60 static GQuark ssrc_stream_map_key;
61
62 static void gst_rtsp_media_get_property (GObject * object, guint propid,
63     GValue * value, GParamSpec * pspec);
64 static void gst_rtsp_media_set_property (GObject * object, guint propid,
65     const GValue * value, GParamSpec * pspec);
66 static void gst_rtsp_media_finalize (GObject * obj);
67
68 static gpointer do_loop (GstRTSPMediaClass * klass);
69 static gboolean default_handle_message (GstRTSPMedia * media,
70     GstMessage * message);
71 static gboolean default_unprepare (GstRTSPMedia * media);
72 static void unlock_streams (GstRTSPMedia * media);
73
74 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
75
76 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
77
78 static void
79 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
80 {
81   GObjectClass *gobject_class;
82   GError *error = NULL;
83
84   gobject_class = G_OBJECT_CLASS (klass);
85
86   gobject_class->get_property = gst_rtsp_media_get_property;
87   gobject_class->set_property = gst_rtsp_media_set_property;
88   gobject_class->finalize = gst_rtsp_media_finalize;
89
90   g_object_class_install_property (gobject_class, PROP_SHARED,
91       g_param_spec_boolean ("shared", "Shared",
92           "If this media pipeline can be shared", DEFAULT_SHARED,
93           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
94
95   g_object_class_install_property (gobject_class, PROP_REUSABLE,
96       g_param_spec_boolean ("reusable", "Reusable",
97           "If this media pipeline can be reused after an unprepare",
98           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
99
100   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
101       g_param_spec_flags ("protocols", "Protocols",
102           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
103           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
104
105   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
106       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
107           "Send an EOS event to the pipeline before unpreparing",
108           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109
110   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
111       g_param_spec_uint ("buffer-size", "Buffer Size",
112           "The kernel UDP buffer size to use", 0, G_MAXUINT,
113           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
114
115   gst_rtsp_media_signals[SIGNAL_PREPARED] =
116       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
117       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
118       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
119
120   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
121       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
122       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
123       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
124
125   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
126       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
127       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
128       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
129
130   klass->context = g_main_context_new ();
131   klass->loop = g_main_loop_new (klass->context, TRUE);
132
133   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
134
135   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
136   if (error != NULL) {
137     g_critical ("could not start bus thread: %s", error->message);
138   }
139   klass->handle_message = default_handle_message;
140   klass->unprepare = default_unprepare;
141
142   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
143 }
144
145 static void
146 gst_rtsp_media_init (GstRTSPMedia * media)
147 {
148   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
149   media->lock = g_mutex_new ();
150   media->cond = g_cond_new ();
151
152   media->shared = DEFAULT_SHARED;
153   media->reusable = DEFAULT_REUSABLE;
154   media->protocols = DEFAULT_PROTOCOLS;
155   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
156   media->buffer_size = DEFAULT_BUFFER_SIZE;
157 }
158
159 void
160 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
161 {
162   if (trans->transport) {
163     gst_rtsp_transport_free (trans->transport);
164     trans->transport = NULL;
165   }
166   if (trans->rtpsource) {
167     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
168     trans->rtpsource = NULL;
169   }
170 }
171
172 static void
173 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
174 {
175   if (stream->session)
176     g_object_unref (stream->session);
177
178   if (stream->caps)
179     gst_caps_unref (stream->caps);
180
181   if (stream->send_rtp_sink)
182     gst_object_unref (stream->send_rtp_sink);
183   if (stream->send_rtp_src)
184     gst_object_unref (stream->send_rtp_src);
185   if (stream->send_rtcp_src)
186     gst_object_unref (stream->send_rtcp_src);
187   if (stream->recv_rtcp_sink)
188     gst_object_unref (stream->recv_rtcp_sink);
189   if (stream->recv_rtp_sink)
190     gst_object_unref (stream->recv_rtp_sink);
191
192   g_list_free (stream->transports);
193
194   g_free (stream);
195 }
196
197 static void
198 gst_rtsp_media_finalize (GObject * obj)
199 {
200   GstRTSPMedia *media;
201   guint i;
202
203   media = GST_RTSP_MEDIA (obj);
204
205   GST_INFO ("finalize media %p", media);
206
207   if (media->pipeline) {
208     unlock_streams (media);
209     gst_element_set_state (media->pipeline, GST_STATE_NULL);
210     gst_object_unref (media->pipeline);
211   }
212
213   for (i = 0; i < media->streams->len; i++) {
214     GstRTSPMediaStream *stream;
215
216     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
217
218     gst_rtsp_media_stream_free (stream);
219   }
220   g_array_free (media->streams, TRUE);
221
222   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
223   g_list_free (media->dynamic);
224
225   if (media->source) {
226     g_source_destroy (media->source);
227     g_source_unref (media->source);
228   }
229   g_mutex_free (media->lock);
230   g_cond_free (media->cond);
231
232   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
233 }
234
235 static void
236 gst_rtsp_media_get_property (GObject * object, guint propid,
237     GValue * value, GParamSpec * pspec)
238 {
239   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
240
241   switch (propid) {
242     case PROP_SHARED:
243       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
244       break;
245     case PROP_REUSABLE:
246       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
247       break;
248     case PROP_PROTOCOLS:
249       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
250       break;
251     case PROP_EOS_SHUTDOWN:
252       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
253       break;
254     case PROP_BUFFER_SIZE:
255       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
256       break;
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
259   }
260 }
261
262 static void
263 gst_rtsp_media_set_property (GObject * object, guint propid,
264     const GValue * value, GParamSpec * pspec)
265 {
266   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
267
268   switch (propid) {
269     case PROP_SHARED:
270       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
271       break;
272     case PROP_REUSABLE:
273       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
274       break;
275     case PROP_PROTOCOLS:
276       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
277       break;
278     case PROP_EOS_SHUTDOWN:
279       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
280       break;
281     case PROP_BUFFER_SIZE:
282       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
283       break;
284     default:
285       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
286   }
287 }
288
289 static gpointer
290 do_loop (GstRTSPMediaClass * klass)
291 {
292   GST_INFO ("enter mainloop");
293   g_main_loop_run (klass->loop);
294   GST_INFO ("exit mainloop");
295
296   return NULL;
297 }
298
299 static void
300 collect_media_stats (GstRTSPMedia * media)
301 {
302   gint64 position, duration;
303
304   media->range.unit = GST_RTSP_RANGE_NPT;
305
306   if (media->is_live) {
307     media->range.min.type = GST_RTSP_TIME_NOW;
308     media->range.min.seconds = -1;
309     media->range.max.type = GST_RTSP_TIME_END;
310     media->range.max.seconds = -1;
311   } else {
312     /* get the position */
313     if (!gst_element_query_position (media->pipeline, GST_FORMAT_TIME,
314             &position)) {
315       GST_INFO ("position query failed");
316       position = 0;
317     }
318
319     /* get the duration */
320     if (!gst_element_query_duration (media->pipeline, GST_FORMAT_TIME,
321             &duration)) {
322       GST_INFO ("duration query failed");
323       duration = -1;
324     }
325
326     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
327         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
328
329     if (position == -1) {
330       media->range.min.type = GST_RTSP_TIME_NOW;
331       media->range.min.seconds = -1;
332     } else {
333       media->range.min.type = GST_RTSP_TIME_SECONDS;
334       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
335     }
336     if (duration == -1) {
337       media->range.max.type = GST_RTSP_TIME_END;
338       media->range.max.seconds = -1;
339     } else {
340       media->range.max.type = GST_RTSP_TIME_SECONDS;
341       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
342     }
343   }
344 }
345
346 /**
347  * gst_rtsp_media_new:
348  *
349  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
350  * element to produde RTP data for one or more related (audio/video/..) 
351  * streams.
352  *
353  * Returns: a new #GstRTSPMedia object.
354  */
355 GstRTSPMedia *
356 gst_rtsp_media_new (void)
357 {
358   GstRTSPMedia *result;
359
360   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
361
362   return result;
363 }
364
365 /**
366  * gst_rtsp_media_set_shared:
367  * @media: a #GstRTSPMedia
368  * @shared: the new value
369  *
370  * Set or unset if the pipeline for @media can be shared will multiple clients.
371  * When @shared is %TRUE, client requests for this media will share the media
372  * pipeline.
373  */
374 void
375 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
376 {
377   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
378
379   media->shared = shared;
380 }
381
382 /**
383  * gst_rtsp_media_is_shared:
384  * @media: a #GstRTSPMedia
385  *
386  * Check if the pipeline for @media can be shared between multiple clients.
387  *
388  * Returns: %TRUE if the media can be shared between clients.
389  */
390 gboolean
391 gst_rtsp_media_is_shared (GstRTSPMedia * media)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
394
395   return media->shared;
396 }
397
398 /**
399  * gst_rtsp_media_set_reusable:
400  * @media: a #GstRTSPMedia
401  * @reusable: the new value
402  *
403  * Set or unset if the pipeline for @media can be reused after the pipeline has
404  * been unprepared.
405  */
406 void
407 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
408 {
409   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
410
411   media->reusable = reusable;
412 }
413
414 /**
415  * gst_rtsp_media_is_reusable:
416  * @media: a #GstRTSPMedia
417  *
418  * Check if the pipeline for @media can be reused after an unprepare.
419  *
420  * Returns: %TRUE if the media can be reused
421  */
422 gboolean
423 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
424 {
425   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
426
427   return media->reusable;
428 }
429
430 /**
431  * gst_rtsp_media_set_protocols:
432  * @media: a #GstRTSPMedia
433  * @protocols: the new flags
434  *
435  * Configure the allowed lower transport for @media.
436  */
437 void
438 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
439 {
440   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
441
442   media->protocols = protocols;
443 }
444
445 /**
446  * gst_rtsp_media_get_protocols:
447  * @media: a #GstRTSPMedia
448  *
449  * Get the allowed protocols of @media.
450  *
451  * Returns: a #GstRTSPLowerTrans
452  */
453 GstRTSPLowerTrans
454 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
455 {
456   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
457       GST_RTSP_LOWER_TRANS_UNKNOWN);
458
459   return media->protocols;
460 }
461
462 /**
463  * gst_rtsp_media_set_eos_shutdown:
464  * @media: a #GstRTSPMedia
465  * @eos_shutdown: the new value
466  *
467  * Set or unset if an EOS event will be sent to the pipeline for @media before
468  * it is unprepared.
469  */
470 void
471 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
472 {
473   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
474
475   media->eos_shutdown = eos_shutdown;
476 }
477
478 /**
479  * gst_rtsp_media_is_eos_shutdown:
480  * @media: a #GstRTSPMedia
481  *
482  * Check if the pipeline for @media will send an EOS down the pipeline before
483  * unpreparing.
484  *
485  * Returns: %TRUE if the media will send EOS before unpreparing.
486  */
487 gboolean
488 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
489 {
490   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
491
492   return media->eos_shutdown;
493 }
494
495 /**
496  * gst_rtsp_media_set_buffer_size:
497  * @media: a #GstRTSPMedia
498  * @size: the new value
499  *
500  * Set the kernel UDP buffer size.
501  */
502 void
503 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
504 {
505   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
506
507   media->buffer_size = size;
508 }
509
510 /**
511  * gst_rtsp_media_get_buffer_size:
512  * @media: a #GstRTSPMedia
513  *
514  * Get the kernel UDP buffer size.
515  *
516  * Returns: the kernel UDP buffer size.
517  */
518 guint
519 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
520 {
521   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
522
523   return media->buffer_size;
524 }
525
526 /**
527  * gst_rtsp_media_set_auth:
528  * @media: a #GstRTSPMedia
529  * @auth: a #GstRTSPAuth
530  *
531  * configure @auth to be used as the authentication manager of @media.
532  */
533 void
534 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
535 {
536   GstRTSPAuth *old;
537
538   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
539
540   old = media->auth;
541
542   if (old != auth) {
543     if (auth)
544       g_object_ref (auth);
545     media->auth = auth;
546     if (old)
547       g_object_unref (old);
548   }
549 }
550
551 /**
552  * gst_rtsp_media_get_auth:
553  * @media: a #GstRTSPMedia
554  *
555  * Get the #GstRTSPAuth used as the authentication manager of @media.
556  *
557  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
558  * usage.
559  */
560 GstRTSPAuth *
561 gst_rtsp_media_get_auth (GstRTSPMedia * media)
562 {
563   GstRTSPAuth *result;
564
565   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
566
567   if ((result = media->auth))
568     g_object_ref (result);
569
570   return result;
571 }
572
573
574 /**
575  * gst_rtsp_media_n_streams:
576  * @media: a #GstRTSPMedia
577  *
578  * Get the number of streams in this media.
579  *
580  * Returns: The number of streams.
581  */
582 guint
583 gst_rtsp_media_n_streams (GstRTSPMedia * media)
584 {
585   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
586
587   return media->streams->len;
588 }
589
590 /**
591  * gst_rtsp_media_get_stream:
592  * @media: a #GstRTSPMedia
593  * @idx: the stream index
594  *
595  * Retrieve the stream with index @idx from @media.
596  *
597  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
598  * that index did not exist.
599  */
600 GstRTSPMediaStream *
601 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
602 {
603   GstRTSPMediaStream *res;
604
605   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
606
607   if (idx < media->streams->len)
608     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
609   else
610     res = NULL;
611
612   return res;
613 }
614
615 /**
616  * gst_rtsp_media_get_range_string:
617  * @media: a #GstRTSPMedia
618  * @play: for the PLAY request
619  *
620  * Get the current range as a string.
621  *
622  * Returns: The range as a string, g_free() after usage.
623  */
624 gchar *
625 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
626 {
627   gchar *result;
628   GstRTSPTimeRange range;
629
630   /* make copy */
631   range = media->range;
632
633   if (!play && media->active > 0) {
634     range.min.type = GST_RTSP_TIME_NOW;
635     range.min.seconds = -1;
636   }
637
638   result = gst_rtsp_range_to_string (&range);
639
640   return result;
641 }
642
643 /**
644  * gst_rtsp_media_seek:
645  * @media: a #GstRTSPMedia
646  * @range: a #GstRTSPTimeRange
647  *
648  * Seek the pipeline to @range.
649  *
650  * Returns: %TRUE on success.
651  */
652 gboolean
653 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
654 {
655   GstSeekFlags flags;
656   gboolean res;
657   gint64 start, stop;
658   GstSeekType start_type, stop_type;
659
660   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
661   g_return_val_if_fail (range != NULL, FALSE);
662
663   if (range->unit != GST_RTSP_RANGE_NPT)
664     goto not_supported;
665
666   /* depends on the current playing state of the pipeline. We might need to
667    * queue this until we get EOS. */
668   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
669
670   start_type = stop_type = GST_SEEK_TYPE_NONE;
671
672   switch (range->min.type) {
673     case GST_RTSP_TIME_NOW:
674       start = -1;
675       break;
676     case GST_RTSP_TIME_SECONDS:
677       /* only seek when something changed */
678       if (media->range.min.seconds == range->min.seconds) {
679         start = -1;
680       } else {
681         start = range->min.seconds * GST_SECOND;
682         start_type = GST_SEEK_TYPE_SET;
683       }
684       break;
685     case GST_RTSP_TIME_END:
686     default:
687       goto weird_type;
688   }
689   switch (range->max.type) {
690     case GST_RTSP_TIME_SECONDS:
691       /* only seek when something changed */
692       if (media->range.max.seconds == range->max.seconds) {
693         stop = -1;
694       } else {
695         stop = range->max.seconds * GST_SECOND;
696         stop_type = GST_SEEK_TYPE_SET;
697       }
698       break;
699     case GST_RTSP_TIME_END:
700       stop = -1;
701       stop_type = GST_SEEK_TYPE_SET;
702       break;
703     case GST_RTSP_TIME_NOW:
704     default:
705       goto weird_type;
706   }
707
708   if (start != -1 || stop != -1) {
709     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
710         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
711
712     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
713         flags, start_type, start, stop_type, stop);
714
715     /* and block for the seek to complete */
716     GST_INFO ("done seeking %d", res);
717     gst_element_get_state (media->pipeline, NULL, NULL, -1);
718     GST_INFO ("prerolled again");
719
720     collect_media_stats (media);
721   } else {
722     GST_INFO ("no seek needed");
723     res = TRUE;
724   }
725
726   return res;
727
728   /* ERRORS */
729 not_supported:
730   {
731     GST_WARNING ("seek unit %d not supported", range->unit);
732     return FALSE;
733   }
734 weird_type:
735   {
736     GST_WARNING ("weird range type %d not supported", range->min.type);
737     return FALSE;
738   }
739 }
740
741 /**
742  * gst_rtsp_media_stream_rtp:
743  * @stream: a #GstRTSPMediaStream
744  * @buffer: a #GstBuffer
745  *
746  * Handle an RTP buffer for the stream. This method is usually called when a
747  * message has been received from a client using the TCP transport.
748  *
749  * This function takes ownership of @buffer.
750  *
751  * Returns: a GstFlowReturn.
752  */
753 GstFlowReturn
754 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
755 {
756   GstFlowReturn ret;
757
758   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
759
760   return ret;
761 }
762
763 /**
764  * gst_rtsp_media_stream_rtcp:
765  * @stream: a #GstRTSPMediaStream
766  * @buffer: a #GstBuffer
767  *
768  * Handle an RTCP buffer for the stream. This method is usually called when a
769  * message has been received from a client using the TCP transport.
770  *
771  * This function takes ownership of @buffer.
772  *
773  * Returns: a GstFlowReturn.
774  */
775 GstFlowReturn
776 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
777 {
778   GstFlowReturn ret;
779
780   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
781
782   return ret;
783 }
784
785 /* Allocate the udp ports and sockets */
786 static gboolean
787 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
788 {
789   GstStateChangeReturn ret;
790   GstElement *udpsrc0, *udpsrc1;
791   GstElement *udpsink0, *udpsink1;
792   gint tmp_rtp, tmp_rtcp;
793   guint count;
794   gint rtpport, rtcpport, sockfd;
795   const gchar *host;
796
797   udpsrc0 = NULL;
798   udpsrc1 = NULL;
799   udpsink0 = NULL;
800   udpsink1 = NULL;
801   count = 0;
802
803   /* Start with random port */
804   tmp_rtp = 0;
805
806   if (media->is_ipv6)
807     host = "udp://[::0]";
808   else
809     host = "udp://0.0.0.0";
810
811   /* try to allocate 2 UDP ports, the RTP port should be an even
812    * number and the RTCP port should be the next (uneven) port */
813 again:
814   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
815   if (udpsrc0 == NULL)
816     goto no_udp_protocol;
817   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
818
819   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
820   if (ret == GST_STATE_CHANGE_FAILURE) {
821     if (tmp_rtp != 0) {
822       tmp_rtp += 2;
823       if (++count > 20)
824         goto no_ports;
825
826       gst_element_set_state (udpsrc0, GST_STATE_NULL);
827       gst_object_unref (udpsrc0);
828
829       goto again;
830     }
831     goto no_udp_protocol;
832   }
833
834   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
835
836   /* check if port is even */
837   if ((tmp_rtp & 1) != 0) {
838     /* port not even, close and allocate another */
839     if (++count > 20)
840       goto no_ports;
841
842     gst_element_set_state (udpsrc0, GST_STATE_NULL);
843     gst_object_unref (udpsrc0);
844
845     tmp_rtp++;
846     goto again;
847   }
848
849   /* allocate port+1 for RTCP now */
850   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
851   if (udpsrc1 == NULL)
852     goto no_udp_rtcp_protocol;
853
854   /* set port */
855   tmp_rtcp = tmp_rtp + 1;
856   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
857
858   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
859   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
860   if (ret == GST_STATE_CHANGE_FAILURE) {
861
862     if (++count > 20)
863       goto no_ports;
864
865     gst_element_set_state (udpsrc0, GST_STATE_NULL);
866     gst_object_unref (udpsrc0);
867
868     gst_element_set_state (udpsrc1, GST_STATE_NULL);
869     gst_object_unref (udpsrc1);
870
871     tmp_rtp += 2;
872     goto again;
873   }
874
875   /* all fine, do port check */
876   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
877   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
878
879   /* this should not happen... */
880   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
881     goto port_error;
882
883   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
884   if (!udpsink0)
885     goto no_udp_protocol;
886
887   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
888   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
889   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
890
891   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
892   if (!udpsink1)
893     goto no_udp_protocol;
894
895   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
896           "send-duplicates")) {
897     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
898     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
899   } else {
900     g_warning
901         ("old multiudpsink version found without send-duplicates property");
902   }
903
904   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
905           "buffer-size")) {
906     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
907   } else {
908     GST_WARNING ("multiudpsink version found without buffer-size property");
909   }
910
911   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
912   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
913   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
914   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
915   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
916
917   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
918   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
919   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
920   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
921
922   /* we keep these elements, we configure all in configure_transport when the
923    * server told us to really use the UDP ports. */
924   stream->udpsrc[0] = udpsrc0;
925   stream->udpsrc[1] = udpsrc1;
926   stream->udpsink[0] = udpsink0;
927   stream->udpsink[1] = udpsink1;
928   stream->server_port.min = rtpport;
929   stream->server_port.max = rtcpport;
930
931   return TRUE;
932
933   /* ERRORS */
934 no_udp_protocol:
935   {
936     goto cleanup;
937   }
938 no_ports:
939   {
940     goto cleanup;
941   }
942 no_udp_rtcp_protocol:
943   {
944     goto cleanup;
945   }
946 port_error:
947   {
948     goto cleanup;
949   }
950 cleanup:
951   {
952     if (udpsrc0) {
953       gst_element_set_state (udpsrc0, GST_STATE_NULL);
954       gst_object_unref (udpsrc0);
955     }
956     if (udpsrc1) {
957       gst_element_set_state (udpsrc1, GST_STATE_NULL);
958       gst_object_unref (udpsrc1);
959     }
960     if (udpsink0) {
961       gst_element_set_state (udpsink0, GST_STATE_NULL);
962       gst_object_unref (udpsink0);
963     }
964     if (udpsink1) {
965       gst_element_set_state (udpsink1, GST_STATE_NULL);
966       gst_object_unref (udpsink1);
967     }
968     return FALSE;
969   }
970 }
971
972 /* executed from streaming thread */
973 static void
974 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
975 {
976   gchar *capsstr;
977   GstCaps *newcaps, *oldcaps;
978
979   newcaps = gst_pad_get_current_caps (pad);
980
981   oldcaps = stream->caps;
982   stream->caps = newcaps;
983
984   if (oldcaps)
985     gst_caps_unref (oldcaps);
986
987   capsstr = gst_caps_to_string (newcaps);
988   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
989   g_free (capsstr);
990 }
991
992 static void
993 dump_structure (const GstStructure * s)
994 {
995   gchar *sstr;
996
997   sstr = gst_structure_to_string (s);
998   GST_INFO ("structure: %s", sstr);
999   g_free (sstr);
1000 }
1001
1002 static GstRTSPMediaTrans *
1003 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1004 {
1005   GList *walk;
1006   GstRTSPMediaTrans *result = NULL;
1007   const gchar *tmp;
1008   gchar *dest;
1009   guint port;
1010
1011   if (rtcp_from == NULL)
1012     return NULL;
1013
1014   tmp = g_strrstr (rtcp_from, ":");
1015   if (tmp == NULL)
1016     return NULL;
1017
1018   port = atoi (tmp + 1);
1019   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1020
1021   GST_INFO ("finding %s:%d", dest, port);
1022
1023   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1024     GstRTSPMediaTrans *trans = walk->data;
1025     gint min, max;
1026
1027     min = trans->transport->client_port.min;
1028     max = trans->transport->client_port.max;
1029
1030     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1031             || max == port)) {
1032       result = trans;
1033       break;
1034     }
1035   }
1036   g_free (dest);
1037
1038   return result;
1039 }
1040
1041 static void
1042 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1043 {
1044   GstStructure *stats;
1045   GstRTSPMediaTrans *trans;
1046
1047   GST_INFO ("%p: new source %p", stream, source);
1048
1049   /* see if we have a stream to match with the origin of the RTCP packet */
1050   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1051   if (trans == NULL) {
1052     g_object_get (source, "stats", &stats, NULL);
1053     if (stats) {
1054       const gchar *rtcp_from;
1055
1056       dump_structure (stats);
1057
1058       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1059       if ((trans = find_transport (stream, rtcp_from))) {
1060         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1061             source);
1062
1063         /* keep ref to the source */
1064         trans->rtpsource = source;
1065
1066         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1067       }
1068       gst_structure_free (stats);
1069     }
1070   } else {
1071     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1072   }
1073 }
1074
1075 static void
1076 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1077 {
1078   GST_INFO ("%p: new SDES %p", stream, source);
1079 }
1080
1081 static void
1082 on_ssrc_active (GObject * session, GObject * source,
1083     GstRTSPMediaStream * stream)
1084 {
1085   GstRTSPMediaTrans *trans;
1086
1087   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1088
1089   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1090
1091   if (trans && trans->keep_alive)
1092     trans->keep_alive (trans->ka_user_data);
1093
1094 #ifdef DUMP_STATS
1095   {
1096     GstStructure *stats;
1097     g_object_get (source, "stats", &stats, NULL);
1098     if (stats) {
1099       dump_structure (stats);
1100       gst_structure_free (stats);
1101     }
1102   }
1103 #endif
1104 }
1105
1106 static void
1107 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1108 {
1109   GST_INFO ("%p: source %p bye", stream, source);
1110 }
1111
1112 static void
1113 on_bye_timeout (GObject * session, GObject * source,
1114     GstRTSPMediaStream * stream)
1115 {
1116   GstRTSPMediaTrans *trans;
1117
1118   GST_INFO ("%p: source %p bye timeout", stream, source);
1119
1120   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1121     trans->rtpsource = NULL;
1122     trans->timeout = TRUE;
1123   }
1124 }
1125
1126 static void
1127 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1128 {
1129   GstRTSPMediaTrans *trans;
1130
1131   GST_INFO ("%p: source %p timeout", stream, source);
1132
1133   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1134     trans->rtpsource = NULL;
1135     trans->timeout = TRUE;
1136   }
1137 }
1138
1139 static GstFlowReturn
1140 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1141 {
1142   GList *walk;
1143   GstBuffer *buffer;
1144   GstRTSPMediaStream *stream;
1145
1146   buffer = gst_app_sink_pull_buffer (sink);
1147   if (!buffer)
1148     return GST_FLOW_OK;
1149
1150   stream = (GstRTSPMediaStream *) user_data;
1151
1152   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1153     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1154
1155     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1156       if (tr->send_rtp)
1157         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1158     } else {
1159       if (tr->send_rtcp)
1160         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1161     }
1162   }
1163   gst_buffer_unref (buffer);
1164
1165   return GST_FLOW_OK;
1166 }
1167
1168 static GstFlowReturn
1169 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1170 {
1171   GList *walk;
1172   GstBufferList *blist;
1173   GstRTSPMediaStream *stream;
1174
1175   blist = gst_app_sink_pull_buffer_list (sink);
1176   if (!blist)
1177     return GST_FLOW_OK;
1178
1179   stream = (GstRTSPMediaStream *) user_data;
1180
1181   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1182     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1183
1184     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1185       if (tr->send_rtp_list)
1186         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1187             tr->user_data);
1188     } else {
1189       if (tr->send_rtcp_list)
1190         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1191             tr->user_data);
1192     }
1193   }
1194   gst_buffer_list_unref (blist);
1195
1196   return GST_FLOW_OK;
1197 }
1198
1199 static GstAppSinkCallbacks sink_cb = {
1200   NULL,                         /* not interested in EOS */
1201   NULL,                         /* not interested in preroll buffers */
1202   handle_new_buffer,
1203   handle_new_buffer_list
1204 };
1205
1206 /* prepare the pipeline objects to handle @stream in @media */
1207 static gboolean
1208 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1209 {
1210   gchar *name;
1211   GstPad *pad, *teepad, *selpad;
1212   GstPadLinkReturn ret;
1213   gint i;
1214
1215   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1216    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1217    * elements */
1218   if (!alloc_udp_ports (media, stream))
1219     return FALSE;
1220
1221   /* add the ports to the pipeline */
1222   for (i = 0; i < 2; i++) {
1223     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1224     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1225   }
1226
1227   /* create elements for the TCP transfer */
1228   for (i = 0; i < 2; i++) {
1229     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1230     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1231     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1232     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1233     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1234     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1235     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1236     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1237         &sink_cb, stream, NULL);
1238   }
1239
1240   /* hook up the stream to the RTP session elements. */
1241   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1242   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1243   g_free (name);
1244   name = g_strdup_printf ("send_rtp_src_%d", idx);
1245   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1246   g_free (name);
1247   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1248   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1249   g_free (name);
1250   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1251   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1252   g_free (name);
1253   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1254   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1255   g_free (name);
1256
1257   /* get the session */
1258   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1259       &stream->session);
1260
1261   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1262       stream);
1263   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1264       stream);
1265   g_signal_connect (stream->session, "on-ssrc-active",
1266       (GCallback) on_ssrc_active, stream);
1267   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1268       stream);
1269   g_signal_connect (stream->session, "on-bye-timeout",
1270       (GCallback) on_bye_timeout, stream);
1271   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1272       stream);
1273
1274   /* link the RTP pad to the session manager */
1275   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1276   if (ret != GST_PAD_LINK_OK)
1277     goto link_failed;
1278
1279   /* make tee for RTP and link to stream */
1280   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1281   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1282
1283   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1284   gst_pad_link (stream->send_rtp_src, pad);
1285   gst_object_unref (pad);
1286
1287   /* link RTP sink, we're pretty sure this will work. */
1288   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1289   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1290   gst_pad_link (teepad, pad);
1291   gst_object_unref (pad);
1292   gst_object_unref (teepad);
1293
1294   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1295   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1296   gst_pad_link (teepad, pad);
1297   gst_object_unref (pad);
1298   gst_object_unref (teepad);
1299
1300   /* make tee for RTCP */
1301   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1302   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1303
1304   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1305   gst_pad_link (stream->send_rtcp_src, pad);
1306   gst_object_unref (pad);
1307
1308   /* link RTCP elements */
1309   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1310   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1311   gst_pad_link (teepad, pad);
1312   gst_object_unref (pad);
1313   gst_object_unref (teepad);
1314
1315   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1316   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1317   gst_pad_link (teepad, pad);
1318   gst_object_unref (pad);
1319   gst_object_unref (teepad);
1320
1321   /* make selector for the RTP receivers */
1322   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
1323   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1324
1325   pad = gst_element_get_static_pad (stream->selector[0], "src");
1326   gst_pad_link (pad, stream->recv_rtp_sink);
1327   gst_object_unref (pad);
1328
1329   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1330   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1331   gst_pad_link (pad, selpad);
1332   gst_object_unref (pad);
1333   gst_object_unref (selpad);
1334
1335   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1336   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1337   gst_pad_link (pad, selpad);
1338   gst_object_unref (pad);
1339   gst_object_unref (selpad);
1340
1341   /* make selector for the RTCP receivers */
1342   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
1343   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1344
1345   pad = gst_element_get_static_pad (stream->selector[1], "src");
1346   gst_pad_link (pad, stream->recv_rtcp_sink);
1347   gst_object_unref (pad);
1348
1349   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1350   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1351   gst_pad_link (pad, selpad);
1352   gst_object_unref (pad);
1353   gst_object_unref (selpad);
1354
1355   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1356   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1357   gst_pad_link (pad, selpad);
1358   gst_object_unref (pad);
1359   gst_object_unref (selpad);
1360
1361   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1362    * values */
1363   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1364   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1365   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1366   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1367
1368   /* be notified of caps changes */
1369   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1370       (GCallback) caps_notify, stream);
1371
1372   stream->prepared = TRUE;
1373
1374   return TRUE;
1375
1376   /* ERRORS */
1377 link_failed:
1378   {
1379     GST_WARNING ("failed to link stream %d", idx);
1380     return FALSE;
1381   }
1382 }
1383
1384 static void
1385 unlock_streams (GstRTSPMedia * media)
1386 {
1387   guint i, n_streams;
1388
1389   /* unlock the udp src elements */
1390   n_streams = gst_rtsp_media_n_streams (media);
1391   for (i = 0; i < n_streams; i++) {
1392     GstRTSPMediaStream *stream;
1393
1394     stream = gst_rtsp_media_get_stream (media, i);
1395
1396     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1397     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1398   }
1399 }
1400
1401 static void
1402 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1403 {
1404   g_mutex_lock (media->lock);
1405   /* never overwrite the error status */
1406   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1407     media->status = status;
1408   GST_DEBUG ("setting new status to %d", status);
1409   g_cond_broadcast (media->cond);
1410   g_mutex_unlock (media->lock);
1411 }
1412
1413 static GstRTSPMediaStatus
1414 gst_rtsp_media_get_status (GstRTSPMedia * media)
1415 {
1416   GstRTSPMediaStatus result;
1417   GTimeVal timeout;
1418
1419   g_mutex_lock (media->lock);
1420   g_get_current_time (&timeout);
1421   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1422   /* while we are preparing, wait */
1423   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1424     GST_DEBUG ("waiting for status change");
1425     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1426       GST_DEBUG ("timeout, assuming error status");
1427       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1428     }
1429   }
1430   /* could be success or error */
1431   result = media->status;
1432   GST_DEBUG ("got status %d", result);
1433   g_mutex_unlock (media->lock);
1434
1435   return result;
1436 }
1437
1438 static gboolean
1439 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1440 {
1441   GstMessageType type;
1442
1443   type = GST_MESSAGE_TYPE (message);
1444
1445   switch (type) {
1446     case GST_MESSAGE_STATE_CHANGED:
1447       break;
1448     case GST_MESSAGE_BUFFERING:
1449     {
1450       gint percent;
1451
1452       gst_message_parse_buffering (message, &percent);
1453
1454       /* no state management needed for live pipelines */
1455       if (media->is_live)
1456         break;
1457
1458       if (percent == 100) {
1459         /* a 100% message means buffering is done */
1460         media->buffering = FALSE;
1461         /* if the desired state is playing, go back */
1462         if (media->target_state == GST_STATE_PLAYING) {
1463           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1464           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1465         } else {
1466           GST_INFO ("Buffering done");
1467         }
1468       } else {
1469         /* buffering busy */
1470         if (media->buffering == FALSE) {
1471           if (media->target_state == GST_STATE_PLAYING) {
1472             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1473             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1474             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1475           } else {
1476             GST_INFO ("Buffering ...");
1477           }
1478         }
1479         media->buffering = TRUE;
1480       }
1481       break;
1482     }
1483     case GST_MESSAGE_LATENCY:
1484     {
1485       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1486       break;
1487     }
1488     case GST_MESSAGE_ERROR:
1489     {
1490       GError *gerror;
1491       gchar *debug;
1492
1493       gst_message_parse_error (message, &gerror, &debug);
1494       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1495       g_error_free (gerror);
1496       g_free (debug);
1497
1498       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1499       break;
1500     }
1501     case GST_MESSAGE_WARNING:
1502     {
1503       GError *gerror;
1504       gchar *debug;
1505
1506       gst_message_parse_warning (message, &gerror, &debug);
1507       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1508       g_error_free (gerror);
1509       g_free (debug);
1510       break;
1511     }
1512     case GST_MESSAGE_ELEMENT:
1513       break;
1514     case GST_MESSAGE_STREAM_STATUS:
1515       break;
1516     case GST_MESSAGE_ASYNC_DONE:
1517       if (!media->adding) {
1518         /* when we are dynamically adding pads, the addition of the udpsrc will
1519          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1520          * wait for the final ASYNC_DONE after everything prerolled */
1521         GST_INFO ("%p: got ASYNC_DONE", media);
1522         collect_media_stats (media);
1523
1524         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1525       } else {
1526         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1527       }
1528       break;
1529     case GST_MESSAGE_EOS:
1530       GST_INFO ("%p: got EOS", media);
1531       if (media->eos_pending) {
1532         GST_DEBUG ("shutting down after EOS");
1533         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1534         media->eos_pending = FALSE;
1535         g_object_unref (media);
1536       }
1537       break;
1538     default:
1539       GST_INFO ("%p: got message type %s", media,
1540           gst_message_type_get_name (type));
1541       break;
1542   }
1543   return TRUE;
1544 }
1545
1546 static gboolean
1547 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1548 {
1549   GstRTSPMediaClass *klass;
1550   gboolean ret;
1551
1552   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1553
1554   if (klass->handle_message)
1555     ret = klass->handle_message (media, message);
1556   else
1557     ret = FALSE;
1558
1559   return ret;
1560 }
1561
1562 /* called from streaming threads */
1563 static void
1564 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1565 {
1566   GstRTSPMediaStream *stream;
1567   gchar *name;
1568   gint i;
1569
1570   i = media->streams->len + 1;
1571
1572   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1573
1574   stream = g_new0 (GstRTSPMediaStream, 1);
1575   stream->payloader = element;
1576
1577   name = g_strdup_printf ("dynpay%d", i);
1578
1579   media->adding = TRUE;
1580
1581   /* ghost the pad of the payloader to the element */
1582   stream->srcpad = gst_ghost_pad_new (name, pad);
1583   gst_pad_set_active (stream->srcpad, TRUE);
1584   gst_element_add_pad (media->element, stream->srcpad);
1585   g_free (name);
1586
1587   /* add stream now */
1588   g_array_append_val (media->streams, stream);
1589
1590   setup_stream (stream, i, media);
1591
1592   for (i = 0; i < 2; i++) {
1593     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1594     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1595     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1596     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1597     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1598   }
1599   media->adding = FALSE;
1600 }
1601
1602 static void
1603 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1604 {
1605   GST_INFO ("no more pads");
1606   if (media->fakesink) {
1607     gst_object_ref (media->fakesink);
1608     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1609     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1610     gst_object_unref (media->fakesink);
1611     media->fakesink = NULL;
1612     GST_INFO ("removed fakesink");
1613   }
1614 }
1615
1616 /**
1617  * gst_rtsp_media_prepare:
1618  * @media: a #GstRTSPMedia
1619  *
1620  * Prepare @media for streaming. This function will create the pipeline and
1621  * other objects to manage the streaming.
1622  *
1623  * It will preroll the pipeline and collect vital information about the streams
1624  * such as the duration.
1625  *
1626  * Returns: %TRUE on success.
1627  */
1628 gboolean
1629 gst_rtsp_media_prepare (GstRTSPMedia * media)
1630 {
1631   GstStateChangeReturn ret;
1632   GstRTSPMediaStatus status;
1633   guint i, n_streams;
1634   GstRTSPMediaClass *klass;
1635   GstBus *bus;
1636   GList *walk;
1637
1638   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1639     goto was_prepared;
1640
1641   if (!media->reusable && media->reused)
1642     goto is_reused;
1643
1644   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1645   if (media->rtpbin == NULL)
1646     goto no_gstrtpbin;
1647
1648   GST_INFO ("preparing media %p", media);
1649
1650   /* reset some variables */
1651   media->is_live = FALSE;
1652   media->buffering = FALSE;
1653   /* we're preparing now */
1654   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1655
1656   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1657
1658   /* add the pipeline bus to our custom mainloop */
1659   media->source = gst_bus_create_watch (bus);
1660   gst_object_unref (bus);
1661
1662   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1663
1664   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1665   media->id = g_source_attach (media->source, klass->context);
1666
1667   /* add stuff to the bin */
1668   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1669
1670   /* link streams we already have, other streams might appear when we have
1671    * dynamic elements */
1672   n_streams = gst_rtsp_media_n_streams (media);
1673   for (i = 0; i < n_streams; i++) {
1674     GstRTSPMediaStream *stream;
1675
1676     stream = gst_rtsp_media_get_stream (media, i);
1677
1678     setup_stream (stream, i, media);
1679   }
1680
1681   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1682     GstElement *elem = walk->data;
1683
1684     GST_INFO ("adding callbacks for dynamic element %p", elem);
1685
1686     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1687     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1688
1689     /* we add a fakesink here in order to make the state change async. We remove
1690      * the fakesink again in the no-more-pads callback. */
1691     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1692     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1693   }
1694
1695   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1696   /* first go to PAUSED */
1697   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1698   media->target_state = GST_STATE_PAUSED;
1699
1700   switch (ret) {
1701     case GST_STATE_CHANGE_SUCCESS:
1702       GST_INFO ("SUCCESS state change for media %p", media);
1703       break;
1704     case GST_STATE_CHANGE_ASYNC:
1705       GST_INFO ("ASYNC state change for media %p", media);
1706       break;
1707     case GST_STATE_CHANGE_NO_PREROLL:
1708       /* we need to go to PLAYING */
1709       GST_INFO ("NO_PREROLL state change: live media %p", media);
1710       media->is_live = TRUE;
1711       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1712       if (ret == GST_STATE_CHANGE_FAILURE)
1713         goto state_failed;
1714       break;
1715     case GST_STATE_CHANGE_FAILURE:
1716       goto state_failed;
1717   }
1718
1719   /* now wait for all pads to be prerolled */
1720   status = gst_rtsp_media_get_status (media);
1721   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1722     goto state_failed;
1723
1724   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1725
1726   GST_INFO ("object %p is prerolled", media);
1727
1728   return TRUE;
1729
1730   /* OK */
1731 was_prepared:
1732   {
1733     return TRUE;
1734   }
1735   /* ERRORS */
1736 is_reused:
1737   {
1738     GST_WARNING ("can not reuse media %p", media);
1739     return FALSE;
1740   }
1741 no_gstrtpbin:
1742   {
1743     GST_WARNING ("no gstrtpbin element");
1744     g_warning ("failed to create element 'gstrtpbin', check your installation");
1745     return FALSE;
1746   }
1747 state_failed:
1748   {
1749     GST_WARNING ("failed to preroll pipeline");
1750     unlock_streams (media);
1751     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1752     gst_rtsp_media_unprepare (media);
1753     return FALSE;
1754   }
1755 }
1756
1757 /**
1758  * gst_rtsp_media_unprepare:
1759  * @media: a #GstRTSPMedia
1760  *
1761  * Unprepare @media. After this call, the media should be prepared again before
1762  * it can be used again. If the media is set to be non-reusable, a new instance
1763  * must be created.
1764  *
1765  * Returns: %TRUE on success.
1766  */
1767 gboolean
1768 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1769 {
1770   GstRTSPMediaClass *klass;
1771   gboolean success;
1772
1773   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1774     return TRUE;
1775
1776   GST_INFO ("unprepare media %p", media);
1777   media->target_state = GST_STATE_NULL;
1778
1779   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1780   if (klass->unprepare)
1781     success = klass->unprepare (media);
1782   else
1783     success = TRUE;
1784
1785   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1786   media->reused = TRUE;
1787
1788   /* when the media is not reusable, this will effectively unref the media and
1789    * recreate it */
1790   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1791
1792   return success;
1793 }
1794
1795 static gboolean
1796 default_unprepare (GstRTSPMedia * media)
1797 {
1798   if (media->eos_shutdown) {
1799     GST_DEBUG ("sending EOS for shutdown");
1800     /* ref so that we don't disappear */
1801     g_object_ref (media);
1802     media->eos_pending = TRUE;
1803     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1804     /* we need to go to playing again for the EOS to propagate, normally in this
1805      * state, nothing is receiving data from us anymore so this is ok. */
1806     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1807   } else {
1808     GST_DEBUG ("shutting down");
1809     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1810   }
1811   return TRUE;
1812 }
1813
1814 static void
1815 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1816     gchar * dest, gint min, gint max)
1817 {
1818   GST_INFO ("adding %s:%d-%d", dest, min, max);
1819   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1820   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1821 }
1822
1823 static void
1824 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1825     gchar * dest, gint min, gint max)
1826 {
1827   GST_INFO ("removing %s:%d-%d", dest, min, max);
1828   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1829   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1830 }
1831
1832 /**
1833  * gst_rtsp_media_set_state:
1834  * @media: a #GstRTSPMedia
1835  * @state: the target state of the media
1836  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1837  *
1838  * Set the state of @media to @state and for the transports in @transports.
1839  *
1840  * Returns: %TRUE on success.
1841  */
1842 gboolean
1843 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1844     GArray * transports)
1845 {
1846   gint i;
1847   gboolean add, remove, do_state;
1848   gint old_active;
1849
1850   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1851   g_return_val_if_fail (transports != NULL, FALSE);
1852
1853   /* NULL and READY are the same */
1854   if (state == GST_STATE_READY)
1855     state = GST_STATE_NULL;
1856
1857   add = remove = FALSE;
1858
1859   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1860       media);
1861
1862   switch (state) {
1863     case GST_STATE_NULL:
1864       /* unlock the streams so that they follow the state changes from now on */
1865       unlock_streams (media);
1866       /* fallthrough */
1867     case GST_STATE_PAUSED:
1868       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1869       if (media->target_state == GST_STATE_PLAYING)
1870         remove = TRUE;
1871       break;
1872     case GST_STATE_PLAYING:
1873       /* we're going to PLAYING, add */
1874       add = TRUE;
1875       break;
1876     default:
1877       break;
1878   }
1879   old_active = media->active;
1880
1881   for (i = 0; i < transports->len; i++) {
1882     GstRTSPMediaTrans *tr;
1883     GstRTSPMediaStream *stream;
1884     GstRTSPTransport *trans;
1885
1886     /* we need a non-NULL entry in the array */
1887     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1888     if (tr == NULL)
1889       continue;
1890
1891     /* we need a transport */
1892     if (!(trans = tr->transport))
1893       continue;
1894
1895     /* get the stream and add the destinations */
1896     stream = gst_rtsp_media_get_stream (media, tr->idx);
1897     switch (trans->lower_transport) {
1898       case GST_RTSP_LOWER_TRANS_UDP:
1899       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1900       {
1901         gchar *dest;
1902         gint min, max;
1903
1904         dest = trans->destination;
1905         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1906           min = trans->port.min;
1907           max = trans->port.max;
1908         } else {
1909           min = trans->client_port.min;
1910           max = trans->client_port.max;
1911         }
1912
1913         if (add && !tr->active) {
1914           add_udp_destination (media, stream, dest, min, max);
1915           stream->transports = g_list_prepend (stream->transports, tr);
1916           tr->active = TRUE;
1917           media->active++;
1918         } else if (remove && tr->active) {
1919           remove_udp_destination (media, stream, dest, min, max);
1920           stream->transports = g_list_remove (stream->transports, tr);
1921           tr->active = FALSE;
1922           media->active--;
1923         }
1924         break;
1925       }
1926       case GST_RTSP_LOWER_TRANS_TCP:
1927         if (add && !tr->active) {
1928           GST_INFO ("adding TCP %s", trans->destination);
1929           stream->transports = g_list_prepend (stream->transports, tr);
1930           tr->active = TRUE;
1931           media->active++;
1932         } else if (remove && tr->active) {
1933           GST_INFO ("removing TCP %s", trans->destination);
1934           stream->transports = g_list_remove (stream->transports, tr);
1935           tr->active = FALSE;
1936           media->active--;
1937         }
1938         break;
1939       default:
1940         GST_INFO ("Unknown transport %d", trans->lower_transport);
1941         break;
1942     }
1943   }
1944
1945   /* we just added the first media, do the playing state change */
1946   if (old_active == 0 && add)
1947     do_state = TRUE;
1948   /* if we have no more active media, do the downward state changes */
1949   else if (media->active == 0)
1950     do_state = TRUE;
1951   else
1952     do_state = FALSE;
1953
1954   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
1955       media, do_state);
1956
1957   if (media->target_state != state) {
1958     if (do_state) {
1959       if (state == GST_STATE_NULL) {
1960         gst_rtsp_media_unprepare (media);
1961       } else {
1962         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
1963             media);
1964         media->target_state = state;
1965         gst_element_set_state (media->pipeline, state);
1966       }
1967     }
1968     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
1969         NULL);
1970   }
1971
1972   /* remember where we are */
1973   if (state == GST_STATE_PAUSED || old_active != media->active)
1974     collect_media_stats (media);
1975
1976   return TRUE;
1977 }
1978
1979 /**
1980  * gst_rtsp_media_remove_elements:
1981  * @media: a #GstRTSPMedia
1982  *
1983  * Remove all elements and the pipeline controlled by @media.
1984  */
1985 void
1986 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1987 {
1988   gint i, j;
1989
1990   unlock_streams (media);
1991
1992   for (i = 0; i < media->streams->len; i++) {
1993     GstRTSPMediaStream *stream;
1994
1995     GST_INFO ("Removing elements of stream %d from pipeline", i);
1996
1997     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1998
1999     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2000
2001     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2002
2003     for (j = 0; j < 2; j++) {
2004       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2005       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2006       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2007       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2008       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2009       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2010
2011       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2012       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2013       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2014       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2015       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2016       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2017     }
2018     if (stream->caps)
2019       gst_caps_unref (stream->caps);
2020     stream->caps = NULL;
2021     gst_rtsp_media_stream_free (stream);
2022   }
2023   g_array_remove_range (media->streams, 0, media->streams->len);
2024
2025   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2026   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2027
2028   gst_object_unref (media->pipeline);
2029   media->pipeline = NULL;
2030 }