rtsp-media: don't collect media stats when going to NULL
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED          FALSE
29 #define DEFAULT_REUSABLE        FALSE
30 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN    FALSE
33 #define DEFAULT_BUFFER_SIZE     0x80000
34 #define DEFAULT_MULTICAST_GROUP "224.2.0.1"
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_MULTICAST_GROUP,
48   PROP_LAST
49 };
50
51 enum
52 {
53   SIGNAL_PREPARED,
54   SIGNAL_UNPREPARED,
55   SIGNAL_NEW_STATE,
56   SIGNAL_LAST
57 };
58
59 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
60 #define GST_CAT_DEFAULT rtsp_media_debug
61
62 static GQuark ssrc_stream_map_key;
63
64 static void gst_rtsp_media_get_property (GObject * object, guint propid,
65     GValue * value, GParamSpec * pspec);
66 static void gst_rtsp_media_set_property (GObject * object, guint propid,
67     const GValue * value, GParamSpec * pspec);
68 static void gst_rtsp_media_finalize (GObject * obj);
69
70 static gpointer do_loop (GstRTSPMediaClass * klass);
71 static gboolean default_handle_message (GstRTSPMedia * media,
72     GstMessage * message);
73 static gboolean default_unprepare (GstRTSPMedia * media);
74 static void unlock_streams (GstRTSPMedia * media);
75
76 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
77
78 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
79
80 static void
81 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
82 {
83   GObjectClass *gobject_class;
84
85   gobject_class = G_OBJECT_CLASS (klass);
86
87   gobject_class->get_property = gst_rtsp_media_get_property;
88   gobject_class->set_property = gst_rtsp_media_set_property;
89   gobject_class->finalize = gst_rtsp_media_finalize;
90
91   g_object_class_install_property (gobject_class, PROP_SHARED,
92       g_param_spec_boolean ("shared", "Shared",
93           "If this media pipeline can be shared", DEFAULT_SHARED,
94           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
95
96   g_object_class_install_property (gobject_class, PROP_REUSABLE,
97       g_param_spec_boolean ("reusable", "Reusable",
98           "If this media pipeline can be reused after an unprepare",
99           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100
101   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
102       g_param_spec_flags ("protocols", "Protocols",
103           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
104           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105
106   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
107       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
108           "Send an EOS event to the pipeline before unpreparing",
109           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
110
111   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
112       g_param_spec_uint ("buffer-size", "Buffer Size",
113           "The kernel UDP buffer size to use", 0, G_MAXUINT,
114           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115
116   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
117       g_param_spec_string ("multicast-group", "Multicast Group",
118           "The Multicast group to send media to",
119           DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
120
121   gst_rtsp_media_signals[SIGNAL_PREPARED] =
122       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
123       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
124       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
125
126   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
127       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
128       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
129       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
130
131   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
132       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
133       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
134       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
135
136   klass->context = g_main_context_new ();
137   klass->loop = g_main_loop_new (klass->context, TRUE);
138
139   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
140
141   klass->thread = g_thread_new ("Bus Thread", (GThreadFunc) do_loop, klass);
142
143   klass->handle_message = default_handle_message;
144   klass->unprepare = default_unprepare;
145
146   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
147 }
148
149 static void
150 gst_rtsp_media_init (GstRTSPMedia * media)
151 {
152   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
153   g_mutex_init (&media->lock);
154   g_cond_init (&media->cond);
155
156   media->shared = DEFAULT_SHARED;
157   media->reusable = DEFAULT_REUSABLE;
158   media->protocols = DEFAULT_PROTOCOLS;
159   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
160   media->buffer_size = DEFAULT_BUFFER_SIZE;
161   media->multicast_group = g_strdup (DEFAULT_MULTICAST_GROUP);
162 }
163
164 void
165 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
166 {
167   if (trans->transport) {
168     gst_rtsp_transport_free (trans->transport);
169     trans->transport = NULL;
170   }
171   if (trans->rtpsource) {
172     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
173     trans->rtpsource = NULL;
174   }
175 }
176
177 static void
178 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
179 {
180   if (stream->session)
181     g_object_unref (stream->session);
182
183   if (stream->caps)
184     gst_caps_unref (stream->caps);
185
186   if (stream->send_rtp_sink)
187     gst_object_unref (stream->send_rtp_sink);
188   if (stream->send_rtp_src)
189     gst_object_unref (stream->send_rtp_src);
190   if (stream->send_rtcp_src)
191     gst_object_unref (stream->send_rtcp_src);
192   if (stream->recv_rtcp_sink)
193     gst_object_unref (stream->recv_rtcp_sink);
194   if (stream->recv_rtp_sink)
195     gst_object_unref (stream->recv_rtp_sink);
196
197   g_list_free (stream->transports);
198
199   g_free (stream);
200 }
201
202 static void
203 gst_rtsp_media_finalize (GObject * obj)
204 {
205   GstRTSPMedia *media;
206   guint i;
207
208   media = GST_RTSP_MEDIA (obj);
209
210   GST_INFO ("finalize media %p", media);
211
212   if (media->pipeline) {
213     unlock_streams (media);
214     gst_element_set_state (media->pipeline, GST_STATE_NULL);
215     gst_object_unref (media->pipeline);
216   }
217
218   for (i = 0; i < media->streams->len; i++) {
219     GstRTSPMediaStream *stream;
220
221     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
222
223     gst_rtsp_media_stream_free (stream);
224   }
225   g_array_free (media->streams, TRUE);
226
227   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
228   g_list_free (media->dynamic);
229
230   if (media->source) {
231     g_source_destroy (media->source);
232     g_source_unref (media->source);
233   }
234   g_free (media->multicast_group);
235   g_mutex_clear (&media->lock);
236   g_cond_clear (&media->cond);
237
238   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
239 }
240
241 static void
242 gst_rtsp_media_get_property (GObject * object, guint propid,
243     GValue * value, GParamSpec * pspec)
244 {
245   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
246
247   switch (propid) {
248     case PROP_SHARED:
249       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
250       break;
251     case PROP_REUSABLE:
252       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
253       break;
254     case PROP_PROTOCOLS:
255       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
256       break;
257     case PROP_EOS_SHUTDOWN:
258       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
259       break;
260     case PROP_BUFFER_SIZE:
261       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
262       break;
263     case PROP_MULTICAST_GROUP:
264       g_value_take_string (value, gst_rtsp_media_get_multicast_group (media));
265       break;
266     default:
267       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
268   }
269 }
270
271 static void
272 gst_rtsp_media_set_property (GObject * object, guint propid,
273     const GValue * value, GParamSpec * pspec)
274 {
275   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
276
277   switch (propid) {
278     case PROP_SHARED:
279       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
280       break;
281     case PROP_REUSABLE:
282       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
283       break;
284     case PROP_PROTOCOLS:
285       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
286       break;
287     case PROP_EOS_SHUTDOWN:
288       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
289       break;
290     case PROP_BUFFER_SIZE:
291       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
292       break;
293     case PROP_MULTICAST_GROUP:
294       gst_rtsp_media_set_multicast_group (media, g_value_get_string (value));
295       break;
296     default:
297       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
298   }
299 }
300
301 static gpointer
302 do_loop (GstRTSPMediaClass * klass)
303 {
304   GST_INFO ("enter mainloop");
305   g_main_loop_run (klass->loop);
306   GST_INFO ("exit mainloop");
307
308   return NULL;
309 }
310
311 static void
312 collect_media_stats (GstRTSPMedia * media)
313 {
314   gint64 position, duration;
315
316   media->range.unit = GST_RTSP_RANGE_NPT;
317
318   if (media->is_live) {
319     media->range.min.type = GST_RTSP_TIME_NOW;
320     media->range.min.seconds = -1;
321     media->range.max.type = GST_RTSP_TIME_END;
322     media->range.max.seconds = -1;
323   } else {
324     /* get the position */
325     if (!gst_element_query_position (media->pipeline, GST_FORMAT_TIME,
326             &position)) {
327       GST_INFO ("position query failed");
328       position = 0;
329     }
330
331     /* get the duration */
332     if (!gst_element_query_duration (media->pipeline, GST_FORMAT_TIME,
333             &duration)) {
334       GST_INFO ("duration query failed");
335       duration = -1;
336     }
337
338     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
339         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
340
341     if (position == -1) {
342       media->range.min.type = GST_RTSP_TIME_NOW;
343       media->range.min.seconds = -1;
344     } else {
345       media->range.min.type = GST_RTSP_TIME_SECONDS;
346       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
347     }
348     if (duration == -1) {
349       media->range.max.type = GST_RTSP_TIME_END;
350       media->range.max.seconds = -1;
351     } else {
352       media->range.max.type = GST_RTSP_TIME_SECONDS;
353       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
354     }
355   }
356 }
357
358 /**
359  * gst_rtsp_media_new:
360  *
361  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
362  * element to produde RTP data for one or more related (audio/video/..) 
363  * streams.
364  *
365  * Returns: a new #GstRTSPMedia object.
366  */
367 GstRTSPMedia *
368 gst_rtsp_media_new (void)
369 {
370   GstRTSPMedia *result;
371
372   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
373
374   return result;
375 }
376
377 /**
378  * gst_rtsp_media_set_shared:
379  * @media: a #GstRTSPMedia
380  * @shared: the new value
381  *
382  * Set or unset if the pipeline for @media can be shared will multiple clients.
383  * When @shared is %TRUE, client requests for this media will share the media
384  * pipeline.
385  */
386 void
387 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
388 {
389   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
390
391   media->shared = shared;
392 }
393
394 /**
395  * gst_rtsp_media_is_shared:
396  * @media: a #GstRTSPMedia
397  *
398  * Check if the pipeline for @media can be shared between multiple clients.
399  *
400  * Returns: %TRUE if the media can be shared between clients.
401  */
402 gboolean
403 gst_rtsp_media_is_shared (GstRTSPMedia * media)
404 {
405   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
406
407   return media->shared;
408 }
409
410 /**
411  * gst_rtsp_media_set_reusable:
412  * @media: a #GstRTSPMedia
413  * @reusable: the new value
414  *
415  * Set or unset if the pipeline for @media can be reused after the pipeline has
416  * been unprepared.
417  */
418 void
419 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
420 {
421   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
422
423   media->reusable = reusable;
424 }
425
426 /**
427  * gst_rtsp_media_is_reusable:
428  * @media: a #GstRTSPMedia
429  *
430  * Check if the pipeline for @media can be reused after an unprepare.
431  *
432  * Returns: %TRUE if the media can be reused
433  */
434 gboolean
435 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
436 {
437   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
438
439   return media->reusable;
440 }
441
442 /**
443  * gst_rtsp_media_set_protocols:
444  * @media: a #GstRTSPMedia
445  * @protocols: the new flags
446  *
447  * Configure the allowed lower transport for @media.
448  */
449 void
450 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
451 {
452   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
453
454   media->protocols = protocols;
455 }
456
457 /**
458  * gst_rtsp_media_get_protocols:
459  * @media: a #GstRTSPMedia
460  *
461  * Get the allowed protocols of @media.
462  *
463  * Returns: a #GstRTSPLowerTrans
464  */
465 GstRTSPLowerTrans
466 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
467 {
468   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
469       GST_RTSP_LOWER_TRANS_UNKNOWN);
470
471   return media->protocols;
472 }
473
474 /**
475  * gst_rtsp_media_set_eos_shutdown:
476  * @media: a #GstRTSPMedia
477  * @eos_shutdown: the new value
478  *
479  * Set or unset if an EOS event will be sent to the pipeline for @media before
480  * it is unprepared.
481  */
482 void
483 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
484 {
485   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
486
487   media->eos_shutdown = eos_shutdown;
488 }
489
490 /**
491  * gst_rtsp_media_is_eos_shutdown:
492  * @media: a #GstRTSPMedia
493  *
494  * Check if the pipeline for @media will send an EOS down the pipeline before
495  * unpreparing.
496  *
497  * Returns: %TRUE if the media will send EOS before unpreparing.
498  */
499 gboolean
500 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
501 {
502   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
503
504   return media->eos_shutdown;
505 }
506
507 /**
508  * gst_rtsp_media_set_buffer_size:
509  * @media: a #GstRTSPMedia
510  * @size: the new value
511  *
512  * Set the kernel UDP buffer size.
513  */
514 void
515 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
516 {
517   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
518
519   media->buffer_size = size;
520 }
521
522 /**
523  * gst_rtsp_media_get_buffer_size:
524  * @media: a #GstRTSPMedia
525  *
526  * Get the kernel UDP buffer size.
527  *
528  * Returns: the kernel UDP buffer size.
529  */
530 guint
531 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
532 {
533   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
534
535   return media->buffer_size;
536 }
537
538 /**
539  * gst_rtsp_media_set_multicast_group:
540  * @media: a #GstRTSPMedia
541  * @mc: the new multicast group
542  *
543  * Set the multicast group that media from @media will be streamed to.
544  */
545 void
546 gst_rtsp_media_set_multicast_group (GstRTSPMedia * media, const gchar * mc)
547 {
548   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
549
550   g_mutex_lock (&media->lock);
551   g_free (media->multicast_group);
552   media->multicast_group = g_strdup (mc);
553   g_mutex_unlock (&media->lock);
554 }
555
556 /**
557  * gst_rtsp_media_get_multicast_group:
558  * @media: a #GstRTSPMedia
559  *
560  * Get the multicast group that media from @media will be streamed to.
561  *
562  * Returns: the multicast group
563  */
564 gchar *
565 gst_rtsp_media_get_multicast_group (GstRTSPMedia * media)
566 {
567   gchar *result;
568
569   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
570
571   g_mutex_lock (&media->lock);
572   result = g_strdup (media->multicast_group);
573   g_mutex_unlock (&media->lock);
574
575   return result;
576 }
577
578 /**
579  * gst_rtsp_media_set_auth:
580  * @media: a #GstRTSPMedia
581  * @auth: a #GstRTSPAuth
582  *
583  * configure @auth to be used as the authentication manager of @media.
584  */
585 void
586 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
587 {
588   GstRTSPAuth *old;
589
590   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
591
592   old = media->auth;
593
594   if (old != auth) {
595     if (auth)
596       g_object_ref (auth);
597     media->auth = auth;
598     if (old)
599       g_object_unref (old);
600   }
601 }
602
603 /**
604  * gst_rtsp_media_get_auth:
605  * @media: a #GstRTSPMedia
606  *
607  * Get the #GstRTSPAuth used as the authentication manager of @media.
608  *
609  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
610  * usage.
611  */
612 GstRTSPAuth *
613 gst_rtsp_media_get_auth (GstRTSPMedia * media)
614 {
615   GstRTSPAuth *result;
616
617   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
618
619   if ((result = media->auth))
620     g_object_ref (result);
621
622   return result;
623 }
624
625
626 /**
627  * gst_rtsp_media_n_streams:
628  * @media: a #GstRTSPMedia
629  *
630  * Get the number of streams in this media.
631  *
632  * Returns: The number of streams.
633  */
634 guint
635 gst_rtsp_media_n_streams (GstRTSPMedia * media)
636 {
637   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
638
639   return media->streams->len;
640 }
641
642 /**
643  * gst_rtsp_media_get_stream:
644  * @media: a #GstRTSPMedia
645  * @idx: the stream index
646  *
647  * Retrieve the stream with index @idx from @media.
648  *
649  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
650  * that index did not exist.
651  */
652 GstRTSPMediaStream *
653 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
654 {
655   GstRTSPMediaStream *res;
656
657   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
658
659   if (idx < media->streams->len)
660     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
661   else
662     res = NULL;
663
664   return res;
665 }
666
667 /**
668  * gst_rtsp_media_get_range_string:
669  * @media: a #GstRTSPMedia
670  * @play: for the PLAY request
671  *
672  * Get the current range as a string.
673  *
674  * Returns: The range as a string, g_free() after usage.
675  */
676 gchar *
677 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
678 {
679   gchar *result;
680   GstRTSPTimeRange range;
681
682   /* make copy */
683   range = media->range;
684
685   if (!play && media->active > 0) {
686     range.min.type = GST_RTSP_TIME_NOW;
687     range.min.seconds = -1;
688   }
689
690   result = gst_rtsp_range_to_string (&range);
691
692   return result;
693 }
694
695 /**
696  * gst_rtsp_media_seek:
697  * @media: a #GstRTSPMedia
698  * @range: a #GstRTSPTimeRange
699  *
700  * Seek the pipeline to @range.
701  *
702  * Returns: %TRUE on success.
703  */
704 gboolean
705 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
706 {
707   GstSeekFlags flags;
708   gboolean res;
709   gint64 start, stop;
710   GstSeekType start_type, stop_type;
711
712   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
713   g_return_val_if_fail (range != NULL, FALSE);
714
715   if (media->seekable) {
716     GST_INFO ("pipeline is not seekable");
717     return TRUE;
718   }
719
720   if (range->unit != GST_RTSP_RANGE_NPT)
721     goto not_supported;
722
723   /* depends on the current playing state of the pipeline. We might need to
724    * queue this until we get EOS. */
725   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
726
727   start_type = stop_type = GST_SEEK_TYPE_NONE;
728
729   switch (range->min.type) {
730     case GST_RTSP_TIME_NOW:
731       start = -1;
732       break;
733     case GST_RTSP_TIME_SECONDS:
734       /* only seek when something changed */
735       if (media->range.min.seconds == range->min.seconds) {
736         start = -1;
737       } else {
738         start = range->min.seconds * GST_SECOND;
739         start_type = GST_SEEK_TYPE_SET;
740       }
741       break;
742     case GST_RTSP_TIME_END:
743     default:
744       goto weird_type;
745   }
746   switch (range->max.type) {
747     case GST_RTSP_TIME_SECONDS:
748       /* only seek when something changed */
749       if (media->range.max.seconds == range->max.seconds) {
750         stop = -1;
751       } else {
752         stop = range->max.seconds * GST_SECOND;
753         stop_type = GST_SEEK_TYPE_SET;
754       }
755       break;
756     case GST_RTSP_TIME_END:
757       stop = -1;
758       stop_type = GST_SEEK_TYPE_SET;
759       break;
760     case GST_RTSP_TIME_NOW:
761     default:
762       goto weird_type;
763   }
764
765   if (start != -1 || stop != -1) {
766     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
767         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
768
769     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
770         flags, start_type, start, stop_type, stop);
771
772     /* and block for the seek to complete */
773     GST_INFO ("done seeking %d", res);
774     gst_element_get_state (media->pipeline, NULL, NULL, -1);
775     GST_INFO ("prerolled again");
776
777     collect_media_stats (media);
778   } else {
779     GST_INFO ("no seek needed");
780     res = TRUE;
781   }
782
783   return res;
784
785   /* ERRORS */
786 not_supported:
787   {
788     GST_WARNING ("seek unit %d not supported", range->unit);
789     return FALSE;
790   }
791 weird_type:
792   {
793     GST_WARNING ("weird range type %d not supported", range->min.type);
794     return FALSE;
795   }
796 }
797
798 /**
799  * gst_rtsp_media_stream_rtp:
800  * @stream: a #GstRTSPMediaStream
801  * @buffer: a #GstBuffer
802  *
803  * Handle an RTP buffer for the stream. This method is usually called when a
804  * message has been received from a client using the TCP transport.
805  *
806  * This function takes ownership of @buffer.
807  *
808  * Returns: a GstFlowReturn.
809  */
810 GstFlowReturn
811 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
812 {
813   GstFlowReturn ret;
814
815   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
816
817   return ret;
818 }
819
820 /**
821  * gst_rtsp_media_stream_rtcp:
822  * @stream: a #GstRTSPMediaStream
823  * @buffer: a #GstBuffer
824  *
825  * Handle an RTCP buffer for the stream. This method is usually called when a
826  * message has been received from a client using the TCP transport.
827  *
828  * This function takes ownership of @buffer.
829  *
830  * Returns: a GstFlowReturn.
831  */
832 GstFlowReturn
833 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
834 {
835   GstFlowReturn ret;
836
837   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
838
839   return ret;
840 }
841
842 /* Allocate the udp ports and sockets */
843 static gboolean
844 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
845 {
846   GstStateChangeReturn ret;
847   GstElement *udpsrc0, *udpsrc1;
848   GstElement *udpsink0, *udpsink1;
849   gint tmp_rtp, tmp_rtcp;
850   guint count;
851   gint rtpport, rtcpport;
852   GSocket *socket;
853   const gchar *host;
854
855   udpsrc0 = NULL;
856   udpsrc1 = NULL;
857   udpsink0 = NULL;
858   udpsink1 = NULL;
859   count = 0;
860
861   /* Start with random port */
862   tmp_rtp = 0;
863
864   if (media->is_ipv6)
865     host = "udp://[::0]";
866   else
867     host = "udp://0.0.0.0";
868
869   /* try to allocate 2 UDP ports, the RTP port should be an even
870    * number and the RTCP port should be the next (uneven) port */
871 again:
872   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
873   if (udpsrc0 == NULL)
874     goto no_udp_protocol;
875   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
876
877   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
878   if (ret == GST_STATE_CHANGE_FAILURE) {
879     if (tmp_rtp != 0) {
880       tmp_rtp += 2;
881       if (++count > 20)
882         goto no_ports;
883
884       gst_element_set_state (udpsrc0, GST_STATE_NULL);
885       gst_object_unref (udpsrc0);
886
887       goto again;
888     }
889     goto no_udp_protocol;
890   }
891
892   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
893
894   /* check if port is even */
895   if ((tmp_rtp & 1) != 0) {
896     /* port not even, close and allocate another */
897     if (++count > 20)
898       goto no_ports;
899
900     gst_element_set_state (udpsrc0, GST_STATE_NULL);
901     gst_object_unref (udpsrc0);
902
903     tmp_rtp++;
904     goto again;
905   }
906
907   /* allocate port+1 for RTCP now */
908   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
909   if (udpsrc1 == NULL)
910     goto no_udp_rtcp_protocol;
911
912   /* set port */
913   tmp_rtcp = tmp_rtp + 1;
914   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
915
916   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
917   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
918   if (ret == GST_STATE_CHANGE_FAILURE) {
919
920     if (++count > 20)
921       goto no_ports;
922
923     gst_element_set_state (udpsrc0, GST_STATE_NULL);
924     gst_object_unref (udpsrc0);
925
926     gst_element_set_state (udpsrc1, GST_STATE_NULL);
927     gst_object_unref (udpsrc1);
928
929     tmp_rtp += 2;
930     goto again;
931   }
932
933   /* all fine, do port check */
934   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
935   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
936
937   /* this should not happen... */
938   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
939     goto port_error;
940
941   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
942   if (!udpsink0)
943     goto no_udp_protocol;
944
945   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
946   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
947   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
948
949   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
950   if (!udpsink1)
951     goto no_udp_protocol;
952
953   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
954           "send-duplicates")) {
955     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
956     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
957   } else {
958     g_warning
959         ("old multiudpsink version found without send-duplicates property");
960   }
961
962   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
963           "buffer-size")) {
964     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
965   } else {
966     GST_WARNING ("multiudpsink version found without buffer-size property");
967   }
968
969   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
970   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
971   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
972   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
973   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
974
975   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
976   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
977   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
978   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
979
980   /* we keep these elements, we configure all in configure_transport when the
981    * server told us to really use the UDP ports. */
982   stream->udpsrc[0] = udpsrc0;
983   stream->udpsrc[1] = udpsrc1;
984   stream->udpsink[0] = udpsink0;
985   stream->udpsink[1] = udpsink1;
986   stream->server_port.min = rtpport;
987   stream->server_port.max = rtcpport;
988
989   return TRUE;
990
991   /* ERRORS */
992 no_udp_protocol:
993   {
994     goto cleanup;
995   }
996 no_ports:
997   {
998     goto cleanup;
999   }
1000 no_udp_rtcp_protocol:
1001   {
1002     goto cleanup;
1003   }
1004 port_error:
1005   {
1006     goto cleanup;
1007   }
1008 cleanup:
1009   {
1010     if (udpsrc0) {
1011       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1012       gst_object_unref (udpsrc0);
1013     }
1014     if (udpsrc1) {
1015       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1016       gst_object_unref (udpsrc1);
1017     }
1018     if (udpsink0) {
1019       gst_element_set_state (udpsink0, GST_STATE_NULL);
1020       gst_object_unref (udpsink0);
1021     }
1022     if (udpsink1) {
1023       gst_element_set_state (udpsink1, GST_STATE_NULL);
1024       gst_object_unref (udpsink1);
1025     }
1026     return FALSE;
1027   }
1028 }
1029
1030 /* executed from streaming thread */
1031 static void
1032 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
1033 {
1034   gchar *capsstr;
1035   GstCaps *newcaps, *oldcaps;
1036
1037   newcaps = gst_pad_get_current_caps (pad);
1038
1039   oldcaps = stream->caps;
1040   stream->caps = newcaps;
1041
1042   if (oldcaps)
1043     gst_caps_unref (oldcaps);
1044
1045   capsstr = gst_caps_to_string (newcaps);
1046   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
1047   g_free (capsstr);
1048 }
1049
1050 static void
1051 dump_structure (const GstStructure * s)
1052 {
1053   gchar *sstr;
1054
1055   sstr = gst_structure_to_string (s);
1056   GST_INFO ("structure: %s", sstr);
1057   g_free (sstr);
1058 }
1059
1060 static GstRTSPMediaTrans *
1061 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1062 {
1063   GList *walk;
1064   GstRTSPMediaTrans *result = NULL;
1065   const gchar *tmp;
1066   gchar *dest;
1067   guint port;
1068
1069   if (rtcp_from == NULL)
1070     return NULL;
1071
1072   tmp = g_strrstr (rtcp_from, ":");
1073   if (tmp == NULL)
1074     return NULL;
1075
1076   port = atoi (tmp + 1);
1077   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1078
1079   GST_INFO ("finding %s:%d", dest, port);
1080
1081   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1082     GstRTSPMediaTrans *trans = walk->data;
1083     gint min, max;
1084
1085     min = trans->transport->client_port.min;
1086     max = trans->transport->client_port.max;
1087
1088     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1089             || max == port)) {
1090       result = trans;
1091       break;
1092     }
1093   }
1094   g_free (dest);
1095
1096   return result;
1097 }
1098
1099 static void
1100 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1101 {
1102   GstStructure *stats;
1103   GstRTSPMediaTrans *trans;
1104
1105   GST_INFO ("%p: new source %p", stream, source);
1106
1107   /* see if we have a stream to match with the origin of the RTCP packet */
1108   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1109   if (trans == NULL) {
1110     g_object_get (source, "stats", &stats, NULL);
1111     if (stats) {
1112       const gchar *rtcp_from;
1113
1114       dump_structure (stats);
1115
1116       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1117       if ((trans = find_transport (stream, rtcp_from))) {
1118         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1119             source);
1120
1121         /* keep ref to the source */
1122         trans->rtpsource = source;
1123
1124         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1125       }
1126       gst_structure_free (stats);
1127     }
1128   } else {
1129     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1130   }
1131 }
1132
1133 static void
1134 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1135 {
1136   GST_INFO ("%p: new SDES %p", stream, source);
1137 }
1138
1139 static void
1140 on_ssrc_active (GObject * session, GObject * source,
1141     GstRTSPMediaStream * stream)
1142 {
1143   GstRTSPMediaTrans *trans;
1144
1145   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1146
1147   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1148
1149   if (trans && trans->keep_alive)
1150     trans->keep_alive (trans->ka_user_data);
1151
1152 #ifdef DUMP_STATS
1153   {
1154     GstStructure *stats;
1155     g_object_get (source, "stats", &stats, NULL);
1156     if (stats) {
1157       dump_structure (stats);
1158       gst_structure_free (stats);
1159     }
1160   }
1161 #endif
1162 }
1163
1164 static void
1165 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1166 {
1167   GST_INFO ("%p: source %p bye", stream, source);
1168 }
1169
1170 static void
1171 on_bye_timeout (GObject * session, GObject * source,
1172     GstRTSPMediaStream * stream)
1173 {
1174   GstRTSPMediaTrans *trans;
1175
1176   GST_INFO ("%p: source %p bye timeout", stream, source);
1177
1178   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1179     trans->rtpsource = NULL;
1180     trans->timeout = TRUE;
1181   }
1182 }
1183
1184 static void
1185 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1186 {
1187   GstRTSPMediaTrans *trans;
1188
1189   GST_INFO ("%p: source %p timeout", stream, source);
1190
1191   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1192     trans->rtpsource = NULL;
1193     trans->timeout = TRUE;
1194   }
1195 }
1196
1197 static GstFlowReturn
1198 handle_new_sample (GstAppSink * sink, gpointer user_data)
1199 {
1200   GList *walk;
1201   GstSample *sample;
1202   GstBuffer *buffer;
1203   GstRTSPMediaStream *stream;
1204
1205   sample = gst_app_sink_pull_sample (sink);
1206   if (!sample)
1207     return GST_FLOW_OK;
1208
1209   stream = (GstRTSPMediaStream *) user_data;
1210   buffer = gst_sample_get_buffer (sample);
1211
1212   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1213     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1214
1215     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1216       if (tr->send_rtp)
1217         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1218     } else {
1219       if (tr->send_rtcp)
1220         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1221     }
1222   }
1223   gst_sample_unref (sample);
1224
1225   return GST_FLOW_OK;
1226 }
1227
1228 static GstAppSinkCallbacks sink_cb = {
1229   NULL,                         /* not interested in EOS */
1230   NULL,                         /* not interested in preroll samples */
1231   handle_new_sample,
1232 };
1233
1234 /* prepare the pipeline objects to handle @stream in @media */
1235 static gboolean
1236 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1237 {
1238   gchar *name;
1239   GstPad *pad, *teepad, *queuepad, *selpad;
1240   GstPadLinkReturn ret;
1241   gint i;
1242
1243   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1244    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1245    * elements */
1246   if (!alloc_udp_ports (media, stream))
1247     return FALSE;
1248
1249   /* add the ports to the pipeline */
1250   for (i = 0; i < 2; i++) {
1251     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1252     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1253   }
1254
1255   /* create elements for the TCP transfer */
1256   for (i = 0; i < 2; i++) {
1257     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1258     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
1259     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1260     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1261     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1262     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appqueue[i]);
1263     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1264     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1265     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1266         &sink_cb, stream, NULL);
1267   }
1268
1269   /* hook up the stream to the RTP session elements. */
1270   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1271   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1272   g_free (name);
1273   name = g_strdup_printf ("send_rtp_src_%u", idx);
1274   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1275   g_free (name);
1276   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1277   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1278   g_free (name);
1279   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1280   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1281   g_free (name);
1282   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1283   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1284   g_free (name);
1285
1286   /* get the session */
1287   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1288       &stream->session);
1289
1290   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1291       stream);
1292   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1293       stream);
1294   g_signal_connect (stream->session, "on-ssrc-active",
1295       (GCallback) on_ssrc_active, stream);
1296   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1297       stream);
1298   g_signal_connect (stream->session, "on-bye-timeout",
1299       (GCallback) on_bye_timeout, stream);
1300   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1301       stream);
1302
1303   /* link the RTP pad to the session manager */
1304   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1305   if (ret != GST_PAD_LINK_OK)
1306     goto link_failed;
1307
1308   /* make tee for RTP and link to stream */
1309   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1310   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1311
1312   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1313   gst_pad_link (stream->send_rtp_src, pad);
1314   gst_object_unref (pad);
1315
1316   /* link RTP sink, we're pretty sure this will work. */
1317   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1318   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1319   gst_pad_link (teepad, pad);
1320   gst_object_unref (pad);
1321   gst_object_unref (teepad);
1322
1323   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1324   pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
1325   gst_pad_link (teepad, pad);
1326   gst_object_unref (pad);
1327   gst_object_unref (teepad);
1328
1329   queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
1330   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1331   gst_pad_link (queuepad, pad);
1332   gst_object_unref (pad);
1333   gst_object_unref (queuepad);
1334
1335   /* make tee for RTCP */
1336   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1337   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1338
1339   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1340   gst_pad_link (stream->send_rtcp_src, pad);
1341   gst_object_unref (pad);
1342
1343   /* link RTCP elements */
1344   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1345   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1346   gst_pad_link (teepad, pad);
1347   gst_object_unref (pad);
1348   gst_object_unref (teepad);
1349
1350   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1351   pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
1352   gst_pad_link (teepad, pad);
1353   gst_object_unref (pad);
1354   gst_object_unref (teepad);
1355
1356   queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
1357   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1358   gst_pad_link (queuepad, pad);
1359   gst_object_unref (pad);
1360   gst_object_unref (queuepad);
1361
1362   /* make selector for the RTP receivers */
1363   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
1364   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1365
1366   pad = gst_element_get_static_pad (stream->selector[0], "src");
1367   gst_pad_link (pad, stream->recv_rtp_sink);
1368   gst_object_unref (pad);
1369
1370   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1371   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1372   gst_pad_link (pad, selpad);
1373   gst_object_unref (pad);
1374   gst_object_unref (selpad);
1375
1376   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1377   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1378   gst_pad_link (pad, selpad);
1379   gst_object_unref (pad);
1380   gst_object_unref (selpad);
1381
1382   /* make selector for the RTCP receivers */
1383   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
1384   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1385
1386   pad = gst_element_get_static_pad (stream->selector[1], "src");
1387   gst_pad_link (pad, stream->recv_rtcp_sink);
1388   gst_object_unref (pad);
1389
1390   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1391   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1392   gst_pad_link (pad, selpad);
1393   gst_object_unref (pad);
1394   gst_object_unref (selpad);
1395
1396   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1397   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1398   gst_pad_link (pad, selpad);
1399   gst_object_unref (pad);
1400   gst_object_unref (selpad);
1401
1402   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1403    * values */
1404   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1405   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1406   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1407   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1408
1409   /* be notified of caps changes */
1410   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1411       (GCallback) caps_notify, stream);
1412
1413   stream->prepared = TRUE;
1414
1415   return TRUE;
1416
1417   /* ERRORS */
1418 link_failed:
1419   {
1420     GST_WARNING ("failed to link stream %d", idx);
1421     return FALSE;
1422   }
1423 }
1424
1425 static void
1426 unlock_streams (GstRTSPMedia * media)
1427 {
1428   guint i, n_streams;
1429
1430   /* unlock the udp src elements */
1431   n_streams = gst_rtsp_media_n_streams (media);
1432   for (i = 0; i < n_streams; i++) {
1433     GstRTSPMediaStream *stream;
1434
1435     stream = gst_rtsp_media_get_stream (media, i);
1436
1437     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1438     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1439   }
1440 }
1441
1442 static void
1443 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1444 {
1445   g_mutex_lock (&media->lock);
1446   /* never overwrite the error status */
1447   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1448     media->status = status;
1449   GST_DEBUG ("setting new status to %d", status);
1450   g_cond_broadcast (&media->cond);
1451   g_mutex_unlock (&media->lock);
1452 }
1453
1454 static GstRTSPMediaStatus
1455 gst_rtsp_media_get_status (GstRTSPMedia * media)
1456 {
1457   GstRTSPMediaStatus result;
1458   gint64 end_time;
1459
1460   g_mutex_lock (&media->lock);
1461   end_time = g_get_monotonic_time () + 20 * G_TIME_SPAN_SECOND;
1462   /* while we are preparing, wait */
1463   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1464     GST_DEBUG ("waiting for status change");
1465     if (!g_cond_wait_until (&media->cond, &media->lock, end_time)) {
1466       GST_DEBUG ("timeout, assuming error status");
1467       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1468     }
1469   }
1470   /* could be success or error */
1471   result = media->status;
1472   GST_DEBUG ("got status %d", result);
1473   g_mutex_unlock (&media->lock);
1474
1475   return result;
1476 }
1477
1478 static gboolean
1479 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1480 {
1481   GstMessageType type;
1482
1483   type = GST_MESSAGE_TYPE (message);
1484
1485   switch (type) {
1486     case GST_MESSAGE_STATE_CHANGED:
1487       break;
1488     case GST_MESSAGE_BUFFERING:
1489     {
1490       gint percent;
1491
1492       gst_message_parse_buffering (message, &percent);
1493
1494       /* no state management needed for live pipelines */
1495       if (media->is_live)
1496         break;
1497
1498       if (percent == 100) {
1499         /* a 100% message means buffering is done */
1500         media->buffering = FALSE;
1501         /* if the desired state is playing, go back */
1502         if (media->target_state == GST_STATE_PLAYING) {
1503           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1504           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1505         } else {
1506           GST_INFO ("Buffering done");
1507         }
1508       } else {
1509         /* buffering busy */
1510         if (media->buffering == FALSE) {
1511           if (media->target_state == GST_STATE_PLAYING) {
1512             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1513             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1514             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1515           } else {
1516             GST_INFO ("Buffering ...");
1517           }
1518         }
1519         media->buffering = TRUE;
1520       }
1521       break;
1522     }
1523     case GST_MESSAGE_LATENCY:
1524     {
1525       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1526       break;
1527     }
1528     case GST_MESSAGE_ERROR:
1529     {
1530       GError *gerror;
1531       gchar *debug;
1532
1533       gst_message_parse_error (message, &gerror, &debug);
1534       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1535       g_error_free (gerror);
1536       g_free (debug);
1537
1538       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1539       break;
1540     }
1541     case GST_MESSAGE_WARNING:
1542     {
1543       GError *gerror;
1544       gchar *debug;
1545
1546       gst_message_parse_warning (message, &gerror, &debug);
1547       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1548       g_error_free (gerror);
1549       g_free (debug);
1550       break;
1551     }
1552     case GST_MESSAGE_ELEMENT:
1553       break;
1554     case GST_MESSAGE_STREAM_STATUS:
1555       break;
1556     case GST_MESSAGE_ASYNC_DONE:
1557       if (!media->adding) {
1558         /* when we are dynamically adding pads, the addition of the udpsrc will
1559          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1560          * wait for the final ASYNC_DONE after everything prerolled */
1561         GST_INFO ("%p: got ASYNC_DONE", media);
1562         collect_media_stats (media);
1563
1564         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1565       } else {
1566         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1567       }
1568       break;
1569     case GST_MESSAGE_EOS:
1570       GST_INFO ("%p: got EOS", media);
1571       if (media->eos_pending) {
1572         GST_DEBUG ("shutting down after EOS");
1573         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1574         media->eos_pending = FALSE;
1575         g_object_unref (media);
1576       }
1577       break;
1578     default:
1579       GST_INFO ("%p: got message type %s", media,
1580           gst_message_type_get_name (type));
1581       break;
1582   }
1583   return TRUE;
1584 }
1585
1586 static gboolean
1587 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1588 {
1589   GstRTSPMediaClass *klass;
1590   gboolean ret;
1591
1592   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1593
1594   if (klass->handle_message)
1595     ret = klass->handle_message (media, message);
1596   else
1597     ret = FALSE;
1598
1599   return ret;
1600 }
1601
1602 /* called from streaming threads */
1603 static void
1604 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1605 {
1606   GstRTSPMediaStream *stream;
1607   gchar *name;
1608   gint i;
1609
1610   i = media->streams->len + 1;
1611
1612   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1613
1614   stream = g_new0 (GstRTSPMediaStream, 1);
1615   stream->payloader = element;
1616
1617   name = g_strdup_printf ("dynpay%d", i);
1618
1619   media->adding = TRUE;
1620
1621   /* ghost the pad of the payloader to the element */
1622   stream->srcpad = gst_ghost_pad_new (name, pad);
1623   gst_pad_set_active (stream->srcpad, TRUE);
1624   gst_element_add_pad (media->element, stream->srcpad);
1625   g_free (name);
1626
1627   /* add stream now */
1628   g_array_append_val (media->streams, stream);
1629
1630   setup_stream (stream, i, media);
1631
1632   for (i = 0; i < 2; i++) {
1633     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1634     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1635     gst_element_set_state (stream->appqueue[i], GST_STATE_PAUSED);
1636     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1637     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1638     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1639   }
1640   media->adding = FALSE;
1641 }
1642
1643 static void
1644 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1645 {
1646   GST_INFO ("no more pads");
1647   if (media->fakesink) {
1648     gst_object_ref (media->fakesink);
1649     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1650     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1651     gst_object_unref (media->fakesink);
1652     media->fakesink = NULL;
1653     GST_INFO ("removed fakesink");
1654   }
1655 }
1656
1657 /**
1658  * gst_rtsp_media_prepare:
1659  * @media: a #GstRTSPMedia
1660  *
1661  * Prepare @media for streaming. This function will create the pipeline and
1662  * other objects to manage the streaming.
1663  *
1664  * It will preroll the pipeline and collect vital information about the streams
1665  * such as the duration.
1666  *
1667  * Returns: %TRUE on success.
1668  */
1669 gboolean
1670 gst_rtsp_media_prepare (GstRTSPMedia * media)
1671 {
1672   GstStateChangeReturn ret;
1673   GstRTSPMediaStatus status;
1674   guint i, n_streams;
1675   GstRTSPMediaClass *klass;
1676   GstBus *bus;
1677   GList *walk;
1678
1679   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1680     goto was_prepared;
1681
1682   if (!media->reusable && media->reused)
1683     goto is_reused;
1684
1685   media->rtpbin = gst_element_factory_make ("rtpbin", NULL);
1686   if (media->rtpbin == NULL)
1687     goto no_rtpbin;
1688
1689   GST_INFO ("preparing media %p", media);
1690
1691   /* reset some variables */
1692   media->is_live = FALSE;
1693   media->seekable = FALSE;
1694   media->buffering = FALSE;
1695   /* we're preparing now */
1696   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1697
1698   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1699
1700   /* add the pipeline bus to our custom mainloop */
1701   media->source = gst_bus_create_watch (bus);
1702   gst_object_unref (bus);
1703
1704   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1705
1706   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1707   media->id = g_source_attach (media->source, klass->context);
1708
1709   /* add stuff to the bin */
1710   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1711
1712   /* link streams we already have, other streams might appear when we have
1713    * dynamic elements */
1714   n_streams = gst_rtsp_media_n_streams (media);
1715   for (i = 0; i < n_streams; i++) {
1716     GstRTSPMediaStream *stream;
1717
1718     stream = gst_rtsp_media_get_stream (media, i);
1719
1720     setup_stream (stream, i, media);
1721   }
1722
1723   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1724     GstElement *elem = walk->data;
1725
1726     GST_INFO ("adding callbacks for dynamic element %p", elem);
1727
1728     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1729     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1730
1731     /* we add a fakesink here in order to make the state change async. We remove
1732      * the fakesink again in the no-more-pads callback. */
1733     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1734     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1735   }
1736
1737   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1738   /* first go to PAUSED */
1739   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1740   media->target_state = GST_STATE_PAUSED;
1741
1742   switch (ret) {
1743     case GST_STATE_CHANGE_SUCCESS:
1744       GST_INFO ("SUCCESS state change for media %p", media);
1745       media->seekable = TRUE;
1746       break;
1747     case GST_STATE_CHANGE_ASYNC:
1748       GST_INFO ("ASYNC state change for media %p", media);
1749       media->seekable = TRUE;
1750       break;
1751     case GST_STATE_CHANGE_NO_PREROLL:
1752       /* we need to go to PLAYING */
1753       GST_INFO ("NO_PREROLL state change: live media %p", media);
1754       /* FIXME we disable seeking for live streams for now. We should perform a
1755        * seeking query in preroll instead and do a seeking query. */
1756       media->seekable = FALSE;
1757       media->is_live = TRUE;
1758       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1759       if (ret == GST_STATE_CHANGE_FAILURE)
1760         goto state_failed;
1761       break;
1762     case GST_STATE_CHANGE_FAILURE:
1763       goto state_failed;
1764   }
1765
1766   /* now wait for all pads to be prerolled */
1767   status = gst_rtsp_media_get_status (media);
1768   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1769     goto state_failed;
1770
1771   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1772
1773   GST_INFO ("object %p is prerolled", media);
1774
1775   return TRUE;
1776
1777   /* OK */
1778 was_prepared:
1779   {
1780     return TRUE;
1781   }
1782   /* ERRORS */
1783 is_reused:
1784   {
1785     GST_WARNING ("can not reuse media %p", media);
1786     return FALSE;
1787   }
1788 no_rtpbin:
1789   {
1790     GST_WARNING ("no rtpbin element");
1791     g_warning ("failed to create element 'rtpbin', check your installation");
1792     return FALSE;
1793   }
1794 state_failed:
1795   {
1796     GST_WARNING ("failed to preroll pipeline");
1797     unlock_streams (media);
1798     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1799     gst_rtsp_media_unprepare (media);
1800     return FALSE;
1801   }
1802 }
1803
1804 /**
1805  * gst_rtsp_media_unprepare:
1806  * @media: a #GstRTSPMedia
1807  *
1808  * Unprepare @media. After this call, the media should be prepared again before
1809  * it can be used again. If the media is set to be non-reusable, a new instance
1810  * must be created.
1811  *
1812  * Returns: %TRUE on success.
1813  */
1814 gboolean
1815 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1816 {
1817   GstRTSPMediaClass *klass;
1818   gboolean success;
1819
1820   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1821     return TRUE;
1822
1823   GST_INFO ("unprepare media %p", media);
1824   media->target_state = GST_STATE_NULL;
1825
1826   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1827   if (klass->unprepare)
1828     success = klass->unprepare (media);
1829   else
1830     success = TRUE;
1831
1832   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1833   media->reused = TRUE;
1834
1835   /* when the media is not reusable, this will effectively unref the media and
1836    * recreate it */
1837   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1838
1839   return success;
1840 }
1841
1842 static gboolean
1843 default_unprepare (GstRTSPMedia * media)
1844 {
1845   if (media->eos_shutdown) {
1846     GST_DEBUG ("sending EOS for shutdown");
1847     /* ref so that we don't disappear */
1848     g_object_ref (media);
1849     media->eos_pending = TRUE;
1850     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1851     /* we need to go to playing again for the EOS to propagate, normally in this
1852      * state, nothing is receiving data from us anymore so this is ok. */
1853     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1854   } else {
1855     GST_DEBUG ("shutting down");
1856     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1857   }
1858   return TRUE;
1859 }
1860
1861 static void
1862 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1863     gchar * dest, gint min, gint max)
1864 {
1865   GST_INFO ("adding %s:%d-%d", dest, min, max);
1866   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1867   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1868 }
1869
1870 static void
1871 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1872     gchar * dest, gint min, gint max)
1873 {
1874   GST_INFO ("removing %s:%d-%d", dest, min, max);
1875   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1876   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1877 }
1878
1879 /**
1880  * gst_rtsp_media_set_state:
1881  * @media: a #GstRTSPMedia
1882  * @state: the target state of the media
1883  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1884  *
1885  * Set the state of @media to @state and for the transports in @transports.
1886  *
1887  * Returns: %TRUE on success.
1888  */
1889 gboolean
1890 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1891     GArray * transports)
1892 {
1893   gint i;
1894   gboolean add, remove, do_state;
1895   gint old_active;
1896
1897   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1898   g_return_val_if_fail (transports != NULL, FALSE);
1899
1900   /* NULL and READY are the same */
1901   if (state == GST_STATE_READY)
1902     state = GST_STATE_NULL;
1903
1904   add = remove = FALSE;
1905
1906   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1907       media);
1908
1909   switch (state) {
1910     case GST_STATE_NULL:
1911       /* unlock the streams so that they follow the state changes from now on */
1912       unlock_streams (media);
1913       /* fallthrough */
1914     case GST_STATE_PAUSED:
1915       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1916       if (media->target_state == GST_STATE_PLAYING)
1917         remove = TRUE;
1918       break;
1919     case GST_STATE_PLAYING:
1920       /* we're going to PLAYING, add */
1921       add = TRUE;
1922       break;
1923     default:
1924       break;
1925   }
1926   old_active = media->active;
1927
1928   for (i = 0; i < transports->len; i++) {
1929     GstRTSPMediaTrans *tr;
1930     GstRTSPMediaStream *stream;
1931     GstRTSPTransport *trans;
1932
1933     /* we need a non-NULL entry in the array */
1934     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1935     if (tr == NULL)
1936       continue;
1937
1938     /* we need a transport */
1939     if (!(trans = tr->transport))
1940       continue;
1941
1942     /* get the stream and add the destinations */
1943     stream = gst_rtsp_media_get_stream (media, tr->idx);
1944     switch (trans->lower_transport) {
1945       case GST_RTSP_LOWER_TRANS_UDP:
1946       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1947       {
1948         gchar *dest;
1949         gint min, max;
1950
1951         dest = trans->destination;
1952         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1953           min = trans->port.min;
1954           max = trans->port.max;
1955         } else {
1956           min = trans->client_port.min;
1957           max = trans->client_port.max;
1958         }
1959
1960         if (add && !tr->active) {
1961           add_udp_destination (media, stream, dest, min, max);
1962           stream->transports = g_list_prepend (stream->transports, tr);
1963           tr->active = TRUE;
1964           media->active++;
1965         } else if (remove && tr->active) {
1966           remove_udp_destination (media, stream, dest, min, max);
1967           stream->transports = g_list_remove (stream->transports, tr);
1968           tr->active = FALSE;
1969           media->active--;
1970         }
1971         break;
1972       }
1973       case GST_RTSP_LOWER_TRANS_TCP:
1974         if (add && !tr->active) {
1975           GST_INFO ("adding TCP %s", trans->destination);
1976           stream->transports = g_list_prepend (stream->transports, tr);
1977           tr->active = TRUE;
1978           media->active++;
1979         } else if (remove && tr->active) {
1980           GST_INFO ("removing TCP %s", trans->destination);
1981           stream->transports = g_list_remove (stream->transports, tr);
1982           tr->active = FALSE;
1983           media->active--;
1984         }
1985         break;
1986       default:
1987         GST_INFO ("Unknown transport %d", trans->lower_transport);
1988         break;
1989     }
1990   }
1991
1992   /* we just added the first media, do the playing state change */
1993   if (old_active == 0 && add)
1994     do_state = TRUE;
1995   /* if we have no more active media, do the downward state changes */
1996   else if (media->active == 0)
1997     do_state = TRUE;
1998   else
1999     do_state = FALSE;
2000
2001   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2002       media, do_state);
2003
2004   if (media->target_state != state) {
2005     if (do_state) {
2006       if (state == GST_STATE_NULL) {
2007         gst_rtsp_media_unprepare (media);
2008       } else {
2009         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2010             media);
2011         media->target_state = state;
2012         gst_element_set_state (media->pipeline, state);
2013       }
2014     }
2015     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2016         NULL);
2017   }
2018
2019   /* remember where we are */
2020   if (state != GST_STATE_NULL && (state == GST_STATE_PAUSED ||
2021           old_active != media->active))
2022     collect_media_stats (media);
2023
2024   return TRUE;
2025 }
2026
2027 /**
2028  * gst_rtsp_media_remove_elements:
2029  * @media: a #GstRTSPMedia
2030  *
2031  * Remove all elements and the pipeline controlled by @media.
2032  */
2033 void
2034 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2035 {
2036   gint i, j;
2037
2038   unlock_streams (media);
2039
2040   for (i = 0; i < media->streams->len; i++) {
2041     GstRTSPMediaStream *stream;
2042
2043     GST_INFO ("Removing elements of stream %d from pipeline", i);
2044
2045     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2046
2047     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2048
2049     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2050
2051     for (j = 0; j < 2; j++) {
2052       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2053       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2054       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2055       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2056       gst_element_set_state (stream->appqueue[j], GST_STATE_NULL);
2057       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2058       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2059
2060       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2061       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2062       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2063       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2064       gst_bin_remove (GST_BIN (media->pipeline), stream->appqueue[j]);
2065       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2066       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2067     }
2068     if (stream->caps)
2069       gst_caps_unref (stream->caps);
2070     stream->caps = NULL;
2071     gst_rtsp_media_stream_free (stream);
2072   }
2073   g_array_remove_range (media->streams, 0, media->streams->len);
2074
2075   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2076   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2077
2078   gst_object_unref (media->pipeline);
2079   media->pipeline = NULL;
2080 }