fix compiler warnings about unused variables
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-funnel.h"
27 #include "rtsp-media.h"
28
29 #define DEFAULT_SHARED         FALSE
30 #define DEFAULT_REUSABLE       FALSE
31 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
32 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
33 #define DEFAULT_EOS_SHUTDOWN   FALSE
34 #define DEFAULT_BUFFER_SIZE    0x80000
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_LAST
48 };
49
50 enum
51 {
52   SIGNAL_PREPARED,
53   SIGNAL_UNPREPARED,
54   SIGNAL_NEW_STATE,
55   SIGNAL_LAST
56 };
57
58 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
59 #define GST_CAT_DEFAULT rtsp_media_debug
60
61 static GQuark ssrc_stream_map_key;
62
63 static void gst_rtsp_media_get_property (GObject * object, guint propid,
64     GValue * value, GParamSpec * pspec);
65 static void gst_rtsp_media_set_property (GObject * object, guint propid,
66     const GValue * value, GParamSpec * pspec);
67 static void gst_rtsp_media_finalize (GObject * obj);
68
69 static gpointer do_loop (GstRTSPMediaClass * klass);
70 static gboolean default_handle_message (GstRTSPMedia * media,
71     GstMessage * message);
72 static gboolean default_unprepare (GstRTSPMedia * media);
73 static void unlock_streams (GstRTSPMedia * media);
74
75 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
76
77 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
78
79 static void
80 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
81 {
82   GObjectClass *gobject_class;
83   GError *error = NULL;
84
85   gobject_class = G_OBJECT_CLASS (klass);
86
87   gobject_class->get_property = gst_rtsp_media_get_property;
88   gobject_class->set_property = gst_rtsp_media_set_property;
89   gobject_class->finalize = gst_rtsp_media_finalize;
90
91   g_object_class_install_property (gobject_class, PROP_SHARED,
92       g_param_spec_boolean ("shared", "Shared",
93           "If this media pipeline can be shared", DEFAULT_SHARED,
94           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
95
96   g_object_class_install_property (gobject_class, PROP_REUSABLE,
97       g_param_spec_boolean ("reusable", "Reusable",
98           "If this media pipeline can be reused after an unprepare",
99           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100
101   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
102       g_param_spec_flags ("protocols", "Protocols",
103           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
104           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105
106   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
107       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
108           "Send an EOS event to the pipeline before unpreparing",
109           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
110
111   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
112       g_param_spec_uint ("buffer-size", "Buffer Size",
113           "The kernel UDP buffer size to use", 0, G_MAXUINT,
114           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115
116   gst_rtsp_media_signals[SIGNAL_PREPARED] =
117       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
118       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
119       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
120
121   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
122       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
123       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
124       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
125
126   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
127       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
128       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
129       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
130
131   klass->context = g_main_context_new ();
132   klass->loop = g_main_loop_new (klass->context, TRUE);
133
134   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
135
136   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
137   if (error != NULL) {
138     g_critical ("could not start bus thread: %s", error->message);
139   }
140   klass->handle_message = default_handle_message;
141   klass->unprepare = default_unprepare;
142
143   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
144
145   gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL);
146
147 }
148
149 static void
150 gst_rtsp_media_init (GstRTSPMedia * media)
151 {
152   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
153   media->lock = g_mutex_new ();
154   media->cond = g_cond_new ();
155
156   media->shared = DEFAULT_SHARED;
157   media->reusable = DEFAULT_REUSABLE;
158   media->protocols = DEFAULT_PROTOCOLS;
159   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
160   media->buffer_size = DEFAULT_BUFFER_SIZE;
161 }
162
163 void
164 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
165 {
166   if (trans->transport) {
167     gst_rtsp_transport_free (trans->transport);
168     trans->transport = NULL;
169   }
170   if (trans->rtpsource) {
171     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
172     trans->rtpsource = NULL;
173   }
174 }
175
176 static void
177 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
178 {
179   if (stream->session)
180     g_object_unref (stream->session);
181
182   if (stream->caps)
183     gst_caps_unref (stream->caps);
184
185   if (stream->send_rtp_sink)
186     gst_object_unref (stream->send_rtp_sink);
187   if (stream->send_rtp_src)
188     gst_object_unref (stream->send_rtp_src);
189   if (stream->send_rtcp_src)
190     gst_object_unref (stream->send_rtcp_src);
191   if (stream->recv_rtcp_sink)
192     gst_object_unref (stream->recv_rtcp_sink);
193   if (stream->recv_rtp_sink)
194     gst_object_unref (stream->recv_rtp_sink);
195
196   g_list_free (stream->transports);
197
198   g_free (stream);
199 }
200
201 static void
202 gst_rtsp_media_finalize (GObject * obj)
203 {
204   GstRTSPMedia *media;
205   guint i;
206
207   media = GST_RTSP_MEDIA (obj);
208
209   GST_INFO ("finalize media %p", media);
210
211   if (media->pipeline) {
212     unlock_streams (media);
213     gst_element_set_state (media->pipeline, GST_STATE_NULL);
214     gst_object_unref (media->pipeline);
215   }
216
217   for (i = 0; i < media->streams->len; i++) {
218     GstRTSPMediaStream *stream;
219
220     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
221
222     gst_rtsp_media_stream_free (stream);
223   }
224   g_array_free (media->streams, TRUE);
225
226   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
227   g_list_free (media->dynamic);
228
229   if (media->source) {
230     g_source_destroy (media->source);
231     g_source_unref (media->source);
232   }
233   g_mutex_free (media->lock);
234   g_cond_free (media->cond);
235
236   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
237 }
238
239 static void
240 gst_rtsp_media_get_property (GObject * object, guint propid,
241     GValue * value, GParamSpec * pspec)
242 {
243   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
244
245   switch (propid) {
246     case PROP_SHARED:
247       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
248       break;
249     case PROP_REUSABLE:
250       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
251       break;
252     case PROP_PROTOCOLS:
253       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
254       break;
255     case PROP_EOS_SHUTDOWN:
256       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
257       break;
258     case PROP_BUFFER_SIZE:
259       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
260       break;
261     default:
262       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
263   }
264 }
265
266 static void
267 gst_rtsp_media_set_property (GObject * object, guint propid,
268     const GValue * value, GParamSpec * pspec)
269 {
270   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
271
272   switch (propid) {
273     case PROP_SHARED:
274       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
275       break;
276     case PROP_REUSABLE:
277       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
278       break;
279     case PROP_PROTOCOLS:
280       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
281       break;
282     case PROP_EOS_SHUTDOWN:
283       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
284       break;
285     case PROP_BUFFER_SIZE:
286       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
287       break;
288     default:
289       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
290   }
291 }
292
293 static gpointer
294 do_loop (GstRTSPMediaClass * klass)
295 {
296   GST_INFO ("enter mainloop");
297   g_main_loop_run (klass->loop);
298   GST_INFO ("exit mainloop");
299
300   return NULL;
301 }
302
303 static void
304 collect_media_stats (GstRTSPMedia * media)
305 {
306   GstFormat format;
307   gint64 position, duration;
308
309   media->range.unit = GST_RTSP_RANGE_NPT;
310
311   if (media->is_live) {
312     media->range.min.type = GST_RTSP_TIME_NOW;
313     media->range.min.seconds = -1;
314     media->range.max.type = GST_RTSP_TIME_END;
315     media->range.max.seconds = -1;
316   } else {
317     /* get the position */
318     format = GST_FORMAT_TIME;
319     if (!gst_element_query_position (media->pipeline, &format, &position)) {
320       GST_INFO ("position query failed");
321       position = 0;
322     }
323
324     /* get the duration */
325     format = GST_FORMAT_TIME;
326     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
327       GST_INFO ("duration query failed");
328       duration = -1;
329     }
330
331     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
332         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
333
334     if (position == -1) {
335       media->range.min.type = GST_RTSP_TIME_NOW;
336       media->range.min.seconds = -1;
337     } else {
338       media->range.min.type = GST_RTSP_TIME_SECONDS;
339       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
340     }
341     if (duration == -1) {
342       media->range.max.type = GST_RTSP_TIME_END;
343       media->range.max.seconds = -1;
344     } else {
345       media->range.max.type = GST_RTSP_TIME_SECONDS;
346       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
347     }
348   }
349 }
350
351 /**
352  * gst_rtsp_media_new:
353  *
354  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
355  * element to produde RTP data for one or more related (audio/video/..) 
356  * streams.
357  *
358  * Returns: a new #GstRTSPMedia object.
359  */
360 GstRTSPMedia *
361 gst_rtsp_media_new (void)
362 {
363   GstRTSPMedia *result;
364
365   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
366
367   return result;
368 }
369
370 /**
371  * gst_rtsp_media_set_shared:
372  * @media: a #GstRTSPMedia
373  * @shared: the new value
374  *
375  * Set or unset if the pipeline for @media can be shared will multiple clients.
376  * When @shared is %TRUE, client requests for this media will share the media
377  * pipeline.
378  */
379 void
380 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
381 {
382   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
383
384   media->shared = shared;
385 }
386
387 /**
388  * gst_rtsp_media_is_shared:
389  * @media: a #GstRTSPMedia
390  *
391  * Check if the pipeline for @media can be shared between multiple clients.
392  *
393  * Returns: %TRUE if the media can be shared between clients.
394  */
395 gboolean
396 gst_rtsp_media_is_shared (GstRTSPMedia * media)
397 {
398   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
399
400   return media->shared;
401 }
402
403 /**
404  * gst_rtsp_media_set_reusable:
405  * @media: a #GstRTSPMedia
406  * @reusable: the new value
407  *
408  * Set or unset if the pipeline for @media can be reused after the pipeline has
409  * been unprepared.
410  */
411 void
412 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
413 {
414   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
415
416   media->reusable = reusable;
417 }
418
419 /**
420  * gst_rtsp_media_is_reusable:
421  * @media: a #GstRTSPMedia
422  *
423  * Check if the pipeline for @media can be reused after an unprepare.
424  *
425  * Returns: %TRUE if the media can be reused
426  */
427 gboolean
428 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
429 {
430   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
431
432   return media->reusable;
433 }
434
435 /**
436  * gst_rtsp_media_set_protocols:
437  * @media: a #GstRTSPMedia
438  * @protocols: the new flags
439  *
440  * Configure the allowed lower transport for @media.
441  */
442 void
443 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
444 {
445   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
446
447   media->protocols = protocols;
448 }
449
450 /**
451  * gst_rtsp_media_get_protocols:
452  * @media: a #GstRTSPMedia
453  *
454  * Get the allowed protocols of @media.
455  *
456  * Returns: a #GstRTSPLowerTrans
457  */
458 GstRTSPLowerTrans
459 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
460 {
461   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
462       GST_RTSP_LOWER_TRANS_UNKNOWN);
463
464   return media->protocols;
465 }
466
467 /**
468  * gst_rtsp_media_set_eos_shutdown:
469  * @media: a #GstRTSPMedia
470  * @eos_shutdown: the new value
471  *
472  * Set or unset if an EOS event will be sent to the pipeline for @media before
473  * it is unprepared.
474  */
475 void
476 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
477 {
478   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
479
480   media->eos_shutdown = eos_shutdown;
481 }
482
483 /**
484  * gst_rtsp_media_is_eos_shutdown:
485  * @media: a #GstRTSPMedia
486  *
487  * Check if the pipeline for @media will send an EOS down the pipeline before
488  * unpreparing.
489  *
490  * Returns: %TRUE if the media will send EOS before unpreparing.
491  */
492 gboolean
493 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
494 {
495   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
496
497   return media->eos_shutdown;
498 }
499
500 /**
501  * gst_rtsp_media_set_buffer_size:
502  * @media: a #GstRTSPMedia
503  * @size: the new value
504  *
505  * Set the kernel UDP buffer size.
506  */
507 void
508 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
509 {
510   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
511
512   media->buffer_size = size;
513 }
514
515 /**
516  * gst_rtsp_media_get_buffer_size:
517  * @media: a #GstRTSPMedia
518  *
519  * Get the kernel UDP buffer size.
520  *
521  * Returns: the kernel UDP buffer size.
522  */
523 guint
524 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
525 {
526   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
527
528   return media->buffer_size;
529 }
530
531 /**
532  * gst_rtsp_media_set_auth:
533  * @media: a #GstRTSPMedia
534  * @auth: a #GstRTSPAuth
535  *
536  * configure @auth to be used as the authentication manager of @media.
537  */
538 void
539 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
540 {
541   GstRTSPAuth *old;
542
543   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
544
545   old = media->auth;
546
547   if (old != auth) {
548     if (auth)
549       g_object_ref (auth);
550     media->auth = auth;
551     if (old)
552       g_object_unref (old);
553   }
554 }
555
556 /**
557  * gst_rtsp_media_get_auth:
558  * @media: a #GstRTSPMedia
559  *
560  * Get the #GstRTSPAuth used as the authentication manager of @media.
561  *
562  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
563  * usage.
564  */
565 GstRTSPAuth *
566 gst_rtsp_media_get_auth (GstRTSPMedia * media)
567 {
568   GstRTSPAuth *result;
569
570   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
571
572   if ((result = media->auth))
573     g_object_ref (result);
574
575   return result;
576 }
577
578
579 /**
580  * gst_rtsp_media_n_streams:
581  * @media: a #GstRTSPMedia
582  *
583  * Get the number of streams in this media.
584  *
585  * Returns: The number of streams.
586  */
587 guint
588 gst_rtsp_media_n_streams (GstRTSPMedia * media)
589 {
590   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
591
592   return media->streams->len;
593 }
594
595 /**
596  * gst_rtsp_media_get_stream:
597  * @media: a #GstRTSPMedia
598  * @idx: the stream index
599  *
600  * Retrieve the stream with index @idx from @media.
601  *
602  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
603  * that index did not exist.
604  */
605 GstRTSPMediaStream *
606 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
607 {
608   GstRTSPMediaStream *res;
609
610   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
611
612   if (idx < media->streams->len)
613     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
614   else
615     res = NULL;
616
617   return res;
618 }
619
620 /**
621  * gst_rtsp_media_get_range_string:
622  * @media: a #GstRTSPMedia
623  * @play: for the PLAY request
624  *
625  * Get the current range as a string.
626  *
627  * Returns: The range as a string, g_free() after usage.
628  */
629 gchar *
630 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
631 {
632   gchar *result;
633   GstRTSPTimeRange range;
634
635   /* make copy */
636   range = media->range;
637
638   if (!play && media->active > 0) {
639     range.min.type = GST_RTSP_TIME_NOW;
640     range.min.seconds = -1;
641   }
642
643   result = gst_rtsp_range_to_string (&range);
644
645   return result;
646 }
647
648 /**
649  * gst_rtsp_media_seek:
650  * @media: a #GstRTSPMedia
651  * @range: a #GstRTSPTimeRange
652  *
653  * Seek the pipeline to @range.
654  *
655  * Returns: %TRUE on success.
656  */
657 gboolean
658 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
659 {
660   GstSeekFlags flags;
661   gboolean res;
662   gint64 start, stop;
663   GstSeekType start_type, stop_type;
664
665   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
666   g_return_val_if_fail (range != NULL, FALSE);
667
668   if (range->unit != GST_RTSP_RANGE_NPT)
669     goto not_supported;
670
671   /* depends on the current playing state of the pipeline. We might need to
672    * queue this until we get EOS. */
673   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
674
675   start_type = stop_type = GST_SEEK_TYPE_NONE;
676
677   switch (range->min.type) {
678     case GST_RTSP_TIME_NOW:
679       start = -1;
680       break;
681     case GST_RTSP_TIME_SECONDS:
682       /* only seek when something changed */
683       if (media->range.min.seconds == range->min.seconds) {
684         start = -1;
685       } else {
686         start = range->min.seconds * GST_SECOND;
687         start_type = GST_SEEK_TYPE_SET;
688       }
689       break;
690     case GST_RTSP_TIME_END:
691     default:
692       goto weird_type;
693   }
694   switch (range->max.type) {
695     case GST_RTSP_TIME_SECONDS:
696       /* only seek when something changed */
697       if (media->range.max.seconds == range->max.seconds) {
698         stop = -1;
699       } else {
700         stop = range->max.seconds * GST_SECOND;
701         stop_type = GST_SEEK_TYPE_SET;
702       }
703       break;
704     case GST_RTSP_TIME_END:
705       stop = -1;
706       stop_type = GST_SEEK_TYPE_SET;
707       break;
708     case GST_RTSP_TIME_NOW:
709     default:
710       goto weird_type;
711   }
712
713   if (start != -1 || stop != -1) {
714     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
715         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
716
717     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
718         flags, start_type, start, stop_type, stop);
719
720     /* and block for the seek to complete */
721     GST_INFO ("done seeking %d", res);
722     gst_element_get_state (media->pipeline, NULL, NULL, -1);
723     GST_INFO ("prerolled again");
724
725     collect_media_stats (media);
726   } else {
727     GST_INFO ("no seek needed");
728     res = TRUE;
729   }
730
731   return res;
732
733   /* ERRORS */
734 not_supported:
735   {
736     GST_WARNING ("seek unit %d not supported", range->unit);
737     return FALSE;
738   }
739 weird_type:
740   {
741     GST_WARNING ("weird range type %d not supported", range->min.type);
742     return FALSE;
743   }
744 }
745
746 /**
747  * gst_rtsp_media_stream_rtp:
748  * @stream: a #GstRTSPMediaStream
749  * @buffer: a #GstBuffer
750  *
751  * Handle an RTP buffer for the stream. This method is usually called when a
752  * message has been received from a client using the TCP transport.
753  *
754  * This function takes ownership of @buffer.
755  *
756  * Returns: a GstFlowReturn.
757  */
758 GstFlowReturn
759 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
760 {
761   GstFlowReturn ret;
762
763   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
764
765   return ret;
766 }
767
768 /**
769  * gst_rtsp_media_stream_rtcp:
770  * @stream: a #GstRTSPMediaStream
771  * @buffer: a #GstBuffer
772  *
773  * Handle an RTCP buffer for the stream. This method is usually called when a
774  * message has been received from a client using the TCP transport.
775  *
776  * This function takes ownership of @buffer.
777  *
778  * Returns: a GstFlowReturn.
779  */
780 GstFlowReturn
781 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
782 {
783   GstFlowReturn ret;
784
785   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
786
787   return ret;
788 }
789
790 /* Allocate the udp ports and sockets */
791 static gboolean
792 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
793 {
794   GstStateChangeReturn ret;
795   GstElement *udpsrc0, *udpsrc1;
796   GstElement *udpsink0, *udpsink1;
797   gint tmp_rtp, tmp_rtcp;
798   guint count;
799   gint rtpport, rtcpport, sockfd;
800   const gchar *host;
801
802   udpsrc0 = NULL;
803   udpsrc1 = NULL;
804   udpsink0 = NULL;
805   udpsink1 = NULL;
806   count = 0;
807
808   /* Start with random port */
809   tmp_rtp = 0;
810
811   if (media->is_ipv6)
812     host = "udp://[::0]";
813   else
814     host = "udp://0.0.0.0";
815
816   /* try to allocate 2 UDP ports, the RTP port should be an even
817    * number and the RTCP port should be the next (uneven) port */
818 again:
819   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
820   if (udpsrc0 == NULL)
821     goto no_udp_protocol;
822   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
823
824   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
825   if (ret == GST_STATE_CHANGE_FAILURE) {
826     if (tmp_rtp != 0) {
827       tmp_rtp += 2;
828       if (++count > 20)
829         goto no_ports;
830
831       gst_element_set_state (udpsrc0, GST_STATE_NULL);
832       gst_object_unref (udpsrc0);
833
834       goto again;
835     }
836     goto no_udp_protocol;
837   }
838
839   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
840
841   /* check if port is even */
842   if ((tmp_rtp & 1) != 0) {
843     /* port not even, close and allocate another */
844     if (++count > 20)
845       goto no_ports;
846
847     gst_element_set_state (udpsrc0, GST_STATE_NULL);
848     gst_object_unref (udpsrc0);
849
850     tmp_rtp++;
851     goto again;
852   }
853
854   /* allocate port+1 for RTCP now */
855   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
856   if (udpsrc1 == NULL)
857     goto no_udp_rtcp_protocol;
858
859   /* set port */
860   tmp_rtcp = tmp_rtp + 1;
861   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
862
863   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
864   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
865   if (ret == GST_STATE_CHANGE_FAILURE) {
866
867     if (++count > 20)
868       goto no_ports;
869
870     gst_element_set_state (udpsrc0, GST_STATE_NULL);
871     gst_object_unref (udpsrc0);
872
873     gst_element_set_state (udpsrc1, GST_STATE_NULL);
874     gst_object_unref (udpsrc1);
875
876     tmp_rtp += 2;
877     goto again;
878   }
879
880   /* all fine, do port check */
881   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
882   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
883
884   /* this should not happen... */
885   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
886     goto port_error;
887
888   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
889   if (!udpsink0)
890     goto no_udp_protocol;
891
892   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
893   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
894   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
895
896   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
897   if (!udpsink1)
898     goto no_udp_protocol;
899
900   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
901           "send-duplicates")) {
902     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
903     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
904   } else {
905     g_warning
906         ("old multiudpsink version found without send-duplicates property");
907   }
908
909   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
910           "buffer-size")) {
911     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
912   } else {
913     GST_WARNING ("multiudpsink version found without buffer-size property");
914   }
915
916   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
917   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
918   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
919   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
920   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
921
922   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
923   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
924   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
925   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
926
927   /* we keep these elements, we configure all in configure_transport when the
928    * server told us to really use the UDP ports. */
929   stream->udpsrc[0] = udpsrc0;
930   stream->udpsrc[1] = udpsrc1;
931   stream->udpsink[0] = udpsink0;
932   stream->udpsink[1] = udpsink1;
933   stream->server_port.min = rtpport;
934   stream->server_port.max = rtcpport;
935
936   return TRUE;
937
938   /* ERRORS */
939 no_udp_protocol:
940   {
941     goto cleanup;
942   }
943 no_ports:
944   {
945     goto cleanup;
946   }
947 no_udp_rtcp_protocol:
948   {
949     goto cleanup;
950   }
951 port_error:
952   {
953     goto cleanup;
954   }
955 cleanup:
956   {
957     if (udpsrc0) {
958       gst_element_set_state (udpsrc0, GST_STATE_NULL);
959       gst_object_unref (udpsrc0);
960     }
961     if (udpsrc1) {
962       gst_element_set_state (udpsrc1, GST_STATE_NULL);
963       gst_object_unref (udpsrc1);
964     }
965     if (udpsink0) {
966       gst_element_set_state (udpsink0, GST_STATE_NULL);
967       gst_object_unref (udpsink0);
968     }
969     if (udpsink1) {
970       gst_element_set_state (udpsink1, GST_STATE_NULL);
971       gst_object_unref (udpsink1);
972     }
973     return FALSE;
974   }
975 }
976
977 /* executed from streaming thread */
978 static void
979 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
980 {
981   gchar *capsstr;
982   GstCaps *newcaps, *oldcaps;
983
984   if ((newcaps = GST_PAD_CAPS (pad)))
985     gst_caps_ref (newcaps);
986
987   oldcaps = stream->caps;
988   stream->caps = newcaps;
989
990   if (oldcaps)
991     gst_caps_unref (oldcaps);
992
993   capsstr = gst_caps_to_string (newcaps);
994   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
995   g_free (capsstr);
996 }
997
998 static void
999 dump_structure (const GstStructure * s)
1000 {
1001   gchar *sstr;
1002
1003   sstr = gst_structure_to_string (s);
1004   GST_INFO ("structure: %s", sstr);
1005   g_free (sstr);
1006 }
1007
1008 static GstRTSPMediaTrans *
1009 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1010 {
1011   GList *walk;
1012   GstRTSPMediaTrans *result = NULL;
1013   const gchar *tmp;
1014   gchar *dest;
1015   guint port;
1016
1017   if (rtcp_from == NULL)
1018     return NULL;
1019
1020   tmp = g_strrstr (rtcp_from, ":");
1021   if (tmp == NULL)
1022     return NULL;
1023
1024   port = atoi (tmp + 1);
1025   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1026
1027   GST_INFO ("finding %s:%d", dest, port);
1028
1029   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1030     GstRTSPMediaTrans *trans = walk->data;
1031     gint min, max;
1032
1033     min = trans->transport->client_port.min;
1034     max = trans->transport->client_port.max;
1035
1036     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1037             || max == port)) {
1038       result = trans;
1039       break;
1040     }
1041   }
1042   g_free (dest);
1043
1044   return result;
1045 }
1046
1047 static void
1048 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1049 {
1050   GstStructure *stats;
1051   GstRTSPMediaTrans *trans;
1052
1053   GST_INFO ("%p: new source %p", stream, source);
1054
1055   /* see if we have a stream to match with the origin of the RTCP packet */
1056   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1057   if (trans == NULL) {
1058     g_object_get (source, "stats", &stats, NULL);
1059     if (stats) {
1060       const gchar *rtcp_from;
1061
1062       dump_structure (stats);
1063
1064       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1065       if ((trans = find_transport (stream, rtcp_from))) {
1066         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1067             source);
1068
1069         /* keep ref to the source */
1070         trans->rtpsource = source;
1071
1072         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1073       }
1074       gst_structure_free (stats);
1075     }
1076   } else {
1077     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1078   }
1079 }
1080
1081 static void
1082 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1083 {
1084   GST_INFO ("%p: new SDES %p", stream, source);
1085 }
1086
1087 static void
1088 on_ssrc_active (GObject * session, GObject * source,
1089     GstRTSPMediaStream * stream)
1090 {
1091   GstRTSPMediaTrans *trans;
1092
1093   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1094
1095   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1096
1097   if (trans && trans->keep_alive)
1098     trans->keep_alive (trans->ka_user_data);
1099
1100 #ifdef DUMP_STATS
1101   {
1102     GstStructure *stats;
1103     g_object_get (source, "stats", &stats, NULL);
1104     if (stats) {
1105       dump_structure (stats);
1106       gst_structure_free (stats);
1107     }
1108   }
1109 #endif
1110 }
1111
1112 static void
1113 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1114 {
1115   GST_INFO ("%p: source %p bye", stream, source);
1116 }
1117
1118 static void
1119 on_bye_timeout (GObject * session, GObject * source,
1120     GstRTSPMediaStream * stream)
1121 {
1122   GstRTSPMediaTrans *trans;
1123
1124   GST_INFO ("%p: source %p bye timeout", stream, source);
1125
1126   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1127     trans->rtpsource = NULL;
1128     trans->timeout = TRUE;
1129   }
1130 }
1131
1132 static void
1133 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1134 {
1135   GstRTSPMediaTrans *trans;
1136
1137   GST_INFO ("%p: source %p timeout", stream, source);
1138
1139   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1140     trans->rtpsource = NULL;
1141     trans->timeout = TRUE;
1142   }
1143 }
1144
1145 static GstFlowReturn
1146 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1147 {
1148   GList *walk;
1149   GstBuffer *buffer;
1150   GstRTSPMediaStream *stream;
1151
1152   buffer = gst_app_sink_pull_buffer (sink);
1153   if (!buffer)
1154     return GST_FLOW_OK;
1155
1156   stream = (GstRTSPMediaStream *) user_data;
1157
1158   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1159     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1160
1161     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1162       if (tr->send_rtp)
1163         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1164     } else {
1165       if (tr->send_rtcp)
1166         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1167     }
1168   }
1169   gst_buffer_unref (buffer);
1170
1171   return GST_FLOW_OK;
1172 }
1173
1174 static GstFlowReturn
1175 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1176 {
1177   GList *walk;
1178   GstBufferList *blist;
1179   GstRTSPMediaStream *stream;
1180
1181   blist = gst_app_sink_pull_buffer_list (sink);
1182   if (!blist)
1183     return GST_FLOW_OK;
1184
1185   stream = (GstRTSPMediaStream *) user_data;
1186
1187   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1188     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1189
1190     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1191       if (tr->send_rtp_list)
1192         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1193             tr->user_data);
1194     } else {
1195       if (tr->send_rtcp_list)
1196         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1197             tr->user_data);
1198     }
1199   }
1200   gst_buffer_list_unref (blist);
1201
1202   return GST_FLOW_OK;
1203 }
1204
1205 static GstAppSinkCallbacks sink_cb = {
1206   NULL,                         /* not interested in EOS */
1207   NULL,                         /* not interested in preroll buffers */
1208   handle_new_buffer,
1209   handle_new_buffer_list
1210 };
1211
1212 /* prepare the pipeline objects to handle @stream in @media */
1213 static gboolean
1214 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1215 {
1216   gchar *name;
1217   GstPad *pad, *teepad, *selpad;
1218   GstPadLinkReturn ret;
1219   gint i;
1220
1221   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1222    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1223    * elements */
1224   if (!alloc_udp_ports (media, stream))
1225     return FALSE;
1226
1227   /* add the ports to the pipeline */
1228   for (i = 0; i < 2; i++) {
1229     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1230     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1231   }
1232
1233   /* create elements for the TCP transfer */
1234   for (i = 0; i < 2; i++) {
1235     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1236     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1237     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1238     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1239     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1240     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1241     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1242     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1243         &sink_cb, stream, NULL);
1244   }
1245
1246   /* hook up the stream to the RTP session elements. */
1247   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1248   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1249   g_free (name);
1250   name = g_strdup_printf ("send_rtp_src_%d", idx);
1251   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1252   g_free (name);
1253   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1254   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1255   g_free (name);
1256   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1257   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1258   g_free (name);
1259   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1260   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1261   g_free (name);
1262
1263   /* get the session */
1264   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1265       &stream->session);
1266
1267   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1268       stream);
1269   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1270       stream);
1271   g_signal_connect (stream->session, "on-ssrc-active",
1272       (GCallback) on_ssrc_active, stream);
1273   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1274       stream);
1275   g_signal_connect (stream->session, "on-bye-timeout",
1276       (GCallback) on_bye_timeout, stream);
1277   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1278       stream);
1279
1280   /* link the RTP pad to the session manager */
1281   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1282   if (ret != GST_PAD_LINK_OK)
1283     goto link_failed;
1284
1285   /* make tee for RTP and link to stream */
1286   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1287   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1288
1289   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1290   gst_pad_link (stream->send_rtp_src, pad);
1291   gst_object_unref (pad);
1292
1293   /* link RTP sink, we're pretty sure this will work. */
1294   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1295   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1296   gst_pad_link (teepad, pad);
1297   gst_object_unref (pad);
1298   gst_object_unref (teepad);
1299
1300   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1301   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1302   gst_pad_link (teepad, pad);
1303   gst_object_unref (pad);
1304   gst_object_unref (teepad);
1305
1306   /* make tee for RTCP */
1307   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1308   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1309
1310   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1311   gst_pad_link (stream->send_rtcp_src, pad);
1312   gst_object_unref (pad);
1313
1314   /* link RTCP elements */
1315   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1316   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1317   gst_pad_link (teepad, pad);
1318   gst_object_unref (pad);
1319   gst_object_unref (teepad);
1320
1321   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1322   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1323   gst_pad_link (teepad, pad);
1324   gst_object_unref (pad);
1325   gst_object_unref (teepad);
1326
1327   /* make selector for the RTP receivers */
1328   stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
1329   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1330
1331   pad = gst_element_get_static_pad (stream->selector[0], "src");
1332   gst_pad_link (pad, stream->recv_rtp_sink);
1333   gst_object_unref (pad);
1334
1335   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1336   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1337   gst_pad_link (pad, selpad);
1338   gst_object_unref (pad);
1339   gst_object_unref (selpad);
1340
1341   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1342   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1343   gst_pad_link (pad, selpad);
1344   gst_object_unref (pad);
1345   gst_object_unref (selpad);
1346
1347   /* make selector for the RTCP receivers */
1348   stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
1349   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1350
1351   pad = gst_element_get_static_pad (stream->selector[1], "src");
1352   gst_pad_link (pad, stream->recv_rtcp_sink);
1353   gst_object_unref (pad);
1354
1355   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1356   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1357   gst_pad_link (pad, selpad);
1358   gst_object_unref (pad);
1359   gst_object_unref (selpad);
1360
1361   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1362   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1363   gst_pad_link (pad, selpad);
1364   gst_object_unref (pad);
1365   gst_object_unref (selpad);
1366
1367   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1368    * values */
1369   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1370   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1371   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1372   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1373
1374   /* be notified of caps changes */
1375   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1376       (GCallback) caps_notify, stream);
1377
1378   stream->prepared = TRUE;
1379
1380   return TRUE;
1381
1382   /* ERRORS */
1383 link_failed:
1384   {
1385     GST_WARNING ("failed to link stream %d", idx);
1386     return FALSE;
1387   }
1388 }
1389
1390 static void
1391 unlock_streams (GstRTSPMedia * media)
1392 {
1393   guint i, n_streams;
1394
1395   /* unlock the udp src elements */
1396   n_streams = gst_rtsp_media_n_streams (media);
1397   for (i = 0; i < n_streams; i++) {
1398     GstRTSPMediaStream *stream;
1399
1400     stream = gst_rtsp_media_get_stream (media, i);
1401
1402     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1403     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1404   }
1405 }
1406
1407 static void
1408 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1409 {
1410   g_mutex_lock (media->lock);
1411   /* never overwrite the error status */
1412   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1413     media->status = status;
1414   GST_DEBUG ("setting new status to %d", status);
1415   g_cond_broadcast (media->cond);
1416   g_mutex_unlock (media->lock);
1417 }
1418
1419 static GstRTSPMediaStatus
1420 gst_rtsp_media_get_status (GstRTSPMedia * media)
1421 {
1422   GstRTSPMediaStatus result;
1423   GTimeVal timeout;
1424
1425   g_mutex_lock (media->lock);
1426   g_get_current_time (&timeout);
1427   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1428   /* while we are preparing, wait */
1429   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1430     GST_DEBUG ("waiting for status change");
1431     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1432       GST_DEBUG ("timeout, assuming error status");
1433       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1434     }
1435   }
1436   /* could be success or error */
1437   result = media->status;
1438   GST_DEBUG ("got status %d", result);
1439   g_mutex_unlock (media->lock);
1440
1441   return result;
1442 }
1443
1444 static gboolean
1445 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1446 {
1447   GstMessageType type;
1448
1449   type = GST_MESSAGE_TYPE (message);
1450
1451   switch (type) {
1452     case GST_MESSAGE_STATE_CHANGED:
1453       break;
1454     case GST_MESSAGE_BUFFERING:
1455     {
1456       gint percent;
1457
1458       gst_message_parse_buffering (message, &percent);
1459
1460       /* no state management needed for live pipelines */
1461       if (media->is_live)
1462         break;
1463
1464       if (percent == 100) {
1465         /* a 100% message means buffering is done */
1466         media->buffering = FALSE;
1467         /* if the desired state is playing, go back */
1468         if (media->target_state == GST_STATE_PLAYING) {
1469           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1470           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1471         } else {
1472           GST_INFO ("Buffering done");
1473         }
1474       } else {
1475         /* buffering busy */
1476         if (media->buffering == FALSE) {
1477           if (media->target_state == GST_STATE_PLAYING) {
1478             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1479             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1480             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1481           } else {
1482             GST_INFO ("Buffering ...");
1483           }
1484         }
1485         media->buffering = TRUE;
1486       }
1487       break;
1488     }
1489     case GST_MESSAGE_LATENCY:
1490     {
1491       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1492       break;
1493     }
1494     case GST_MESSAGE_ERROR:
1495     {
1496       GError *gerror;
1497       gchar *debug;
1498
1499       gst_message_parse_error (message, &gerror, &debug);
1500       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1501       g_error_free (gerror);
1502       g_free (debug);
1503
1504       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1505       break;
1506     }
1507     case GST_MESSAGE_WARNING:
1508     {
1509       GError *gerror;
1510       gchar *debug;
1511
1512       gst_message_parse_warning (message, &gerror, &debug);
1513       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1514       g_error_free (gerror);
1515       g_free (debug);
1516       break;
1517     }
1518     case GST_MESSAGE_ELEMENT:
1519       break;
1520     case GST_MESSAGE_STREAM_STATUS:
1521       break;
1522     case GST_MESSAGE_ASYNC_DONE:
1523       if (!media->adding) {
1524         /* when we are dynamically adding pads, the addition of the udpsrc will
1525          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1526          * wait for the final ASYNC_DONE after everything prerolled */
1527         GST_INFO ("%p: got ASYNC_DONE", media);
1528         collect_media_stats (media);
1529
1530         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1531       } else {
1532         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1533       }
1534       break;
1535     case GST_MESSAGE_EOS:
1536       GST_INFO ("%p: got EOS", media);
1537       if (media->eos_pending) {
1538         GST_DEBUG ("shutting down after EOS");
1539         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1540         media->eos_pending = FALSE;
1541         g_object_unref (media);
1542       }
1543       break;
1544     default:
1545       GST_INFO ("%p: got message type %s", media,
1546           gst_message_type_get_name (type));
1547       break;
1548   }
1549   return TRUE;
1550 }
1551
1552 static gboolean
1553 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1554 {
1555   GstRTSPMediaClass *klass;
1556   gboolean ret;
1557
1558   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1559
1560   if (klass->handle_message)
1561     ret = klass->handle_message (media, message);
1562   else
1563     ret = FALSE;
1564
1565   return ret;
1566 }
1567
1568 /* called from streaming threads */
1569 static void
1570 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1571 {
1572   GstRTSPMediaStream *stream;
1573   gchar *name;
1574   gint i;
1575
1576   i = media->streams->len + 1;
1577
1578   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1579
1580   stream = g_new0 (GstRTSPMediaStream, 1);
1581   stream->payloader = element;
1582
1583   name = g_strdup_printf ("dynpay%d", i);
1584
1585   media->adding = TRUE;
1586
1587   /* ghost the pad of the payloader to the element */
1588   stream->srcpad = gst_ghost_pad_new (name, pad);
1589   gst_pad_set_active (stream->srcpad, TRUE);
1590   gst_element_add_pad (media->element, stream->srcpad);
1591   g_free (name);
1592
1593   /* add stream now */
1594   g_array_append_val (media->streams, stream);
1595
1596   setup_stream (stream, i, media);
1597
1598   for (i = 0; i < 2; i++) {
1599     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1600     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1601     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1602     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1603     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1604   }
1605   media->adding = FALSE;
1606 }
1607
1608 static void
1609 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1610 {
1611   GST_INFO ("no more pads");
1612   if (media->fakesink) {
1613     gst_object_ref (media->fakesink);
1614     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1615     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1616     gst_object_unref (media->fakesink);
1617     media->fakesink = NULL;
1618     GST_INFO ("removed fakesink");
1619   }
1620 }
1621
1622 /**
1623  * gst_rtsp_media_prepare:
1624  * @media: a #GstRTSPMedia
1625  *
1626  * Prepare @media for streaming. This function will create the pipeline and
1627  * other objects to manage the streaming.
1628  *
1629  * It will preroll the pipeline and collect vital information about the streams
1630  * such as the duration.
1631  *
1632  * Returns: %TRUE on success.
1633  */
1634 gboolean
1635 gst_rtsp_media_prepare (GstRTSPMedia * media)
1636 {
1637   GstStateChangeReturn ret;
1638   GstRTSPMediaStatus status;
1639   guint i, n_streams;
1640   GstRTSPMediaClass *klass;
1641   GstBus *bus;
1642   GList *walk;
1643
1644   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1645     goto was_prepared;
1646
1647   if (!media->reusable && media->reused)
1648     goto is_reused;
1649
1650   GST_INFO ("preparing media %p", media);
1651
1652   /* reset some variables */
1653   media->is_live = FALSE;
1654   media->buffering = FALSE;
1655   /* we're preparing now */
1656   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1657
1658   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1659
1660   /* add the pipeline bus to our custom mainloop */
1661   media->source = gst_bus_create_watch (bus);
1662   gst_object_unref (bus);
1663
1664   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1665
1666   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1667   media->id = g_source_attach (media->source, klass->context);
1668
1669   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1670
1671   /* add stuff to the bin */
1672   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1673
1674   /* link streams we already have, other streams might appear when we have
1675    * dynamic elements */
1676   n_streams = gst_rtsp_media_n_streams (media);
1677   for (i = 0; i < n_streams; i++) {
1678     GstRTSPMediaStream *stream;
1679
1680     stream = gst_rtsp_media_get_stream (media, i);
1681
1682     setup_stream (stream, i, media);
1683   }
1684
1685   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1686     GstElement *elem = walk->data;
1687
1688     GST_INFO ("adding callbacks for dynamic element %p", elem);
1689
1690     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1691     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1692
1693     /* we add a fakesink here in order to make the state change async. We remove
1694      * the fakesink again in the no-more-pads callback. */
1695     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1696     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1697   }
1698
1699   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1700   /* first go to PAUSED */
1701   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1702   media->target_state = GST_STATE_PAUSED;
1703
1704   switch (ret) {
1705     case GST_STATE_CHANGE_SUCCESS:
1706       GST_INFO ("SUCCESS state change for media %p", media);
1707       break;
1708     case GST_STATE_CHANGE_ASYNC:
1709       GST_INFO ("ASYNC state change for media %p", media);
1710       break;
1711     case GST_STATE_CHANGE_NO_PREROLL:
1712       /* we need to go to PLAYING */
1713       GST_INFO ("NO_PREROLL state change: live media %p", media);
1714       media->is_live = TRUE;
1715       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1716       if (ret == GST_STATE_CHANGE_FAILURE)
1717         goto state_failed;
1718       break;
1719     case GST_STATE_CHANGE_FAILURE:
1720       goto state_failed;
1721   }
1722
1723   /* now wait for all pads to be prerolled */
1724   status = gst_rtsp_media_get_status (media);
1725   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1726     goto state_failed;
1727
1728   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1729
1730   GST_INFO ("object %p is prerolled", media);
1731
1732   return TRUE;
1733
1734   /* OK */
1735 was_prepared:
1736   {
1737     return TRUE;
1738   }
1739   /* ERRORS */
1740 is_reused:
1741   {
1742     GST_WARNING ("can not reuse media %p", media);
1743     return FALSE;
1744   }
1745 state_failed:
1746   {
1747     GST_WARNING ("failed to preroll pipeline");
1748     unlock_streams (media);
1749     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1750     gst_rtsp_media_unprepare (media);
1751     return FALSE;
1752   }
1753 }
1754
1755 /**
1756  * gst_rtsp_media_unprepare:
1757  * @media: a #GstRTSPMedia
1758  *
1759  * Unprepare @media. After this call, the media should be prepared again before
1760  * it can be used again. If the media is set to be non-reusable, a new instance
1761  * must be created.
1762  *
1763  * Returns: %TRUE on success.
1764  */
1765 gboolean
1766 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1767 {
1768   GstRTSPMediaClass *klass;
1769   gboolean success;
1770
1771   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1772     return TRUE;
1773
1774   GST_INFO ("unprepare media %p", media);
1775   media->target_state = GST_STATE_NULL;
1776
1777   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1778   if (klass->unprepare)
1779     success = klass->unprepare (media);
1780   else
1781     success = TRUE;
1782
1783   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1784   media->reused = TRUE;
1785
1786   /* when the media is not reusable, this will effectively unref the media and
1787    * recreate it */
1788   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1789
1790   return success;
1791 }
1792
1793 static gboolean
1794 default_unprepare (GstRTSPMedia * media)
1795 {
1796   if (media->eos_shutdown) {
1797     GST_DEBUG ("sending EOS for shutdown");
1798     /* ref so that we don't disappear */
1799     g_object_ref (media);
1800     media->eos_pending = TRUE;
1801     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1802     /* we need to go to playing again for the EOS to propagate, normally in this
1803      * state, nothing is receiving data from us anymore so this is ok. */
1804     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1805   } else {
1806     GST_DEBUG ("shutting down");
1807     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1808   }
1809   return TRUE;
1810 }
1811
1812 static void
1813 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1814     gchar * dest, gint min, gint max)
1815 {
1816   GST_INFO ("adding %s:%d-%d", dest, min, max);
1817   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1818   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1819 }
1820
1821 static void
1822 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1823     gchar * dest, gint min, gint max)
1824 {
1825   GST_INFO ("removing %s:%d-%d", dest, min, max);
1826   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1827   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1828 }
1829
1830 /**
1831  * gst_rtsp_media_set_state:
1832  * @media: a #GstRTSPMedia
1833  * @state: the target state of the media
1834  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1835  *
1836  * Set the state of @media to @state and for the transports in @transports.
1837  *
1838  * Returns: %TRUE on success.
1839  */
1840 gboolean
1841 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1842     GArray * transports)
1843 {
1844   gint i;
1845   gboolean add, remove, do_state;
1846   gint old_active;
1847
1848   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1849   g_return_val_if_fail (transports != NULL, FALSE);
1850
1851   /* NULL and READY are the same */
1852   if (state == GST_STATE_READY)
1853     state = GST_STATE_NULL;
1854
1855   add = remove = FALSE;
1856
1857   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1858       media);
1859
1860   switch (state) {
1861     case GST_STATE_NULL:
1862       /* unlock the streams so that they follow the state changes from now on */
1863       unlock_streams (media);
1864       /* fallthrough */
1865     case GST_STATE_PAUSED:
1866       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1867       if (media->target_state == GST_STATE_PLAYING)
1868         remove = TRUE;
1869       break;
1870     case GST_STATE_PLAYING:
1871       /* we're going to PLAYING, add */
1872       add = TRUE;
1873       break;
1874     default:
1875       break;
1876   }
1877   old_active = media->active;
1878
1879   for (i = 0; i < transports->len; i++) {
1880     GstRTSPMediaTrans *tr;
1881     GstRTSPMediaStream *stream;
1882     GstRTSPTransport *trans;
1883
1884     /* we need a non-NULL entry in the array */
1885     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1886     if (tr == NULL)
1887       continue;
1888
1889     /* we need a transport */
1890     if (!(trans = tr->transport))
1891       continue;
1892
1893     /* get the stream and add the destinations */
1894     stream = gst_rtsp_media_get_stream (media, tr->idx);
1895     switch (trans->lower_transport) {
1896       case GST_RTSP_LOWER_TRANS_UDP:
1897       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1898       {
1899         gchar *dest;
1900         gint min, max;
1901
1902         dest = trans->destination;
1903         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1904           min = trans->port.min;
1905           max = trans->port.max;
1906         } else {
1907           min = trans->client_port.min;
1908           max = trans->client_port.max;
1909         }
1910
1911         if (add && !tr->active) {
1912           add_udp_destination (media, stream, dest, min, max);
1913           stream->transports = g_list_prepend (stream->transports, tr);
1914           tr->active = TRUE;
1915           media->active++;
1916         } else if (remove && tr->active) {
1917           remove_udp_destination (media, stream, dest, min, max);
1918           stream->transports = g_list_remove (stream->transports, tr);
1919           tr->active = FALSE;
1920           media->active--;
1921         }
1922         break;
1923       }
1924       case GST_RTSP_LOWER_TRANS_TCP:
1925         if (add && !tr->active) {
1926           GST_INFO ("adding TCP %s", trans->destination);
1927           stream->transports = g_list_prepend (stream->transports, tr);
1928           tr->active = TRUE;
1929           media->active++;
1930         } else if (remove && tr->active) {
1931           GST_INFO ("removing TCP %s", trans->destination);
1932           stream->transports = g_list_remove (stream->transports, tr);
1933           tr->active = FALSE;
1934           media->active--;
1935         }
1936         break;
1937       default:
1938         GST_INFO ("Unknown transport %d", trans->lower_transport);
1939         break;
1940     }
1941   }
1942
1943   /* we just added the first media, do the playing state change */
1944   if (old_active == 0 && add)
1945     do_state = TRUE;
1946   /* if we have no more active media, do the downward state changes */
1947   else if (media->active == 0)
1948     do_state = TRUE;
1949   else
1950     do_state = FALSE;
1951
1952   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
1953       media, do_state);
1954
1955   if (media->target_state != state) {
1956     if (do_state) {
1957       if (state == GST_STATE_NULL) {
1958         gst_rtsp_media_unprepare (media);
1959       } else {
1960         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
1961             media);
1962         media->target_state = state;
1963         gst_element_set_state (media->pipeline, state);
1964       }
1965     }
1966     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
1967         NULL);
1968   }
1969
1970   /* remember where we are */
1971   if (state == GST_STATE_PAUSED || old_active != media->active)
1972     collect_media_stats (media);
1973
1974   return TRUE;
1975 }
1976
1977 /**
1978  * gst_rtsp_media_remove_elements:
1979  * @media: a #GstRTSPMedia
1980  *
1981  * Remove all elements and the pipeline controlled by @media.
1982  */
1983 void
1984 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1985 {
1986   gint i, j;
1987
1988   unlock_streams (media);
1989
1990   for (i = 0; i < media->streams->len; i++) {
1991     GstRTSPMediaStream *stream;
1992
1993     GST_INFO ("Removing elements of stream %d from pipeline", i);
1994
1995     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1996
1997     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
1998
1999     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2000
2001     for (j = 0; j < 2; j++) {
2002       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2003       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2004       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2005       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2006       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2007       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2008
2009       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2010       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2011       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2012       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2013       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2014       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2015     }
2016     if (stream->caps)
2017       gst_caps_unref (stream->caps);
2018     stream->caps = NULL;
2019     gst_rtsp_media_stream_free (stream);
2020   }
2021   g_array_remove_range (media->streams, 0, media->streams->len);
2022
2023   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2024   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2025
2026   gst_object_unref (media->pipeline);
2027   media->pipeline = NULL;
2028 }