fef99caa0afef3374459000be25e7eb3f11f7d45
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED          FALSE
29 #define DEFAULT_REUSABLE        FALSE
30 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN    FALSE
33 #define DEFAULT_BUFFER_SIZE     0x80000
34 #define DEFAULT_MULTICAST_GROUP "224.2.0.1"
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_MULTICAST_GROUP,
48   PROP_LAST
49 };
50
51 enum
52 {
53   SIGNAL_PREPARED,
54   SIGNAL_UNPREPARED,
55   SIGNAL_NEW_STATE,
56   SIGNAL_LAST
57 };
58
59 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
60 #define GST_CAT_DEFAULT rtsp_media_debug
61
62 static GQuark ssrc_stream_map_key;
63
64 static void gst_rtsp_media_get_property (GObject * object, guint propid,
65     GValue * value, GParamSpec * pspec);
66 static void gst_rtsp_media_set_property (GObject * object, guint propid,
67     const GValue * value, GParamSpec * pspec);
68 static void gst_rtsp_media_finalize (GObject * obj);
69
70 static gpointer do_loop (GstRTSPMediaClass * klass);
71 static gboolean default_handle_message (GstRTSPMedia * media,
72     GstMessage * message);
73 static gboolean default_unprepare (GstRTSPMedia * media);
74 static void unlock_streams (GstRTSPMedia * media);
75
76 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
77
78 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
79
80 static void
81 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
82 {
83   GObjectClass *gobject_class;
84   GError *error = NULL;
85
86   gobject_class = G_OBJECT_CLASS (klass);
87
88   gobject_class->get_property = gst_rtsp_media_get_property;
89   gobject_class->set_property = gst_rtsp_media_set_property;
90   gobject_class->finalize = gst_rtsp_media_finalize;
91
92   g_object_class_install_property (gobject_class, PROP_SHARED,
93       g_param_spec_boolean ("shared", "Shared",
94           "If this media pipeline can be shared", DEFAULT_SHARED,
95           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96
97   g_object_class_install_property (gobject_class, PROP_REUSABLE,
98       g_param_spec_boolean ("reusable", "Reusable",
99           "If this media pipeline can be reused after an unprepare",
100           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
101
102   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
103       g_param_spec_flags ("protocols", "Protocols",
104           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
105           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
106
107   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
108       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
109           "Send an EOS event to the pipeline before unpreparing",
110           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111
112   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
113       g_param_spec_uint ("buffer-size", "Buffer Size",
114           "The kernel UDP buffer size to use", 0, G_MAXUINT,
115           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116
117   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
118       g_param_spec_string ("multicast-group", "Multicast Group",
119           "The Multicast group to send media to",
120           DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121
122   gst_rtsp_media_signals[SIGNAL_PREPARED] =
123       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
124       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
125       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
126
127   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
128       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
129       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
130       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
131
132   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
133       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
134       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
135       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
136
137   klass->context = g_main_context_new ();
138   klass->loop = g_main_loop_new (klass->context, TRUE);
139
140   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
141
142   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
143   if (error != NULL) {
144     g_critical ("could not start bus thread: %s", error->message);
145   }
146   klass->handle_message = default_handle_message;
147   klass->unprepare = default_unprepare;
148
149   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
150 }
151
152 static void
153 gst_rtsp_media_init (GstRTSPMedia * media)
154 {
155   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
156   media->lock = g_mutex_new ();
157   media->cond = g_cond_new ();
158
159   media->shared = DEFAULT_SHARED;
160   media->reusable = DEFAULT_REUSABLE;
161   media->protocols = DEFAULT_PROTOCOLS;
162   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
163   media->buffer_size = DEFAULT_BUFFER_SIZE;
164   media->multicast_group = g_strdup (DEFAULT_MULTICAST_GROUP);
165 }
166
167 void
168 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
169 {
170   if (trans->transport) {
171     gst_rtsp_transport_free (trans->transport);
172     trans->transport = NULL;
173   }
174   if (trans->rtpsource) {
175     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
176     trans->rtpsource = NULL;
177   }
178 }
179
180 static void
181 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
182 {
183   if (stream->session)
184     g_object_unref (stream->session);
185
186   if (stream->caps)
187     gst_caps_unref (stream->caps);
188
189   if (stream->send_rtp_sink)
190     gst_object_unref (stream->send_rtp_sink);
191   if (stream->send_rtp_src)
192     gst_object_unref (stream->send_rtp_src);
193   if (stream->send_rtcp_src)
194     gst_object_unref (stream->send_rtcp_src);
195   if (stream->recv_rtcp_sink)
196     gst_object_unref (stream->recv_rtcp_sink);
197   if (stream->recv_rtp_sink)
198     gst_object_unref (stream->recv_rtp_sink);
199
200   g_list_free (stream->transports);
201
202   g_free (stream);
203 }
204
205 static void
206 gst_rtsp_media_finalize (GObject * obj)
207 {
208   GstRTSPMedia *media;
209   guint i;
210
211   media = GST_RTSP_MEDIA (obj);
212
213   GST_INFO ("finalize media %p", media);
214
215   if (media->pipeline) {
216     unlock_streams (media);
217     gst_element_set_state (media->pipeline, GST_STATE_NULL);
218     gst_object_unref (media->pipeline);
219   }
220
221   for (i = 0; i < media->streams->len; i++) {
222     GstRTSPMediaStream *stream;
223
224     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
225
226     gst_rtsp_media_stream_free (stream);
227   }
228   g_array_free (media->streams, TRUE);
229
230   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
231   g_list_free (media->dynamic);
232
233   if (media->source) {
234     g_source_destroy (media->source);
235     g_source_unref (media->source);
236   }
237   g_free (media->multicast_group);
238   g_mutex_free (media->lock);
239   g_cond_free (media->cond);
240
241   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
242 }
243
244 static void
245 gst_rtsp_media_get_property (GObject * object, guint propid,
246     GValue * value, GParamSpec * pspec)
247 {
248   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
249
250   switch (propid) {
251     case PROP_SHARED:
252       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
253       break;
254     case PROP_REUSABLE:
255       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
256       break;
257     case PROP_PROTOCOLS:
258       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
259       break;
260     case PROP_EOS_SHUTDOWN:
261       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
262       break;
263     case PROP_BUFFER_SIZE:
264       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
265       break;
266     case PROP_MULTICAST_GROUP:
267       g_value_take_string (value, gst_rtsp_media_get_multicast_group (media));
268       break;
269     default:
270       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
271   }
272 }
273
274 static void
275 gst_rtsp_media_set_property (GObject * object, guint propid,
276     const GValue * value, GParamSpec * pspec)
277 {
278   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
279
280   switch (propid) {
281     case PROP_SHARED:
282       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
283       break;
284     case PROP_REUSABLE:
285       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
286       break;
287     case PROP_PROTOCOLS:
288       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
289       break;
290     case PROP_EOS_SHUTDOWN:
291       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
292       break;
293     case PROP_BUFFER_SIZE:
294       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
295       break;
296     case PROP_MULTICAST_GROUP:
297       gst_rtsp_media_set_multicast_group (media, g_value_get_string (value));
298       break;
299     default:
300       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
301   }
302 }
303
304 static gpointer
305 do_loop (GstRTSPMediaClass * klass)
306 {
307   GST_INFO ("enter mainloop");
308   g_main_loop_run (klass->loop);
309   GST_INFO ("exit mainloop");
310
311   return NULL;
312 }
313
314 static void
315 collect_media_stats (GstRTSPMedia * media)
316 {
317   gint64 position, duration;
318
319   media->range.unit = GST_RTSP_RANGE_NPT;
320
321   if (media->is_live) {
322     media->range.min.type = GST_RTSP_TIME_NOW;
323     media->range.min.seconds = -1;
324     media->range.max.type = GST_RTSP_TIME_END;
325     media->range.max.seconds = -1;
326   } else {
327     /* get the position */
328     if (!gst_element_query_position (media->pipeline, GST_FORMAT_TIME,
329             &position)) {
330       GST_INFO ("position query failed");
331       position = 0;
332     }
333
334     /* get the duration */
335     if (!gst_element_query_duration (media->pipeline, GST_FORMAT_TIME,
336             &duration)) {
337       GST_INFO ("duration query failed");
338       duration = -1;
339     }
340
341     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
342         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
343
344     if (position == -1) {
345       media->range.min.type = GST_RTSP_TIME_NOW;
346       media->range.min.seconds = -1;
347     } else {
348       media->range.min.type = GST_RTSP_TIME_SECONDS;
349       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
350     }
351     if (duration == -1) {
352       media->range.max.type = GST_RTSP_TIME_END;
353       media->range.max.seconds = -1;
354     } else {
355       media->range.max.type = GST_RTSP_TIME_SECONDS;
356       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
357     }
358   }
359 }
360
361 /**
362  * gst_rtsp_media_new:
363  *
364  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
365  * element to produde RTP data for one or more related (audio/video/..) 
366  * streams.
367  *
368  * Returns: a new #GstRTSPMedia object.
369  */
370 GstRTSPMedia *
371 gst_rtsp_media_new (void)
372 {
373   GstRTSPMedia *result;
374
375   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
376
377   return result;
378 }
379
380 /**
381  * gst_rtsp_media_set_shared:
382  * @media: a #GstRTSPMedia
383  * @shared: the new value
384  *
385  * Set or unset if the pipeline for @media can be shared will multiple clients.
386  * When @shared is %TRUE, client requests for this media will share the media
387  * pipeline.
388  */
389 void
390 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
391 {
392   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
393
394   media->shared = shared;
395 }
396
397 /**
398  * gst_rtsp_media_is_shared:
399  * @media: a #GstRTSPMedia
400  *
401  * Check if the pipeline for @media can be shared between multiple clients.
402  *
403  * Returns: %TRUE if the media can be shared between clients.
404  */
405 gboolean
406 gst_rtsp_media_is_shared (GstRTSPMedia * media)
407 {
408   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
409
410   return media->shared;
411 }
412
413 /**
414  * gst_rtsp_media_set_reusable:
415  * @media: a #GstRTSPMedia
416  * @reusable: the new value
417  *
418  * Set or unset if the pipeline for @media can be reused after the pipeline has
419  * been unprepared.
420  */
421 void
422 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
423 {
424   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
425
426   media->reusable = reusable;
427 }
428
429 /**
430  * gst_rtsp_media_is_reusable:
431  * @media: a #GstRTSPMedia
432  *
433  * Check if the pipeline for @media can be reused after an unprepare.
434  *
435  * Returns: %TRUE if the media can be reused
436  */
437 gboolean
438 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
439 {
440   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
441
442   return media->reusable;
443 }
444
445 /**
446  * gst_rtsp_media_set_protocols:
447  * @media: a #GstRTSPMedia
448  * @protocols: the new flags
449  *
450  * Configure the allowed lower transport for @media.
451  */
452 void
453 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
454 {
455   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
456
457   media->protocols = protocols;
458 }
459
460 /**
461  * gst_rtsp_media_get_protocols:
462  * @media: a #GstRTSPMedia
463  *
464  * Get the allowed protocols of @media.
465  *
466  * Returns: a #GstRTSPLowerTrans
467  */
468 GstRTSPLowerTrans
469 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
470 {
471   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
472       GST_RTSP_LOWER_TRANS_UNKNOWN);
473
474   return media->protocols;
475 }
476
477 /**
478  * gst_rtsp_media_set_eos_shutdown:
479  * @media: a #GstRTSPMedia
480  * @eos_shutdown: the new value
481  *
482  * Set or unset if an EOS event will be sent to the pipeline for @media before
483  * it is unprepared.
484  */
485 void
486 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
487 {
488   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
489
490   media->eos_shutdown = eos_shutdown;
491 }
492
493 /**
494  * gst_rtsp_media_is_eos_shutdown:
495  * @media: a #GstRTSPMedia
496  *
497  * Check if the pipeline for @media will send an EOS down the pipeline before
498  * unpreparing.
499  *
500  * Returns: %TRUE if the media will send EOS before unpreparing.
501  */
502 gboolean
503 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
504 {
505   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
506
507   return media->eos_shutdown;
508 }
509
510 /**
511  * gst_rtsp_media_set_buffer_size:
512  * @media: a #GstRTSPMedia
513  * @size: the new value
514  *
515  * Set the kernel UDP buffer size.
516  */
517 void
518 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
519 {
520   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
521
522   media->buffer_size = size;
523 }
524
525 /**
526  * gst_rtsp_media_get_buffer_size:
527  * @media: a #GstRTSPMedia
528  *
529  * Get the kernel UDP buffer size.
530  *
531  * Returns: the kernel UDP buffer size.
532  */
533 guint
534 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
535 {
536   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
537
538   return media->buffer_size;
539 }
540
541 /**
542  * gst_rtsp_media_set_multicast_group:
543  * @media: a #GstRTSPMedia
544  * @mc: the new multicast group
545  *
546  * Set the multicast group that media from @media will be streamed to.
547  */
548 void
549 gst_rtsp_media_set_multicast_group (GstRTSPMedia * media, const gchar * mc)
550 {
551   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
552
553   g_mutex_lock (media->lock);
554   g_free (media->multicast_group);
555   media->multicast_group = g_strdup (mc);
556   g_mutex_unlock (media->lock);
557 }
558
559 /**
560  * gst_rtsp_media_get_multicast_group:
561  * @media: a #GstRTSPMedia
562  *
563  * Get the multicast group that media from @media will be streamed to.
564  *
565  * Returns: the multicast group
566  */
567 gchar *
568 gst_rtsp_media_get_multicast_group (GstRTSPMedia * media)
569 {
570   gchar *result;
571
572   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
573
574   g_mutex_lock (media->lock);
575   result = g_strdup (media->multicast_group);
576   g_mutex_unlock (media->lock);
577
578   return result;
579 }
580
581 /**
582  * gst_rtsp_media_set_auth:
583  * @media: a #GstRTSPMedia
584  * @auth: a #GstRTSPAuth
585  *
586  * configure @auth to be used as the authentication manager of @media.
587  */
588 void
589 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
590 {
591   GstRTSPAuth *old;
592
593   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
594
595   old = media->auth;
596
597   if (old != auth) {
598     if (auth)
599       g_object_ref (auth);
600     media->auth = auth;
601     if (old)
602       g_object_unref (old);
603   }
604 }
605
606 /**
607  * gst_rtsp_media_get_auth:
608  * @media: a #GstRTSPMedia
609  *
610  * Get the #GstRTSPAuth used as the authentication manager of @media.
611  *
612  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
613  * usage.
614  */
615 GstRTSPAuth *
616 gst_rtsp_media_get_auth (GstRTSPMedia * media)
617 {
618   GstRTSPAuth *result;
619
620   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
621
622   if ((result = media->auth))
623     g_object_ref (result);
624
625   return result;
626 }
627
628
629 /**
630  * gst_rtsp_media_n_streams:
631  * @media: a #GstRTSPMedia
632  *
633  * Get the number of streams in this media.
634  *
635  * Returns: The number of streams.
636  */
637 guint
638 gst_rtsp_media_n_streams (GstRTSPMedia * media)
639 {
640   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
641
642   return media->streams->len;
643 }
644
645 /**
646  * gst_rtsp_media_get_stream:
647  * @media: a #GstRTSPMedia
648  * @idx: the stream index
649  *
650  * Retrieve the stream with index @idx from @media.
651  *
652  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
653  * that index did not exist.
654  */
655 GstRTSPMediaStream *
656 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
657 {
658   GstRTSPMediaStream *res;
659
660   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
661
662   if (idx < media->streams->len)
663     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
664   else
665     res = NULL;
666
667   return res;
668 }
669
670 /**
671  * gst_rtsp_media_get_range_string:
672  * @media: a #GstRTSPMedia
673  * @play: for the PLAY request
674  *
675  * Get the current range as a string.
676  *
677  * Returns: The range as a string, g_free() after usage.
678  */
679 gchar *
680 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
681 {
682   gchar *result;
683   GstRTSPTimeRange range;
684
685   /* make copy */
686   range = media->range;
687
688   if (!play && media->active > 0) {
689     range.min.type = GST_RTSP_TIME_NOW;
690     range.min.seconds = -1;
691   }
692
693   result = gst_rtsp_range_to_string (&range);
694
695   return result;
696 }
697
698 /**
699  * gst_rtsp_media_seek:
700  * @media: a #GstRTSPMedia
701  * @range: a #GstRTSPTimeRange
702  *
703  * Seek the pipeline to @range.
704  *
705  * Returns: %TRUE on success.
706  */
707 gboolean
708 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
709 {
710   GstSeekFlags flags;
711   gboolean res;
712   gint64 start, stop;
713   GstSeekType start_type, stop_type;
714
715   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
716   g_return_val_if_fail (range != NULL, FALSE);
717
718   if (media->seekable) {
719     GST_INFO ("pipeline is not seekable");
720     return TRUE;
721   }
722
723   if (range->unit != GST_RTSP_RANGE_NPT)
724     goto not_supported;
725
726   /* depends on the current playing state of the pipeline. We might need to
727    * queue this until we get EOS. */
728   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
729
730   start_type = stop_type = GST_SEEK_TYPE_NONE;
731
732   switch (range->min.type) {
733     case GST_RTSP_TIME_NOW:
734       start = -1;
735       break;
736     case GST_RTSP_TIME_SECONDS:
737       /* only seek when something changed */
738       if (media->range.min.seconds == range->min.seconds) {
739         start = -1;
740       } else {
741         start = range->min.seconds * GST_SECOND;
742         start_type = GST_SEEK_TYPE_SET;
743       }
744       break;
745     case GST_RTSP_TIME_END:
746     default:
747       goto weird_type;
748   }
749   switch (range->max.type) {
750     case GST_RTSP_TIME_SECONDS:
751       /* only seek when something changed */
752       if (media->range.max.seconds == range->max.seconds) {
753         stop = -1;
754       } else {
755         stop = range->max.seconds * GST_SECOND;
756         stop_type = GST_SEEK_TYPE_SET;
757       }
758       break;
759     case GST_RTSP_TIME_END:
760       stop = -1;
761       stop_type = GST_SEEK_TYPE_SET;
762       break;
763     case GST_RTSP_TIME_NOW:
764     default:
765       goto weird_type;
766   }
767
768   if (start != -1 || stop != -1) {
769     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
770         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
771
772     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
773         flags, start_type, start, stop_type, stop);
774
775     /* and block for the seek to complete */
776     GST_INFO ("done seeking %d", res);
777     gst_element_get_state (media->pipeline, NULL, NULL, -1);
778     GST_INFO ("prerolled again");
779
780     collect_media_stats (media);
781   } else {
782     GST_INFO ("no seek needed");
783     res = TRUE;
784   }
785
786   return res;
787
788   /* ERRORS */
789 not_supported:
790   {
791     GST_WARNING ("seek unit %d not supported", range->unit);
792     return FALSE;
793   }
794 weird_type:
795   {
796     GST_WARNING ("weird range type %d not supported", range->min.type);
797     return FALSE;
798   }
799 }
800
801 /**
802  * gst_rtsp_media_stream_rtp:
803  * @stream: a #GstRTSPMediaStream
804  * @buffer: a #GstBuffer
805  *
806  * Handle an RTP buffer for the stream. This method is usually called when a
807  * message has been received from a client using the TCP transport.
808  *
809  * This function takes ownership of @buffer.
810  *
811  * Returns: a GstFlowReturn.
812  */
813 GstFlowReturn
814 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
815 {
816   GstFlowReturn ret;
817
818   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
819
820   return ret;
821 }
822
823 /**
824  * gst_rtsp_media_stream_rtcp:
825  * @stream: a #GstRTSPMediaStream
826  * @buffer: a #GstBuffer
827  *
828  * Handle an RTCP buffer for the stream. This method is usually called when a
829  * message has been received from a client using the TCP transport.
830  *
831  * This function takes ownership of @buffer.
832  *
833  * Returns: a GstFlowReturn.
834  */
835 GstFlowReturn
836 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
837 {
838   GstFlowReturn ret;
839
840   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
841
842   return ret;
843 }
844
845 /* Allocate the udp ports and sockets */
846 static gboolean
847 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
848 {
849   GstStateChangeReturn ret;
850   GstElement *udpsrc0, *udpsrc1;
851   GstElement *udpsink0, *udpsink1;
852   gint tmp_rtp, tmp_rtcp;
853   guint count;
854   gint rtpport, rtcpport;
855   GSocket *socket;
856   const gchar *host;
857
858   udpsrc0 = NULL;
859   udpsrc1 = NULL;
860   udpsink0 = NULL;
861   udpsink1 = NULL;
862   count = 0;
863
864   /* Start with random port */
865   tmp_rtp = 0;
866
867   if (media->is_ipv6)
868     host = "udp://[::0]";
869   else
870     host = "udp://0.0.0.0";
871
872   /* try to allocate 2 UDP ports, the RTP port should be an even
873    * number and the RTCP port should be the next (uneven) port */
874 again:
875   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
876   if (udpsrc0 == NULL)
877     goto no_udp_protocol;
878   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
879
880   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
881   if (ret == GST_STATE_CHANGE_FAILURE) {
882     if (tmp_rtp != 0) {
883       tmp_rtp += 2;
884       if (++count > 20)
885         goto no_ports;
886
887       gst_element_set_state (udpsrc0, GST_STATE_NULL);
888       gst_object_unref (udpsrc0);
889
890       goto again;
891     }
892     goto no_udp_protocol;
893   }
894
895   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
896
897   /* check if port is even */
898   if ((tmp_rtp & 1) != 0) {
899     /* port not even, close and allocate another */
900     if (++count > 20)
901       goto no_ports;
902
903     gst_element_set_state (udpsrc0, GST_STATE_NULL);
904     gst_object_unref (udpsrc0);
905
906     tmp_rtp++;
907     goto again;
908   }
909
910   /* allocate port+1 for RTCP now */
911   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
912   if (udpsrc1 == NULL)
913     goto no_udp_rtcp_protocol;
914
915   /* set port */
916   tmp_rtcp = tmp_rtp + 1;
917   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
918
919   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
920   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
921   if (ret == GST_STATE_CHANGE_FAILURE) {
922
923     if (++count > 20)
924       goto no_ports;
925
926     gst_element_set_state (udpsrc0, GST_STATE_NULL);
927     gst_object_unref (udpsrc0);
928
929     gst_element_set_state (udpsrc1, GST_STATE_NULL);
930     gst_object_unref (udpsrc1);
931
932     tmp_rtp += 2;
933     goto again;
934   }
935
936   /* all fine, do port check */
937   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
938   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
939
940   /* this should not happen... */
941   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
942     goto port_error;
943
944   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
945   if (!udpsink0)
946     goto no_udp_protocol;
947
948   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
949   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
950   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
951
952   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
953   if (!udpsink1)
954     goto no_udp_protocol;
955
956   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
957           "send-duplicates")) {
958     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
959     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
960   } else {
961     g_warning
962         ("old multiudpsink version found without send-duplicates property");
963   }
964
965   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
966           "buffer-size")) {
967     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
968   } else {
969     GST_WARNING ("multiudpsink version found without buffer-size property");
970   }
971
972   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
973   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
974   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
975   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
976   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
977
978   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
979   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
980   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
981   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
982
983   /* we keep these elements, we configure all in configure_transport when the
984    * server told us to really use the UDP ports. */
985   stream->udpsrc[0] = udpsrc0;
986   stream->udpsrc[1] = udpsrc1;
987   stream->udpsink[0] = udpsink0;
988   stream->udpsink[1] = udpsink1;
989   stream->server_port.min = rtpport;
990   stream->server_port.max = rtcpport;
991
992   return TRUE;
993
994   /* ERRORS */
995 no_udp_protocol:
996   {
997     goto cleanup;
998   }
999 no_ports:
1000   {
1001     goto cleanup;
1002   }
1003 no_udp_rtcp_protocol:
1004   {
1005     goto cleanup;
1006   }
1007 port_error:
1008   {
1009     goto cleanup;
1010   }
1011 cleanup:
1012   {
1013     if (udpsrc0) {
1014       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1015       gst_object_unref (udpsrc0);
1016     }
1017     if (udpsrc1) {
1018       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1019       gst_object_unref (udpsrc1);
1020     }
1021     if (udpsink0) {
1022       gst_element_set_state (udpsink0, GST_STATE_NULL);
1023       gst_object_unref (udpsink0);
1024     }
1025     if (udpsink1) {
1026       gst_element_set_state (udpsink1, GST_STATE_NULL);
1027       gst_object_unref (udpsink1);
1028     }
1029     return FALSE;
1030   }
1031 }
1032
1033 /* executed from streaming thread */
1034 static void
1035 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
1036 {
1037   gchar *capsstr;
1038   GstCaps *newcaps, *oldcaps;
1039
1040   newcaps = gst_pad_get_current_caps (pad);
1041
1042   oldcaps = stream->caps;
1043   stream->caps = newcaps;
1044
1045   if (oldcaps)
1046     gst_caps_unref (oldcaps);
1047
1048   capsstr = gst_caps_to_string (newcaps);
1049   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
1050   g_free (capsstr);
1051 }
1052
1053 static void
1054 dump_structure (const GstStructure * s)
1055 {
1056   gchar *sstr;
1057
1058   sstr = gst_structure_to_string (s);
1059   GST_INFO ("structure: %s", sstr);
1060   g_free (sstr);
1061 }
1062
1063 static GstRTSPMediaTrans *
1064 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1065 {
1066   GList *walk;
1067   GstRTSPMediaTrans *result = NULL;
1068   const gchar *tmp;
1069   gchar *dest;
1070   guint port;
1071
1072   if (rtcp_from == NULL)
1073     return NULL;
1074
1075   tmp = g_strrstr (rtcp_from, ":");
1076   if (tmp == NULL)
1077     return NULL;
1078
1079   port = atoi (tmp + 1);
1080   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1081
1082   GST_INFO ("finding %s:%d", dest, port);
1083
1084   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1085     GstRTSPMediaTrans *trans = walk->data;
1086     gint min, max;
1087
1088     min = trans->transport->client_port.min;
1089     max = trans->transport->client_port.max;
1090
1091     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1092             || max == port)) {
1093       result = trans;
1094       break;
1095     }
1096   }
1097   g_free (dest);
1098
1099   return result;
1100 }
1101
1102 static void
1103 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1104 {
1105   GstStructure *stats;
1106   GstRTSPMediaTrans *trans;
1107
1108   GST_INFO ("%p: new source %p", stream, source);
1109
1110   /* see if we have a stream to match with the origin of the RTCP packet */
1111   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1112   if (trans == NULL) {
1113     g_object_get (source, "stats", &stats, NULL);
1114     if (stats) {
1115       const gchar *rtcp_from;
1116
1117       dump_structure (stats);
1118
1119       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1120       if ((trans = find_transport (stream, rtcp_from))) {
1121         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1122             source);
1123
1124         /* keep ref to the source */
1125         trans->rtpsource = source;
1126
1127         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1128       }
1129       gst_structure_free (stats);
1130     }
1131   } else {
1132     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1133   }
1134 }
1135
1136 static void
1137 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1138 {
1139   GST_INFO ("%p: new SDES %p", stream, source);
1140 }
1141
1142 static void
1143 on_ssrc_active (GObject * session, GObject * source,
1144     GstRTSPMediaStream * stream)
1145 {
1146   GstRTSPMediaTrans *trans;
1147
1148   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1149
1150   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1151
1152   if (trans && trans->keep_alive)
1153     trans->keep_alive (trans->ka_user_data);
1154
1155 #ifdef DUMP_STATS
1156   {
1157     GstStructure *stats;
1158     g_object_get (source, "stats", &stats, NULL);
1159     if (stats) {
1160       dump_structure (stats);
1161       gst_structure_free (stats);
1162     }
1163   }
1164 #endif
1165 }
1166
1167 static void
1168 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1169 {
1170   GST_INFO ("%p: source %p bye", stream, source);
1171 }
1172
1173 static void
1174 on_bye_timeout (GObject * session, GObject * source,
1175     GstRTSPMediaStream * stream)
1176 {
1177   GstRTSPMediaTrans *trans;
1178
1179   GST_INFO ("%p: source %p bye timeout", stream, source);
1180
1181   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1182     trans->rtpsource = NULL;
1183     trans->timeout = TRUE;
1184   }
1185 }
1186
1187 static void
1188 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1189 {
1190   GstRTSPMediaTrans *trans;
1191
1192   GST_INFO ("%p: source %p timeout", stream, source);
1193
1194   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1195     trans->rtpsource = NULL;
1196     trans->timeout = TRUE;
1197   }
1198 }
1199
1200 static GstFlowReturn
1201 handle_new_sample (GstAppSink * sink, gpointer user_data)
1202 {
1203   GList *walk;
1204   GstSample *sample;
1205   GstBuffer *buffer;
1206   GstRTSPMediaStream *stream;
1207
1208   sample = gst_app_sink_pull_sample (sink);
1209   if (!sample)
1210     return GST_FLOW_OK;
1211
1212   stream = (GstRTSPMediaStream *) user_data;
1213   buffer = gst_sample_get_buffer (sample);
1214
1215   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1216     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1217
1218     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1219       if (tr->send_rtp)
1220         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1221     } else {
1222       if (tr->send_rtcp)
1223         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1224     }
1225   }
1226   gst_sample_unref (sample);
1227
1228   return GST_FLOW_OK;
1229 }
1230
1231 static GstAppSinkCallbacks sink_cb = {
1232   NULL,                         /* not interested in EOS */
1233   NULL,                         /* not interested in preroll samples */
1234   handle_new_sample,
1235 };
1236
1237 /* prepare the pipeline objects to handle @stream in @media */
1238 static gboolean
1239 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1240 {
1241   gchar *name;
1242   GstPad *pad, *teepad, *queuepad, *selpad;
1243   GstPadLinkReturn ret;
1244   gint i;
1245
1246   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1247    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1248    * elements */
1249   if (!alloc_udp_ports (media, stream))
1250     return FALSE;
1251
1252   /* add the ports to the pipeline */
1253   for (i = 0; i < 2; i++) {
1254     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1255     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1256   }
1257
1258   /* create elements for the TCP transfer */
1259   for (i = 0; i < 2; i++) {
1260     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1261     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
1262     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1263     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1264     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1265     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appqueue[i]);
1266     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1267     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1268     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1269         &sink_cb, stream, NULL);
1270   }
1271
1272   /* hook up the stream to the RTP session elements. */
1273   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1274   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1275   g_free (name);
1276   name = g_strdup_printf ("send_rtp_src_%u", idx);
1277   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1278   g_free (name);
1279   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1280   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1281   g_free (name);
1282   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1283   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1284   g_free (name);
1285   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1286   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1287   g_free (name);
1288
1289   /* get the session */
1290   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1291       &stream->session);
1292
1293   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1294       stream);
1295   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1296       stream);
1297   g_signal_connect (stream->session, "on-ssrc-active",
1298       (GCallback) on_ssrc_active, stream);
1299   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1300       stream);
1301   g_signal_connect (stream->session, "on-bye-timeout",
1302       (GCallback) on_bye_timeout, stream);
1303   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1304       stream);
1305
1306   /* link the RTP pad to the session manager */
1307   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1308   if (ret != GST_PAD_LINK_OK)
1309     goto link_failed;
1310
1311   /* make tee for RTP and link to stream */
1312   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1313   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1314
1315   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1316   gst_pad_link (stream->send_rtp_src, pad);
1317   gst_object_unref (pad);
1318
1319   /* link RTP sink, we're pretty sure this will work. */
1320   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1321   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1322   gst_pad_link (teepad, pad);
1323   gst_object_unref (pad);
1324   gst_object_unref (teepad);
1325
1326   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1327   pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
1328   gst_pad_link (teepad, pad);
1329   gst_object_unref (pad);
1330   gst_object_unref (teepad);
1331
1332   queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
1333   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1334   gst_pad_link (queuepad, pad);
1335   gst_object_unref (pad);
1336   gst_object_unref (queuepad);
1337
1338   /* make tee for RTCP */
1339   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1340   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1341
1342   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1343   gst_pad_link (stream->send_rtcp_src, pad);
1344   gst_object_unref (pad);
1345
1346   /* link RTCP elements */
1347   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1348   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1349   gst_pad_link (teepad, pad);
1350   gst_object_unref (pad);
1351   gst_object_unref (teepad);
1352
1353   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1354   pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
1355   gst_pad_link (teepad, pad);
1356   gst_object_unref (pad);
1357   gst_object_unref (teepad);
1358
1359   queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
1360   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1361   gst_pad_link (queuepad, pad);
1362   gst_object_unref (pad);
1363   gst_object_unref (queuepad);
1364
1365   /* make selector for the RTP receivers */
1366   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
1367   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1368
1369   pad = gst_element_get_static_pad (stream->selector[0], "src");
1370   gst_pad_link (pad, stream->recv_rtp_sink);
1371   gst_object_unref (pad);
1372
1373   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1374   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1375   gst_pad_link (pad, selpad);
1376   gst_object_unref (pad);
1377   gst_object_unref (selpad);
1378
1379   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1380   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1381   gst_pad_link (pad, selpad);
1382   gst_object_unref (pad);
1383   gst_object_unref (selpad);
1384
1385   /* make selector for the RTCP receivers */
1386   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
1387   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1388
1389   pad = gst_element_get_static_pad (stream->selector[1], "src");
1390   gst_pad_link (pad, stream->recv_rtcp_sink);
1391   gst_object_unref (pad);
1392
1393   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1394   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1395   gst_pad_link (pad, selpad);
1396   gst_object_unref (pad);
1397   gst_object_unref (selpad);
1398
1399   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1400   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1401   gst_pad_link (pad, selpad);
1402   gst_object_unref (pad);
1403   gst_object_unref (selpad);
1404
1405   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1406    * values */
1407   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1408   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1409   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1410   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1411
1412   /* be notified of caps changes */
1413   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1414       (GCallback) caps_notify, stream);
1415
1416   stream->prepared = TRUE;
1417
1418   return TRUE;
1419
1420   /* ERRORS */
1421 link_failed:
1422   {
1423     GST_WARNING ("failed to link stream %d", idx);
1424     return FALSE;
1425   }
1426 }
1427
1428 static void
1429 unlock_streams (GstRTSPMedia * media)
1430 {
1431   guint i, n_streams;
1432
1433   /* unlock the udp src elements */
1434   n_streams = gst_rtsp_media_n_streams (media);
1435   for (i = 0; i < n_streams; i++) {
1436     GstRTSPMediaStream *stream;
1437
1438     stream = gst_rtsp_media_get_stream (media, i);
1439
1440     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1441     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1442   }
1443 }
1444
1445 static void
1446 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1447 {
1448   g_mutex_lock (media->lock);
1449   /* never overwrite the error status */
1450   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1451     media->status = status;
1452   GST_DEBUG ("setting new status to %d", status);
1453   g_cond_broadcast (media->cond);
1454   g_mutex_unlock (media->lock);
1455 }
1456
1457 static GstRTSPMediaStatus
1458 gst_rtsp_media_get_status (GstRTSPMedia * media)
1459 {
1460   GstRTSPMediaStatus result;
1461   GTimeVal timeout;
1462
1463   g_mutex_lock (media->lock);
1464   g_get_current_time (&timeout);
1465   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1466   /* while we are preparing, wait */
1467   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1468     GST_DEBUG ("waiting for status change");
1469     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1470       GST_DEBUG ("timeout, assuming error status");
1471       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1472     }
1473   }
1474   /* could be success or error */
1475   result = media->status;
1476   GST_DEBUG ("got status %d", result);
1477   g_mutex_unlock (media->lock);
1478
1479   return result;
1480 }
1481
1482 static gboolean
1483 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1484 {
1485   GstMessageType type;
1486
1487   type = GST_MESSAGE_TYPE (message);
1488
1489   switch (type) {
1490     case GST_MESSAGE_STATE_CHANGED:
1491       break;
1492     case GST_MESSAGE_BUFFERING:
1493     {
1494       gint percent;
1495
1496       gst_message_parse_buffering (message, &percent);
1497
1498       /* no state management needed for live pipelines */
1499       if (media->is_live)
1500         break;
1501
1502       if (percent == 100) {
1503         /* a 100% message means buffering is done */
1504         media->buffering = FALSE;
1505         /* if the desired state is playing, go back */
1506         if (media->target_state == GST_STATE_PLAYING) {
1507           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1508           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1509         } else {
1510           GST_INFO ("Buffering done");
1511         }
1512       } else {
1513         /* buffering busy */
1514         if (media->buffering == FALSE) {
1515           if (media->target_state == GST_STATE_PLAYING) {
1516             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1517             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1518             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1519           } else {
1520             GST_INFO ("Buffering ...");
1521           }
1522         }
1523         media->buffering = TRUE;
1524       }
1525       break;
1526     }
1527     case GST_MESSAGE_LATENCY:
1528     {
1529       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1530       break;
1531     }
1532     case GST_MESSAGE_ERROR:
1533     {
1534       GError *gerror;
1535       gchar *debug;
1536
1537       gst_message_parse_error (message, &gerror, &debug);
1538       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1539       g_error_free (gerror);
1540       g_free (debug);
1541
1542       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1543       break;
1544     }
1545     case GST_MESSAGE_WARNING:
1546     {
1547       GError *gerror;
1548       gchar *debug;
1549
1550       gst_message_parse_warning (message, &gerror, &debug);
1551       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1552       g_error_free (gerror);
1553       g_free (debug);
1554       break;
1555     }
1556     case GST_MESSAGE_ELEMENT:
1557       break;
1558     case GST_MESSAGE_STREAM_STATUS:
1559       break;
1560     case GST_MESSAGE_ASYNC_DONE:
1561       if (!media->adding) {
1562         /* when we are dynamically adding pads, the addition of the udpsrc will
1563          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1564          * wait for the final ASYNC_DONE after everything prerolled */
1565         GST_INFO ("%p: got ASYNC_DONE", media);
1566         collect_media_stats (media);
1567
1568         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1569       } else {
1570         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1571       }
1572       break;
1573     case GST_MESSAGE_EOS:
1574       GST_INFO ("%p: got EOS", media);
1575       if (media->eos_pending) {
1576         GST_DEBUG ("shutting down after EOS");
1577         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1578         media->eos_pending = FALSE;
1579         g_object_unref (media);
1580       }
1581       break;
1582     default:
1583       GST_INFO ("%p: got message type %s", media,
1584           gst_message_type_get_name (type));
1585       break;
1586   }
1587   return TRUE;
1588 }
1589
1590 static gboolean
1591 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1592 {
1593   GstRTSPMediaClass *klass;
1594   gboolean ret;
1595
1596   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1597
1598   if (klass->handle_message)
1599     ret = klass->handle_message (media, message);
1600   else
1601     ret = FALSE;
1602
1603   return ret;
1604 }
1605
1606 /* called from streaming threads */
1607 static void
1608 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1609 {
1610   GstRTSPMediaStream *stream;
1611   gchar *name;
1612   gint i;
1613
1614   i = media->streams->len + 1;
1615
1616   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1617
1618   stream = g_new0 (GstRTSPMediaStream, 1);
1619   stream->payloader = element;
1620
1621   name = g_strdup_printf ("dynpay%d", i);
1622
1623   media->adding = TRUE;
1624
1625   /* ghost the pad of the payloader to the element */
1626   stream->srcpad = gst_ghost_pad_new (name, pad);
1627   gst_pad_set_active (stream->srcpad, TRUE);
1628   gst_element_add_pad (media->element, stream->srcpad);
1629   g_free (name);
1630
1631   /* add stream now */
1632   g_array_append_val (media->streams, stream);
1633
1634   setup_stream (stream, i, media);
1635
1636   for (i = 0; i < 2; i++) {
1637     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1638     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1639     gst_element_set_state (stream->appqueue[i], GST_STATE_PAUSED);
1640     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1641     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1642     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1643   }
1644   media->adding = FALSE;
1645 }
1646
1647 static void
1648 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1649 {
1650   GST_INFO ("no more pads");
1651   if (media->fakesink) {
1652     gst_object_ref (media->fakesink);
1653     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1654     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1655     gst_object_unref (media->fakesink);
1656     media->fakesink = NULL;
1657     GST_INFO ("removed fakesink");
1658   }
1659 }
1660
1661 /**
1662  * gst_rtsp_media_prepare:
1663  * @media: a #GstRTSPMedia
1664  *
1665  * Prepare @media for streaming. This function will create the pipeline and
1666  * other objects to manage the streaming.
1667  *
1668  * It will preroll the pipeline and collect vital information about the streams
1669  * such as the duration.
1670  *
1671  * Returns: %TRUE on success.
1672  */
1673 gboolean
1674 gst_rtsp_media_prepare (GstRTSPMedia * media)
1675 {
1676   GstStateChangeReturn ret;
1677   GstRTSPMediaStatus status;
1678   guint i, n_streams;
1679   GstRTSPMediaClass *klass;
1680   GstBus *bus;
1681   GList *walk;
1682
1683   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1684     goto was_prepared;
1685
1686   if (!media->reusable && media->reused)
1687     goto is_reused;
1688
1689   media->rtpbin = gst_element_factory_make ("rtpbin", NULL);
1690   if (media->rtpbin == NULL)
1691     goto no_rtpbin;
1692
1693   GST_INFO ("preparing media %p", media);
1694
1695   /* reset some variables */
1696   media->is_live = FALSE;
1697   media->seekable = FALSE;
1698   media->buffering = FALSE;
1699   /* we're preparing now */
1700   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1701
1702   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1703
1704   /* add the pipeline bus to our custom mainloop */
1705   media->source = gst_bus_create_watch (bus);
1706   gst_object_unref (bus);
1707
1708   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1709
1710   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1711   media->id = g_source_attach (media->source, klass->context);
1712
1713   /* add stuff to the bin */
1714   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1715
1716   /* link streams we already have, other streams might appear when we have
1717    * dynamic elements */
1718   n_streams = gst_rtsp_media_n_streams (media);
1719   for (i = 0; i < n_streams; i++) {
1720     GstRTSPMediaStream *stream;
1721
1722     stream = gst_rtsp_media_get_stream (media, i);
1723
1724     setup_stream (stream, i, media);
1725   }
1726
1727   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1728     GstElement *elem = walk->data;
1729
1730     GST_INFO ("adding callbacks for dynamic element %p", elem);
1731
1732     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1733     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1734
1735     /* we add a fakesink here in order to make the state change async. We remove
1736      * the fakesink again in the no-more-pads callback. */
1737     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1738     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1739   }
1740
1741   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1742   /* first go to PAUSED */
1743   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1744   media->target_state = GST_STATE_PAUSED;
1745
1746   switch (ret) {
1747     case GST_STATE_CHANGE_SUCCESS:
1748       GST_INFO ("SUCCESS state change for media %p", media);
1749       media->seekable = TRUE;
1750       break;
1751     case GST_STATE_CHANGE_ASYNC:
1752       GST_INFO ("ASYNC state change for media %p", media);
1753       media->seekable = TRUE;
1754       break;
1755     case GST_STATE_CHANGE_NO_PREROLL:
1756       /* we need to go to PLAYING */
1757       GST_INFO ("NO_PREROLL state change: live media %p", media);
1758       /* FIXME we disable seeking for live streams for now. We should perform a
1759        * seeking query in preroll instead and do a seeking query. */
1760       media->seekable = FALSE;
1761       media->is_live = TRUE;
1762       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1763       if (ret == GST_STATE_CHANGE_FAILURE)
1764         goto state_failed;
1765       break;
1766     case GST_STATE_CHANGE_FAILURE:
1767       goto state_failed;
1768   }
1769
1770   /* now wait for all pads to be prerolled */
1771   status = gst_rtsp_media_get_status (media);
1772   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1773     goto state_failed;
1774
1775   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1776
1777   GST_INFO ("object %p is prerolled", media);
1778
1779   return TRUE;
1780
1781   /* OK */
1782 was_prepared:
1783   {
1784     return TRUE;
1785   }
1786   /* ERRORS */
1787 is_reused:
1788   {
1789     GST_WARNING ("can not reuse media %p", media);
1790     return FALSE;
1791   }
1792 no_rtpbin:
1793   {
1794     GST_WARNING ("no rtpbin element");
1795     g_warning ("failed to create element 'rtpbin', check your installation");
1796     return FALSE;
1797   }
1798 state_failed:
1799   {
1800     GST_WARNING ("failed to preroll pipeline");
1801     unlock_streams (media);
1802     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1803     gst_rtsp_media_unprepare (media);
1804     return FALSE;
1805   }
1806 }
1807
1808 /**
1809  * gst_rtsp_media_unprepare:
1810  * @media: a #GstRTSPMedia
1811  *
1812  * Unprepare @media. After this call, the media should be prepared again before
1813  * it can be used again. If the media is set to be non-reusable, a new instance
1814  * must be created.
1815  *
1816  * Returns: %TRUE on success.
1817  */
1818 gboolean
1819 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1820 {
1821   GstRTSPMediaClass *klass;
1822   gboolean success;
1823
1824   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1825     return TRUE;
1826
1827   GST_INFO ("unprepare media %p", media);
1828   media->target_state = GST_STATE_NULL;
1829
1830   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1831   if (klass->unprepare)
1832     success = klass->unprepare (media);
1833   else
1834     success = TRUE;
1835
1836   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1837   media->reused = TRUE;
1838
1839   /* when the media is not reusable, this will effectively unref the media and
1840    * recreate it */
1841   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1842
1843   return success;
1844 }
1845
1846 static gboolean
1847 default_unprepare (GstRTSPMedia * media)
1848 {
1849   if (media->eos_shutdown) {
1850     GST_DEBUG ("sending EOS for shutdown");
1851     /* ref so that we don't disappear */
1852     g_object_ref (media);
1853     media->eos_pending = TRUE;
1854     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1855     /* we need to go to playing again for the EOS to propagate, normally in this
1856      * state, nothing is receiving data from us anymore so this is ok. */
1857     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1858   } else {
1859     GST_DEBUG ("shutting down");
1860     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1861   }
1862   return TRUE;
1863 }
1864
1865 static void
1866 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1867     gchar * dest, gint min, gint max)
1868 {
1869   GST_INFO ("adding %s:%d-%d", dest, min, max);
1870   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1871   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1872 }
1873
1874 static void
1875 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1876     gchar * dest, gint min, gint max)
1877 {
1878   GST_INFO ("removing %s:%d-%d", dest, min, max);
1879   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1880   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1881 }
1882
1883 /**
1884  * gst_rtsp_media_set_state:
1885  * @media: a #GstRTSPMedia
1886  * @state: the target state of the media
1887  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1888  *
1889  * Set the state of @media to @state and for the transports in @transports.
1890  *
1891  * Returns: %TRUE on success.
1892  */
1893 gboolean
1894 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1895     GArray * transports)
1896 {
1897   gint i;
1898   gboolean add, remove, do_state;
1899   gint old_active;
1900
1901   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1902   g_return_val_if_fail (transports != NULL, FALSE);
1903
1904   /* NULL and READY are the same */
1905   if (state == GST_STATE_READY)
1906     state = GST_STATE_NULL;
1907
1908   add = remove = FALSE;
1909
1910   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1911       media);
1912
1913   switch (state) {
1914     case GST_STATE_NULL:
1915       /* unlock the streams so that they follow the state changes from now on */
1916       unlock_streams (media);
1917       /* fallthrough */
1918     case GST_STATE_PAUSED:
1919       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1920       if (media->target_state == GST_STATE_PLAYING)
1921         remove = TRUE;
1922       break;
1923     case GST_STATE_PLAYING:
1924       /* we're going to PLAYING, add */
1925       add = TRUE;
1926       break;
1927     default:
1928       break;
1929   }
1930   old_active = media->active;
1931
1932   for (i = 0; i < transports->len; i++) {
1933     GstRTSPMediaTrans *tr;
1934     GstRTSPMediaStream *stream;
1935     GstRTSPTransport *trans;
1936
1937     /* we need a non-NULL entry in the array */
1938     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1939     if (tr == NULL)
1940       continue;
1941
1942     /* we need a transport */
1943     if (!(trans = tr->transport))
1944       continue;
1945
1946     /* get the stream and add the destinations */
1947     stream = gst_rtsp_media_get_stream (media, tr->idx);
1948     switch (trans->lower_transport) {
1949       case GST_RTSP_LOWER_TRANS_UDP:
1950       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1951       {
1952         gchar *dest;
1953         gint min, max;
1954
1955         dest = trans->destination;
1956         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1957           min = trans->port.min;
1958           max = trans->port.max;
1959         } else {
1960           min = trans->client_port.min;
1961           max = trans->client_port.max;
1962         }
1963
1964         if (add && !tr->active) {
1965           add_udp_destination (media, stream, dest, min, max);
1966           stream->transports = g_list_prepend (stream->transports, tr);
1967           tr->active = TRUE;
1968           media->active++;
1969         } else if (remove && tr->active) {
1970           remove_udp_destination (media, stream, dest, min, max);
1971           stream->transports = g_list_remove (stream->transports, tr);
1972           tr->active = FALSE;
1973           media->active--;
1974         }
1975         break;
1976       }
1977       case GST_RTSP_LOWER_TRANS_TCP:
1978         if (add && !tr->active) {
1979           GST_INFO ("adding TCP %s", trans->destination);
1980           stream->transports = g_list_prepend (stream->transports, tr);
1981           tr->active = TRUE;
1982           media->active++;
1983         } else if (remove && tr->active) {
1984           GST_INFO ("removing TCP %s", trans->destination);
1985           stream->transports = g_list_remove (stream->transports, tr);
1986           tr->active = FALSE;
1987           media->active--;
1988         }
1989         break;
1990       default:
1991         GST_INFO ("Unknown transport %d", trans->lower_transport);
1992         break;
1993     }
1994   }
1995
1996   /* we just added the first media, do the playing state change */
1997   if (old_active == 0 && add)
1998     do_state = TRUE;
1999   /* if we have no more active media, do the downward state changes */
2000   else if (media->active == 0)
2001     do_state = TRUE;
2002   else
2003     do_state = FALSE;
2004
2005   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2006       media, do_state);
2007
2008   if (media->target_state != state) {
2009     if (do_state) {
2010       if (state == GST_STATE_NULL) {
2011         gst_rtsp_media_unprepare (media);
2012       } else {
2013         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2014             media);
2015         media->target_state = state;
2016         gst_element_set_state (media->pipeline, state);
2017       }
2018     }
2019     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2020         NULL);
2021   }
2022
2023   /* remember where we are */
2024   if (state == GST_STATE_PAUSED || old_active != media->active)
2025     collect_media_stats (media);
2026
2027   return TRUE;
2028 }
2029
2030 /**
2031  * gst_rtsp_media_remove_elements:
2032  * @media: a #GstRTSPMedia
2033  *
2034  * Remove all elements and the pipeline controlled by @media.
2035  */
2036 void
2037 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2038 {
2039   gint i, j;
2040
2041   unlock_streams (media);
2042
2043   for (i = 0; i < media->streams->len; i++) {
2044     GstRTSPMediaStream *stream;
2045
2046     GST_INFO ("Removing elements of stream %d from pipeline", i);
2047
2048     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2049
2050     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2051
2052     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2053
2054     for (j = 0; j < 2; j++) {
2055       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2056       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2057       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2058       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2059       gst_element_set_state (stream->appqueue[j], GST_STATE_NULL);
2060       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2061       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2062
2063       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2064       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2065       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2066       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2067       gst_bin_remove (GST_BIN (media->pipeline), stream->appqueue[j]);
2068       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2069       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2070     }
2071     if (stream->caps)
2072       gst_caps_unref (stream->caps);
2073     stream->caps = NULL;
2074     gst_rtsp_media_stream_free (stream);
2075   }
2076   g_array_remove_range (media->streams, 0, media->streams->len);
2077
2078   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2079   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2080
2081   gst_object_unref (media->pipeline);
2082   media->pipeline = NULL;
2083 }