media: enable per factory authorisations
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-funnel.h"
27 #include "rtsp-media.h"
28
29 #define DEFAULT_SHARED         FALSE
30 #define DEFAULT_REUSABLE       FALSE
31 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
32 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
33 #define DEFAULT_EOS_SHUTDOWN   FALSE
34
35 /* define to dump received RTCP packets */
36 #undef DUMP_STATS
37
38 enum
39 {
40   PROP_0,
41   PROP_SHARED,
42   PROP_REUSABLE,
43   PROP_PROTOCOLS,
44   PROP_EOS_SHUTDOWN,
45   PROP_LAST
46 };
47
48 enum
49 {
50   SIGNAL_PREPARED,
51   SIGNAL_UNPREPARED,
52   SIGNAL_NEW_STATE,
53   SIGNAL_LAST
54 };
55
56 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
57 #define GST_CAT_DEFAULT rtsp_media_debug
58
59 static GQuark ssrc_stream_map_key;
60
61 static void gst_rtsp_media_get_property (GObject * object, guint propid,
62     GValue * value, GParamSpec * pspec);
63 static void gst_rtsp_media_set_property (GObject * object, guint propid,
64     const GValue * value, GParamSpec * pspec);
65 static void gst_rtsp_media_finalize (GObject * obj);
66
67 static gpointer do_loop (GstRTSPMediaClass * klass);
68 static gboolean default_handle_message (GstRTSPMedia * media,
69     GstMessage * message);
70 static gboolean default_unprepare (GstRTSPMedia * media);
71 static void unlock_streams (GstRTSPMedia * media);
72
73 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
74
75 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
76
77 static void
78 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
79 {
80   GObjectClass *gobject_class;
81   GError *error = NULL;
82
83   gobject_class = G_OBJECT_CLASS (klass);
84
85   gobject_class->get_property = gst_rtsp_media_get_property;
86   gobject_class->set_property = gst_rtsp_media_set_property;
87   gobject_class->finalize = gst_rtsp_media_finalize;
88
89   g_object_class_install_property (gobject_class, PROP_SHARED,
90       g_param_spec_boolean ("shared", "Shared",
91           "If this media pipeline can be shared", DEFAULT_SHARED,
92           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
93
94   g_object_class_install_property (gobject_class, PROP_REUSABLE,
95       g_param_spec_boolean ("reusable", "Reusable",
96           "If this media pipeline can be reused after an unprepare",
97           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98
99   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
100       g_param_spec_flags ("protocols", "Protocols",
101           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
102           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
103
104   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
105       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
106           "Send an EOS event to the pipeline before unpreparing",
107           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
108
109   gst_rtsp_media_signals[SIGNAL_PREPARED] =
110       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
111       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
112       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
113
114   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
115       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
116       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
117       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
118
119   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
120       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
121       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
122       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
123
124   klass->context = g_main_context_new ();
125   klass->loop = g_main_loop_new (klass->context, TRUE);
126
127   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
128   if (error != NULL) {
129     g_critical ("could not start bus thread: %s", error->message);
130   }
131   klass->handle_message = default_handle_message;
132   klass->unprepare = default_unprepare;
133
134   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
135
136   gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL);
137 }
138
139 static void
140 gst_rtsp_media_init (GstRTSPMedia * media)
141 {
142   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
143   media->lock = g_mutex_new ();
144   media->cond = g_cond_new ();
145
146   media->shared = DEFAULT_SHARED;
147   media->reusable = DEFAULT_REUSABLE;
148   media->protocols = DEFAULT_PROTOCOLS;
149   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
150 }
151
152 /* FIXME. this should be done in multiudpsink */
153 typedef struct
154 {
155   gint count;
156   gchar *dest;
157   gint min, max;
158 } RTSPDestination;
159
160 static gint
161 dest_compare (RTSPDestination * a, RTSPDestination * b)
162 {
163   if ((a->min == b->min) && (a->max == b->max)
164       && (strcmp (a->dest, b->dest) == 0))
165     return 0;
166
167   return 1;
168 }
169
170 static RTSPDestination *
171 create_destination (const gchar * dest, gint min, gint max)
172 {
173   RTSPDestination *res;
174
175   res = g_slice_new (RTSPDestination);
176   res->count = 1;
177   res->dest = g_strdup (dest);
178   res->min = min;
179   res->max = max;
180
181   return res;
182 }
183
184 static void
185 free_destination (RTSPDestination * dest)
186 {
187   g_free (dest->dest);
188   g_slice_free (RTSPDestination, dest);
189 }
190
191 void
192 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
193 {
194   if (trans->transport) {
195     gst_rtsp_transport_free (trans->transport);
196     trans->transport = NULL;
197   }
198   if (trans->rtpsource) {
199     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
200     trans->rtpsource = NULL;
201   }
202 }
203
204 static void
205 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
206 {
207   if (stream->session)
208     g_object_unref (stream->session);
209
210   if (stream->caps)
211     gst_caps_unref (stream->caps);
212
213   if (stream->send_rtp_sink)
214     gst_object_unref (stream->send_rtp_sink);
215   if (stream->send_rtp_src)
216     gst_object_unref (stream->send_rtp_src);
217   if (stream->send_rtcp_src)
218     gst_object_unref (stream->send_rtcp_src);
219   if (stream->recv_rtcp_sink)
220     gst_object_unref (stream->recv_rtcp_sink);
221   if (stream->recv_rtp_sink)
222     gst_object_unref (stream->recv_rtp_sink);
223
224   g_list_free (stream->transports);
225
226   g_list_foreach (stream->destinations, (GFunc) free_destination, NULL);
227   g_list_free (stream->destinations);
228
229   g_free (stream);
230 }
231
232 static void
233 gst_rtsp_media_finalize (GObject * obj)
234 {
235   GstRTSPMedia *media;
236   guint i;
237
238   media = GST_RTSP_MEDIA (obj);
239
240   GST_INFO ("finalize media %p", media);
241
242   if (media->pipeline) {
243     unlock_streams (media);
244     gst_element_set_state (media->pipeline, GST_STATE_NULL);
245     gst_object_unref (media->pipeline);
246   }
247
248   for (i = 0; i < media->streams->len; i++) {
249     GstRTSPMediaStream *stream;
250
251     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
252
253     gst_rtsp_media_stream_free (stream);
254   }
255   g_array_free (media->streams, TRUE);
256
257   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
258   g_list_free (media->dynamic);
259
260   if (media->source) {
261     g_source_destroy (media->source);
262     g_source_unref (media->source);
263   }
264   g_mutex_free (media->lock);
265   g_cond_free (media->cond);
266
267   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
268 }
269
270 static void
271 gst_rtsp_media_get_property (GObject * object, guint propid,
272     GValue * value, GParamSpec * pspec)
273 {
274   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
275
276   switch (propid) {
277     case PROP_SHARED:
278       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
279       break;
280     case PROP_REUSABLE:
281       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
282       break;
283     case PROP_PROTOCOLS:
284       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
285       break;
286     case PROP_EOS_SHUTDOWN:
287       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
288       break;
289     default:
290       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
291   }
292 }
293
294 static void
295 gst_rtsp_media_set_property (GObject * object, guint propid,
296     const GValue * value, GParamSpec * pspec)
297 {
298   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
299
300   switch (propid) {
301     case PROP_SHARED:
302       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
303       break;
304     case PROP_REUSABLE:
305       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
306       break;
307     case PROP_PROTOCOLS:
308       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
309       break;
310     case PROP_EOS_SHUTDOWN:
311       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
312       break;
313     default:
314       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
315   }
316 }
317
318 static gpointer
319 do_loop (GstRTSPMediaClass * klass)
320 {
321   GST_INFO ("enter mainloop");
322   g_main_loop_run (klass->loop);
323   GST_INFO ("exit mainloop");
324
325   return NULL;
326 }
327
328 static void
329 collect_media_stats (GstRTSPMedia * media)
330 {
331   GstFormat format;
332   gint64 position, duration;
333
334   media->range.unit = GST_RTSP_RANGE_NPT;
335
336   if (media->is_live) {
337     media->range.min.type = GST_RTSP_TIME_NOW;
338     media->range.min.seconds = -1;
339     media->range.max.type = GST_RTSP_TIME_END;
340     media->range.max.seconds = -1;
341   } else {
342     /* get the position */
343     format = GST_FORMAT_TIME;
344     if (!gst_element_query_position (media->pipeline, &format, &position)) {
345       GST_INFO ("position query failed");
346       position = 0;
347     }
348
349     /* get the duration */
350     format = GST_FORMAT_TIME;
351     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
352       GST_INFO ("duration query failed");
353       duration = -1;
354     }
355
356     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
357         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
358
359     if (position == -1) {
360       media->range.min.type = GST_RTSP_TIME_NOW;
361       media->range.min.seconds = -1;
362     } else {
363       media->range.min.type = GST_RTSP_TIME_SECONDS;
364       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
365     }
366     if (duration == -1) {
367       media->range.max.type = GST_RTSP_TIME_END;
368       media->range.max.seconds = -1;
369     } else {
370       media->range.max.type = GST_RTSP_TIME_SECONDS;
371       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
372     }
373   }
374 }
375
376 /**
377  * gst_rtsp_media_new:
378  *
379  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
380  * element to produde RTP data for one or more related (audio/video/..) 
381  * streams.
382  *
383  * Returns: a new #GstRTSPMedia object.
384  */
385 GstRTSPMedia *
386 gst_rtsp_media_new (void)
387 {
388   GstRTSPMedia *result;
389
390   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
391
392   return result;
393 }
394
395 /**
396  * gst_rtsp_media_set_shared:
397  * @media: a #GstRTSPMedia
398  * @shared: the new value
399  *
400  * Set or unset if the pipeline for @media can be shared will multiple clients.
401  * When @shared is %TRUE, client requests for this media will share the media
402  * pipeline.
403  */
404 void
405 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
406 {
407   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
408
409   media->shared = shared;
410 }
411
412 /**
413  * gst_rtsp_media_is_shared:
414  * @media: a #GstRTSPMedia
415  *
416  * Check if the pipeline for @media can be shared between multiple clients.
417  *
418  * Returns: %TRUE if the media can be shared between clients.
419  */
420 gboolean
421 gst_rtsp_media_is_shared (GstRTSPMedia * media)
422 {
423   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
424
425   return media->shared;
426 }
427
428 /**
429  * gst_rtsp_media_set_reusable:
430  * @media: a #GstRTSPMedia
431  * @reusable: the new value
432  *
433  * Set or unset if the pipeline for @media can be reused after the pipeline has
434  * been unprepared.
435  */
436 void
437 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
438 {
439   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
440
441   media->reusable = reusable;
442 }
443
444 /**
445  * gst_rtsp_media_is_reusable:
446  * @media: a #GstRTSPMedia
447  *
448  * Check if the pipeline for @media can be reused after an unprepare.
449  *
450  * Returns: %TRUE if the media can be reused
451  */
452 gboolean
453 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
454 {
455   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
456
457   return media->reusable;
458 }
459
460 /**
461  * gst_rtsp_media_set_protocols:
462  * @media: a #GstRTSPMedia
463  * @protocols: the new flags
464  *
465  * Configure the allowed lower transport for @media.
466  */
467 void
468 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
469 {
470   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
471
472   media->protocols = protocols;
473 }
474
475 /**
476  * gst_rtsp_media_get_protocols:
477  * @media: a #GstRTSPMedia
478  *
479  * Get the allowed protocols of @media.
480  *
481  * Returns: a #GstRTSPLowerTrans
482  */
483 GstRTSPLowerTrans
484 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
485 {
486   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
487       GST_RTSP_LOWER_TRANS_UNKNOWN);
488
489   return media->protocols;
490 }
491
492 /**
493  * gst_rtsp_media_set_eos_shutdown:
494  * @media: a #GstRTSPMedia
495  * @eos_shutdown: the new value
496  *
497  * Set or unset if an EOS event will be sent to the pipeline for @media before
498  * it is unprepared.
499  */
500 void
501 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
502 {
503   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
504
505   media->eos_shutdown = eos_shutdown;
506 }
507
508 /**
509  * gst_rtsp_media_is_eos_shutdown:
510  * @media: a #GstRTSPMedia
511  *
512  * Check if the pipeline for @media will send an EOS down the pipeline before
513  * unpreparing.
514  *
515  * Returns: %TRUE if the media will send EOS before unpreparing.
516  */
517 gboolean
518 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
519 {
520   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
521
522   return media->eos_shutdown;
523 }
524
525 /**
526  * gst_rtsp_media_set_auth:
527  * @media: a #GstRTSPMedia
528  * @auth: a #GstRTSPAuth
529  *
530  * configure @auth to be used as the authentication manager of @media.
531  */
532 void
533 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
534 {
535   GstRTSPAuth *old;
536
537   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
538
539   old = media->auth;
540
541   if (old != auth) {
542     if (auth)
543       g_object_ref (auth);
544     media->auth = auth;
545     if (old)
546       g_object_unref (old);
547   }
548 }
549
550 /**
551  * gst_rtsp_media_get_auth:
552  * @media: a #GstRTSPMedia
553  *
554  * Get the #GstRTSPAuth used as the authentication manager of @media.
555  *
556  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
557  * usage.
558  */
559 GstRTSPAuth *
560 gst_rtsp_media_get_auth (GstRTSPMedia * media)
561 {
562   GstRTSPAuth *result;
563
564   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
565
566   if ((result = media->auth))
567     g_object_ref (result);
568
569   return result;
570 }
571
572
573 /**
574  * gst_rtsp_media_n_streams:
575  * @media: a #GstRTSPMedia
576  *
577  * Get the number of streams in this media.
578  *
579  * Returns: The number of streams.
580  */
581 guint
582 gst_rtsp_media_n_streams (GstRTSPMedia * media)
583 {
584   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
585
586   return media->streams->len;
587 }
588
589 /**
590  * gst_rtsp_media_get_stream:
591  * @media: a #GstRTSPMedia
592  * @idx: the stream index
593  *
594  * Retrieve the stream with index @idx from @media.
595  *
596  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
597  * that index did not exist.
598  */
599 GstRTSPMediaStream *
600 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
601 {
602   GstRTSPMediaStream *res;
603
604   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
605
606   if (idx < media->streams->len)
607     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
608   else
609     res = NULL;
610
611   return res;
612 }
613
614 /**
615  * gst_rtsp_media_get_range_string:
616  * @media: a #GstRTSPMedia
617  * @play: for the PLAY request
618  *
619  * Get the current range as a string.
620  *
621  * Returns: The range as a string, g_free() after usage.
622  */
623 gchar *
624 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
625 {
626   gchar *result;
627   GstRTSPTimeRange range;
628
629   /* make copy */
630   range = media->range;
631
632   if (!play && media->active > 0) {
633     range.min.type = GST_RTSP_TIME_NOW;
634     range.min.seconds = -1;
635   }
636
637   result = gst_rtsp_range_to_string (&range);
638
639   return result;
640 }
641
642 /**
643  * gst_rtsp_media_seek:
644  * @media: a #GstRTSPMedia
645  * @range: a #GstRTSPTimeRange
646  *
647  * Seek the pipeline to @range.
648  *
649  * Returns: %TRUE on success.
650  */
651 gboolean
652 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
653 {
654   GstSeekFlags flags;
655   gboolean res;
656   gint64 start, stop;
657   GstSeekType start_type, stop_type;
658
659   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
660   g_return_val_if_fail (range != NULL, FALSE);
661
662   if (range->unit != GST_RTSP_RANGE_NPT)
663     goto not_supported;
664
665   /* depends on the current playing state of the pipeline. We might need to
666    * queue this until we get EOS. */
667   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
668
669   start_type = stop_type = GST_SEEK_TYPE_NONE;
670
671   switch (range->min.type) {
672     case GST_RTSP_TIME_NOW:
673       start = -1;
674       break;
675     case GST_RTSP_TIME_SECONDS:
676       /* only seek when something changed */
677       if (media->range.min.seconds == range->min.seconds) {
678         start = -1;
679       } else {
680         start = range->min.seconds * GST_SECOND;
681         start_type = GST_SEEK_TYPE_SET;
682       }
683       break;
684     case GST_RTSP_TIME_END:
685     default:
686       goto weird_type;
687   }
688   switch (range->max.type) {
689     case GST_RTSP_TIME_SECONDS:
690       /* only seek when something changed */
691       if (media->range.max.seconds == range->max.seconds) {
692         stop = -1;
693       } else {
694         stop = range->max.seconds * GST_SECOND;
695         stop_type = GST_SEEK_TYPE_SET;
696       }
697       break;
698     case GST_RTSP_TIME_END:
699       stop = -1;
700       stop_type = GST_SEEK_TYPE_SET;
701       break;
702     case GST_RTSP_TIME_NOW:
703     default:
704       goto weird_type;
705   }
706
707   if (start != -1 || stop != -1) {
708     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
709         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
710
711     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
712         flags, start_type, start, stop_type, stop);
713
714     /* and block for the seek to complete */
715     GST_INFO ("done seeking %d", res);
716     gst_element_get_state (media->pipeline, NULL, NULL, -1);
717     GST_INFO ("prerolled again");
718
719     collect_media_stats (media);
720   } else {
721     GST_INFO ("no seek needed");
722     res = TRUE;
723   }
724
725   return res;
726
727   /* ERRORS */
728 not_supported:
729   {
730     GST_WARNING ("seek unit %d not supported", range->unit);
731     return FALSE;
732   }
733 weird_type:
734   {
735     GST_WARNING ("weird range type %d not supported", range->min.type);
736     return FALSE;
737   }
738 }
739
740 /**
741  * gst_rtsp_media_stream_rtp:
742  * @stream: a #GstRTSPMediaStream
743  * @buffer: a #GstBuffer
744  *
745  * Handle an RTP buffer for the stream. This method is usually called when a
746  * message has been received from a client using the TCP transport.
747  *
748  * This function takes ownership of @buffer.
749  *
750  * Returns: a GstFlowReturn.
751  */
752 GstFlowReturn
753 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
754 {
755   GstFlowReturn ret;
756
757   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
758
759   return ret;
760 }
761
762 /**
763  * gst_rtsp_media_stream_rtcp:
764  * @stream: a #GstRTSPMediaStream
765  * @buffer: a #GstBuffer
766  *
767  * Handle an RTCP buffer for the stream. This method is usually called when a
768  * message has been received from a client using the TCP transport.
769  *
770  * This function takes ownership of @buffer.
771  *
772  * Returns: a GstFlowReturn.
773  */
774 GstFlowReturn
775 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
776 {
777   GstFlowReturn ret;
778
779   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
780
781   return ret;
782 }
783
784 /* Allocate the udp ports and sockets */
785 static gboolean
786 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
787 {
788   GstStateChangeReturn ret;
789   GstElement *udpsrc0, *udpsrc1;
790   GstElement *udpsink0, *udpsink1;
791   gint tmp_rtp, tmp_rtcp;
792   guint count;
793   gint rtpport, rtcpport, sockfd;
794   const gchar *host;
795
796   udpsrc0 = NULL;
797   udpsrc1 = NULL;
798   udpsink0 = NULL;
799   udpsink1 = NULL;
800   count = 0;
801
802   /* Start with random port */
803   tmp_rtp = 0;
804
805   if (media->is_ipv6)
806     host = "udp://[::0]";
807   else
808     host = "udp://0.0.0.0";
809
810   /* try to allocate 2 UDP ports, the RTP port should be an even
811    * number and the RTCP port should be the next (uneven) port */
812 again:
813   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
814   if (udpsrc0 == NULL)
815     goto no_udp_protocol;
816   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
817
818   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
819   if (ret == GST_STATE_CHANGE_FAILURE) {
820     if (tmp_rtp != 0) {
821       tmp_rtp += 2;
822       if (++count > 20)
823         goto no_ports;
824
825       gst_element_set_state (udpsrc0, GST_STATE_NULL);
826       gst_object_unref (udpsrc0);
827
828       goto again;
829     }
830     goto no_udp_protocol;
831   }
832
833   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
834
835   /* check if port is even */
836   if ((tmp_rtp & 1) != 0) {
837     /* port not even, close and allocate another */
838     if (++count > 20)
839       goto no_ports;
840
841     gst_element_set_state (udpsrc0, GST_STATE_NULL);
842     gst_object_unref (udpsrc0);
843
844     tmp_rtp++;
845     goto again;
846   }
847
848   /* allocate port+1 for RTCP now */
849   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
850   if (udpsrc1 == NULL)
851     goto no_udp_rtcp_protocol;
852
853   /* set port */
854   tmp_rtcp = tmp_rtp + 1;
855   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
856
857   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
858   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
859   if (ret == GST_STATE_CHANGE_FAILURE) {
860
861     if (++count > 20)
862       goto no_ports;
863
864     gst_element_set_state (udpsrc0, GST_STATE_NULL);
865     gst_object_unref (udpsrc0);
866
867     gst_element_set_state (udpsrc1, GST_STATE_NULL);
868     gst_object_unref (udpsrc1);
869
870     tmp_rtp += 2;
871     goto again;
872   }
873
874   /* all fine, do port check */
875   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
876   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
877
878   /* this should not happen... */
879   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
880     goto port_error;
881
882   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
883   if (!udpsink0)
884     goto no_udp_protocol;
885
886   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
887   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
888   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
889
890   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
891   if (!udpsink1)
892     goto no_udp_protocol;
893
894   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
895           "send-duplicates")) {
896     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
897     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
898     stream->filter_duplicates = FALSE;
899   } else {
900     GST_WARNING ("multiudpsink version found without send-duplicates property");
901     stream->filter_duplicates = TRUE;
902   }
903
904   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
905           "buffer-size")) {
906     g_object_set (G_OBJECT (udpsink0), "buffer-size", 0x80000, NULL);
907   } else {
908     GST_WARNING ("multiudpsink version found without buffer-size property");
909   }
910
911   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
912   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
913   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
914   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
915   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
916
917   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
918   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
919   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
920   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
921
922   /* we keep these elements, we configure all in configure_transport when the
923    * server told us to really use the UDP ports. */
924   stream->udpsrc[0] = udpsrc0;
925   stream->udpsrc[1] = udpsrc1;
926   stream->udpsink[0] = udpsink0;
927   stream->udpsink[1] = udpsink1;
928   stream->server_port.min = rtpport;
929   stream->server_port.max = rtcpport;
930
931   return TRUE;
932
933   /* ERRORS */
934 no_udp_protocol:
935   {
936     goto cleanup;
937   }
938 no_ports:
939   {
940     goto cleanup;
941   }
942 no_udp_rtcp_protocol:
943   {
944     goto cleanup;
945   }
946 port_error:
947   {
948     goto cleanup;
949   }
950 cleanup:
951   {
952     if (udpsrc0) {
953       gst_element_set_state (udpsrc0, GST_STATE_NULL);
954       gst_object_unref (udpsrc0);
955     }
956     if (udpsrc1) {
957       gst_element_set_state (udpsrc1, GST_STATE_NULL);
958       gst_object_unref (udpsrc1);
959     }
960     if (udpsink0) {
961       gst_element_set_state (udpsink0, GST_STATE_NULL);
962       gst_object_unref (udpsink0);
963     }
964     if (udpsink1) {
965       gst_element_set_state (udpsink1, GST_STATE_NULL);
966       gst_object_unref (udpsink1);
967     }
968     return FALSE;
969   }
970 }
971
972 /* executed from streaming thread */
973 static void
974 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
975 {
976   gchar *capsstr;
977   GstCaps *newcaps, *oldcaps;
978
979   if ((newcaps = GST_PAD_CAPS (pad)))
980     gst_caps_ref (newcaps);
981
982   oldcaps = stream->caps;
983   stream->caps = newcaps;
984
985   if (oldcaps)
986     gst_caps_unref (oldcaps);
987
988   capsstr = gst_caps_to_string (newcaps);
989   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
990   g_free (capsstr);
991 }
992
993 static void
994 dump_structure (const GstStructure * s)
995 {
996   gchar *sstr;
997
998   sstr = gst_structure_to_string (s);
999   GST_INFO ("structure: %s", sstr);
1000   g_free (sstr);
1001 }
1002
1003 static GstRTSPMediaTrans *
1004 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1005 {
1006   GList *walk;
1007   GstRTSPMediaTrans *result = NULL;
1008   const gchar *tmp;
1009   gchar *dest;
1010   guint port;
1011
1012   if (rtcp_from == NULL)
1013     return NULL;
1014
1015   tmp = g_strrstr (rtcp_from, ":");
1016   if (tmp == NULL)
1017     return NULL;
1018
1019   port = atoi (tmp + 1);
1020   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1021
1022   GST_INFO ("finding %s:%d", dest, port);
1023
1024   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1025     GstRTSPMediaTrans *trans = walk->data;
1026     gint min, max;
1027
1028     min = trans->transport->client_port.min;
1029     max = trans->transport->client_port.max;
1030
1031     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1032             || max == port)) {
1033       result = trans;
1034       break;
1035     }
1036   }
1037   g_free (dest);
1038
1039   return result;
1040 }
1041
1042 static void
1043 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1044 {
1045   GstStructure *stats;
1046   GstRTSPMediaTrans *trans;
1047
1048   GST_INFO ("%p: new source %p", stream, source);
1049
1050   /* see if we have a stream to match with the origin of the RTCP packet */
1051   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1052   if (trans == NULL) {
1053     g_object_get (source, "stats", &stats, NULL);
1054     if (stats) {
1055       const gchar *rtcp_from;
1056
1057       dump_structure (stats);
1058
1059       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1060       if ((trans = find_transport (stream, rtcp_from))) {
1061         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1062             source);
1063
1064         /* keep ref to the source */
1065         trans->rtpsource = source;
1066
1067         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1068       }
1069       gst_structure_free (stats);
1070     }
1071   } else {
1072     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1073   }
1074 }
1075
1076 static void
1077 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1078 {
1079   GST_INFO ("%p: new SDES %p", stream, source);
1080 }
1081
1082 static void
1083 on_ssrc_active (GObject * session, GObject * source,
1084     GstRTSPMediaStream * stream)
1085 {
1086   GstRTSPMediaTrans *trans;
1087
1088   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1089
1090   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1091
1092   if (trans && trans->keep_alive)
1093     trans->keep_alive (trans->ka_user_data);
1094
1095 #ifdef DUMP_STATS
1096   {
1097     GstStructure *stats;
1098     g_object_get (source, "stats", &stats, NULL);
1099     if (stats) {
1100       dump_structure (stats);
1101       gst_structure_free (stats);
1102     }
1103   }
1104 #endif
1105 }
1106
1107 static void
1108 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1109 {
1110   GST_INFO ("%p: source %p bye", stream, source);
1111 }
1112
1113 static void
1114 on_bye_timeout (GObject * session, GObject * source,
1115     GstRTSPMediaStream * stream)
1116 {
1117   GstRTSPMediaTrans *trans;
1118
1119   GST_INFO ("%p: source %p bye timeout", stream, source);
1120
1121   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1122     trans->rtpsource = NULL;
1123     trans->timeout = TRUE;
1124   }
1125 }
1126
1127 static void
1128 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1129 {
1130   GstRTSPMediaTrans *trans;
1131
1132   GST_INFO ("%p: source %p timeout", stream, source);
1133
1134   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1135     trans->rtpsource = NULL;
1136     trans->timeout = TRUE;
1137   }
1138 }
1139
1140 static GstFlowReturn
1141 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1142 {
1143   GList *walk;
1144   GstBuffer *buffer;
1145   GstRTSPMediaStream *stream;
1146
1147   buffer = gst_app_sink_pull_buffer (sink);
1148   if (!buffer)
1149     return GST_FLOW_OK;
1150
1151   stream = (GstRTSPMediaStream *) user_data;
1152
1153   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1154     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1155
1156     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1157       if (tr->send_rtp)
1158         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1159     } else {
1160       if (tr->send_rtcp)
1161         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1162     }
1163   }
1164   gst_buffer_unref (buffer);
1165
1166   return GST_FLOW_OK;
1167 }
1168
1169 static GstFlowReturn
1170 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1171 {
1172   GList *walk;
1173   GstBufferList *blist;
1174   GstRTSPMediaStream *stream;
1175
1176   blist = gst_app_sink_pull_buffer_list (sink);
1177   if (!blist)
1178     return GST_FLOW_OK;
1179
1180   stream = (GstRTSPMediaStream *) user_data;
1181
1182   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1183     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1184
1185     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1186       if (tr->send_rtp_list)
1187         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1188             tr->user_data);
1189     } else {
1190       if (tr->send_rtcp_list)
1191         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1192             tr->user_data);
1193     }
1194   }
1195   gst_buffer_list_unref (blist);
1196
1197   return GST_FLOW_OK;
1198 }
1199
1200 static GstAppSinkCallbacks sink_cb = {
1201   NULL,                         /* not interested in EOS */
1202   NULL,                         /* not interested in preroll buffers */
1203   handle_new_buffer,
1204   handle_new_buffer_list
1205 };
1206
1207 /* prepare the pipeline objects to handle @stream in @media */
1208 static gboolean
1209 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1210 {
1211   gchar *name;
1212   GstPad *pad, *teepad, *selpad;
1213   GstPadLinkReturn ret;
1214   gint i;
1215
1216   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1217    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1218    * elements */
1219   if (!alloc_udp_ports (media, stream))
1220     return FALSE;
1221
1222   /* add the ports to the pipeline */
1223   for (i = 0; i < 2; i++) {
1224     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1225     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1226   }
1227
1228   /* create elements for the TCP transfer */
1229   for (i = 0; i < 2; i++) {
1230     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1231     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1232     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1233     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1234     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1235     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1236     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1237     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1238         &sink_cb, stream, NULL);
1239   }
1240
1241   /* hook up the stream to the RTP session elements. */
1242   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1243   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1244   g_free (name);
1245   name = g_strdup_printf ("send_rtp_src_%d", idx);
1246   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1247   g_free (name);
1248   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1249   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1250   g_free (name);
1251   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1252   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1253   g_free (name);
1254   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1255   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1256   g_free (name);
1257
1258   /* get the session */
1259   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1260       &stream->session);
1261
1262   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1263       stream);
1264   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1265       stream);
1266   g_signal_connect (stream->session, "on-ssrc-active",
1267       (GCallback) on_ssrc_active, stream);
1268   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1269       stream);
1270   g_signal_connect (stream->session, "on-bye-timeout",
1271       (GCallback) on_bye_timeout, stream);
1272   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1273       stream);
1274
1275   /* link the RTP pad to the session manager */
1276   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1277   if (ret != GST_PAD_LINK_OK)
1278     goto link_failed;
1279
1280   /* make tee for RTP and link to stream */
1281   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1282   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1283
1284   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1285   gst_pad_link (stream->send_rtp_src, pad);
1286   gst_object_unref (pad);
1287
1288   /* link RTP sink, we're pretty sure this will work. */
1289   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1290   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1291   gst_pad_link (teepad, pad);
1292   gst_object_unref (pad);
1293   gst_object_unref (teepad);
1294
1295   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1296   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1297   gst_pad_link (teepad, pad);
1298   gst_object_unref (pad);
1299   gst_object_unref (teepad);
1300
1301   /* make tee for RTCP */
1302   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1303   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1304
1305   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1306   gst_pad_link (stream->send_rtcp_src, pad);
1307   gst_object_unref (pad);
1308
1309   /* link RTCP elements */
1310   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1311   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1312   gst_pad_link (teepad, pad);
1313   gst_object_unref (pad);
1314   gst_object_unref (teepad);
1315
1316   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1317   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1318   gst_pad_link (teepad, pad);
1319   gst_object_unref (pad);
1320   gst_object_unref (teepad);
1321
1322   /* make selector for the RTP receivers */
1323   stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
1324   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1325
1326   pad = gst_element_get_static_pad (stream->selector[0], "src");
1327   gst_pad_link (pad, stream->recv_rtp_sink);
1328   gst_object_unref (pad);
1329
1330   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1331   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1332   gst_pad_link (pad, selpad);
1333   gst_object_unref (pad);
1334   gst_object_unref (selpad);
1335
1336   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1337   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1338   gst_pad_link (pad, selpad);
1339   gst_object_unref (pad);
1340   gst_object_unref (selpad);
1341
1342   /* make selector for the RTCP receivers */
1343   stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
1344   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1345
1346   pad = gst_element_get_static_pad (stream->selector[1], "src");
1347   gst_pad_link (pad, stream->recv_rtcp_sink);
1348   gst_object_unref (pad);
1349
1350   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1351   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1352   gst_pad_link (pad, selpad);
1353   gst_object_unref (pad);
1354   gst_object_unref (selpad);
1355
1356   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1357   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1358   gst_pad_link (pad, selpad);
1359   gst_object_unref (pad);
1360   gst_object_unref (selpad);
1361
1362   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1363    * values */
1364   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1365   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1366   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1367   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1368
1369   /* be notified of caps changes */
1370   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1371       (GCallback) caps_notify, stream);
1372
1373   stream->prepared = TRUE;
1374
1375   return TRUE;
1376
1377   /* ERRORS */
1378 link_failed:
1379   {
1380     GST_WARNING ("failed to link stream %d", idx);
1381     return FALSE;
1382   }
1383 }
1384
1385 static void
1386 unlock_streams (GstRTSPMedia * media)
1387 {
1388   guint i, n_streams;
1389
1390   /* unlock the udp src elements */
1391   n_streams = gst_rtsp_media_n_streams (media);
1392   for (i = 0; i < n_streams; i++) {
1393     GstRTSPMediaStream *stream;
1394
1395     stream = gst_rtsp_media_get_stream (media, i);
1396
1397     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1398     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1399   }
1400 }
1401
1402 static void
1403 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1404 {
1405   g_mutex_lock (media->lock);
1406   /* never overwrite the error status */
1407   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1408     media->status = status;
1409   GST_DEBUG ("setting new status to %d", status);
1410   g_cond_broadcast (media->cond);
1411   g_mutex_unlock (media->lock);
1412 }
1413
1414 static GstRTSPMediaStatus
1415 gst_rtsp_media_get_status (GstRTSPMedia * media)
1416 {
1417   GstRTSPMediaStatus result;
1418   GTimeVal timeout;
1419
1420   g_mutex_lock (media->lock);
1421   g_get_current_time (&timeout);
1422   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1423   /* while we are preparing, wait */
1424   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1425     GST_DEBUG ("waiting for status change");
1426     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1427       GST_DEBUG ("timeout, assuming error status");
1428       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1429     }
1430   }
1431   /* could be success or error */
1432   result = media->status;
1433   GST_DEBUG ("got status %d", result);
1434   g_mutex_unlock (media->lock);
1435
1436   return result;
1437 }
1438
1439 static gboolean
1440 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1441 {
1442   GstMessageType type;
1443
1444   type = GST_MESSAGE_TYPE (message);
1445
1446   switch (type) {
1447     case GST_MESSAGE_STATE_CHANGED:
1448       break;
1449     case GST_MESSAGE_BUFFERING:
1450     {
1451       gint percent;
1452
1453       gst_message_parse_buffering (message, &percent);
1454
1455       /* no state management needed for live pipelines */
1456       if (media->is_live)
1457         break;
1458
1459       if (percent == 100) {
1460         /* a 100% message means buffering is done */
1461         media->buffering = FALSE;
1462         /* if the desired state is playing, go back */
1463         if (media->target_state == GST_STATE_PLAYING) {
1464           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1465           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1466         } else {
1467           GST_INFO ("Buffering done");
1468         }
1469       } else {
1470         /* buffering busy */
1471         if (media->buffering == FALSE) {
1472           if (media->target_state == GST_STATE_PLAYING) {
1473             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1474             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1475             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1476           } else {
1477             GST_INFO ("Buffering ...");
1478           }
1479         }
1480         media->buffering = TRUE;
1481       }
1482       break;
1483     }
1484     case GST_MESSAGE_LATENCY:
1485     {
1486       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1487       break;
1488     }
1489     case GST_MESSAGE_ERROR:
1490     {
1491       GError *gerror;
1492       gchar *debug;
1493
1494       gst_message_parse_error (message, &gerror, &debug);
1495       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1496       g_error_free (gerror);
1497       g_free (debug);
1498
1499       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1500       break;
1501     }
1502     case GST_MESSAGE_WARNING:
1503     {
1504       GError *gerror;
1505       gchar *debug;
1506
1507       gst_message_parse_warning (message, &gerror, &debug);
1508       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1509       g_error_free (gerror);
1510       g_free (debug);
1511       break;
1512     }
1513     case GST_MESSAGE_ELEMENT:
1514       break;
1515     case GST_MESSAGE_STREAM_STATUS:
1516       break;
1517     case GST_MESSAGE_ASYNC_DONE:
1518       if (!media->adding) {
1519         /* when we are dynamically adding pads, the addition of the udpsrc will
1520          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1521          * wait for the final ASYNC_DONE after everything prerolled */
1522         GST_INFO ("%p: got ASYNC_DONE", media);
1523         collect_media_stats (media);
1524
1525         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1526       } else {
1527         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1528       }
1529       break;
1530     case GST_MESSAGE_EOS:
1531       GST_INFO ("%p: got EOS", media);
1532       if (media->eos_pending) {
1533         GST_DEBUG ("shutting down after EOS");
1534         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1535         media->eos_pending = FALSE;
1536         g_object_unref (media);
1537       }
1538       break;
1539     default:
1540       GST_INFO ("%p: got message type %s", media,
1541           gst_message_type_get_name (type));
1542       break;
1543   }
1544   return TRUE;
1545 }
1546
1547 static gboolean
1548 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1549 {
1550   GstRTSPMediaClass *klass;
1551   gboolean ret;
1552
1553   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1554
1555   if (klass->handle_message)
1556     ret = klass->handle_message (media, message);
1557   else
1558     ret = FALSE;
1559
1560   return ret;
1561 }
1562
1563 /* called from streaming threads */
1564 static void
1565 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1566 {
1567   GstRTSPMediaStream *stream;
1568   gchar *name;
1569   gint i;
1570
1571   i = media->streams->len + 1;
1572
1573   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1574
1575   stream = g_new0 (GstRTSPMediaStream, 1);
1576   stream->payloader = element;
1577
1578   name = g_strdup_printf ("dynpay%d", i);
1579
1580   media->adding = TRUE;
1581
1582   /* ghost the pad of the payloader to the element */
1583   stream->srcpad = gst_ghost_pad_new (name, pad);
1584   gst_pad_set_active (stream->srcpad, TRUE);
1585   gst_element_add_pad (media->element, stream->srcpad);
1586   g_free (name);
1587
1588   /* add stream now */
1589   g_array_append_val (media->streams, stream);
1590
1591   setup_stream (stream, i, media);
1592
1593   for (i = 0; i < 2; i++) {
1594     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1595     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1596     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1597     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1598     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1599   }
1600   media->adding = FALSE;
1601 }
1602
1603 static void
1604 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1605 {
1606   GST_INFO ("no more pads");
1607   if (media->fakesink) {
1608     gst_object_ref (media->fakesink);
1609     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1610     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1611     gst_object_unref (media->fakesink);
1612     media->fakesink = NULL;
1613     GST_INFO ("removed fakesink");
1614   }
1615 }
1616
1617 /**
1618  * gst_rtsp_media_prepare:
1619  * @media: a #GstRTSPMedia
1620  *
1621  * Prepare @media for streaming. This function will create the pipeline and
1622  * other objects to manage the streaming.
1623  *
1624  * It will preroll the pipeline and collect vital information about the streams
1625  * such as the duration.
1626  *
1627  * Returns: %TRUE on success.
1628  */
1629 gboolean
1630 gst_rtsp_media_prepare (GstRTSPMedia * media)
1631 {
1632   GstStateChangeReturn ret;
1633   GstRTSPMediaStatus status;
1634   guint i, n_streams;
1635   GstRTSPMediaClass *klass;
1636   GstBus *bus;
1637   GList *walk;
1638
1639   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1640     goto was_prepared;
1641
1642   if (!media->reusable && media->reused)
1643     goto is_reused;
1644
1645   GST_INFO ("preparing media %p", media);
1646
1647   /* reset some variables */
1648   media->is_live = FALSE;
1649   media->buffering = FALSE;
1650   /* we're preparing now */
1651   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1652
1653   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1654
1655   /* add the pipeline bus to our custom mainloop */
1656   media->source = gst_bus_create_watch (bus);
1657   gst_object_unref (bus);
1658
1659   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1660
1661   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1662   media->id = g_source_attach (media->source, klass->context);
1663
1664   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1665
1666   /* add stuff to the bin */
1667   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1668
1669   /* link streams we already have, other streams might appear when we have
1670    * dynamic elements */
1671   n_streams = gst_rtsp_media_n_streams (media);
1672   for (i = 0; i < n_streams; i++) {
1673     GstRTSPMediaStream *stream;
1674
1675     stream = gst_rtsp_media_get_stream (media, i);
1676
1677     setup_stream (stream, i, media);
1678   }
1679
1680   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1681     GstElement *elem = walk->data;
1682
1683     GST_INFO ("adding callbacks for dynamic element %p", elem);
1684
1685     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1686     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1687
1688     /* we add a fakesink here in order to make the state change async. We remove
1689      * the fakesink again in the no-more-pads callback. */
1690     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1691     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1692   }
1693
1694   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1695   /* first go to PAUSED */
1696   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1697   media->target_state = GST_STATE_PAUSED;
1698
1699   switch (ret) {
1700     case GST_STATE_CHANGE_SUCCESS:
1701       GST_INFO ("SUCCESS state change for media %p", media);
1702       break;
1703     case GST_STATE_CHANGE_ASYNC:
1704       GST_INFO ("ASYNC state change for media %p", media);
1705       break;
1706     case GST_STATE_CHANGE_NO_PREROLL:
1707       /* we need to go to PLAYING */
1708       GST_INFO ("NO_PREROLL state change: live media %p", media);
1709       media->is_live = TRUE;
1710       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1711       if (ret == GST_STATE_CHANGE_FAILURE)
1712         goto state_failed;
1713       break;
1714     case GST_STATE_CHANGE_FAILURE:
1715       goto state_failed;
1716   }
1717
1718   /* now wait for all pads to be prerolled */
1719   status = gst_rtsp_media_get_status (media);
1720   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1721     goto state_failed;
1722
1723   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1724
1725   GST_INFO ("object %p is prerolled", media);
1726
1727   return TRUE;
1728
1729   /* OK */
1730 was_prepared:
1731   {
1732     return TRUE;
1733   }
1734   /* ERRORS */
1735 is_reused:
1736   {
1737     GST_WARNING ("can not reuse media %p", media);
1738     return FALSE;
1739   }
1740 state_failed:
1741   {
1742     GST_WARNING ("failed to preroll pipeline");
1743     unlock_streams (media);
1744     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1745     gst_rtsp_media_unprepare (media);
1746     return FALSE;
1747   }
1748 }
1749
1750 /**
1751  * gst_rtsp_media_unprepare:
1752  * @media: a #GstRTSPMedia
1753  *
1754  * Unprepare @media. After this call, the media should be prepared again before
1755  * it can be used again. If the media is set to be non-reusable, a new instance
1756  * must be created.
1757  *
1758  * Returns: %TRUE on success.
1759  */
1760 gboolean
1761 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1762 {
1763   GstRTSPMediaClass *klass;
1764   gboolean success;
1765
1766   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1767     return TRUE;
1768
1769   GST_INFO ("unprepare media %p", media);
1770   media->target_state = GST_STATE_NULL;
1771
1772   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1773   if (klass->unprepare)
1774     success = klass->unprepare (media);
1775   else
1776     success = TRUE;
1777
1778   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1779   media->reused = TRUE;
1780
1781   /* when the media is not reusable, this will effectively unref the media and
1782    * recreate it */
1783   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1784
1785   return success;
1786 }
1787
1788 static gboolean
1789 default_unprepare (GstRTSPMedia * media)
1790 {
1791   if (media->eos_shutdown) {
1792     GST_DEBUG ("sending EOS for shutdown");
1793     /* ref so that we don't disappear */
1794     g_object_ref (media);
1795     media->eos_pending = TRUE;
1796     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1797     /* we need to go to playing again for the EOS to propagate, normally in this
1798      * state, nothing is receiving data from us anymore so this is ok. */
1799     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1800   } else {
1801     GST_DEBUG ("shutting down");
1802     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1803   }
1804   return TRUE;
1805 }
1806
1807 static void
1808 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1809     gchar * dest, gint min, gint max)
1810 {
1811   gboolean do_add = TRUE;
1812   RTSPDestination *ndest;
1813
1814   if (stream->filter_duplicates) {
1815     RTSPDestination fdest;
1816     GList *find;
1817
1818     fdest.dest = dest;
1819     fdest.min = min;
1820     fdest.max = max;
1821
1822     /* first see if we already added this destination */
1823     find =
1824         g_list_find_custom (stream->destinations, &fdest,
1825         (GCompareFunc) dest_compare);
1826     if (find) {
1827       ndest = (RTSPDestination *) find->data;
1828
1829       GST_INFO ("already streaming to %s:%d-%d with %d clients", dest, min, max,
1830           ndest->count);
1831       ndest->count++;
1832       do_add = FALSE;
1833     }
1834   }
1835
1836   if (do_add) {
1837     GST_INFO ("adding %s:%d-%d", dest, min, max);
1838     g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1839     g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1840
1841     if (stream->filter_duplicates) {
1842       ndest = create_destination (dest, min, max);
1843       stream->destinations = g_list_prepend (stream->destinations, ndest);
1844     }
1845   }
1846 }
1847
1848 static void
1849 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1850     gchar * dest, gint min, gint max)
1851 {
1852   gboolean do_remove = TRUE;
1853   RTSPDestination *ndest = NULL;
1854   GList *find = NULL;
1855
1856   if (stream->filter_duplicates) {
1857     RTSPDestination fdest;
1858
1859     fdest.dest = dest;
1860     fdest.min = min;
1861     fdest.max = max;
1862
1863     /* first see if we already added this destination */
1864     find =
1865         g_list_find_custom (stream->destinations, &fdest,
1866         (GCompareFunc) dest_compare);
1867     if (!find)
1868       return;
1869
1870     ndest = (RTSPDestination *) find->data;
1871     if (--ndest->count > 0) {
1872       do_remove = FALSE;
1873       GST_INFO ("still streaming to %s:%d-%d with %d clients", dest, min, max,
1874           ndest->count);
1875     }
1876   }
1877
1878   if (do_remove) {
1879     GST_INFO ("removing %s:%d-%d", dest, min, max);
1880     g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1881     g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1882
1883     if (stream->filter_duplicates) {
1884       stream->destinations = g_list_delete_link (stream->destinations, find);
1885       free_destination (ndest);
1886     }
1887   }
1888 }
1889
1890 /**
1891  * gst_rtsp_media_set_state:
1892  * @media: a #GstRTSPMedia
1893  * @state: the target state of the media
1894  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1895  *
1896  * Set the state of @media to @state and for the transports in @transports.
1897  *
1898  * Returns: %TRUE on success.
1899  */
1900 gboolean
1901 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1902     GArray * transports)
1903 {
1904   gint i;
1905   GstStateChangeReturn ret;
1906   gboolean add, remove, do_state;
1907   gint old_active;
1908
1909   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1910   g_return_val_if_fail (transports != NULL, FALSE);
1911
1912   /* NULL and READY are the same */
1913   if (state == GST_STATE_READY)
1914     state = GST_STATE_NULL;
1915
1916   add = remove = FALSE;
1917
1918   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1919       media);
1920
1921   switch (state) {
1922     case GST_STATE_NULL:
1923       /* unlock the streams so that they follow the state changes from now on */
1924       unlock_streams (media);
1925       /* fallthrough */
1926     case GST_STATE_PAUSED:
1927       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1928       if (media->target_state == GST_STATE_PLAYING)
1929         remove = TRUE;
1930       break;
1931     case GST_STATE_PLAYING:
1932       /* we're going to PLAYING, add */
1933       add = TRUE;
1934       break;
1935     default:
1936       break;
1937   }
1938   old_active = media->active;
1939
1940   for (i = 0; i < transports->len; i++) {
1941     GstRTSPMediaTrans *tr;
1942     GstRTSPMediaStream *stream;
1943     GstRTSPTransport *trans;
1944
1945     /* we need a non-NULL entry in the array */
1946     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1947     if (tr == NULL)
1948       continue;
1949
1950     /* we need a transport */
1951     if (!(trans = tr->transport))
1952       continue;
1953
1954     /* get the stream and add the destinations */
1955     stream = gst_rtsp_media_get_stream (media, tr->idx);
1956     switch (trans->lower_transport) {
1957       case GST_RTSP_LOWER_TRANS_UDP:
1958       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1959       {
1960         gchar *dest;
1961         gint min, max;
1962
1963         dest = trans->destination;
1964         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1965           min = trans->port.min;
1966           max = trans->port.max;
1967         } else {
1968           min = trans->client_port.min;
1969           max = trans->client_port.max;
1970         }
1971
1972         if (add && !tr->active) {
1973           add_udp_destination (media, stream, dest, min, max);
1974           stream->transports = g_list_prepend (stream->transports, tr);
1975           tr->active = TRUE;
1976           media->active++;
1977         } else if (remove && tr->active) {
1978           remove_udp_destination (media, stream, dest, min, max);
1979           stream->transports = g_list_remove (stream->transports, tr);
1980           tr->active = FALSE;
1981           media->active--;
1982         }
1983         break;
1984       }
1985       case GST_RTSP_LOWER_TRANS_TCP:
1986         if (add && !tr->active) {
1987           GST_INFO ("adding TCP %s", trans->destination);
1988           stream->transports = g_list_prepend (stream->transports, tr);
1989           tr->active = TRUE;
1990           media->active++;
1991         } else if (remove && tr->active) {
1992           GST_INFO ("removing TCP %s", trans->destination);
1993           stream->transports = g_list_remove (stream->transports, tr);
1994           tr->active = FALSE;
1995           media->active--;
1996         }
1997         break;
1998       default:
1999         GST_INFO ("Unknown transport %d", trans->lower_transport);
2000         break;
2001     }
2002   }
2003
2004   /* we just added the first media, do the playing state change */
2005   if (old_active == 0 && add)
2006     do_state = TRUE;
2007   /* if we have no more active media, do the downward state changes */
2008   else if (media->active == 0)
2009     do_state = TRUE;
2010   else
2011     do_state = FALSE;
2012
2013   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2014       media, do_state);
2015
2016   if (media->target_state != state) {
2017     if (do_state) {
2018       if (state == GST_STATE_NULL) {
2019         gst_rtsp_media_unprepare (media);
2020       } else {
2021         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2022             media);
2023         media->target_state = state;
2024         ret = gst_element_set_state (media->pipeline, state);
2025       }
2026     }
2027     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2028         NULL);
2029   }
2030
2031   /* remember where we are */
2032   if (state == GST_STATE_PAUSED || old_active != media->active)
2033     collect_media_stats (media);
2034
2035   return TRUE;
2036 }
2037
2038 /**
2039  * gst_rtsp_media_remove_elements:
2040  * @media: a #GstRTSPMedia
2041  *
2042  * Remove all elements and the pipeline controlled by @media.
2043  */
2044 void
2045 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2046 {
2047   gint i, j;
2048
2049   unlock_streams (media);
2050
2051   for (i = 0; i < media->streams->len; i++) {
2052     GstRTSPMediaStream *stream;
2053
2054     GST_INFO ("Removing elements of stream %d from pipeline", i);
2055
2056     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2057
2058     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2059
2060     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2061
2062     for (j = 0; j < 2; j++) {
2063       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2064       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2065       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2066       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2067       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2068       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2069
2070       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2071       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2072       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2073       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2074       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2075       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2076     }
2077     if (stream->caps)
2078       gst_caps_unref (stream->caps);
2079     stream->caps = NULL;
2080     gst_rtsp_media_stream_free (stream);
2081   }
2082   g_array_remove_range (media->streams, 0, media->streams->len);
2083
2084   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2085   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2086
2087   gst_object_unref (media->pipeline);
2088   media->pipeline = NULL;
2089 }