rtsp-server: port some more to 0.11
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED          FALSE
29 #define DEFAULT_REUSABLE        FALSE
30 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN    FALSE
33 #define DEFAULT_BUFFER_SIZE     0x80000
34 #define DEFAULT_MULTICAST_GROUP "224.2.0.1"
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_MULTICAST_GROUP,
48   PROP_LAST
49 };
50
51 enum
52 {
53   SIGNAL_PREPARED,
54   SIGNAL_UNPREPARED,
55   SIGNAL_NEW_STATE,
56   SIGNAL_LAST
57 };
58
59 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
60 #define GST_CAT_DEFAULT rtsp_media_debug
61
62 static GQuark ssrc_stream_map_key;
63
64 static void gst_rtsp_media_get_property (GObject * object, guint propid,
65     GValue * value, GParamSpec * pspec);
66 static void gst_rtsp_media_set_property (GObject * object, guint propid,
67     const GValue * value, GParamSpec * pspec);
68 static void gst_rtsp_media_finalize (GObject * obj);
69
70 static gpointer do_loop (GstRTSPMediaClass * klass);
71 static gboolean default_handle_message (GstRTSPMedia * media,
72     GstMessage * message);
73 static gboolean default_unprepare (GstRTSPMedia * media);
74 static void unlock_streams (GstRTSPMedia * media);
75
76 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
77
78 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
79
80 static void
81 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
82 {
83   GObjectClass *gobject_class;
84   GError *error = NULL;
85
86   gobject_class = G_OBJECT_CLASS (klass);
87
88   gobject_class->get_property = gst_rtsp_media_get_property;
89   gobject_class->set_property = gst_rtsp_media_set_property;
90   gobject_class->finalize = gst_rtsp_media_finalize;
91
92   g_object_class_install_property (gobject_class, PROP_SHARED,
93       g_param_spec_boolean ("shared", "Shared",
94           "If this media pipeline can be shared", DEFAULT_SHARED,
95           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96
97   g_object_class_install_property (gobject_class, PROP_REUSABLE,
98       g_param_spec_boolean ("reusable", "Reusable",
99           "If this media pipeline can be reused after an unprepare",
100           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
101
102   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
103       g_param_spec_flags ("protocols", "Protocols",
104           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
105           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
106
107   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
108       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
109           "Send an EOS event to the pipeline before unpreparing",
110           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111
112   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
113       g_param_spec_uint ("buffer-size", "Buffer Size",
114           "The kernel UDP buffer size to use", 0, G_MAXUINT,
115           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116
117   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
118       g_param_spec_string ("multicast-group", "Multicast Group",
119           "The Multicast group to send media to",
120           DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121
122   gst_rtsp_media_signals[SIGNAL_PREPARED] =
123       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
124       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
125       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
126
127   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
128       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
129       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
130       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
131
132   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
133       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
134       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
135       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
136
137   klass->context = g_main_context_new ();
138   klass->loop = g_main_loop_new (klass->context, TRUE);
139
140   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
141
142   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
143   if (error != NULL) {
144     g_critical ("could not start bus thread: %s", error->message);
145   }
146   klass->handle_message = default_handle_message;
147   klass->unprepare = default_unprepare;
148
149   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
150 }
151
152 static void
153 gst_rtsp_media_init (GstRTSPMedia * media)
154 {
155   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
156   media->lock = g_mutex_new ();
157   media->cond = g_cond_new ();
158
159   media->shared = DEFAULT_SHARED;
160   media->reusable = DEFAULT_REUSABLE;
161   media->protocols = DEFAULT_PROTOCOLS;
162   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
163   media->buffer_size = DEFAULT_BUFFER_SIZE;
164   media->multicast_group = g_strdup (DEFAULT_MULTICAST_GROUP);
165 }
166
167 void
168 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
169 {
170   if (trans->transport) {
171     gst_rtsp_transport_free (trans->transport);
172     trans->transport = NULL;
173   }
174   if (trans->rtpsource) {
175     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
176     trans->rtpsource = NULL;
177   }
178 }
179
180 static void
181 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
182 {
183   if (stream->session)
184     g_object_unref (stream->session);
185
186   if (stream->caps)
187     gst_caps_unref (stream->caps);
188
189   if (stream->send_rtp_sink)
190     gst_object_unref (stream->send_rtp_sink);
191   if (stream->send_rtp_src)
192     gst_object_unref (stream->send_rtp_src);
193   if (stream->send_rtcp_src)
194     gst_object_unref (stream->send_rtcp_src);
195   if (stream->recv_rtcp_sink)
196     gst_object_unref (stream->recv_rtcp_sink);
197   if (stream->recv_rtp_sink)
198     gst_object_unref (stream->recv_rtp_sink);
199
200   g_list_free (stream->transports);
201
202   g_free (stream);
203 }
204
205 static void
206 gst_rtsp_media_finalize (GObject * obj)
207 {
208   GstRTSPMedia *media;
209   guint i;
210
211   media = GST_RTSP_MEDIA (obj);
212
213   GST_INFO ("finalize media %p", media);
214
215   if (media->pipeline) {
216     unlock_streams (media);
217     gst_element_set_state (media->pipeline, GST_STATE_NULL);
218     gst_object_unref (media->pipeline);
219   }
220
221   for (i = 0; i < media->streams->len; i++) {
222     GstRTSPMediaStream *stream;
223
224     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
225
226     gst_rtsp_media_stream_free (stream);
227   }
228   g_array_free (media->streams, TRUE);
229
230   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
231   g_list_free (media->dynamic);
232
233   if (media->source) {
234     g_source_destroy (media->source);
235     g_source_unref (media->source);
236   }
237   g_free (media->multicast_group);
238   g_mutex_free (media->lock);
239   g_cond_free (media->cond);
240
241   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
242 }
243
244 static void
245 gst_rtsp_media_get_property (GObject * object, guint propid,
246     GValue * value, GParamSpec * pspec)
247 {
248   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
249
250   switch (propid) {
251     case PROP_SHARED:
252       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
253       break;
254     case PROP_REUSABLE:
255       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
256       break;
257     case PROP_PROTOCOLS:
258       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
259       break;
260     case PROP_EOS_SHUTDOWN:
261       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
262       break;
263     case PROP_BUFFER_SIZE:
264       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
265       break;
266     case PROP_MULTICAST_GROUP:
267       g_value_take_string (value, gst_rtsp_media_get_multicast_group (media));
268       break;
269     default:
270       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
271   }
272 }
273
274 static void
275 gst_rtsp_media_set_property (GObject * object, guint propid,
276     const GValue * value, GParamSpec * pspec)
277 {
278   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
279
280   switch (propid) {
281     case PROP_SHARED:
282       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
283       break;
284     case PROP_REUSABLE:
285       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
286       break;
287     case PROP_PROTOCOLS:
288       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
289       break;
290     case PROP_EOS_SHUTDOWN:
291       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
292       break;
293     case PROP_BUFFER_SIZE:
294       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
295       break;
296     case PROP_MULTICAST_GROUP:
297       gst_rtsp_media_set_multicast_group (media, g_value_get_string (value));
298       break;
299     default:
300       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
301   }
302 }
303
304 static gpointer
305 do_loop (GstRTSPMediaClass * klass)
306 {
307   GST_INFO ("enter mainloop");
308   g_main_loop_run (klass->loop);
309   GST_INFO ("exit mainloop");
310
311   return NULL;
312 }
313
314 static void
315 collect_media_stats (GstRTSPMedia * media)
316 {
317   gint64 position, duration;
318
319   media->range.unit = GST_RTSP_RANGE_NPT;
320
321   if (media->is_live) {
322     media->range.min.type = GST_RTSP_TIME_NOW;
323     media->range.min.seconds = -1;
324     media->range.max.type = GST_RTSP_TIME_END;
325     media->range.max.seconds = -1;
326   } else {
327     /* get the position */
328     if (!gst_element_query_position (media->pipeline, GST_FORMAT_TIME,
329             &position)) {
330       GST_INFO ("position query failed");
331       position = 0;
332     }
333
334     /* get the duration */
335     if (!gst_element_query_duration (media->pipeline, GST_FORMAT_TIME,
336             &duration)) {
337       GST_INFO ("duration query failed");
338       duration = -1;
339     }
340
341     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
342         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
343
344     if (position == -1) {
345       media->range.min.type = GST_RTSP_TIME_NOW;
346       media->range.min.seconds = -1;
347     } else {
348       media->range.min.type = GST_RTSP_TIME_SECONDS;
349       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
350     }
351     if (duration == -1) {
352       media->range.max.type = GST_RTSP_TIME_END;
353       media->range.max.seconds = -1;
354     } else {
355       media->range.max.type = GST_RTSP_TIME_SECONDS;
356       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
357     }
358   }
359 }
360
361 /**
362  * gst_rtsp_media_new:
363  *
364  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
365  * element to produde RTP data for one or more related (audio/video/..) 
366  * streams.
367  *
368  * Returns: a new #GstRTSPMedia object.
369  */
370 GstRTSPMedia *
371 gst_rtsp_media_new (void)
372 {
373   GstRTSPMedia *result;
374
375   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
376
377   return result;
378 }
379
380 /**
381  * gst_rtsp_media_set_shared:
382  * @media: a #GstRTSPMedia
383  * @shared: the new value
384  *
385  * Set or unset if the pipeline for @media can be shared will multiple clients.
386  * When @shared is %TRUE, client requests for this media will share the media
387  * pipeline.
388  */
389 void
390 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
391 {
392   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
393
394   media->shared = shared;
395 }
396
397 /**
398  * gst_rtsp_media_is_shared:
399  * @media: a #GstRTSPMedia
400  *
401  * Check if the pipeline for @media can be shared between multiple clients.
402  *
403  * Returns: %TRUE if the media can be shared between clients.
404  */
405 gboolean
406 gst_rtsp_media_is_shared (GstRTSPMedia * media)
407 {
408   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
409
410   return media->shared;
411 }
412
413 /**
414  * gst_rtsp_media_set_reusable:
415  * @media: a #GstRTSPMedia
416  * @reusable: the new value
417  *
418  * Set or unset if the pipeline for @media can be reused after the pipeline has
419  * been unprepared.
420  */
421 void
422 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
423 {
424   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
425
426   media->reusable = reusable;
427 }
428
429 /**
430  * gst_rtsp_media_is_reusable:
431  * @media: a #GstRTSPMedia
432  *
433  * Check if the pipeline for @media can be reused after an unprepare.
434  *
435  * Returns: %TRUE if the media can be reused
436  */
437 gboolean
438 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
439 {
440   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
441
442   return media->reusable;
443 }
444
445 /**
446  * gst_rtsp_media_set_protocols:
447  * @media: a #GstRTSPMedia
448  * @protocols: the new flags
449  *
450  * Configure the allowed lower transport for @media.
451  */
452 void
453 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
454 {
455   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
456
457   media->protocols = protocols;
458 }
459
460 /**
461  * gst_rtsp_media_get_protocols:
462  * @media: a #GstRTSPMedia
463  *
464  * Get the allowed protocols of @media.
465  *
466  * Returns: a #GstRTSPLowerTrans
467  */
468 GstRTSPLowerTrans
469 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
470 {
471   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
472       GST_RTSP_LOWER_TRANS_UNKNOWN);
473
474   return media->protocols;
475 }
476
477 /**
478  * gst_rtsp_media_set_eos_shutdown:
479  * @media: a #GstRTSPMedia
480  * @eos_shutdown: the new value
481  *
482  * Set or unset if an EOS event will be sent to the pipeline for @media before
483  * it is unprepared.
484  */
485 void
486 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
487 {
488   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
489
490   media->eos_shutdown = eos_shutdown;
491 }
492
493 /**
494  * gst_rtsp_media_is_eos_shutdown:
495  * @media: a #GstRTSPMedia
496  *
497  * Check if the pipeline for @media will send an EOS down the pipeline before
498  * unpreparing.
499  *
500  * Returns: %TRUE if the media will send EOS before unpreparing.
501  */
502 gboolean
503 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
504 {
505   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
506
507   return media->eos_shutdown;
508 }
509
510 /**
511  * gst_rtsp_media_set_buffer_size:
512  * @media: a #GstRTSPMedia
513  * @size: the new value
514  *
515  * Set the kernel UDP buffer size.
516  */
517 void
518 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
519 {
520   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
521
522   media->buffer_size = size;
523 }
524
525 /**
526  * gst_rtsp_media_get_buffer_size:
527  * @media: a #GstRTSPMedia
528  *
529  * Get the kernel UDP buffer size.
530  *
531  * Returns: the kernel UDP buffer size.
532  */
533 guint
534 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
535 {
536   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
537
538   return media->buffer_size;
539 }
540
541 /**
542  * gst_rtsp_media_set_multicast_group:
543  * @media: a #GstRTSPMedia
544  * @mc: the new multicast group
545  *
546  * Set the multicast group that media from @media will be streamed to.
547  */
548 void
549 gst_rtsp_media_set_multicast_group (GstRTSPMedia * media, const gchar * mc)
550 {
551   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
552
553   g_mutex_lock (media->lock);
554   g_free (media->multicast_group);
555   media->multicast_group = g_strdup (mc);
556   g_mutex_unlock (media->lock);
557 }
558
559 /**
560  * gst_rtsp_media_get_multicast_group:
561  * @media: a #GstRTSPMedia
562  *
563  * Get the multicast group that media from @media will be streamed to.
564  *
565  * Returns: the multicast group
566  */
567 gchar *
568 gst_rtsp_media_get_multicast_group (GstRTSPMedia * media)
569 {
570   gchar *result;
571
572   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
573
574   g_mutex_lock (media->lock);
575   result = g_strdup (media->multicast_group);
576   g_mutex_unlock (media->lock);
577
578   return result;
579 }
580
581 /**
582  * gst_rtsp_media_set_auth:
583  * @media: a #GstRTSPMedia
584  * @auth: a #GstRTSPAuth
585  *
586  * configure @auth to be used as the authentication manager of @media.
587  */
588 void
589 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
590 {
591   GstRTSPAuth *old;
592
593   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
594
595   old = media->auth;
596
597   if (old != auth) {
598     if (auth)
599       g_object_ref (auth);
600     media->auth = auth;
601     if (old)
602       g_object_unref (old);
603   }
604 }
605
606 /**
607  * gst_rtsp_media_get_auth:
608  * @media: a #GstRTSPMedia
609  *
610  * Get the #GstRTSPAuth used as the authentication manager of @media.
611  *
612  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
613  * usage.
614  */
615 GstRTSPAuth *
616 gst_rtsp_media_get_auth (GstRTSPMedia * media)
617 {
618   GstRTSPAuth *result;
619
620   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
621
622   if ((result = media->auth))
623     g_object_ref (result);
624
625   return result;
626 }
627
628
629 /**
630  * gst_rtsp_media_n_streams:
631  * @media: a #GstRTSPMedia
632  *
633  * Get the number of streams in this media.
634  *
635  * Returns: The number of streams.
636  */
637 guint
638 gst_rtsp_media_n_streams (GstRTSPMedia * media)
639 {
640   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
641
642   return media->streams->len;
643 }
644
645 /**
646  * gst_rtsp_media_get_stream:
647  * @media: a #GstRTSPMedia
648  * @idx: the stream index
649  *
650  * Retrieve the stream with index @idx from @media.
651  *
652  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
653  * that index did not exist.
654  */
655 GstRTSPMediaStream *
656 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
657 {
658   GstRTSPMediaStream *res;
659
660   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
661
662   if (idx < media->streams->len)
663     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
664   else
665     res = NULL;
666
667   return res;
668 }
669
670 /**
671  * gst_rtsp_media_get_range_string:
672  * @media: a #GstRTSPMedia
673  * @play: for the PLAY request
674  *
675  * Get the current range as a string.
676  *
677  * Returns: The range as a string, g_free() after usage.
678  */
679 gchar *
680 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
681 {
682   gchar *result;
683   GstRTSPTimeRange range;
684
685   /* make copy */
686   range = media->range;
687
688   if (!play && media->active > 0) {
689     range.min.type = GST_RTSP_TIME_NOW;
690     range.min.seconds = -1;
691   }
692
693   result = gst_rtsp_range_to_string (&range);
694
695   return result;
696 }
697
698 /**
699  * gst_rtsp_media_seek:
700  * @media: a #GstRTSPMedia
701  * @range: a #GstRTSPTimeRange
702  *
703  * Seek the pipeline to @range.
704  *
705  * Returns: %TRUE on success.
706  */
707 gboolean
708 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
709 {
710   GstSeekFlags flags;
711   gboolean res;
712   gint64 start, stop;
713   GstSeekType start_type, stop_type;
714
715   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
716   g_return_val_if_fail (range != NULL, FALSE);
717
718   if (media->seekable) {
719     GST_INFO ("pipeline is not seekable");
720     return TRUE;
721   }
722
723   if (range->unit != GST_RTSP_RANGE_NPT)
724     goto not_supported;
725
726   /* depends on the current playing state of the pipeline. We might need to
727    * queue this until we get EOS. */
728   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
729
730   start_type = stop_type = GST_SEEK_TYPE_NONE;
731
732   switch (range->min.type) {
733     case GST_RTSP_TIME_NOW:
734       start = -1;
735       break;
736     case GST_RTSP_TIME_SECONDS:
737       /* only seek when something changed */
738       if (media->range.min.seconds == range->min.seconds) {
739         start = -1;
740       } else {
741         start = range->min.seconds * GST_SECOND;
742         start_type = GST_SEEK_TYPE_SET;
743       }
744       break;
745     case GST_RTSP_TIME_END:
746     default:
747       goto weird_type;
748   }
749   switch (range->max.type) {
750     case GST_RTSP_TIME_SECONDS:
751       /* only seek when something changed */
752       if (media->range.max.seconds == range->max.seconds) {
753         stop = -1;
754       } else {
755         stop = range->max.seconds * GST_SECOND;
756         stop_type = GST_SEEK_TYPE_SET;
757       }
758       break;
759     case GST_RTSP_TIME_END:
760       stop = -1;
761       stop_type = GST_SEEK_TYPE_SET;
762       break;
763     case GST_RTSP_TIME_NOW:
764     default:
765       goto weird_type;
766   }
767
768   if (start != -1 || stop != -1) {
769     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
770         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
771
772     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
773         flags, start_type, start, stop_type, stop);
774
775     /* and block for the seek to complete */
776     GST_INFO ("done seeking %d", res);
777     gst_element_get_state (media->pipeline, NULL, NULL, -1);
778     GST_INFO ("prerolled again");
779
780     collect_media_stats (media);
781   } else {
782     GST_INFO ("no seek needed");
783     res = TRUE;
784   }
785
786   return res;
787
788   /* ERRORS */
789 not_supported:
790   {
791     GST_WARNING ("seek unit %d not supported", range->unit);
792     return FALSE;
793   }
794 weird_type:
795   {
796     GST_WARNING ("weird range type %d not supported", range->min.type);
797     return FALSE;
798   }
799 }
800
801 /**
802  * gst_rtsp_media_stream_rtp:
803  * @stream: a #GstRTSPMediaStream
804  * @buffer: a #GstBuffer
805  *
806  * Handle an RTP buffer for the stream. This method is usually called when a
807  * message has been received from a client using the TCP transport.
808  *
809  * This function takes ownership of @buffer.
810  *
811  * Returns: a GstFlowReturn.
812  */
813 GstFlowReturn
814 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
815 {
816   GstFlowReturn ret;
817
818   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
819
820   return ret;
821 }
822
823 /**
824  * gst_rtsp_media_stream_rtcp:
825  * @stream: a #GstRTSPMediaStream
826  * @buffer: a #GstBuffer
827  *
828  * Handle an RTCP buffer for the stream. This method is usually called when a
829  * message has been received from a client using the TCP transport.
830  *
831  * This function takes ownership of @buffer.
832  *
833  * Returns: a GstFlowReturn.
834  */
835 GstFlowReturn
836 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
837 {
838   GstFlowReturn ret;
839
840   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
841
842   return ret;
843 }
844
845 /* Allocate the udp ports and sockets */
846 static gboolean
847 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
848 {
849   GstStateChangeReturn ret;
850   GstElement *udpsrc0, *udpsrc1;
851   GstElement *udpsink0, *udpsink1;
852   gint tmp_rtp, tmp_rtcp;
853   guint count;
854   gint rtpport, rtcpport, sockfd;
855   const gchar *host;
856
857   udpsrc0 = NULL;
858   udpsrc1 = NULL;
859   udpsink0 = NULL;
860   udpsink1 = NULL;
861   count = 0;
862
863   /* Start with random port */
864   tmp_rtp = 0;
865
866   if (media->is_ipv6)
867     host = "udp://[::0]";
868   else
869     host = "udp://0.0.0.0";
870
871   /* try to allocate 2 UDP ports, the RTP port should be an even
872    * number and the RTCP port should be the next (uneven) port */
873 again:
874   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
875   if (udpsrc0 == NULL)
876     goto no_udp_protocol;
877   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
878
879   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
880   if (ret == GST_STATE_CHANGE_FAILURE) {
881     if (tmp_rtp != 0) {
882       tmp_rtp += 2;
883       if (++count > 20)
884         goto no_ports;
885
886       gst_element_set_state (udpsrc0, GST_STATE_NULL);
887       gst_object_unref (udpsrc0);
888
889       goto again;
890     }
891     goto no_udp_protocol;
892   }
893
894   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
895
896   /* check if port is even */
897   if ((tmp_rtp & 1) != 0) {
898     /* port not even, close and allocate another */
899     if (++count > 20)
900       goto no_ports;
901
902     gst_element_set_state (udpsrc0, GST_STATE_NULL);
903     gst_object_unref (udpsrc0);
904
905     tmp_rtp++;
906     goto again;
907   }
908
909   /* allocate port+1 for RTCP now */
910   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
911   if (udpsrc1 == NULL)
912     goto no_udp_rtcp_protocol;
913
914   /* set port */
915   tmp_rtcp = tmp_rtp + 1;
916   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
917
918   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
919   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
920   if (ret == GST_STATE_CHANGE_FAILURE) {
921
922     if (++count > 20)
923       goto no_ports;
924
925     gst_element_set_state (udpsrc0, GST_STATE_NULL);
926     gst_object_unref (udpsrc0);
927
928     gst_element_set_state (udpsrc1, GST_STATE_NULL);
929     gst_object_unref (udpsrc1);
930
931     tmp_rtp += 2;
932     goto again;
933   }
934
935   /* all fine, do port check */
936   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
937   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
938
939   /* this should not happen... */
940   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
941     goto port_error;
942
943   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
944   if (!udpsink0)
945     goto no_udp_protocol;
946
947   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
948   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
949   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
950
951   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
952   if (!udpsink1)
953     goto no_udp_protocol;
954
955   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
956           "send-duplicates")) {
957     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
958     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
959   } else {
960     g_warning
961         ("old multiudpsink version found without send-duplicates property");
962   }
963
964   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
965           "buffer-size")) {
966     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
967   } else {
968     GST_WARNING ("multiudpsink version found without buffer-size property");
969   }
970
971   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
972   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
973   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
974   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
975   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
976
977   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
978   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
979   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
980   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
981
982   /* we keep these elements, we configure all in configure_transport when the
983    * server told us to really use the UDP ports. */
984   stream->udpsrc[0] = udpsrc0;
985   stream->udpsrc[1] = udpsrc1;
986   stream->udpsink[0] = udpsink0;
987   stream->udpsink[1] = udpsink1;
988   stream->server_port.min = rtpport;
989   stream->server_port.max = rtcpport;
990
991   return TRUE;
992
993   /* ERRORS */
994 no_udp_protocol:
995   {
996     goto cleanup;
997   }
998 no_ports:
999   {
1000     goto cleanup;
1001   }
1002 no_udp_rtcp_protocol:
1003   {
1004     goto cleanup;
1005   }
1006 port_error:
1007   {
1008     goto cleanup;
1009   }
1010 cleanup:
1011   {
1012     if (udpsrc0) {
1013       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1014       gst_object_unref (udpsrc0);
1015     }
1016     if (udpsrc1) {
1017       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1018       gst_object_unref (udpsrc1);
1019     }
1020     if (udpsink0) {
1021       gst_element_set_state (udpsink0, GST_STATE_NULL);
1022       gst_object_unref (udpsink0);
1023     }
1024     if (udpsink1) {
1025       gst_element_set_state (udpsink1, GST_STATE_NULL);
1026       gst_object_unref (udpsink1);
1027     }
1028     return FALSE;
1029   }
1030 }
1031
1032 /* executed from streaming thread */
1033 static void
1034 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
1035 {
1036   gchar *capsstr;
1037   GstCaps *newcaps, *oldcaps;
1038
1039   newcaps = gst_pad_get_current_caps (pad);
1040
1041   oldcaps = stream->caps;
1042   stream->caps = newcaps;
1043
1044   if (oldcaps)
1045     gst_caps_unref (oldcaps);
1046
1047   capsstr = gst_caps_to_string (newcaps);
1048   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
1049   g_free (capsstr);
1050 }
1051
1052 static void
1053 dump_structure (const GstStructure * s)
1054 {
1055   gchar *sstr;
1056
1057   sstr = gst_structure_to_string (s);
1058   GST_INFO ("structure: %s", sstr);
1059   g_free (sstr);
1060 }
1061
1062 static GstRTSPMediaTrans *
1063 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1064 {
1065   GList *walk;
1066   GstRTSPMediaTrans *result = NULL;
1067   const gchar *tmp;
1068   gchar *dest;
1069   guint port;
1070
1071   if (rtcp_from == NULL)
1072     return NULL;
1073
1074   tmp = g_strrstr (rtcp_from, ":");
1075   if (tmp == NULL)
1076     return NULL;
1077
1078   port = atoi (tmp + 1);
1079   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1080
1081   GST_INFO ("finding %s:%d", dest, port);
1082
1083   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1084     GstRTSPMediaTrans *trans = walk->data;
1085     gint min, max;
1086
1087     min = trans->transport->client_port.min;
1088     max = trans->transport->client_port.max;
1089
1090     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1091             || max == port)) {
1092       result = trans;
1093       break;
1094     }
1095   }
1096   g_free (dest);
1097
1098   return result;
1099 }
1100
1101 static void
1102 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1103 {
1104   GstStructure *stats;
1105   GstRTSPMediaTrans *trans;
1106
1107   GST_INFO ("%p: new source %p", stream, source);
1108
1109   /* see if we have a stream to match with the origin of the RTCP packet */
1110   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1111   if (trans == NULL) {
1112     g_object_get (source, "stats", &stats, NULL);
1113     if (stats) {
1114       const gchar *rtcp_from;
1115
1116       dump_structure (stats);
1117
1118       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1119       if ((trans = find_transport (stream, rtcp_from))) {
1120         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1121             source);
1122
1123         /* keep ref to the source */
1124         trans->rtpsource = source;
1125
1126         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1127       }
1128       gst_structure_free (stats);
1129     }
1130   } else {
1131     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1132   }
1133 }
1134
1135 static void
1136 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1137 {
1138   GST_INFO ("%p: new SDES %p", stream, source);
1139 }
1140
1141 static void
1142 on_ssrc_active (GObject * session, GObject * source,
1143     GstRTSPMediaStream * stream)
1144 {
1145   GstRTSPMediaTrans *trans;
1146
1147   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1148
1149   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1150
1151   if (trans && trans->keep_alive)
1152     trans->keep_alive (trans->ka_user_data);
1153
1154 #ifdef DUMP_STATS
1155   {
1156     GstStructure *stats;
1157     g_object_get (source, "stats", &stats, NULL);
1158     if (stats) {
1159       dump_structure (stats);
1160       gst_structure_free (stats);
1161     }
1162   }
1163 #endif
1164 }
1165
1166 static void
1167 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1168 {
1169   GST_INFO ("%p: source %p bye", stream, source);
1170 }
1171
1172 static void
1173 on_bye_timeout (GObject * session, GObject * source,
1174     GstRTSPMediaStream * stream)
1175 {
1176   GstRTSPMediaTrans *trans;
1177
1178   GST_INFO ("%p: source %p bye timeout", stream, source);
1179
1180   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1181     trans->rtpsource = NULL;
1182     trans->timeout = TRUE;
1183   }
1184 }
1185
1186 static void
1187 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1188 {
1189   GstRTSPMediaTrans *trans;
1190
1191   GST_INFO ("%p: source %p timeout", stream, source);
1192
1193   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1194     trans->rtpsource = NULL;
1195     trans->timeout = TRUE;
1196   }
1197 }
1198
1199 static GstFlowReturn
1200 handle_new_sample (GstAppSink * sink, gpointer user_data)
1201 {
1202   GList *walk;
1203   GstSample *sample;
1204   GstBuffer *buffer;
1205   GstRTSPMediaStream *stream;
1206
1207   sample = gst_app_sink_pull_sample (sink);
1208   if (!sample)
1209     return GST_FLOW_OK;
1210
1211   stream = (GstRTSPMediaStream *) user_data;
1212   buffer = gst_sample_get_buffer (sample);
1213
1214   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1215     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1216
1217     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1218       if (tr->send_rtp)
1219         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1220     } else {
1221       if (tr->send_rtcp)
1222         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1223     }
1224   }
1225   gst_sample_unref (sample);
1226
1227   return GST_FLOW_OK;
1228 }
1229
1230 static GstAppSinkCallbacks sink_cb = {
1231   NULL,                         /* not interested in EOS */
1232   NULL,                         /* not interested in preroll samples */
1233   handle_new_sample,
1234 };
1235
1236 /* prepare the pipeline objects to handle @stream in @media */
1237 static gboolean
1238 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1239 {
1240   gchar *name;
1241   GstPad *pad, *teepad, *queuepad, *selpad;
1242   GstPadLinkReturn ret;
1243   gint i;
1244
1245   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1246    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1247    * elements */
1248   if (!alloc_udp_ports (media, stream))
1249     return FALSE;
1250
1251   /* add the ports to the pipeline */
1252   for (i = 0; i < 2; i++) {
1253     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1254     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1255   }
1256
1257   /* create elements for the TCP transfer */
1258   for (i = 0; i < 2; i++) {
1259     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1260     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
1261     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1262     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1263     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1264     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appqueue[i]);
1265     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1266     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1267     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1268         &sink_cb, stream, NULL);
1269   }
1270
1271   /* hook up the stream to the RTP session elements. */
1272   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1273   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1274   g_free (name);
1275   name = g_strdup_printf ("send_rtp_src_%u", idx);
1276   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1277   g_free (name);
1278   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1279   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1280   g_free (name);
1281   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1282   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1283   g_free (name);
1284   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1285   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1286   g_free (name);
1287
1288   /* get the session */
1289   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1290       &stream->session);
1291
1292   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1293       stream);
1294   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1295       stream);
1296   g_signal_connect (stream->session, "on-ssrc-active",
1297       (GCallback) on_ssrc_active, stream);
1298   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1299       stream);
1300   g_signal_connect (stream->session, "on-bye-timeout",
1301       (GCallback) on_bye_timeout, stream);
1302   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1303       stream);
1304
1305   /* link the RTP pad to the session manager */
1306   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1307   if (ret != GST_PAD_LINK_OK)
1308     goto link_failed;
1309
1310   /* make tee for RTP and link to stream */
1311   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1312   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1313
1314   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1315   gst_pad_link (stream->send_rtp_src, pad);
1316   gst_object_unref (pad);
1317
1318   /* link RTP sink, we're pretty sure this will work. */
1319   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1320   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1321   gst_pad_link (teepad, pad);
1322   gst_object_unref (pad);
1323   gst_object_unref (teepad);
1324
1325   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1326   pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
1327   gst_pad_link (teepad, pad);
1328   gst_object_unref (pad);
1329   gst_object_unref (teepad);
1330
1331   queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
1332   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1333   gst_pad_link (queuepad, pad);
1334   gst_object_unref (pad);
1335   gst_object_unref (queuepad);
1336
1337   /* make tee for RTCP */
1338   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1339   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1340
1341   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1342   gst_pad_link (stream->send_rtcp_src, pad);
1343   gst_object_unref (pad);
1344
1345   /* link RTCP elements */
1346   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1347   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1348   gst_pad_link (teepad, pad);
1349   gst_object_unref (pad);
1350   gst_object_unref (teepad);
1351
1352   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1353   pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
1354   gst_pad_link (teepad, pad);
1355   gst_object_unref (pad);
1356   gst_object_unref (teepad);
1357
1358   queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
1359   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1360   gst_pad_link (queuepad, pad);
1361   gst_object_unref (pad);
1362   gst_object_unref (queuepad);
1363
1364   /* make selector for the RTP receivers */
1365   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
1366   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1367
1368   pad = gst_element_get_static_pad (stream->selector[0], "src");
1369   gst_pad_link (pad, stream->recv_rtp_sink);
1370   gst_object_unref (pad);
1371
1372   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1373   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1374   gst_pad_link (pad, selpad);
1375   gst_object_unref (pad);
1376   gst_object_unref (selpad);
1377
1378   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1379   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1380   gst_pad_link (pad, selpad);
1381   gst_object_unref (pad);
1382   gst_object_unref (selpad);
1383
1384   /* make selector for the RTCP receivers */
1385   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
1386   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1387
1388   pad = gst_element_get_static_pad (stream->selector[1], "src");
1389   gst_pad_link (pad, stream->recv_rtcp_sink);
1390   gst_object_unref (pad);
1391
1392   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1393   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1394   gst_pad_link (pad, selpad);
1395   gst_object_unref (pad);
1396   gst_object_unref (selpad);
1397
1398   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1399   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1400   gst_pad_link (pad, selpad);
1401   gst_object_unref (pad);
1402   gst_object_unref (selpad);
1403
1404   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1405    * values */
1406   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1407   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1408   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1409   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1410
1411   /* be notified of caps changes */
1412   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1413       (GCallback) caps_notify, stream);
1414
1415   stream->prepared = TRUE;
1416
1417   return TRUE;
1418
1419   /* ERRORS */
1420 link_failed:
1421   {
1422     GST_WARNING ("failed to link stream %d", idx);
1423     return FALSE;
1424   }
1425 }
1426
1427 static void
1428 unlock_streams (GstRTSPMedia * media)
1429 {
1430   guint i, n_streams;
1431
1432   /* unlock the udp src elements */
1433   n_streams = gst_rtsp_media_n_streams (media);
1434   for (i = 0; i < n_streams; i++) {
1435     GstRTSPMediaStream *stream;
1436
1437     stream = gst_rtsp_media_get_stream (media, i);
1438
1439     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1440     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1441   }
1442 }
1443
1444 static void
1445 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1446 {
1447   g_mutex_lock (media->lock);
1448   /* never overwrite the error status */
1449   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1450     media->status = status;
1451   GST_DEBUG ("setting new status to %d", status);
1452   g_cond_broadcast (media->cond);
1453   g_mutex_unlock (media->lock);
1454 }
1455
1456 static GstRTSPMediaStatus
1457 gst_rtsp_media_get_status (GstRTSPMedia * media)
1458 {
1459   GstRTSPMediaStatus result;
1460   GTimeVal timeout;
1461
1462   g_mutex_lock (media->lock);
1463   g_get_current_time (&timeout);
1464   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1465   /* while we are preparing, wait */
1466   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1467     GST_DEBUG ("waiting for status change");
1468     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1469       GST_DEBUG ("timeout, assuming error status");
1470       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1471     }
1472   }
1473   /* could be success or error */
1474   result = media->status;
1475   GST_DEBUG ("got status %d", result);
1476   g_mutex_unlock (media->lock);
1477
1478   return result;
1479 }
1480
1481 static gboolean
1482 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1483 {
1484   GstMessageType type;
1485
1486   type = GST_MESSAGE_TYPE (message);
1487
1488   switch (type) {
1489     case GST_MESSAGE_STATE_CHANGED:
1490       break;
1491     case GST_MESSAGE_BUFFERING:
1492     {
1493       gint percent;
1494
1495       gst_message_parse_buffering (message, &percent);
1496
1497       /* no state management needed for live pipelines */
1498       if (media->is_live)
1499         break;
1500
1501       if (percent == 100) {
1502         /* a 100% message means buffering is done */
1503         media->buffering = FALSE;
1504         /* if the desired state is playing, go back */
1505         if (media->target_state == GST_STATE_PLAYING) {
1506           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1507           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1508         } else {
1509           GST_INFO ("Buffering done");
1510         }
1511       } else {
1512         /* buffering busy */
1513         if (media->buffering == FALSE) {
1514           if (media->target_state == GST_STATE_PLAYING) {
1515             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1516             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1517             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1518           } else {
1519             GST_INFO ("Buffering ...");
1520           }
1521         }
1522         media->buffering = TRUE;
1523       }
1524       break;
1525     }
1526     case GST_MESSAGE_LATENCY:
1527     {
1528       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1529       break;
1530     }
1531     case GST_MESSAGE_ERROR:
1532     {
1533       GError *gerror;
1534       gchar *debug;
1535
1536       gst_message_parse_error (message, &gerror, &debug);
1537       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1538       g_error_free (gerror);
1539       g_free (debug);
1540
1541       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1542       break;
1543     }
1544     case GST_MESSAGE_WARNING:
1545     {
1546       GError *gerror;
1547       gchar *debug;
1548
1549       gst_message_parse_warning (message, &gerror, &debug);
1550       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1551       g_error_free (gerror);
1552       g_free (debug);
1553       break;
1554     }
1555     case GST_MESSAGE_ELEMENT:
1556       break;
1557     case GST_MESSAGE_STREAM_STATUS:
1558       break;
1559     case GST_MESSAGE_ASYNC_DONE:
1560       if (!media->adding) {
1561         /* when we are dynamically adding pads, the addition of the udpsrc will
1562          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1563          * wait for the final ASYNC_DONE after everything prerolled */
1564         GST_INFO ("%p: got ASYNC_DONE", media);
1565         collect_media_stats (media);
1566
1567         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1568       } else {
1569         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1570       }
1571       break;
1572     case GST_MESSAGE_EOS:
1573       GST_INFO ("%p: got EOS", media);
1574       if (media->eos_pending) {
1575         GST_DEBUG ("shutting down after EOS");
1576         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1577         media->eos_pending = FALSE;
1578         g_object_unref (media);
1579       }
1580       break;
1581     default:
1582       GST_INFO ("%p: got message type %s", media,
1583           gst_message_type_get_name (type));
1584       break;
1585   }
1586   return TRUE;
1587 }
1588
1589 static gboolean
1590 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1591 {
1592   GstRTSPMediaClass *klass;
1593   gboolean ret;
1594
1595   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1596
1597   if (klass->handle_message)
1598     ret = klass->handle_message (media, message);
1599   else
1600     ret = FALSE;
1601
1602   return ret;
1603 }
1604
1605 /* called from streaming threads */
1606 static void
1607 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1608 {
1609   GstRTSPMediaStream *stream;
1610   gchar *name;
1611   gint i;
1612
1613   i = media->streams->len + 1;
1614
1615   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1616
1617   stream = g_new0 (GstRTSPMediaStream, 1);
1618   stream->payloader = element;
1619
1620   name = g_strdup_printf ("dynpay%d", i);
1621
1622   media->adding = TRUE;
1623
1624   /* ghost the pad of the payloader to the element */
1625   stream->srcpad = gst_ghost_pad_new (name, pad);
1626   gst_pad_set_active (stream->srcpad, TRUE);
1627   gst_element_add_pad (media->element, stream->srcpad);
1628   g_free (name);
1629
1630   /* add stream now */
1631   g_array_append_val (media->streams, stream);
1632
1633   setup_stream (stream, i, media);
1634
1635   for (i = 0; i < 2; i++) {
1636     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1637     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1638     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1639     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1640     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1641   }
1642   media->adding = FALSE;
1643 }
1644
1645 static void
1646 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1647 {
1648   GST_INFO ("no more pads");
1649   if (media->fakesink) {
1650     gst_object_ref (media->fakesink);
1651     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1652     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1653     gst_object_unref (media->fakesink);
1654     media->fakesink = NULL;
1655     GST_INFO ("removed fakesink");
1656   }
1657 }
1658
1659 /**
1660  * gst_rtsp_media_prepare:
1661  * @media: a #GstRTSPMedia
1662  *
1663  * Prepare @media for streaming. This function will create the pipeline and
1664  * other objects to manage the streaming.
1665  *
1666  * It will preroll the pipeline and collect vital information about the streams
1667  * such as the duration.
1668  *
1669  * Returns: %TRUE on success.
1670  */
1671 gboolean
1672 gst_rtsp_media_prepare (GstRTSPMedia * media)
1673 {
1674   GstStateChangeReturn ret;
1675   GstRTSPMediaStatus status;
1676   guint i, n_streams;
1677   GstRTSPMediaClass *klass;
1678   GstBus *bus;
1679   GList *walk;
1680
1681   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1682     goto was_prepared;
1683
1684   if (!media->reusable && media->reused)
1685     goto is_reused;
1686
1687   media->rtpbin = gst_element_factory_make ("rtpbin", NULL);
1688   if (media->rtpbin == NULL)
1689     goto no_rtpbin;
1690
1691   GST_INFO ("preparing media %p", media);
1692
1693   /* reset some variables */
1694   media->is_live = FALSE;
1695   media->seekable = FALSE;
1696   media->buffering = FALSE;
1697   /* we're preparing now */
1698   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1699
1700   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1701
1702   /* add the pipeline bus to our custom mainloop */
1703   media->source = gst_bus_create_watch (bus);
1704   gst_object_unref (bus);
1705
1706   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1707
1708   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1709   media->id = g_source_attach (media->source, klass->context);
1710
1711   /* add stuff to the bin */
1712   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1713
1714   /* link streams we already have, other streams might appear when we have
1715    * dynamic elements */
1716   n_streams = gst_rtsp_media_n_streams (media);
1717   for (i = 0; i < n_streams; i++) {
1718     GstRTSPMediaStream *stream;
1719
1720     stream = gst_rtsp_media_get_stream (media, i);
1721
1722     setup_stream (stream, i, media);
1723   }
1724
1725   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1726     GstElement *elem = walk->data;
1727
1728     GST_INFO ("adding callbacks for dynamic element %p", elem);
1729
1730     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1731     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1732
1733     /* we add a fakesink here in order to make the state change async. We remove
1734      * the fakesink again in the no-more-pads callback. */
1735     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1736     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1737   }
1738
1739   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1740   /* first go to PAUSED */
1741   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1742   media->target_state = GST_STATE_PAUSED;
1743
1744   switch (ret) {
1745     case GST_STATE_CHANGE_SUCCESS:
1746       GST_INFO ("SUCCESS state change for media %p", media);
1747       media->seekable = TRUE;
1748       break;
1749     case GST_STATE_CHANGE_ASYNC:
1750       GST_INFO ("ASYNC state change for media %p", media);
1751       media->seekable = TRUE;
1752       break;
1753     case GST_STATE_CHANGE_NO_PREROLL:
1754       /* we need to go to PLAYING */
1755       GST_INFO ("NO_PREROLL state change: live media %p", media);
1756       /* FIXME we disable seeking for live streams for now. We should perform a
1757        * seeking query in preroll instead and do a seeking query. */
1758       media->seekable = FALSE;
1759       media->is_live = TRUE;
1760       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1761       if (ret == GST_STATE_CHANGE_FAILURE)
1762         goto state_failed;
1763       break;
1764     case GST_STATE_CHANGE_FAILURE:
1765       goto state_failed;
1766   }
1767
1768   /* now wait for all pads to be prerolled */
1769   status = gst_rtsp_media_get_status (media);
1770   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1771     goto state_failed;
1772
1773   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1774
1775   GST_INFO ("object %p is prerolled", media);
1776
1777   return TRUE;
1778
1779   /* OK */
1780 was_prepared:
1781   {
1782     return TRUE;
1783   }
1784   /* ERRORS */
1785 is_reused:
1786   {
1787     GST_WARNING ("can not reuse media %p", media);
1788     return FALSE;
1789   }
1790 no_rtpbin:
1791   {
1792     GST_WARNING ("no rtpbin element");
1793     g_warning ("failed to create element 'rtpbin', check your installation");
1794     return FALSE;
1795   }
1796 state_failed:
1797   {
1798     GST_WARNING ("failed to preroll pipeline");
1799     unlock_streams (media);
1800     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1801     gst_rtsp_media_unprepare (media);
1802     return FALSE;
1803   }
1804 }
1805
1806 /**
1807  * gst_rtsp_media_unprepare:
1808  * @media: a #GstRTSPMedia
1809  *
1810  * Unprepare @media. After this call, the media should be prepared again before
1811  * it can be used again. If the media is set to be non-reusable, a new instance
1812  * must be created.
1813  *
1814  * Returns: %TRUE on success.
1815  */
1816 gboolean
1817 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1818 {
1819   GstRTSPMediaClass *klass;
1820   gboolean success;
1821
1822   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1823     return TRUE;
1824
1825   GST_INFO ("unprepare media %p", media);
1826   media->target_state = GST_STATE_NULL;
1827
1828   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1829   if (klass->unprepare)
1830     success = klass->unprepare (media);
1831   else
1832     success = TRUE;
1833
1834   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1835   media->reused = TRUE;
1836
1837   /* when the media is not reusable, this will effectively unref the media and
1838    * recreate it */
1839   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1840
1841   return success;
1842 }
1843
1844 static gboolean
1845 default_unprepare (GstRTSPMedia * media)
1846 {
1847   if (media->eos_shutdown) {
1848     GST_DEBUG ("sending EOS for shutdown");
1849     /* ref so that we don't disappear */
1850     g_object_ref (media);
1851     media->eos_pending = TRUE;
1852     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1853     /* we need to go to playing again for the EOS to propagate, normally in this
1854      * state, nothing is receiving data from us anymore so this is ok. */
1855     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1856   } else {
1857     GST_DEBUG ("shutting down");
1858     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1859   }
1860   return TRUE;
1861 }
1862
1863 static void
1864 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1865     gchar * dest, gint min, gint max)
1866 {
1867   GST_INFO ("adding %s:%d-%d", dest, min, max);
1868   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1869   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1870 }
1871
1872 static void
1873 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1874     gchar * dest, gint min, gint max)
1875 {
1876   GST_INFO ("removing %s:%d-%d", dest, min, max);
1877   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1878   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1879 }
1880
1881 /**
1882  * gst_rtsp_media_set_state:
1883  * @media: a #GstRTSPMedia
1884  * @state: the target state of the media
1885  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1886  *
1887  * Set the state of @media to @state and for the transports in @transports.
1888  *
1889  * Returns: %TRUE on success.
1890  */
1891 gboolean
1892 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1893     GArray * transports)
1894 {
1895   gint i;
1896   gboolean add, remove, do_state;
1897   gint old_active;
1898
1899   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1900   g_return_val_if_fail (transports != NULL, FALSE);
1901
1902   /* NULL and READY are the same */
1903   if (state == GST_STATE_READY)
1904     state = GST_STATE_NULL;
1905
1906   add = remove = FALSE;
1907
1908   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1909       media);
1910
1911   switch (state) {
1912     case GST_STATE_NULL:
1913       /* unlock the streams so that they follow the state changes from now on */
1914       unlock_streams (media);
1915       /* fallthrough */
1916     case GST_STATE_PAUSED:
1917       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1918       if (media->target_state == GST_STATE_PLAYING)
1919         remove = TRUE;
1920       break;
1921     case GST_STATE_PLAYING:
1922       /* we're going to PLAYING, add */
1923       add = TRUE;
1924       break;
1925     default:
1926       break;
1927   }
1928   old_active = media->active;
1929
1930   for (i = 0; i < transports->len; i++) {
1931     GstRTSPMediaTrans *tr;
1932     GstRTSPMediaStream *stream;
1933     GstRTSPTransport *trans;
1934
1935     /* we need a non-NULL entry in the array */
1936     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1937     if (tr == NULL)
1938       continue;
1939
1940     /* we need a transport */
1941     if (!(trans = tr->transport))
1942       continue;
1943
1944     /* get the stream and add the destinations */
1945     stream = gst_rtsp_media_get_stream (media, tr->idx);
1946     switch (trans->lower_transport) {
1947       case GST_RTSP_LOWER_TRANS_UDP:
1948       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1949       {
1950         gchar *dest;
1951         gint min, max;
1952
1953         dest = trans->destination;
1954         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1955           min = trans->port.min;
1956           max = trans->port.max;
1957         } else {
1958           min = trans->client_port.min;
1959           max = trans->client_port.max;
1960         }
1961
1962         if (add && !tr->active) {
1963           add_udp_destination (media, stream, dest, min, max);
1964           stream->transports = g_list_prepend (stream->transports, tr);
1965           tr->active = TRUE;
1966           media->active++;
1967         } else if (remove && tr->active) {
1968           remove_udp_destination (media, stream, dest, min, max);
1969           stream->transports = g_list_remove (stream->transports, tr);
1970           tr->active = FALSE;
1971           media->active--;
1972         }
1973         break;
1974       }
1975       case GST_RTSP_LOWER_TRANS_TCP:
1976         if (add && !tr->active) {
1977           GST_INFO ("adding TCP %s", trans->destination);
1978           stream->transports = g_list_prepend (stream->transports, tr);
1979           tr->active = TRUE;
1980           media->active++;
1981         } else if (remove && tr->active) {
1982           GST_INFO ("removing TCP %s", trans->destination);
1983           stream->transports = g_list_remove (stream->transports, tr);
1984           tr->active = FALSE;
1985           media->active--;
1986         }
1987         break;
1988       default:
1989         GST_INFO ("Unknown transport %d", trans->lower_transport);
1990         break;
1991     }
1992   }
1993
1994   /* we just added the first media, do the playing state change */
1995   if (old_active == 0 && add)
1996     do_state = TRUE;
1997   /* if we have no more active media, do the downward state changes */
1998   else if (media->active == 0)
1999     do_state = TRUE;
2000   else
2001     do_state = FALSE;
2002
2003   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2004       media, do_state);
2005
2006   if (media->target_state != state) {
2007     if (do_state) {
2008       if (state == GST_STATE_NULL) {
2009         gst_rtsp_media_unprepare (media);
2010       } else {
2011         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2012             media);
2013         media->target_state = state;
2014         gst_element_set_state (media->pipeline, state);
2015       }
2016     }
2017     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2018         NULL);
2019   }
2020
2021   /* remember where we are */
2022   if (state == GST_STATE_PAUSED || old_active != media->active)
2023     collect_media_stats (media);
2024
2025   return TRUE;
2026 }
2027
2028 /**
2029  * gst_rtsp_media_remove_elements:
2030  * @media: a #GstRTSPMedia
2031  *
2032  * Remove all elements and the pipeline controlled by @media.
2033  */
2034 void
2035 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2036 {
2037   gint i, j;
2038
2039   unlock_streams (media);
2040
2041   for (i = 0; i < media->streams->len; i++) {
2042     GstRTSPMediaStream *stream;
2043
2044     GST_INFO ("Removing elements of stream %d from pipeline", i);
2045
2046     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2047
2048     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2049
2050     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2051
2052     for (j = 0; j < 2; j++) {
2053       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2054       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2055       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2056       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2057       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2058       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2059
2060       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2061       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2062       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2063       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2064       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2065       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2066     }
2067     if (stream->caps)
2068       gst_caps_unref (stream->caps);
2069     stream->caps = NULL;
2070     gst_rtsp_media_stream_free (stream);
2071   }
2072   g_array_remove_range (media->streams, 0, media->streams->len);
2073
2074   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2075   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2076
2077   gst_object_unref (media->pipeline);
2078   media->pipeline = NULL;
2079 }