38a37d710f4ad5657cc0a054f8743e5e056fc465
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-funnel.h"
27 #include "rtsp-media.h"
28
29 #define DEFAULT_SHARED         FALSE
30 #define DEFAULT_REUSABLE       FALSE
31 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
32 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
33 #define DEFAULT_EOS_SHUTDOWN   FALSE
34 #define DEFAULT_BUFFER_SIZE    0x80000
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_LAST
48 };
49
50 enum
51 {
52   SIGNAL_PREPARED,
53   SIGNAL_UNPREPARED,
54   SIGNAL_NEW_STATE,
55   SIGNAL_LAST
56 };
57
58 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
59 #define GST_CAT_DEFAULT rtsp_media_debug
60
61 static GQuark ssrc_stream_map_key;
62
63 static void gst_rtsp_media_get_property (GObject * object, guint propid,
64     GValue * value, GParamSpec * pspec);
65 static void gst_rtsp_media_set_property (GObject * object, guint propid,
66     const GValue * value, GParamSpec * pspec);
67 static void gst_rtsp_media_finalize (GObject * obj);
68
69 static gpointer do_loop (GstRTSPMediaClass * klass);
70 static gboolean default_handle_message (GstRTSPMedia * media,
71     GstMessage * message);
72 static gboolean default_unprepare (GstRTSPMedia * media);
73 static void unlock_streams (GstRTSPMedia * media);
74
75 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
76
77 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
78
79 static void
80 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
81 {
82   GObjectClass *gobject_class;
83   GError *error = NULL;
84
85   gobject_class = G_OBJECT_CLASS (klass);
86
87   gobject_class->get_property = gst_rtsp_media_get_property;
88   gobject_class->set_property = gst_rtsp_media_set_property;
89   gobject_class->finalize = gst_rtsp_media_finalize;
90
91   g_object_class_install_property (gobject_class, PROP_SHARED,
92       g_param_spec_boolean ("shared", "Shared",
93           "If this media pipeline can be shared", DEFAULT_SHARED,
94           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
95
96   g_object_class_install_property (gobject_class, PROP_REUSABLE,
97       g_param_spec_boolean ("reusable", "Reusable",
98           "If this media pipeline can be reused after an unprepare",
99           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100
101   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
102       g_param_spec_flags ("protocols", "Protocols",
103           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
104           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105
106   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
107       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
108           "Send an EOS event to the pipeline before unpreparing",
109           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
110
111   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
112       g_param_spec_uint ("buffer-size", "Buffer Size",
113           "The kernel UDP buffer size to use", 0, G_MAXUINT,
114           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115
116   gst_rtsp_media_signals[SIGNAL_PREPARED] =
117       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
118       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
119       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
120
121   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
122       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
123       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
124       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
125
126   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
127       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
128       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
129       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
130
131   klass->context = g_main_context_new ();
132   klass->loop = g_main_loop_new (klass->context, TRUE);
133
134   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
135
136   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
137   if (error != NULL) {
138     g_critical ("could not start bus thread: %s", error->message);
139   }
140   klass->handle_message = default_handle_message;
141   klass->unprepare = default_unprepare;
142
143   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
144
145   gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL);
146
147 }
148
149 static void
150 gst_rtsp_media_init (GstRTSPMedia * media)
151 {
152   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
153   media->lock = g_mutex_new ();
154   media->cond = g_cond_new ();
155
156   media->shared = DEFAULT_SHARED;
157   media->reusable = DEFAULT_REUSABLE;
158   media->protocols = DEFAULT_PROTOCOLS;
159   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
160   media->buffer_size = DEFAULT_BUFFER_SIZE;
161 }
162
163 /* FIXME. this should be done in multiudpsink */
164 typedef struct
165 {
166   gint count;
167   gchar *dest;
168   gint min, max;
169 } RTSPDestination;
170
171 static gint
172 dest_compare (RTSPDestination * a, RTSPDestination * b)
173 {
174   if ((a->min == b->min) && (a->max == b->max)
175       && (strcmp (a->dest, b->dest) == 0))
176     return 0;
177
178   return 1;
179 }
180
181 static RTSPDestination *
182 create_destination (const gchar * dest, gint min, gint max)
183 {
184   RTSPDestination *res;
185
186   res = g_slice_new (RTSPDestination);
187   res->count = 1;
188   res->dest = g_strdup (dest);
189   res->min = min;
190   res->max = max;
191
192   return res;
193 }
194
195 static void
196 free_destination (RTSPDestination * dest)
197 {
198   g_free (dest->dest);
199   g_slice_free (RTSPDestination, dest);
200 }
201
202 void
203 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
204 {
205   if (trans->transport) {
206     gst_rtsp_transport_free (trans->transport);
207     trans->transport = NULL;
208   }
209   if (trans->rtpsource) {
210     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
211     trans->rtpsource = NULL;
212   }
213 }
214
215 static void
216 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
217 {
218   if (stream->session)
219     g_object_unref (stream->session);
220
221   if (stream->caps)
222     gst_caps_unref (stream->caps);
223
224   if (stream->send_rtp_sink)
225     gst_object_unref (stream->send_rtp_sink);
226   if (stream->send_rtp_src)
227     gst_object_unref (stream->send_rtp_src);
228   if (stream->send_rtcp_src)
229     gst_object_unref (stream->send_rtcp_src);
230   if (stream->recv_rtcp_sink)
231     gst_object_unref (stream->recv_rtcp_sink);
232   if (stream->recv_rtp_sink)
233     gst_object_unref (stream->recv_rtp_sink);
234
235   g_list_free (stream->transports);
236
237   g_free (stream);
238 }
239
240 static void
241 gst_rtsp_media_finalize (GObject * obj)
242 {
243   GstRTSPMedia *media;
244   guint i;
245
246   media = GST_RTSP_MEDIA (obj);
247
248   GST_INFO ("finalize media %p", media);
249
250   if (media->pipeline) {
251     unlock_streams (media);
252     gst_element_set_state (media->pipeline, GST_STATE_NULL);
253     gst_object_unref (media->pipeline);
254   }
255
256   for (i = 0; i < media->streams->len; i++) {
257     GstRTSPMediaStream *stream;
258
259     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
260
261     gst_rtsp_media_stream_free (stream);
262   }
263   g_array_free (media->streams, TRUE);
264
265   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
266   g_list_free (media->dynamic);
267
268   if (media->source) {
269     g_source_destroy (media->source);
270     g_source_unref (media->source);
271   }
272   g_mutex_free (media->lock);
273   g_cond_free (media->cond);
274
275   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
276 }
277
278 static void
279 gst_rtsp_media_get_property (GObject * object, guint propid,
280     GValue * value, GParamSpec * pspec)
281 {
282   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
283
284   switch (propid) {
285     case PROP_SHARED:
286       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
287       break;
288     case PROP_REUSABLE:
289       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
290       break;
291     case PROP_PROTOCOLS:
292       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
293       break;
294     case PROP_EOS_SHUTDOWN:
295       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
296       break;
297     case PROP_BUFFER_SIZE:
298       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
299       break;
300     default:
301       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
302   }
303 }
304
305 static void
306 gst_rtsp_media_set_property (GObject * object, guint propid,
307     const GValue * value, GParamSpec * pspec)
308 {
309   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
310
311   switch (propid) {
312     case PROP_SHARED:
313       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
314       break;
315     case PROP_REUSABLE:
316       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
317       break;
318     case PROP_PROTOCOLS:
319       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
320       break;
321     case PROP_EOS_SHUTDOWN:
322       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
323       break;
324     case PROP_BUFFER_SIZE:
325       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
326       break;
327     default:
328       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
329   }
330 }
331
332 static gpointer
333 do_loop (GstRTSPMediaClass * klass)
334 {
335   GST_INFO ("enter mainloop");
336   g_main_loop_run (klass->loop);
337   GST_INFO ("exit mainloop");
338
339   return NULL;
340 }
341
342 static void
343 collect_media_stats (GstRTSPMedia * media)
344 {
345   GstFormat format;
346   gint64 position, duration;
347
348   media->range.unit = GST_RTSP_RANGE_NPT;
349
350   if (media->is_live) {
351     media->range.min.type = GST_RTSP_TIME_NOW;
352     media->range.min.seconds = -1;
353     media->range.max.type = GST_RTSP_TIME_END;
354     media->range.max.seconds = -1;
355   } else {
356     /* get the position */
357     format = GST_FORMAT_TIME;
358     if (!gst_element_query_position (media->pipeline, &format, &position)) {
359       GST_INFO ("position query failed");
360       position = 0;
361     }
362
363     /* get the duration */
364     format = GST_FORMAT_TIME;
365     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
366       GST_INFO ("duration query failed");
367       duration = -1;
368     }
369
370     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
371         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
372
373     if (position == -1) {
374       media->range.min.type = GST_RTSP_TIME_NOW;
375       media->range.min.seconds = -1;
376     } else {
377       media->range.min.type = GST_RTSP_TIME_SECONDS;
378       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
379     }
380     if (duration == -1) {
381       media->range.max.type = GST_RTSP_TIME_END;
382       media->range.max.seconds = -1;
383     } else {
384       media->range.max.type = GST_RTSP_TIME_SECONDS;
385       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
386     }
387   }
388 }
389
390 /**
391  * gst_rtsp_media_new:
392  *
393  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
394  * element to produde RTP data for one or more related (audio/video/..) 
395  * streams.
396  *
397  * Returns: a new #GstRTSPMedia object.
398  */
399 GstRTSPMedia *
400 gst_rtsp_media_new (void)
401 {
402   GstRTSPMedia *result;
403
404   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
405
406   return result;
407 }
408
409 /**
410  * gst_rtsp_media_set_shared:
411  * @media: a #GstRTSPMedia
412  * @shared: the new value
413  *
414  * Set or unset if the pipeline for @media can be shared will multiple clients.
415  * When @shared is %TRUE, client requests for this media will share the media
416  * pipeline.
417  */
418 void
419 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
420 {
421   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
422
423   media->shared = shared;
424 }
425
426 /**
427  * gst_rtsp_media_is_shared:
428  * @media: a #GstRTSPMedia
429  *
430  * Check if the pipeline for @media can be shared between multiple clients.
431  *
432  * Returns: %TRUE if the media can be shared between clients.
433  */
434 gboolean
435 gst_rtsp_media_is_shared (GstRTSPMedia * media)
436 {
437   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
438
439   return media->shared;
440 }
441
442 /**
443  * gst_rtsp_media_set_reusable:
444  * @media: a #GstRTSPMedia
445  * @reusable: the new value
446  *
447  * Set or unset if the pipeline for @media can be reused after the pipeline has
448  * been unprepared.
449  */
450 void
451 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
452 {
453   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
454
455   media->reusable = reusable;
456 }
457
458 /**
459  * gst_rtsp_media_is_reusable:
460  * @media: a #GstRTSPMedia
461  *
462  * Check if the pipeline for @media can be reused after an unprepare.
463  *
464  * Returns: %TRUE if the media can be reused
465  */
466 gboolean
467 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
468 {
469   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
470
471   return media->reusable;
472 }
473
474 /**
475  * gst_rtsp_media_set_protocols:
476  * @media: a #GstRTSPMedia
477  * @protocols: the new flags
478  *
479  * Configure the allowed lower transport for @media.
480  */
481 void
482 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
483 {
484   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
485
486   media->protocols = protocols;
487 }
488
489 /**
490  * gst_rtsp_media_get_protocols:
491  * @media: a #GstRTSPMedia
492  *
493  * Get the allowed protocols of @media.
494  *
495  * Returns: a #GstRTSPLowerTrans
496  */
497 GstRTSPLowerTrans
498 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
499 {
500   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
501       GST_RTSP_LOWER_TRANS_UNKNOWN);
502
503   return media->protocols;
504 }
505
506 /**
507  * gst_rtsp_media_set_eos_shutdown:
508  * @media: a #GstRTSPMedia
509  * @eos_shutdown: the new value
510  *
511  * Set or unset if an EOS event will be sent to the pipeline for @media before
512  * it is unprepared.
513  */
514 void
515 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
516 {
517   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
518
519   media->eos_shutdown = eos_shutdown;
520 }
521
522 /**
523  * gst_rtsp_media_is_eos_shutdown:
524  * @media: a #GstRTSPMedia
525  *
526  * Check if the pipeline for @media will send an EOS down the pipeline before
527  * unpreparing.
528  *
529  * Returns: %TRUE if the media will send EOS before unpreparing.
530  */
531 gboolean
532 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
533 {
534   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
535
536   return media->eos_shutdown;
537 }
538
539 /**
540  * gst_rtsp_media_set_buffer_size:
541  * @media: a #GstRTSPMedia
542  * @size: the new value
543  *
544  * Set the kernel UDP buffer size.
545  */
546 void
547 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
548 {
549   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
550
551   media->buffer_size = size;
552 }
553
554 /**
555  * gst_rtsp_media_get_buffer_size:
556  * @media: a #GstRTSPMedia
557  *
558  * Get the kernel UDP buffer size.
559  *
560  * Returns: the kernel UDP buffer size.
561  */
562 guint
563 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
564 {
565   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
566
567   return media->buffer_size;
568 }
569
570 /**
571  * gst_rtsp_media_set_auth:
572  * @media: a #GstRTSPMedia
573  * @auth: a #GstRTSPAuth
574  *
575  * configure @auth to be used as the authentication manager of @media.
576  */
577 void
578 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
579 {
580   GstRTSPAuth *old;
581
582   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
583
584   old = media->auth;
585
586   if (old != auth) {
587     if (auth)
588       g_object_ref (auth);
589     media->auth = auth;
590     if (old)
591       g_object_unref (old);
592   }
593 }
594
595 /**
596  * gst_rtsp_media_get_auth:
597  * @media: a #GstRTSPMedia
598  *
599  * Get the #GstRTSPAuth used as the authentication manager of @media.
600  *
601  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
602  * usage.
603  */
604 GstRTSPAuth *
605 gst_rtsp_media_get_auth (GstRTSPMedia * media)
606 {
607   GstRTSPAuth *result;
608
609   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
610
611   if ((result = media->auth))
612     g_object_ref (result);
613
614   return result;
615 }
616
617
618 /**
619  * gst_rtsp_media_n_streams:
620  * @media: a #GstRTSPMedia
621  *
622  * Get the number of streams in this media.
623  *
624  * Returns: The number of streams.
625  */
626 guint
627 gst_rtsp_media_n_streams (GstRTSPMedia * media)
628 {
629   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
630
631   return media->streams->len;
632 }
633
634 /**
635  * gst_rtsp_media_get_stream:
636  * @media: a #GstRTSPMedia
637  * @idx: the stream index
638  *
639  * Retrieve the stream with index @idx from @media.
640  *
641  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
642  * that index did not exist.
643  */
644 GstRTSPMediaStream *
645 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
646 {
647   GstRTSPMediaStream *res;
648
649   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
650
651   if (idx < media->streams->len)
652     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
653   else
654     res = NULL;
655
656   return res;
657 }
658
659 /**
660  * gst_rtsp_media_get_range_string:
661  * @media: a #GstRTSPMedia
662  * @play: for the PLAY request
663  *
664  * Get the current range as a string.
665  *
666  * Returns: The range as a string, g_free() after usage.
667  */
668 gchar *
669 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
670 {
671   gchar *result;
672   GstRTSPTimeRange range;
673
674   /* make copy */
675   range = media->range;
676
677   if (!play && media->active > 0) {
678     range.min.type = GST_RTSP_TIME_NOW;
679     range.min.seconds = -1;
680   }
681
682   result = gst_rtsp_range_to_string (&range);
683
684   return result;
685 }
686
687 /**
688  * gst_rtsp_media_seek:
689  * @media: a #GstRTSPMedia
690  * @range: a #GstRTSPTimeRange
691  *
692  * Seek the pipeline to @range.
693  *
694  * Returns: %TRUE on success.
695  */
696 gboolean
697 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
698 {
699   GstSeekFlags flags;
700   gboolean res;
701   gint64 start, stop;
702   GstSeekType start_type, stop_type;
703
704   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
705   g_return_val_if_fail (range != NULL, FALSE);
706
707   if (range->unit != GST_RTSP_RANGE_NPT)
708     goto not_supported;
709
710   /* depends on the current playing state of the pipeline. We might need to
711    * queue this until we get EOS. */
712   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
713
714   start_type = stop_type = GST_SEEK_TYPE_NONE;
715
716   switch (range->min.type) {
717     case GST_RTSP_TIME_NOW:
718       start = -1;
719       break;
720     case GST_RTSP_TIME_SECONDS:
721       /* only seek when something changed */
722       if (media->range.min.seconds == range->min.seconds) {
723         start = -1;
724       } else {
725         start = range->min.seconds * GST_SECOND;
726         start_type = GST_SEEK_TYPE_SET;
727       }
728       break;
729     case GST_RTSP_TIME_END:
730     default:
731       goto weird_type;
732   }
733   switch (range->max.type) {
734     case GST_RTSP_TIME_SECONDS:
735       /* only seek when something changed */
736       if (media->range.max.seconds == range->max.seconds) {
737         stop = -1;
738       } else {
739         stop = range->max.seconds * GST_SECOND;
740         stop_type = GST_SEEK_TYPE_SET;
741       }
742       break;
743     case GST_RTSP_TIME_END:
744       stop = -1;
745       stop_type = GST_SEEK_TYPE_SET;
746       break;
747     case GST_RTSP_TIME_NOW:
748     default:
749       goto weird_type;
750   }
751
752   if (start != -1 || stop != -1) {
753     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
754         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
755
756     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
757         flags, start_type, start, stop_type, stop);
758
759     /* and block for the seek to complete */
760     GST_INFO ("done seeking %d", res);
761     gst_element_get_state (media->pipeline, NULL, NULL, -1);
762     GST_INFO ("prerolled again");
763
764     collect_media_stats (media);
765   } else {
766     GST_INFO ("no seek needed");
767     res = TRUE;
768   }
769
770   return res;
771
772   /* ERRORS */
773 not_supported:
774   {
775     GST_WARNING ("seek unit %d not supported", range->unit);
776     return FALSE;
777   }
778 weird_type:
779   {
780     GST_WARNING ("weird range type %d not supported", range->min.type);
781     return FALSE;
782   }
783 }
784
785 /**
786  * gst_rtsp_media_stream_rtp:
787  * @stream: a #GstRTSPMediaStream
788  * @buffer: a #GstBuffer
789  *
790  * Handle an RTP buffer for the stream. This method is usually called when a
791  * message has been received from a client using the TCP transport.
792  *
793  * This function takes ownership of @buffer.
794  *
795  * Returns: a GstFlowReturn.
796  */
797 GstFlowReturn
798 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
799 {
800   GstFlowReturn ret;
801
802   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
803
804   return ret;
805 }
806
807 /**
808  * gst_rtsp_media_stream_rtcp:
809  * @stream: a #GstRTSPMediaStream
810  * @buffer: a #GstBuffer
811  *
812  * Handle an RTCP buffer for the stream. This method is usually called when a
813  * message has been received from a client using the TCP transport.
814  *
815  * This function takes ownership of @buffer.
816  *
817  * Returns: a GstFlowReturn.
818  */
819 GstFlowReturn
820 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
821 {
822   GstFlowReturn ret;
823
824   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
825
826   return ret;
827 }
828
829 /* Allocate the udp ports and sockets */
830 static gboolean
831 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
832 {
833   GstStateChangeReturn ret;
834   GstElement *udpsrc0, *udpsrc1;
835   GstElement *udpsink0, *udpsink1;
836   gint tmp_rtp, tmp_rtcp;
837   guint count;
838   gint rtpport, rtcpport, sockfd;
839   const gchar *host;
840
841   udpsrc0 = NULL;
842   udpsrc1 = NULL;
843   udpsink0 = NULL;
844   udpsink1 = NULL;
845   count = 0;
846
847   /* Start with random port */
848   tmp_rtp = 0;
849
850   if (media->is_ipv6)
851     host = "udp://[::0]";
852   else
853     host = "udp://0.0.0.0";
854
855   /* try to allocate 2 UDP ports, the RTP port should be an even
856    * number and the RTCP port should be the next (uneven) port */
857 again:
858   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
859   if (udpsrc0 == NULL)
860     goto no_udp_protocol;
861   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
862
863   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
864   if (ret == GST_STATE_CHANGE_FAILURE) {
865     if (tmp_rtp != 0) {
866       tmp_rtp += 2;
867       if (++count > 20)
868         goto no_ports;
869
870       gst_element_set_state (udpsrc0, GST_STATE_NULL);
871       gst_object_unref (udpsrc0);
872
873       goto again;
874     }
875     goto no_udp_protocol;
876   }
877
878   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
879
880   /* check if port is even */
881   if ((tmp_rtp & 1) != 0) {
882     /* port not even, close and allocate another */
883     if (++count > 20)
884       goto no_ports;
885
886     gst_element_set_state (udpsrc0, GST_STATE_NULL);
887     gst_object_unref (udpsrc0);
888
889     tmp_rtp++;
890     goto again;
891   }
892
893   /* allocate port+1 for RTCP now */
894   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
895   if (udpsrc1 == NULL)
896     goto no_udp_rtcp_protocol;
897
898   /* set port */
899   tmp_rtcp = tmp_rtp + 1;
900   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
901
902   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
903   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
904   if (ret == GST_STATE_CHANGE_FAILURE) {
905
906     if (++count > 20)
907       goto no_ports;
908
909     gst_element_set_state (udpsrc0, GST_STATE_NULL);
910     gst_object_unref (udpsrc0);
911
912     gst_element_set_state (udpsrc1, GST_STATE_NULL);
913     gst_object_unref (udpsrc1);
914
915     tmp_rtp += 2;
916     goto again;
917   }
918
919   /* all fine, do port check */
920   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
921   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
922
923   /* this should not happen... */
924   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
925     goto port_error;
926
927   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
928   if (!udpsink0)
929     goto no_udp_protocol;
930
931   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
932   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
933   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
934
935   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
936   if (!udpsink1)
937     goto no_udp_protocol;
938
939   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
940           "send-duplicates")) {
941     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
942     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
943   } else {
944     g_warning
945         ("old multiudpsink version found without send-duplicates property");
946   }
947
948   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
949           "buffer-size")) {
950     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
951   } else {
952     GST_WARNING ("multiudpsink version found without buffer-size property");
953   }
954
955   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
956   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
957   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
958   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
959   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
960
961   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
962   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
963   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
964   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
965
966   /* we keep these elements, we configure all in configure_transport when the
967    * server told us to really use the UDP ports. */
968   stream->udpsrc[0] = udpsrc0;
969   stream->udpsrc[1] = udpsrc1;
970   stream->udpsink[0] = udpsink0;
971   stream->udpsink[1] = udpsink1;
972   stream->server_port.min = rtpport;
973   stream->server_port.max = rtcpport;
974
975   return TRUE;
976
977   /* ERRORS */
978 no_udp_protocol:
979   {
980     goto cleanup;
981   }
982 no_ports:
983   {
984     goto cleanup;
985   }
986 no_udp_rtcp_protocol:
987   {
988     goto cleanup;
989   }
990 port_error:
991   {
992     goto cleanup;
993   }
994 cleanup:
995   {
996     if (udpsrc0) {
997       gst_element_set_state (udpsrc0, GST_STATE_NULL);
998       gst_object_unref (udpsrc0);
999     }
1000     if (udpsrc1) {
1001       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1002       gst_object_unref (udpsrc1);
1003     }
1004     if (udpsink0) {
1005       gst_element_set_state (udpsink0, GST_STATE_NULL);
1006       gst_object_unref (udpsink0);
1007     }
1008     if (udpsink1) {
1009       gst_element_set_state (udpsink1, GST_STATE_NULL);
1010       gst_object_unref (udpsink1);
1011     }
1012     return FALSE;
1013   }
1014 }
1015
1016 /* executed from streaming thread */
1017 static void
1018 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
1019 {
1020   gchar *capsstr;
1021   GstCaps *newcaps, *oldcaps;
1022
1023   if ((newcaps = GST_PAD_CAPS (pad)))
1024     gst_caps_ref (newcaps);
1025
1026   oldcaps = stream->caps;
1027   stream->caps = newcaps;
1028
1029   if (oldcaps)
1030     gst_caps_unref (oldcaps);
1031
1032   capsstr = gst_caps_to_string (newcaps);
1033   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
1034   g_free (capsstr);
1035 }
1036
1037 static void
1038 dump_structure (const GstStructure * s)
1039 {
1040   gchar *sstr;
1041
1042   sstr = gst_structure_to_string (s);
1043   GST_INFO ("structure: %s", sstr);
1044   g_free (sstr);
1045 }
1046
1047 static GstRTSPMediaTrans *
1048 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1049 {
1050   GList *walk;
1051   GstRTSPMediaTrans *result = NULL;
1052   const gchar *tmp;
1053   gchar *dest;
1054   guint port;
1055
1056   if (rtcp_from == NULL)
1057     return NULL;
1058
1059   tmp = g_strrstr (rtcp_from, ":");
1060   if (tmp == NULL)
1061     return NULL;
1062
1063   port = atoi (tmp + 1);
1064   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1065
1066   GST_INFO ("finding %s:%d", dest, port);
1067
1068   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1069     GstRTSPMediaTrans *trans = walk->data;
1070     gint min, max;
1071
1072     min = trans->transport->client_port.min;
1073     max = trans->transport->client_port.max;
1074
1075     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1076             || max == port)) {
1077       result = trans;
1078       break;
1079     }
1080   }
1081   g_free (dest);
1082
1083   return result;
1084 }
1085
1086 static void
1087 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1088 {
1089   GstStructure *stats;
1090   GstRTSPMediaTrans *trans;
1091
1092   GST_INFO ("%p: new source %p", stream, source);
1093
1094   /* see if we have a stream to match with the origin of the RTCP packet */
1095   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1096   if (trans == NULL) {
1097     g_object_get (source, "stats", &stats, NULL);
1098     if (stats) {
1099       const gchar *rtcp_from;
1100
1101       dump_structure (stats);
1102
1103       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1104       if ((trans = find_transport (stream, rtcp_from))) {
1105         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1106             source);
1107
1108         /* keep ref to the source */
1109         trans->rtpsource = source;
1110
1111         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1112       }
1113       gst_structure_free (stats);
1114     }
1115   } else {
1116     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1117   }
1118 }
1119
1120 static void
1121 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1122 {
1123   GST_INFO ("%p: new SDES %p", stream, source);
1124 }
1125
1126 static void
1127 on_ssrc_active (GObject * session, GObject * source,
1128     GstRTSPMediaStream * stream)
1129 {
1130   GstRTSPMediaTrans *trans;
1131
1132   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1133
1134   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1135
1136   if (trans && trans->keep_alive)
1137     trans->keep_alive (trans->ka_user_data);
1138
1139 #ifdef DUMP_STATS
1140   {
1141     GstStructure *stats;
1142     g_object_get (source, "stats", &stats, NULL);
1143     if (stats) {
1144       dump_structure (stats);
1145       gst_structure_free (stats);
1146     }
1147   }
1148 #endif
1149 }
1150
1151 static void
1152 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1153 {
1154   GST_INFO ("%p: source %p bye", stream, source);
1155 }
1156
1157 static void
1158 on_bye_timeout (GObject * session, GObject * source,
1159     GstRTSPMediaStream * stream)
1160 {
1161   GstRTSPMediaTrans *trans;
1162
1163   GST_INFO ("%p: source %p bye timeout", stream, source);
1164
1165   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1166     trans->rtpsource = NULL;
1167     trans->timeout = TRUE;
1168   }
1169 }
1170
1171 static void
1172 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1173 {
1174   GstRTSPMediaTrans *trans;
1175
1176   GST_INFO ("%p: source %p timeout", stream, source);
1177
1178   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1179     trans->rtpsource = NULL;
1180     trans->timeout = TRUE;
1181   }
1182 }
1183
1184 static GstFlowReturn
1185 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1186 {
1187   GList *walk;
1188   GstBuffer *buffer;
1189   GstRTSPMediaStream *stream;
1190
1191   buffer = gst_app_sink_pull_buffer (sink);
1192   if (!buffer)
1193     return GST_FLOW_OK;
1194
1195   stream = (GstRTSPMediaStream *) user_data;
1196
1197   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1198     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1199
1200     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1201       if (tr->send_rtp)
1202         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1203     } else {
1204       if (tr->send_rtcp)
1205         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1206     }
1207   }
1208   gst_buffer_unref (buffer);
1209
1210   return GST_FLOW_OK;
1211 }
1212
1213 static GstFlowReturn
1214 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1215 {
1216   GList *walk;
1217   GstBufferList *blist;
1218   GstRTSPMediaStream *stream;
1219
1220   blist = gst_app_sink_pull_buffer_list (sink);
1221   if (!blist)
1222     return GST_FLOW_OK;
1223
1224   stream = (GstRTSPMediaStream *) user_data;
1225
1226   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1227     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1228
1229     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1230       if (tr->send_rtp_list)
1231         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1232             tr->user_data);
1233     } else {
1234       if (tr->send_rtcp_list)
1235         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1236             tr->user_data);
1237     }
1238   }
1239   gst_buffer_list_unref (blist);
1240
1241   return GST_FLOW_OK;
1242 }
1243
1244 static GstAppSinkCallbacks sink_cb = {
1245   NULL,                         /* not interested in EOS */
1246   NULL,                         /* not interested in preroll buffers */
1247   handle_new_buffer,
1248   handle_new_buffer_list
1249 };
1250
1251 /* prepare the pipeline objects to handle @stream in @media */
1252 static gboolean
1253 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1254 {
1255   gchar *name;
1256   GstPad *pad, *teepad, *selpad;
1257   GstPadLinkReturn ret;
1258   gint i;
1259
1260   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1261    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1262    * elements */
1263   if (!alloc_udp_ports (media, stream))
1264     return FALSE;
1265
1266   /* add the ports to the pipeline */
1267   for (i = 0; i < 2; i++) {
1268     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1269     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1270   }
1271
1272   /* create elements for the TCP transfer */
1273   for (i = 0; i < 2; i++) {
1274     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1275     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1276     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1277     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1278     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1279     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1280     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1281     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1282         &sink_cb, stream, NULL);
1283   }
1284
1285   /* hook up the stream to the RTP session elements. */
1286   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1287   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1288   g_free (name);
1289   name = g_strdup_printf ("send_rtp_src_%d", idx);
1290   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1291   g_free (name);
1292   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1293   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1294   g_free (name);
1295   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1296   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1297   g_free (name);
1298   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1299   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1300   g_free (name);
1301
1302   /* get the session */
1303   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1304       &stream->session);
1305
1306   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1307       stream);
1308   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1309       stream);
1310   g_signal_connect (stream->session, "on-ssrc-active",
1311       (GCallback) on_ssrc_active, stream);
1312   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1313       stream);
1314   g_signal_connect (stream->session, "on-bye-timeout",
1315       (GCallback) on_bye_timeout, stream);
1316   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1317       stream);
1318
1319   /* link the RTP pad to the session manager */
1320   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1321   if (ret != GST_PAD_LINK_OK)
1322     goto link_failed;
1323
1324   /* make tee for RTP and link to stream */
1325   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1326   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1327
1328   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1329   gst_pad_link (stream->send_rtp_src, pad);
1330   gst_object_unref (pad);
1331
1332   /* link RTP sink, we're pretty sure this will work. */
1333   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1334   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1335   gst_pad_link (teepad, pad);
1336   gst_object_unref (pad);
1337   gst_object_unref (teepad);
1338
1339   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1340   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1341   gst_pad_link (teepad, pad);
1342   gst_object_unref (pad);
1343   gst_object_unref (teepad);
1344
1345   /* make tee for RTCP */
1346   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1347   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1348
1349   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1350   gst_pad_link (stream->send_rtcp_src, pad);
1351   gst_object_unref (pad);
1352
1353   /* link RTCP elements */
1354   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1355   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1356   gst_pad_link (teepad, pad);
1357   gst_object_unref (pad);
1358   gst_object_unref (teepad);
1359
1360   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1361   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1362   gst_pad_link (teepad, pad);
1363   gst_object_unref (pad);
1364   gst_object_unref (teepad);
1365
1366   /* make selector for the RTP receivers */
1367   stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
1368   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1369
1370   pad = gst_element_get_static_pad (stream->selector[0], "src");
1371   gst_pad_link (pad, stream->recv_rtp_sink);
1372   gst_object_unref (pad);
1373
1374   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1375   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1376   gst_pad_link (pad, selpad);
1377   gst_object_unref (pad);
1378   gst_object_unref (selpad);
1379
1380   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1381   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1382   gst_pad_link (pad, selpad);
1383   gst_object_unref (pad);
1384   gst_object_unref (selpad);
1385
1386   /* make selector for the RTCP receivers */
1387   stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
1388   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1389
1390   pad = gst_element_get_static_pad (stream->selector[1], "src");
1391   gst_pad_link (pad, stream->recv_rtcp_sink);
1392   gst_object_unref (pad);
1393
1394   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1395   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1396   gst_pad_link (pad, selpad);
1397   gst_object_unref (pad);
1398   gst_object_unref (selpad);
1399
1400   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1401   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1402   gst_pad_link (pad, selpad);
1403   gst_object_unref (pad);
1404   gst_object_unref (selpad);
1405
1406   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1407    * values */
1408   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1409   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1410   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1411   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1412
1413   /* be notified of caps changes */
1414   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1415       (GCallback) caps_notify, stream);
1416
1417   stream->prepared = TRUE;
1418
1419   return TRUE;
1420
1421   /* ERRORS */
1422 link_failed:
1423   {
1424     GST_WARNING ("failed to link stream %d", idx);
1425     return FALSE;
1426   }
1427 }
1428
1429 static void
1430 unlock_streams (GstRTSPMedia * media)
1431 {
1432   guint i, n_streams;
1433
1434   /* unlock the udp src elements */
1435   n_streams = gst_rtsp_media_n_streams (media);
1436   for (i = 0; i < n_streams; i++) {
1437     GstRTSPMediaStream *stream;
1438
1439     stream = gst_rtsp_media_get_stream (media, i);
1440
1441     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1442     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1443   }
1444 }
1445
1446 static void
1447 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1448 {
1449   g_mutex_lock (media->lock);
1450   /* never overwrite the error status */
1451   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1452     media->status = status;
1453   GST_DEBUG ("setting new status to %d", status);
1454   g_cond_broadcast (media->cond);
1455   g_mutex_unlock (media->lock);
1456 }
1457
1458 static GstRTSPMediaStatus
1459 gst_rtsp_media_get_status (GstRTSPMedia * media)
1460 {
1461   GstRTSPMediaStatus result;
1462   GTimeVal timeout;
1463
1464   g_mutex_lock (media->lock);
1465   g_get_current_time (&timeout);
1466   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1467   /* while we are preparing, wait */
1468   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1469     GST_DEBUG ("waiting for status change");
1470     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1471       GST_DEBUG ("timeout, assuming error status");
1472       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1473     }
1474   }
1475   /* could be success or error */
1476   result = media->status;
1477   GST_DEBUG ("got status %d", result);
1478   g_mutex_unlock (media->lock);
1479
1480   return result;
1481 }
1482
1483 static gboolean
1484 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1485 {
1486   GstMessageType type;
1487
1488   type = GST_MESSAGE_TYPE (message);
1489
1490   switch (type) {
1491     case GST_MESSAGE_STATE_CHANGED:
1492       break;
1493     case GST_MESSAGE_BUFFERING:
1494     {
1495       gint percent;
1496
1497       gst_message_parse_buffering (message, &percent);
1498
1499       /* no state management needed for live pipelines */
1500       if (media->is_live)
1501         break;
1502
1503       if (percent == 100) {
1504         /* a 100% message means buffering is done */
1505         media->buffering = FALSE;
1506         /* if the desired state is playing, go back */
1507         if (media->target_state == GST_STATE_PLAYING) {
1508           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1509           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1510         } else {
1511           GST_INFO ("Buffering done");
1512         }
1513       } else {
1514         /* buffering busy */
1515         if (media->buffering == FALSE) {
1516           if (media->target_state == GST_STATE_PLAYING) {
1517             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1518             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1519             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1520           } else {
1521             GST_INFO ("Buffering ...");
1522           }
1523         }
1524         media->buffering = TRUE;
1525       }
1526       break;
1527     }
1528     case GST_MESSAGE_LATENCY:
1529     {
1530       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1531       break;
1532     }
1533     case GST_MESSAGE_ERROR:
1534     {
1535       GError *gerror;
1536       gchar *debug;
1537
1538       gst_message_parse_error (message, &gerror, &debug);
1539       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1540       g_error_free (gerror);
1541       g_free (debug);
1542
1543       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1544       break;
1545     }
1546     case GST_MESSAGE_WARNING:
1547     {
1548       GError *gerror;
1549       gchar *debug;
1550
1551       gst_message_parse_warning (message, &gerror, &debug);
1552       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1553       g_error_free (gerror);
1554       g_free (debug);
1555       break;
1556     }
1557     case GST_MESSAGE_ELEMENT:
1558       break;
1559     case GST_MESSAGE_STREAM_STATUS:
1560       break;
1561     case GST_MESSAGE_ASYNC_DONE:
1562       if (!media->adding) {
1563         /* when we are dynamically adding pads, the addition of the udpsrc will
1564          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1565          * wait for the final ASYNC_DONE after everything prerolled */
1566         GST_INFO ("%p: got ASYNC_DONE", media);
1567         collect_media_stats (media);
1568
1569         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1570       } else {
1571         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1572       }
1573       break;
1574     case GST_MESSAGE_EOS:
1575       GST_INFO ("%p: got EOS", media);
1576       if (media->eos_pending) {
1577         GST_DEBUG ("shutting down after EOS");
1578         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1579         media->eos_pending = FALSE;
1580         g_object_unref (media);
1581       }
1582       break;
1583     default:
1584       GST_INFO ("%p: got message type %s", media,
1585           gst_message_type_get_name (type));
1586       break;
1587   }
1588   return TRUE;
1589 }
1590
1591 static gboolean
1592 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1593 {
1594   GstRTSPMediaClass *klass;
1595   gboolean ret;
1596
1597   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1598
1599   if (klass->handle_message)
1600     ret = klass->handle_message (media, message);
1601   else
1602     ret = FALSE;
1603
1604   return ret;
1605 }
1606
1607 /* called from streaming threads */
1608 static void
1609 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1610 {
1611   GstRTSPMediaStream *stream;
1612   gchar *name;
1613   gint i;
1614
1615   i = media->streams->len + 1;
1616
1617   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1618
1619   stream = g_new0 (GstRTSPMediaStream, 1);
1620   stream->payloader = element;
1621
1622   name = g_strdup_printf ("dynpay%d", i);
1623
1624   media->adding = TRUE;
1625
1626   /* ghost the pad of the payloader to the element */
1627   stream->srcpad = gst_ghost_pad_new (name, pad);
1628   gst_pad_set_active (stream->srcpad, TRUE);
1629   gst_element_add_pad (media->element, stream->srcpad);
1630   g_free (name);
1631
1632   /* add stream now */
1633   g_array_append_val (media->streams, stream);
1634
1635   setup_stream (stream, i, media);
1636
1637   for (i = 0; i < 2; i++) {
1638     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1639     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1640     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1641     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1642     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1643   }
1644   media->adding = FALSE;
1645 }
1646
1647 static void
1648 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1649 {
1650   GST_INFO ("no more pads");
1651   if (media->fakesink) {
1652     gst_object_ref (media->fakesink);
1653     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1654     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1655     gst_object_unref (media->fakesink);
1656     media->fakesink = NULL;
1657     GST_INFO ("removed fakesink");
1658   }
1659 }
1660
1661 /**
1662  * gst_rtsp_media_prepare:
1663  * @media: a #GstRTSPMedia
1664  *
1665  * Prepare @media for streaming. This function will create the pipeline and
1666  * other objects to manage the streaming.
1667  *
1668  * It will preroll the pipeline and collect vital information about the streams
1669  * such as the duration.
1670  *
1671  * Returns: %TRUE on success.
1672  */
1673 gboolean
1674 gst_rtsp_media_prepare (GstRTSPMedia * media)
1675 {
1676   GstStateChangeReturn ret;
1677   GstRTSPMediaStatus status;
1678   guint i, n_streams;
1679   GstRTSPMediaClass *klass;
1680   GstBus *bus;
1681   GList *walk;
1682
1683   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1684     goto was_prepared;
1685
1686   if (!media->reusable && media->reused)
1687     goto is_reused;
1688
1689   GST_INFO ("preparing media %p", media);
1690
1691   /* reset some variables */
1692   media->is_live = FALSE;
1693   media->buffering = FALSE;
1694   /* we're preparing now */
1695   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1696
1697   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1698
1699   /* add the pipeline bus to our custom mainloop */
1700   media->source = gst_bus_create_watch (bus);
1701   gst_object_unref (bus);
1702
1703   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1704
1705   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1706   media->id = g_source_attach (media->source, klass->context);
1707
1708   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1709
1710   /* add stuff to the bin */
1711   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1712
1713   /* link streams we already have, other streams might appear when we have
1714    * dynamic elements */
1715   n_streams = gst_rtsp_media_n_streams (media);
1716   for (i = 0; i < n_streams; i++) {
1717     GstRTSPMediaStream *stream;
1718
1719     stream = gst_rtsp_media_get_stream (media, i);
1720
1721     setup_stream (stream, i, media);
1722   }
1723
1724   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1725     GstElement *elem = walk->data;
1726
1727     GST_INFO ("adding callbacks for dynamic element %p", elem);
1728
1729     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1730     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1731
1732     /* we add a fakesink here in order to make the state change async. We remove
1733      * the fakesink again in the no-more-pads callback. */
1734     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1735     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1736   }
1737
1738   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1739   /* first go to PAUSED */
1740   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1741   media->target_state = GST_STATE_PAUSED;
1742
1743   switch (ret) {
1744     case GST_STATE_CHANGE_SUCCESS:
1745       GST_INFO ("SUCCESS state change for media %p", media);
1746       break;
1747     case GST_STATE_CHANGE_ASYNC:
1748       GST_INFO ("ASYNC state change for media %p", media);
1749       break;
1750     case GST_STATE_CHANGE_NO_PREROLL:
1751       /* we need to go to PLAYING */
1752       GST_INFO ("NO_PREROLL state change: live media %p", media);
1753       media->is_live = TRUE;
1754       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1755       if (ret == GST_STATE_CHANGE_FAILURE)
1756         goto state_failed;
1757       break;
1758     case GST_STATE_CHANGE_FAILURE:
1759       goto state_failed;
1760   }
1761
1762   /* now wait for all pads to be prerolled */
1763   status = gst_rtsp_media_get_status (media);
1764   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1765     goto state_failed;
1766
1767   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1768
1769   GST_INFO ("object %p is prerolled", media);
1770
1771   return TRUE;
1772
1773   /* OK */
1774 was_prepared:
1775   {
1776     return TRUE;
1777   }
1778   /* ERRORS */
1779 is_reused:
1780   {
1781     GST_WARNING ("can not reuse media %p", media);
1782     return FALSE;
1783   }
1784 state_failed:
1785   {
1786     GST_WARNING ("failed to preroll pipeline");
1787     unlock_streams (media);
1788     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1789     gst_rtsp_media_unprepare (media);
1790     return FALSE;
1791   }
1792 }
1793
1794 /**
1795  * gst_rtsp_media_unprepare:
1796  * @media: a #GstRTSPMedia
1797  *
1798  * Unprepare @media. After this call, the media should be prepared again before
1799  * it can be used again. If the media is set to be non-reusable, a new instance
1800  * must be created.
1801  *
1802  * Returns: %TRUE on success.
1803  */
1804 gboolean
1805 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1806 {
1807   GstRTSPMediaClass *klass;
1808   gboolean success;
1809
1810   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1811     return TRUE;
1812
1813   GST_INFO ("unprepare media %p", media);
1814   media->target_state = GST_STATE_NULL;
1815
1816   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1817   if (klass->unprepare)
1818     success = klass->unprepare (media);
1819   else
1820     success = TRUE;
1821
1822   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1823   media->reused = TRUE;
1824
1825   /* when the media is not reusable, this will effectively unref the media and
1826    * recreate it */
1827   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1828
1829   return success;
1830 }
1831
1832 static gboolean
1833 default_unprepare (GstRTSPMedia * media)
1834 {
1835   if (media->eos_shutdown) {
1836     GST_DEBUG ("sending EOS for shutdown");
1837     /* ref so that we don't disappear */
1838     g_object_ref (media);
1839     media->eos_pending = TRUE;
1840     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1841     /* we need to go to playing again for the EOS to propagate, normally in this
1842      * state, nothing is receiving data from us anymore so this is ok. */
1843     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1844   } else {
1845     GST_DEBUG ("shutting down");
1846     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1847   }
1848   return TRUE;
1849 }
1850
1851 static void
1852 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1853     gchar * dest, gint min, gint max)
1854 {
1855   gboolean do_add = TRUE;
1856   RTSPDestination *ndest;
1857
1858   GST_INFO ("adding %s:%d-%d", dest, min, max);
1859   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1860   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1861 }
1862
1863 static void
1864 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1865     gchar * dest, gint min, gint max)
1866 {
1867   gboolean do_remove = TRUE;
1868   RTSPDestination *ndest = NULL;
1869   GList *find = NULL;
1870
1871   GST_INFO ("removing %s:%d-%d", dest, min, max);
1872   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1873   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1874 }
1875
1876 /**
1877  * gst_rtsp_media_set_state:
1878  * @media: a #GstRTSPMedia
1879  * @state: the target state of the media
1880  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1881  *
1882  * Set the state of @media to @state and for the transports in @transports.
1883  *
1884  * Returns: %TRUE on success.
1885  */
1886 gboolean
1887 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1888     GArray * transports)
1889 {
1890   gint i;
1891   GstStateChangeReturn ret;
1892   gboolean add, remove, do_state;
1893   gint old_active;
1894
1895   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1896   g_return_val_if_fail (transports != NULL, FALSE);
1897
1898   /* NULL and READY are the same */
1899   if (state == GST_STATE_READY)
1900     state = GST_STATE_NULL;
1901
1902   add = remove = FALSE;
1903
1904   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1905       media);
1906
1907   switch (state) {
1908     case GST_STATE_NULL:
1909       /* unlock the streams so that they follow the state changes from now on */
1910       unlock_streams (media);
1911       /* fallthrough */
1912     case GST_STATE_PAUSED:
1913       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1914       if (media->target_state == GST_STATE_PLAYING)
1915         remove = TRUE;
1916       break;
1917     case GST_STATE_PLAYING:
1918       /* we're going to PLAYING, add */
1919       add = TRUE;
1920       break;
1921     default:
1922       break;
1923   }
1924   old_active = media->active;
1925
1926   for (i = 0; i < transports->len; i++) {
1927     GstRTSPMediaTrans *tr;
1928     GstRTSPMediaStream *stream;
1929     GstRTSPTransport *trans;
1930
1931     /* we need a non-NULL entry in the array */
1932     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1933     if (tr == NULL)
1934       continue;
1935
1936     /* we need a transport */
1937     if (!(trans = tr->transport))
1938       continue;
1939
1940     /* get the stream and add the destinations */
1941     stream = gst_rtsp_media_get_stream (media, tr->idx);
1942     switch (trans->lower_transport) {
1943       case GST_RTSP_LOWER_TRANS_UDP:
1944       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1945       {
1946         gchar *dest;
1947         gint min, max;
1948
1949         dest = trans->destination;
1950         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1951           min = trans->port.min;
1952           max = trans->port.max;
1953         } else {
1954           min = trans->client_port.min;
1955           max = trans->client_port.max;
1956         }
1957
1958         if (add && !tr->active) {
1959           add_udp_destination (media, stream, dest, min, max);
1960           stream->transports = g_list_prepend (stream->transports, tr);
1961           tr->active = TRUE;
1962           media->active++;
1963         } else if (remove && tr->active) {
1964           remove_udp_destination (media, stream, dest, min, max);
1965           stream->transports = g_list_remove (stream->transports, tr);
1966           tr->active = FALSE;
1967           media->active--;
1968         }
1969         break;
1970       }
1971       case GST_RTSP_LOWER_TRANS_TCP:
1972         if (add && !tr->active) {
1973           GST_INFO ("adding TCP %s", trans->destination);
1974           stream->transports = g_list_prepend (stream->transports, tr);
1975           tr->active = TRUE;
1976           media->active++;
1977         } else if (remove && tr->active) {
1978           GST_INFO ("removing TCP %s", trans->destination);
1979           stream->transports = g_list_remove (stream->transports, tr);
1980           tr->active = FALSE;
1981           media->active--;
1982         }
1983         break;
1984       default:
1985         GST_INFO ("Unknown transport %d", trans->lower_transport);
1986         break;
1987     }
1988   }
1989
1990   /* we just added the first media, do the playing state change */
1991   if (old_active == 0 && add)
1992     do_state = TRUE;
1993   /* if we have no more active media, do the downward state changes */
1994   else if (media->active == 0)
1995     do_state = TRUE;
1996   else
1997     do_state = FALSE;
1998
1999   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2000       media, do_state);
2001
2002   if (media->target_state != state) {
2003     if (do_state) {
2004       if (state == GST_STATE_NULL) {
2005         gst_rtsp_media_unprepare (media);
2006       } else {
2007         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2008             media);
2009         media->target_state = state;
2010         ret = gst_element_set_state (media->pipeline, state);
2011       }
2012     }
2013     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2014         NULL);
2015   }
2016
2017   /* remember where we are */
2018   if (state == GST_STATE_PAUSED || old_active != media->active)
2019     collect_media_stats (media);
2020
2021   return TRUE;
2022 }
2023
2024 /**
2025  * gst_rtsp_media_remove_elements:
2026  * @media: a #GstRTSPMedia
2027  *
2028  * Remove all elements and the pipeline controlled by @media.
2029  */
2030 void
2031 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2032 {
2033   gint i, j;
2034
2035   unlock_streams (media);
2036
2037   for (i = 0; i < media->streams->len; i++) {
2038     GstRTSPMediaStream *stream;
2039
2040     GST_INFO ("Removing elements of stream %d from pipeline", i);
2041
2042     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2043
2044     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2045
2046     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2047
2048     for (j = 0; j < 2; j++) {
2049       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2050       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2051       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2052       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2053       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2054       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2055
2056       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2057       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2058       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2059       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2060       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2061       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2062     }
2063     if (stream->caps)
2064       gst_caps_unref (stream->caps);
2065     stream->caps = NULL;
2066     gst_rtsp_media_stream_free (stream);
2067   }
2068   g_array_remove_range (media->streams, 0, media->streams->len);
2069
2070   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2071   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2072
2073   gst_object_unref (media->pipeline);
2074   media->pipeline = NULL;
2075 }