25decae8acd30fccf35351855cbd0887cb873f49
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED         FALSE
29 #define DEFAULT_REUSABLE       FALSE
30 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN   FALSE
33
34 /* define to dump received RTCP packets */
35 #undef DUMP_STATS
36
37 enum
38 {
39   PROP_0,
40   PROP_SHARED,
41   PROP_REUSABLE,
42   PROP_PROTOCOLS,
43   PROP_EOS_SHUTDOWN,
44   PROP_LAST
45 };
46
47 enum
48 {
49   SIGNAL_PREPARED,
50   SIGNAL_UNPREPARED,
51   SIGNAL_NEW_STATE,
52   SIGNAL_LAST
53 };
54
55 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
56 #define GST_CAT_DEFAULT rtsp_media_debug
57
58 static GQuark ssrc_stream_map_key;
59
60 static void gst_rtsp_media_get_property (GObject * object, guint propid,
61     GValue * value, GParamSpec * pspec);
62 static void gst_rtsp_media_set_property (GObject * object, guint propid,
63     const GValue * value, GParamSpec * pspec);
64 static void gst_rtsp_media_finalize (GObject * obj);
65
66 static gpointer do_loop (GstRTSPMediaClass * klass);
67 static gboolean default_handle_message (GstRTSPMedia * media,
68     GstMessage * message);
69 static gboolean default_unprepare (GstRTSPMedia * media);
70 static void unlock_streams (GstRTSPMedia * media);
71
72 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
73
74 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
75
76 static void
77 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
78 {
79   GObjectClass *gobject_class;
80   GError *error = NULL;
81
82   gobject_class = G_OBJECT_CLASS (klass);
83
84   gobject_class->get_property = gst_rtsp_media_get_property;
85   gobject_class->set_property = gst_rtsp_media_set_property;
86   gobject_class->finalize = gst_rtsp_media_finalize;
87
88   g_object_class_install_property (gobject_class, PROP_SHARED,
89       g_param_spec_boolean ("shared", "Shared",
90           "If this media pipeline can be shared", DEFAULT_SHARED,
91           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
92
93   g_object_class_install_property (gobject_class, PROP_REUSABLE,
94       g_param_spec_boolean ("reusable", "Reusable",
95           "If this media pipeline can be reused after an unprepare",
96           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
97
98   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
99       g_param_spec_flags ("protocols", "Protocols",
100           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
101           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
102
103   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
104       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
105           "Send an EOS event to the pipeline before unpreparing",
106           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
107
108   gst_rtsp_media_signals[SIGNAL_PREPARED] =
109       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
110       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
111       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
112
113   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
114       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
115       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
116       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
117
118   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
119       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
120       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
121       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
122
123   klass->context = g_main_context_new ();
124   klass->loop = g_main_loop_new (klass->context, TRUE);
125
126   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
127   if (error != NULL) {
128     g_critical ("could not start bus thread: %s", error->message);
129   }
130   klass->handle_message = default_handle_message;
131   klass->unprepare = default_unprepare;
132
133   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
134 }
135
136 static void
137 gst_rtsp_media_init (GstRTSPMedia * media)
138 {
139   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
140   media->lock = g_mutex_new ();
141   media->cond = g_cond_new ();
142
143   media->shared = DEFAULT_SHARED;
144   media->reusable = DEFAULT_REUSABLE;
145   media->protocols = DEFAULT_PROTOCOLS;
146   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
147 }
148
149 /* FIXME. this should be done in multiudpsink */
150 typedef struct
151 {
152   gint count;
153   gchar *dest;
154   gint min, max;
155 } RTSPDestination;
156
157 static gint
158 dest_compare (RTSPDestination * a, RTSPDestination * b)
159 {
160   if ((a->min == b->min) && (a->max == b->max)
161       && (strcmp (a->dest, b->dest) == 0))
162     return 0;
163
164   return 1;
165 }
166
167 static RTSPDestination *
168 create_destination (const gchar * dest, gint min, gint max)
169 {
170   RTSPDestination *res;
171
172   res = g_slice_new (RTSPDestination);
173   res->count = 1;
174   res->dest = g_strdup (dest);
175   res->min = min;
176   res->max = max;
177
178   return res;
179 }
180
181 static void
182 free_destination (RTSPDestination * dest)
183 {
184   g_free (dest->dest);
185   g_slice_free (RTSPDestination, dest);
186 }
187
188 void
189 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
190 {
191   if (trans->transport) {
192     gst_rtsp_transport_free (trans->transport);
193     trans->transport = NULL;
194   }
195   if (trans->rtpsource) {
196     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
197     trans->rtpsource = NULL;
198   }
199 }
200
201 static void
202 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
203 {
204   if (stream->session)
205     g_object_unref (stream->session);
206
207   if (stream->caps)
208     gst_caps_unref (stream->caps);
209
210   if (stream->send_rtp_sink)
211     gst_object_unref (stream->send_rtp_sink);
212   if (stream->send_rtp_src)
213     gst_object_unref (stream->send_rtp_src);
214   if (stream->send_rtcp_src)
215     gst_object_unref (stream->send_rtcp_src);
216   if (stream->recv_rtcp_sink)
217     gst_object_unref (stream->recv_rtcp_sink);
218   if (stream->recv_rtp_sink)
219     gst_object_unref (stream->recv_rtp_sink);
220
221   g_list_free (stream->transports);
222
223   g_list_foreach (stream->destinations, (GFunc) free_destination, NULL);
224   g_list_free (stream->destinations);
225
226   g_free (stream);
227 }
228
229 static void
230 gst_rtsp_media_finalize (GObject * obj)
231 {
232   GstRTSPMedia *media;
233   guint i;
234
235   media = GST_RTSP_MEDIA (obj);
236
237   GST_INFO ("finalize media %p", media);
238
239   if (media->pipeline) {
240     unlock_streams (media);
241     gst_element_set_state (media->pipeline, GST_STATE_NULL);
242     gst_object_unref (media->pipeline);
243   }
244
245   for (i = 0; i < media->streams->len; i++) {
246     GstRTSPMediaStream *stream;
247
248     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
249
250     gst_rtsp_media_stream_free (stream);
251   }
252   g_array_free (media->streams, TRUE);
253
254   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
255   g_list_free (media->dynamic);
256
257   if (media->source) {
258     g_source_destroy (media->source);
259     g_source_unref (media->source);
260   }
261   g_mutex_free (media->lock);
262   g_cond_free (media->cond);
263
264   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
265 }
266
267 static void
268 gst_rtsp_media_get_property (GObject * object, guint propid,
269     GValue * value, GParamSpec * pspec)
270 {
271   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
272
273   switch (propid) {
274     case PROP_SHARED:
275       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
276       break;
277     case PROP_REUSABLE:
278       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
279       break;
280     case PROP_PROTOCOLS:
281       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
282       break;
283     case PROP_EOS_SHUTDOWN:
284       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
285       break;
286     default:
287       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
288   }
289 }
290
291 static void
292 gst_rtsp_media_set_property (GObject * object, guint propid,
293     const GValue * value, GParamSpec * pspec)
294 {
295   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
296
297   switch (propid) {
298     case PROP_SHARED:
299       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
300       break;
301     case PROP_REUSABLE:
302       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
303       break;
304     case PROP_PROTOCOLS:
305       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
306       break;
307     case PROP_EOS_SHUTDOWN:
308       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
309       break;
310     default:
311       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
312   }
313 }
314
315 static gpointer
316 do_loop (GstRTSPMediaClass * klass)
317 {
318   GST_INFO ("enter mainloop");
319   g_main_loop_run (klass->loop);
320   GST_INFO ("exit mainloop");
321
322   return NULL;
323 }
324
325 static void
326 collect_media_stats (GstRTSPMedia * media)
327 {
328   GstFormat format;
329   gint64 position, duration;
330
331   media->range.unit = GST_RTSP_RANGE_NPT;
332
333   if (media->is_live) {
334     media->range.min.type = GST_RTSP_TIME_NOW;
335     media->range.min.seconds = -1;
336     media->range.max.type = GST_RTSP_TIME_END;
337     media->range.max.seconds = -1;
338   } else {
339     /* get the position */
340     format = GST_FORMAT_TIME;
341     if (!gst_element_query_position (media->pipeline, &format, &position)) {
342       GST_INFO ("position query failed");
343       position = 0;
344     }
345
346     /* get the duration */
347     format = GST_FORMAT_TIME;
348     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
349       GST_INFO ("duration query failed");
350       duration = -1;
351     }
352
353     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
354         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
355
356     if (position == -1) {
357       media->range.min.type = GST_RTSP_TIME_NOW;
358       media->range.min.seconds = -1;
359     } else {
360       media->range.min.type = GST_RTSP_TIME_SECONDS;
361       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
362     }
363     if (duration == -1) {
364       media->range.max.type = GST_RTSP_TIME_END;
365       media->range.max.seconds = -1;
366     } else {
367       media->range.max.type = GST_RTSP_TIME_SECONDS;
368       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
369     }
370   }
371 }
372
373 /**
374  * gst_rtsp_media_new:
375  *
376  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
377  * element to produde RTP data for one or more related (audio/video/..) 
378  * streams.
379  *
380  * Returns: a new #GstRTSPMedia object.
381  */
382 GstRTSPMedia *
383 gst_rtsp_media_new (void)
384 {
385   GstRTSPMedia *result;
386
387   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
388
389   return result;
390 }
391
392 /**
393  * gst_rtsp_media_set_shared:
394  * @media: a #GstRTSPMedia
395  * @shared: the new value
396  *
397  * Set or unset if the pipeline for @media can be shared will multiple clients.
398  * When @shared is %TRUE, client requests for this media will share the media
399  * pipeline.
400  */
401 void
402 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
403 {
404   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
405
406   media->shared = shared;
407 }
408
409 /**
410  * gst_rtsp_media_is_shared:
411  * @media: a #GstRTSPMedia
412  *
413  * Check if the pipeline for @media can be shared between multiple clients.
414  *
415  * Returns: %TRUE if the media can be shared between clients.
416  */
417 gboolean
418 gst_rtsp_media_is_shared (GstRTSPMedia * media)
419 {
420   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
421
422   return media->shared;
423 }
424
425 /**
426  * gst_rtsp_media_set_reusable:
427  * @media: a #GstRTSPMedia
428  * @reusable: the new value
429  *
430  * Set or unset if the pipeline for @media can be reused after the pipeline has
431  * been unprepared.
432  */
433 void
434 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
435 {
436   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
437
438   media->reusable = reusable;
439 }
440
441 /**
442  * gst_rtsp_media_is_reusable:
443  * @media: a #GstRTSPMedia
444  *
445  * Check if the pipeline for @media can be reused after an unprepare.
446  *
447  * Returns: %TRUE if the media can be reused
448  */
449 gboolean
450 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
451 {
452   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
453
454   return media->reusable;
455 }
456
457 /**
458  * gst_rtsp_media_set_protocols:
459  * @media: a #GstRTSPMedia
460  * @protocols: the new flags
461  *
462  * Configure the allowed lower transport for @media.
463  */
464 void
465 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
466 {
467   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
468
469   media->protocols = protocols;
470 }
471
472 /**
473  * gst_rtsp_media_get_protocols:
474  * @media: a #GstRTSPMedia
475  *
476  * Get the allowed protocols of @media.
477  *
478  * Returns: a #GstRTSPLowerTrans
479  */
480 GstRTSPLowerTrans
481 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
482 {
483   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
484       GST_RTSP_LOWER_TRANS_UNKNOWN);
485
486   return media->protocols;
487 }
488
489 /**
490  * gst_rtsp_media_set_eos_shutdown:
491  * @media: a #GstRTSPMedia
492  * @eos_shutdown: the new value
493  *
494  * Set or unset if an EOS event will be sent to the pipeline for @media before
495  * it is unprepared.
496  */
497 void
498 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
499 {
500   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
501
502   media->eos_shutdown = eos_shutdown;
503 }
504
505 /**
506  * gst_rtsp_media_is_eos_shutdown:
507  * @media: a #GstRTSPMedia
508  *
509  * Check if the pipeline for @media will send an EOS down the pipeline before
510  * unpreparing.
511  *
512  * Returns: %TRUE if the media will send EOS before unpreparing.
513  */
514 gboolean
515 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
516 {
517   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
518
519   return media->eos_shutdown;
520 }
521
522 /**
523  * gst_rtsp_media_n_streams:
524  * @media: a #GstRTSPMedia
525  *
526  * Get the number of streams in this media.
527  *
528  * Returns: The number of streams.
529  */
530 guint
531 gst_rtsp_media_n_streams (GstRTSPMedia * media)
532 {
533   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
534
535   return media->streams->len;
536 }
537
538 /**
539  * gst_rtsp_media_get_stream:
540  * @media: a #GstRTSPMedia
541  * @idx: the stream index
542  *
543  * Retrieve the stream with index @idx from @media.
544  *
545  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
546  * that index did not exist.
547  */
548 GstRTSPMediaStream *
549 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
550 {
551   GstRTSPMediaStream *res;
552
553   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
554
555   if (idx < media->streams->len)
556     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
557   else
558     res = NULL;
559
560   return res;
561 }
562
563 /**
564  * gst_rtsp_media_get_range_string:
565  * @media: a #GstRTSPMedia
566  * @play: for the PLAY request
567  *
568  * Get the current range as a string.
569  *
570  * Returns: The range as a string, g_free() after usage.
571  */
572 gchar *
573 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
574 {
575   gchar *result;
576   GstRTSPTimeRange range;
577
578   /* make copy */
579   range = media->range;
580
581   if (!play && media->active > 0) {
582     range.min.type = GST_RTSP_TIME_NOW;
583     range.min.seconds = -1;
584   }
585
586   result = gst_rtsp_range_to_string (&range);
587
588   return result;
589 }
590
591 /**
592  * gst_rtsp_media_seek:
593  * @media: a #GstRTSPMedia
594  * @range: a #GstRTSPTimeRange
595  *
596  * Seek the pipeline to @range.
597  *
598  * Returns: %TRUE on success.
599  */
600 gboolean
601 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
602 {
603   GstSeekFlags flags;
604   gboolean res;
605   gint64 start, stop;
606   GstSeekType start_type, stop_type;
607
608   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
609   g_return_val_if_fail (range != NULL, FALSE);
610
611   if (range->unit != GST_RTSP_RANGE_NPT)
612     goto not_supported;
613
614   /* depends on the current playing state of the pipeline. We might need to
615    * queue this until we get EOS. */
616   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
617
618   start_type = stop_type = GST_SEEK_TYPE_NONE;
619
620   switch (range->min.type) {
621     case GST_RTSP_TIME_NOW:
622       start = -1;
623       break;
624     case GST_RTSP_TIME_SECONDS:
625       /* only seek when something changed */
626       if (media->range.min.seconds == range->min.seconds) {
627         start = -1;
628       } else {
629         start = range->min.seconds * GST_SECOND;
630         start_type = GST_SEEK_TYPE_SET;
631       }
632       break;
633     case GST_RTSP_TIME_END:
634     default:
635       goto weird_type;
636   }
637   switch (range->max.type) {
638     case GST_RTSP_TIME_SECONDS:
639       /* only seek when something changed */
640       if (media->range.max.seconds == range->max.seconds) {
641         stop = -1;
642       } else {
643         stop = range->max.seconds * GST_SECOND;
644         stop_type = GST_SEEK_TYPE_SET;
645       }
646       break;
647     case GST_RTSP_TIME_END:
648       stop = -1;
649       stop_type = GST_SEEK_TYPE_SET;
650       break;
651     case GST_RTSP_TIME_NOW:
652     default:
653       goto weird_type;
654   }
655
656   if (start != -1 || stop != -1) {
657     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
658         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
659
660     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
661         flags, start_type, start, stop_type, stop);
662
663     /* and block for the seek to complete */
664     GST_INFO ("done seeking %d", res);
665     gst_element_get_state (media->pipeline, NULL, NULL, -1);
666     GST_INFO ("prerolled again");
667
668     collect_media_stats (media);
669   } else {
670     GST_INFO ("no seek needed");
671     res = TRUE;
672   }
673
674   return res;
675
676   /* ERRORS */
677 not_supported:
678   {
679     GST_WARNING ("seek unit %d not supported", range->unit);
680     return FALSE;
681   }
682 weird_type:
683   {
684     GST_WARNING ("weird range type %d not supported", range->min.type);
685     return FALSE;
686   }
687 }
688
689 /**
690  * gst_rtsp_media_stream_rtp:
691  * @stream: a #GstRTSPMediaStream
692  * @buffer: a #GstBuffer
693  *
694  * Handle an RTP buffer for the stream. This method is usually called when a
695  * message has been received from a client using the TCP transport.
696  *
697  * This function takes ownership of @buffer.
698  *
699  * Returns: a GstFlowReturn.
700  */
701 GstFlowReturn
702 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
703 {
704   GstFlowReturn ret;
705
706   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
707
708   return ret;
709 }
710
711 /**
712  * gst_rtsp_media_stream_rtcp:
713  * @stream: a #GstRTSPMediaStream
714  * @buffer: a #GstBuffer
715  *
716  * Handle an RTCP buffer for the stream. This method is usually called when a
717  * message has been received from a client using the TCP transport.
718  *
719  * This function takes ownership of @buffer.
720  *
721  * Returns: a GstFlowReturn.
722  */
723 GstFlowReturn
724 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
725 {
726   GstFlowReturn ret;
727
728   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
729
730   return ret;
731 }
732
733 /* Allocate the udp ports and sockets */
734 static gboolean
735 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
736 {
737   GstStateChangeReturn ret;
738   GstElement *udpsrc0, *udpsrc1;
739   GstElement *udpsink0, *udpsink1;
740   gint tmp_rtp, tmp_rtcp;
741   guint count;
742   gint rtpport, rtcpport, sockfd;
743   const gchar *host;
744
745   udpsrc0 = NULL;
746   udpsrc1 = NULL;
747   udpsink0 = NULL;
748   udpsink1 = NULL;
749   count = 0;
750
751   /* Start with random port */
752   tmp_rtp = 0;
753
754   if (media->is_ipv6)
755     host = "udp://[::0]";
756   else
757     host = "udp://0.0.0.0";
758
759   /* try to allocate 2 UDP ports, the RTP port should be an even
760    * number and the RTCP port should be the next (uneven) port */
761 again:
762   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
763   if (udpsrc0 == NULL)
764     goto no_udp_protocol;
765   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
766
767   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
768   if (ret == GST_STATE_CHANGE_FAILURE) {
769     if (tmp_rtp != 0) {
770       tmp_rtp += 2;
771       if (++count > 20)
772         goto no_ports;
773
774       gst_element_set_state (udpsrc0, GST_STATE_NULL);
775       gst_object_unref (udpsrc0);
776
777       goto again;
778     }
779     goto no_udp_protocol;
780   }
781
782   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
783
784   /* check if port is even */
785   if ((tmp_rtp & 1) != 0) {
786     /* port not even, close and allocate another */
787     if (++count > 20)
788       goto no_ports;
789
790     gst_element_set_state (udpsrc0, GST_STATE_NULL);
791     gst_object_unref (udpsrc0);
792
793     tmp_rtp++;
794     goto again;
795   }
796
797   /* allocate port+1 for RTCP now */
798   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
799   if (udpsrc1 == NULL)
800     goto no_udp_rtcp_protocol;
801
802   /* set port */
803   tmp_rtcp = tmp_rtp + 1;
804   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
805
806   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
807   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
808   if (ret == GST_STATE_CHANGE_FAILURE) {
809
810     if (++count > 20)
811       goto no_ports;
812
813     gst_element_set_state (udpsrc0, GST_STATE_NULL);
814     gst_object_unref (udpsrc0);
815
816     gst_element_set_state (udpsrc1, GST_STATE_NULL);
817     gst_object_unref (udpsrc1);
818
819     tmp_rtp += 2;
820     goto again;
821   }
822
823   /* all fine, do port check */
824   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
825   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
826
827   /* this should not happen... */
828   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
829     goto port_error;
830
831   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
832   if (!udpsink0)
833     goto no_udp_protocol;
834
835   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
836   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
837   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
838
839   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
840   if (!udpsink1)
841     goto no_udp_protocol;
842
843   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
844           "send-duplicates")) {
845     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
846     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
847     stream->filter_duplicates = FALSE;
848   } else {
849     GST_WARNING ("multiudpsink version found without send-duplicates property");
850     stream->filter_duplicates = TRUE;
851   }
852
853   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
854           "buffer-size")) {
855     g_object_set (G_OBJECT (udpsink0), "buffer-size", 0x80000, NULL);
856   } else {
857     GST_WARNING ("multiudpsink version found without buffer-size property");
858   }
859
860   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
861   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
862   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
863   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
864   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
865
866   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
867   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
868   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
869   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
870
871   /* we keep these elements, we configure all in configure_transport when the
872    * server told us to really use the UDP ports. */
873   stream->udpsrc[0] = udpsrc0;
874   stream->udpsrc[1] = udpsrc1;
875   stream->udpsink[0] = udpsink0;
876   stream->udpsink[1] = udpsink1;
877   stream->server_port.min = rtpport;
878   stream->server_port.max = rtcpport;
879
880   return TRUE;
881
882   /* ERRORS */
883 no_udp_protocol:
884   {
885     goto cleanup;
886   }
887 no_ports:
888   {
889     goto cleanup;
890   }
891 no_udp_rtcp_protocol:
892   {
893     goto cleanup;
894   }
895 port_error:
896   {
897     goto cleanup;
898   }
899 cleanup:
900   {
901     if (udpsrc0) {
902       gst_element_set_state (udpsrc0, GST_STATE_NULL);
903       gst_object_unref (udpsrc0);
904     }
905     if (udpsrc1) {
906       gst_element_set_state (udpsrc1, GST_STATE_NULL);
907       gst_object_unref (udpsrc1);
908     }
909     if (udpsink0) {
910       gst_element_set_state (udpsink0, GST_STATE_NULL);
911       gst_object_unref (udpsink0);
912     }
913     if (udpsink1) {
914       gst_element_set_state (udpsink1, GST_STATE_NULL);
915       gst_object_unref (udpsink1);
916     }
917     return FALSE;
918   }
919 }
920
921 /* executed from streaming thread */
922 static void
923 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
924 {
925   gchar *capsstr;
926   GstCaps *newcaps, *oldcaps;
927
928   if ((newcaps = GST_PAD_CAPS (pad)))
929     gst_caps_ref (newcaps);
930
931   oldcaps = stream->caps;
932   stream->caps = newcaps;
933
934   if (oldcaps)
935     gst_caps_unref (oldcaps);
936
937   capsstr = gst_caps_to_string (newcaps);
938   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
939   g_free (capsstr);
940 }
941
942 static void
943 dump_structure (const GstStructure * s)
944 {
945   gchar *sstr;
946
947   sstr = gst_structure_to_string (s);
948   GST_INFO ("structure: %s", sstr);
949   g_free (sstr);
950 }
951
952 static GstRTSPMediaTrans *
953 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
954 {
955   GList *walk;
956   GstRTSPMediaTrans *result = NULL;
957   const gchar *tmp;
958   gchar *dest;
959   guint port;
960
961   if (rtcp_from == NULL)
962     return NULL;
963
964   tmp = g_strrstr (rtcp_from, ":");
965   if (tmp == NULL)
966     return NULL;
967
968   port = atoi (tmp + 1);
969   dest = g_strndup (rtcp_from, tmp - rtcp_from);
970
971   GST_INFO ("finding %s:%d", dest, port);
972
973   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
974     GstRTSPMediaTrans *trans = walk->data;
975     gint min, max;
976
977     min = trans->transport->client_port.min;
978     max = trans->transport->client_port.max;
979
980     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
981             || max == port)) {
982       result = trans;
983       break;
984     }
985   }
986   g_free (dest);
987
988   return result;
989 }
990
991 static void
992 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
993 {
994   GstStructure *stats;
995   GstRTSPMediaTrans *trans;
996
997   GST_INFO ("%p: new source %p", stream, source);
998
999   /* see if we have a stream to match with the origin of the RTCP packet */
1000   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1001   if (trans == NULL) {
1002     g_object_get (source, "stats", &stats, NULL);
1003     if (stats) {
1004       const gchar *rtcp_from;
1005
1006       dump_structure (stats);
1007
1008       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1009       if ((trans = find_transport (stream, rtcp_from))) {
1010         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1011             source);
1012
1013         /* keep ref to the source */
1014         trans->rtpsource = source;
1015
1016         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1017       }
1018       gst_structure_free (stats);
1019     }
1020   } else {
1021     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1022   }
1023 }
1024
1025 static void
1026 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1027 {
1028   GST_INFO ("%p: new SDES %p", stream, source);
1029 }
1030
1031 static void
1032 on_ssrc_active (GObject * session, GObject * source,
1033     GstRTSPMediaStream * stream)
1034 {
1035   GstRTSPMediaTrans *trans;
1036
1037   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1038
1039   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1040
1041   if (trans && trans->keep_alive)
1042     trans->keep_alive (trans->ka_user_data);
1043
1044 #ifdef DUMP_STATS
1045   {
1046     GstStructure *stats;
1047     g_object_get (source, "stats", &stats, NULL);
1048     if (stats) {
1049       dump_structure (stats);
1050       gst_structure_free (stats);
1051     }
1052   }
1053 #endif
1054 }
1055
1056 static void
1057 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1058 {
1059   GST_INFO ("%p: source %p bye", stream, source);
1060 }
1061
1062 static void
1063 on_bye_timeout (GObject * session, GObject * source,
1064     GstRTSPMediaStream * stream)
1065 {
1066   GstRTSPMediaTrans *trans;
1067
1068   GST_INFO ("%p: source %p bye timeout", stream, source);
1069
1070   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1071     trans->rtpsource = NULL;
1072     trans->timeout = TRUE;
1073   }
1074 }
1075
1076 static void
1077 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1078 {
1079   GstRTSPMediaTrans *trans;
1080
1081   GST_INFO ("%p: source %p timeout", stream, source);
1082
1083   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1084     trans->rtpsource = NULL;
1085     trans->timeout = TRUE;
1086   }
1087 }
1088
1089 static GstFlowReturn
1090 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1091 {
1092   GList *walk;
1093   GstBuffer *buffer;
1094   GstRTSPMediaStream *stream;
1095
1096   buffer = gst_app_sink_pull_buffer (sink);
1097   if (!buffer)
1098     return GST_FLOW_OK;
1099
1100   stream = (GstRTSPMediaStream *) user_data;
1101
1102   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1103     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1104
1105     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1106       if (tr->send_rtp)
1107         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1108     } else {
1109       if (tr->send_rtcp)
1110         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1111     }
1112   }
1113   gst_buffer_unref (buffer);
1114
1115   return GST_FLOW_OK;
1116 }
1117
1118 static GstFlowReturn
1119 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1120 {
1121   GList *walk;
1122   GstBufferList *blist;
1123   GstRTSPMediaStream *stream;
1124
1125   blist = gst_app_sink_pull_buffer_list (sink);
1126   if (!blist)
1127     return GST_FLOW_OK;
1128
1129   stream = (GstRTSPMediaStream *) user_data;
1130
1131   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1132     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1133
1134     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1135       if (tr->send_rtp_list)
1136         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1137             tr->user_data);
1138     } else {
1139       if (tr->send_rtcp_list)
1140         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1141             tr->user_data);
1142     }
1143   }
1144   gst_buffer_list_unref (blist);
1145
1146   return GST_FLOW_OK;
1147 }
1148
1149 static GstAppSinkCallbacks sink_cb = {
1150   NULL,                         /* not interested in EOS */
1151   NULL,                         /* not interested in preroll buffers */
1152   handle_new_buffer,
1153   handle_new_buffer_list
1154 };
1155
1156 /* prepare the pipeline objects to handle @stream in @media */
1157 static gboolean
1158 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1159 {
1160   gchar *name;
1161   GstPad *pad, *teepad, *selpad;
1162   GstPadLinkReturn ret;
1163   gint i;
1164
1165   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1166    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1167    * elements */
1168   if (!alloc_udp_ports (media, stream))
1169     return FALSE;
1170
1171   /* add the ports to the pipeline */
1172   for (i = 0; i < 2; i++) {
1173     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1174     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1175   }
1176
1177   /* create elements for the TCP transfer */
1178   for (i = 0; i < 2; i++) {
1179     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1180     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1181     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1182     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1183     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1184     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1185     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1186     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1187         &sink_cb, stream, NULL);
1188   }
1189
1190   /* hook up the stream to the RTP session elements. */
1191   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1192   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1193   g_free (name);
1194   name = g_strdup_printf ("send_rtp_src_%d", idx);
1195   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1196   g_free (name);
1197   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1198   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1199   g_free (name);
1200   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1201   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1202   g_free (name);
1203   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1204   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1205   g_free (name);
1206
1207   /* get the session */
1208   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1209       &stream->session);
1210
1211   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1212       stream);
1213   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1214       stream);
1215   g_signal_connect (stream->session, "on-ssrc-active",
1216       (GCallback) on_ssrc_active, stream);
1217   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1218       stream);
1219   g_signal_connect (stream->session, "on-bye-timeout",
1220       (GCallback) on_bye_timeout, stream);
1221   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1222       stream);
1223
1224   /* link the RTP pad to the session manager */
1225   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1226   if (ret != GST_PAD_LINK_OK)
1227     goto link_failed;
1228
1229   /* make tee for RTP and link to stream */
1230   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1231   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1232
1233   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1234   gst_pad_link (stream->send_rtp_src, pad);
1235   gst_object_unref (pad);
1236
1237   /* link RTP sink, we're pretty sure this will work. */
1238   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1239   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1240   gst_pad_link (teepad, pad);
1241   gst_object_unref (pad);
1242   gst_object_unref (teepad);
1243
1244   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1245   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1246   gst_pad_link (teepad, pad);
1247   gst_object_unref (pad);
1248   gst_object_unref (teepad);
1249
1250   /* make tee for RTCP */
1251   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1252   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1253
1254   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1255   gst_pad_link (stream->send_rtcp_src, pad);
1256   gst_object_unref (pad);
1257
1258   /* link RTCP elements */
1259   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1260   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1261   gst_pad_link (teepad, pad);
1262   gst_object_unref (pad);
1263   gst_object_unref (teepad);
1264
1265   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1266   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1267   gst_pad_link (teepad, pad);
1268   gst_object_unref (pad);
1269   gst_object_unref (teepad);
1270
1271   /* make selector for the RTP receivers */
1272   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1273   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1274   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1275
1276   pad = gst_element_get_static_pad (stream->selector[0], "src");
1277   gst_pad_link (pad, stream->recv_rtp_sink);
1278   gst_object_unref (pad);
1279
1280   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1281   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1282   gst_pad_link (pad, selpad);
1283   gst_object_unref (pad);
1284   gst_object_unref (selpad);
1285
1286   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1287   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1288   gst_pad_link (pad, selpad);
1289   gst_object_unref (pad);
1290   gst_object_unref (selpad);
1291
1292   /* make selector for the RTCP receivers */
1293   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1294   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1295   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1296
1297   pad = gst_element_get_static_pad (stream->selector[1], "src");
1298   gst_pad_link (pad, stream->recv_rtcp_sink);
1299   gst_object_unref (pad);
1300
1301   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1302   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1303   gst_pad_link (pad, selpad);
1304   gst_object_unref (pad);
1305   gst_object_unref (selpad);
1306
1307   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1308   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1309   gst_pad_link (pad, selpad);
1310   gst_object_unref (pad);
1311   gst_object_unref (selpad);
1312
1313   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1314    * values */
1315   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1316   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1317   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1318   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1319
1320   /* be notified of caps changes */
1321   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1322       (GCallback) caps_notify, stream);
1323
1324   stream->prepared = TRUE;
1325
1326   return TRUE;
1327
1328   /* ERRORS */
1329 link_failed:
1330   {
1331     GST_WARNING ("failed to link stream %d", idx);
1332     return FALSE;
1333   }
1334 }
1335
1336 static void
1337 unlock_streams (GstRTSPMedia * media)
1338 {
1339   guint i, n_streams;
1340
1341   /* unlock the udp src elements */
1342   n_streams = gst_rtsp_media_n_streams (media);
1343   for (i = 0; i < n_streams; i++) {
1344     GstRTSPMediaStream *stream;
1345
1346     stream = gst_rtsp_media_get_stream (media, i);
1347
1348     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1349     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1350   }
1351 }
1352
1353 static void
1354 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1355 {
1356   g_mutex_lock (media->lock);
1357   /* never overwrite the error status */
1358   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1359     media->status = status;
1360   GST_DEBUG ("setting new status to %d", status);
1361   g_cond_broadcast (media->cond);
1362   g_mutex_unlock (media->lock);
1363 }
1364
1365 static GstRTSPMediaStatus
1366 gst_rtsp_media_get_status (GstRTSPMedia * media)
1367 {
1368   GstRTSPMediaStatus result;
1369   GTimeVal timeout;
1370
1371   g_mutex_lock (media->lock);
1372   g_get_current_time (&timeout);
1373   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1374   /* while we are preparing, wait */
1375   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1376     GST_DEBUG ("waiting for status change");
1377     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1378       GST_DEBUG ("timeout, assuming error status");
1379       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1380     }
1381   }
1382   /* could be success or error */
1383   result = media->status;
1384   GST_DEBUG ("got status %d", result);
1385   g_mutex_unlock (media->lock);
1386
1387   return result;
1388 }
1389
1390 static gboolean
1391 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1392 {
1393   GstMessageType type;
1394
1395   type = GST_MESSAGE_TYPE (message);
1396
1397   switch (type) {
1398     case GST_MESSAGE_STATE_CHANGED:
1399       break;
1400     case GST_MESSAGE_BUFFERING:
1401     {
1402       gint percent;
1403
1404       gst_message_parse_buffering (message, &percent);
1405
1406       /* no state management needed for live pipelines */
1407       if (media->is_live)
1408         break;
1409
1410       if (percent == 100) {
1411         /* a 100% message means buffering is done */
1412         media->buffering = FALSE;
1413         /* if the desired state is playing, go back */
1414         if (media->target_state == GST_STATE_PLAYING) {
1415           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1416           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1417         } else {
1418           GST_INFO ("Buffering done");
1419         }
1420       } else {
1421         /* buffering busy */
1422         if (media->buffering == FALSE) {
1423           if (media->target_state == GST_STATE_PLAYING) {
1424             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1425             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1426             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1427           } else {
1428             GST_INFO ("Buffering ...");
1429           }
1430         }
1431         media->buffering = TRUE;
1432       }
1433       break;
1434     }
1435     case GST_MESSAGE_LATENCY:
1436     {
1437       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1438       break;
1439     }
1440     case GST_MESSAGE_ERROR:
1441     {
1442       GError *gerror;
1443       gchar *debug;
1444
1445       gst_message_parse_error (message, &gerror, &debug);
1446       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1447       g_error_free (gerror);
1448       g_free (debug);
1449
1450       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1451       break;
1452     }
1453     case GST_MESSAGE_WARNING:
1454     {
1455       GError *gerror;
1456       gchar *debug;
1457
1458       gst_message_parse_warning (message, &gerror, &debug);
1459       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1460       g_error_free (gerror);
1461       g_free (debug);
1462       break;
1463     }
1464     case GST_MESSAGE_ELEMENT:
1465       break;
1466     case GST_MESSAGE_STREAM_STATUS:
1467       break;
1468     case GST_MESSAGE_ASYNC_DONE:
1469       if (!media->adding) {
1470         /* when we are dynamically adding pads, the addition of the udpsrc will
1471          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1472          * wait for the final ASYNC_DONE after everything prerolled */
1473         GST_INFO ("%p: got ASYNC_DONE", media);
1474         collect_media_stats (media);
1475
1476         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1477       } else {
1478         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1479       }
1480       break;
1481     case GST_MESSAGE_EOS:
1482       GST_INFO ("%p: got EOS", media);
1483       if (media->eos_pending) {
1484         GST_DEBUG ("shutting down after EOS");
1485         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1486         media->eos_pending = FALSE;
1487         g_object_unref (media);
1488       }
1489       break;
1490     default:
1491       GST_INFO ("%p: got message type %s", media,
1492           gst_message_type_get_name (type));
1493       break;
1494   }
1495   return TRUE;
1496 }
1497
1498 static gboolean
1499 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1500 {
1501   GstRTSPMediaClass *klass;
1502   gboolean ret;
1503
1504   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1505
1506   if (klass->handle_message)
1507     ret = klass->handle_message (media, message);
1508   else
1509     ret = FALSE;
1510
1511   return ret;
1512 }
1513
1514 /* called from streaming threads */
1515 static void
1516 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1517 {
1518   GstRTSPMediaStream *stream;
1519   gchar *name;
1520   gint i;
1521
1522   i = media->streams->len + 1;
1523
1524   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1525
1526   stream = g_new0 (GstRTSPMediaStream, 1);
1527   stream->payloader = element;
1528
1529   name = g_strdup_printf ("dynpay%d", i);
1530
1531   media->adding = TRUE;
1532
1533   /* ghost the pad of the payloader to the element */
1534   stream->srcpad = gst_ghost_pad_new (name, pad);
1535   gst_pad_set_active (stream->srcpad, TRUE);
1536   gst_element_add_pad (media->element, stream->srcpad);
1537   g_free (name);
1538
1539   /* add stream now */
1540   g_array_append_val (media->streams, stream);
1541
1542   setup_stream (stream, i, media);
1543
1544   for (i = 0; i < 2; i++) {
1545     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1546     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1547     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1548     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1549     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1550   }
1551   media->adding = FALSE;
1552 }
1553
1554 static void
1555 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1556 {
1557   GST_INFO ("no more pads");
1558   if (media->fakesink) {
1559     gst_object_ref (media->fakesink);
1560     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1561     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1562     gst_object_unref (media->fakesink);
1563     media->fakesink = NULL;
1564     GST_INFO ("removed fakesink");
1565   }
1566 }
1567
1568 /**
1569  * gst_rtsp_media_prepare:
1570  * @media: a #GstRTSPMedia
1571  *
1572  * Prepare @media for streaming. This function will create the pipeline and
1573  * other objects to manage the streaming.
1574  *
1575  * It will preroll the pipeline and collect vital information about the streams
1576  * such as the duration.
1577  *
1578  * Returns: %TRUE on success.
1579  */
1580 gboolean
1581 gst_rtsp_media_prepare (GstRTSPMedia * media)
1582 {
1583   GstStateChangeReturn ret;
1584   GstRTSPMediaStatus status;
1585   guint i, n_streams;
1586   GstRTSPMediaClass *klass;
1587   GstBus *bus;
1588   GList *walk;
1589
1590   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1591     goto was_prepared;
1592
1593   if (!media->reusable && media->reused)
1594     goto is_reused;
1595
1596   GST_INFO ("preparing media %p", media);
1597
1598   /* reset some variables */
1599   media->is_live = FALSE;
1600   media->buffering = FALSE;
1601   /* we're preparing now */
1602   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1603
1604   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1605
1606   /* add the pipeline bus to our custom mainloop */
1607   media->source = gst_bus_create_watch (bus);
1608   gst_object_unref (bus);
1609
1610   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1611
1612   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1613   media->id = g_source_attach (media->source, klass->context);
1614
1615   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1616
1617   /* add stuff to the bin */
1618   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1619
1620   /* link streams we already have, other streams might appear when we have
1621    * dynamic elements */
1622   n_streams = gst_rtsp_media_n_streams (media);
1623   for (i = 0; i < n_streams; i++) {
1624     GstRTSPMediaStream *stream;
1625
1626     stream = gst_rtsp_media_get_stream (media, i);
1627
1628     setup_stream (stream, i, media);
1629   }
1630
1631   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1632     GstElement *elem = walk->data;
1633
1634     GST_INFO ("adding callbacks for dynamic element %p", elem);
1635
1636     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1637     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1638
1639     /* we add a fakesink here in order to make the state change async. We remove
1640      * the fakesink again in the no-more-pads callback. */
1641     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1642     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1643   }
1644
1645   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1646   /* first go to PAUSED */
1647   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1648   media->target_state = GST_STATE_PAUSED;
1649
1650   switch (ret) {
1651     case GST_STATE_CHANGE_SUCCESS:
1652       GST_INFO ("SUCCESS state change for media %p", media);
1653       break;
1654     case GST_STATE_CHANGE_ASYNC:
1655       GST_INFO ("ASYNC state change for media %p", media);
1656       break;
1657     case GST_STATE_CHANGE_NO_PREROLL:
1658       /* we need to go to PLAYING */
1659       GST_INFO ("NO_PREROLL state change: live media %p", media);
1660       media->is_live = TRUE;
1661       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1662       if (ret == GST_STATE_CHANGE_FAILURE)
1663         goto state_failed;
1664       break;
1665     case GST_STATE_CHANGE_FAILURE:
1666       goto state_failed;
1667   }
1668
1669   /* now wait for all pads to be prerolled */
1670   status = gst_rtsp_media_get_status (media);
1671   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1672     goto state_failed;
1673
1674   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1675
1676   GST_INFO ("object %p is prerolled", media);
1677
1678   return TRUE;
1679
1680   /* OK */
1681 was_prepared:
1682   {
1683     return TRUE;
1684   }
1685   /* ERRORS */
1686 is_reused:
1687   {
1688     GST_WARNING ("can not reuse media %p", media);
1689     return FALSE;
1690   }
1691 state_failed:
1692   {
1693     GST_WARNING ("failed to preroll pipeline");
1694     unlock_streams (media);
1695     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1696     gst_rtsp_media_unprepare (media);
1697     return FALSE;
1698   }
1699 }
1700
1701 /**
1702  * gst_rtsp_media_unprepare:
1703  * @media: a #GstRTSPMedia
1704  *
1705  * Unprepare @media. After this call, the media should be prepared again before
1706  * it can be used again. If the media is set to be non-reusable, a new instance
1707  * must be created.
1708  *
1709  * Returns: %TRUE on success.
1710  */
1711 gboolean
1712 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1713 {
1714   GstRTSPMediaClass *klass;
1715   gboolean success;
1716
1717   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1718     return TRUE;
1719
1720   GST_INFO ("unprepare media %p", media);
1721   media->target_state = GST_STATE_NULL;
1722
1723   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1724   if (klass->unprepare)
1725     success = klass->unprepare (media);
1726   else
1727     success = TRUE;
1728
1729   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1730   media->reused = TRUE;
1731
1732   /* when the media is not reusable, this will effectively unref the media and
1733    * recreate it */
1734   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1735
1736   return success;
1737 }
1738
1739 static gboolean
1740 default_unprepare (GstRTSPMedia * media)
1741 {
1742   if (media->eos_shutdown) {
1743     GST_DEBUG ("sending EOS for shutdown");
1744     /* ref so that we don't disappear */
1745     g_object_ref (media);
1746     media->eos_pending = TRUE;
1747     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1748     /* we need to go to playing again for the EOS to propagate, normally in this
1749      * state, nothing is receiving data from us anymore so this is ok. */
1750     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1751   } else {
1752     GST_DEBUG ("shutting down");
1753     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1754   }
1755   return TRUE;
1756 }
1757
1758 static void
1759 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1760     gchar * dest, gint min, gint max)
1761 {
1762   gboolean do_add = TRUE;
1763   RTSPDestination *ndest;
1764
1765   if (stream->filter_duplicates) {
1766     RTSPDestination fdest;
1767     GList *find;
1768
1769     fdest.dest = dest;
1770     fdest.min = min;
1771     fdest.max = max;
1772
1773     /* first see if we already added this destination */
1774     find =
1775         g_list_find_custom (stream->destinations, &fdest,
1776         (GCompareFunc) dest_compare);
1777     if (find) {
1778       ndest = (RTSPDestination *) find->data;
1779
1780       GST_INFO ("already streaming to %s:%d-%d with %d clients", dest, min, max,
1781           ndest->count);
1782       ndest->count++;
1783       do_add = FALSE;
1784     }
1785   }
1786
1787   if (do_add) {
1788     GST_INFO ("adding %s:%d-%d", dest, min, max);
1789     g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1790     g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1791
1792     if (stream->filter_duplicates) {
1793       ndest = create_destination (dest, min, max);
1794       stream->destinations = g_list_prepend (stream->destinations, ndest);
1795     }
1796   }
1797 }
1798
1799 static void
1800 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1801     gchar * dest, gint min, gint max)
1802 {
1803   gboolean do_remove = TRUE;
1804   RTSPDestination *ndest = NULL;
1805   GList *find = NULL;
1806
1807   if (stream->filter_duplicates) {
1808     RTSPDestination fdest;
1809
1810     fdest.dest = dest;
1811     fdest.min = min;
1812     fdest.max = max;
1813
1814     /* first see if we already added this destination */
1815     find =
1816         g_list_find_custom (stream->destinations, &fdest,
1817         (GCompareFunc) dest_compare);
1818     if (!find)
1819       return;
1820
1821     ndest = (RTSPDestination *) find->data;
1822     if (--ndest->count > 0) {
1823       do_remove = FALSE;
1824       GST_INFO ("still streaming to %s:%d-%d with %d clients", dest, min, max,
1825           ndest->count);
1826     }
1827   }
1828
1829   if (do_remove) {
1830     GST_INFO ("removing %s:%d-%d", dest, min, max);
1831     g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1832     g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1833
1834     if (stream->filter_duplicates) {
1835       stream->destinations = g_list_delete_link (stream->destinations, find);
1836       free_destination (ndest);
1837     }
1838   }
1839 }
1840
1841 /**
1842  * gst_rtsp_media_set_state:
1843  * @media: a #GstRTSPMedia
1844  * @state: the target state of the media
1845  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1846  *
1847  * Set the state of @media to @state and for the transports in @transports.
1848  *
1849  * Returns: %TRUE on success.
1850  */
1851 gboolean
1852 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1853     GArray * transports)
1854 {
1855   gint i;
1856   GstStateChangeReturn ret;
1857   gboolean add, remove, do_state;
1858   gint old_active;
1859
1860   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1861   g_return_val_if_fail (transports != NULL, FALSE);
1862
1863   /* NULL and READY are the same */
1864   if (state == GST_STATE_READY)
1865     state = GST_STATE_NULL;
1866
1867   add = remove = FALSE;
1868
1869   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1870       media);
1871
1872   switch (state) {
1873     case GST_STATE_NULL:
1874       /* unlock the streams so that they follow the state changes from now on */
1875       unlock_streams (media);
1876       /* fallthrough */
1877     case GST_STATE_PAUSED:
1878       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1879       if (media->target_state == GST_STATE_PLAYING)
1880         remove = TRUE;
1881       break;
1882     case GST_STATE_PLAYING:
1883       /* we're going to PLAYING, add */
1884       add = TRUE;
1885       break;
1886     default:
1887       break;
1888   }
1889   old_active = media->active;
1890
1891   for (i = 0; i < transports->len; i++) {
1892     GstRTSPMediaTrans *tr;
1893     GstRTSPMediaStream *stream;
1894     GstRTSPTransport *trans;
1895
1896     /* we need a non-NULL entry in the array */
1897     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1898     if (tr == NULL)
1899       continue;
1900
1901     /* we need a transport */
1902     if (!(trans = tr->transport))
1903       continue;
1904
1905     /* get the stream and add the destinations */
1906     stream = gst_rtsp_media_get_stream (media, tr->idx);
1907     switch (trans->lower_transport) {
1908       case GST_RTSP_LOWER_TRANS_UDP:
1909       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1910       {
1911         gchar *dest;
1912         gint min, max;
1913
1914         dest = trans->destination;
1915         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1916           min = trans->port.min;
1917           max = trans->port.max;
1918         } else {
1919           min = trans->client_port.min;
1920           max = trans->client_port.max;
1921         }
1922
1923         if (add && !tr->active) {
1924           add_udp_destination (media, stream, dest, min, max);
1925           stream->transports = g_list_prepend (stream->transports, tr);
1926           tr->active = TRUE;
1927           media->active++;
1928         } else if (remove && tr->active) {
1929           remove_udp_destination (media, stream, dest, min, max);
1930           stream->transports = g_list_remove (stream->transports, tr);
1931           tr->active = FALSE;
1932           media->active--;
1933         }
1934         break;
1935       }
1936       case GST_RTSP_LOWER_TRANS_TCP:
1937         if (add && !tr->active) {
1938           GST_INFO ("adding TCP %s", trans->destination);
1939           stream->transports = g_list_prepend (stream->transports, tr);
1940           tr->active = TRUE;
1941           media->active++;
1942         } else if (remove && tr->active) {
1943           GST_INFO ("removing TCP %s", trans->destination);
1944           stream->transports = g_list_remove (stream->transports, tr);
1945           tr->active = FALSE;
1946           media->active--;
1947         }
1948         break;
1949       default:
1950         GST_INFO ("Unknown transport %d", trans->lower_transport);
1951         break;
1952     }
1953   }
1954
1955   /* we just added the first media, do the playing state change */
1956   if (old_active == 0 && add)
1957     do_state = TRUE;
1958   /* if we have no more active media, do the downward state changes */
1959   else if (media->active == 0)
1960     do_state = TRUE;
1961   else
1962     do_state = FALSE;
1963
1964   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
1965       media, do_state);
1966
1967   if (media->target_state != state) {
1968     if (do_state) {
1969       if (state == GST_STATE_NULL) {
1970         gst_rtsp_media_unprepare (media);
1971       } else {
1972         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
1973             media);
1974         media->target_state = state;
1975         ret = gst_element_set_state (media->pipeline, state);
1976       }
1977     }
1978     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
1979         NULL);
1980   }
1981
1982   /* remember where we are */
1983   if (state == GST_STATE_PAUSED || old_active != media->active)
1984     collect_media_stats (media);
1985
1986   return TRUE;
1987 }
1988
1989 /**
1990  * gst_rtsp_media_remove_elements:
1991  * @media: a #GstRTSPMedia
1992  *
1993  * Remove all elements and the pipeline controlled by @media.
1994  */
1995 void
1996 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1997 {
1998   gint i, j;
1999
2000   unlock_streams (media);
2001
2002   for (i = 0; i < media->streams->len; i++) {
2003     GstRTSPMediaStream *stream;
2004
2005     GST_INFO ("Removing elements of stream %d from pipeline", i);
2006
2007     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2008
2009     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2010
2011     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2012
2013     for (j = 0; j < 2; j++) {
2014       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2015       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2016       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2017       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2018       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2019       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2020
2021       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2022       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2023       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2024       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2025       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2026       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2027     }
2028     if (stream->caps)
2029       gst_caps_unref (stream->caps);
2030     stream->caps = NULL;
2031     gst_rtsp_media_stream_free (stream);
2032   }
2033   g_array_remove_range (media->streams, 0, media->streams->len);
2034
2035   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2036   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2037
2038   gst_object_unref (media->pipeline);
2039   media->pipeline = NULL;
2040 }