ba0f45f09fc273bd8fa1a19acaa7de3349253b47
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-funnel.h"
27 #include "rtsp-media.h"
28
29 #define DEFAULT_SHARED         FALSE
30 #define DEFAULT_REUSABLE       FALSE
31 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
32 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
33 #define DEFAULT_EOS_SHUTDOWN   FALSE
34
35 /* define to dump received RTCP packets */
36 #undef DUMP_STATS
37
38 enum
39 {
40   PROP_0,
41   PROP_SHARED,
42   PROP_REUSABLE,
43   PROP_PROTOCOLS,
44   PROP_EOS_SHUTDOWN,
45   PROP_LAST
46 };
47
48 enum
49 {
50   SIGNAL_PREPARED,
51   SIGNAL_UNPREPARED,
52   SIGNAL_NEW_STATE,
53   SIGNAL_LAST
54 };
55
56 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
57 #define GST_CAT_DEFAULT rtsp_media_debug
58
59 static GQuark ssrc_stream_map_key;
60
61 static void gst_rtsp_media_get_property (GObject * object, guint propid,
62     GValue * value, GParamSpec * pspec);
63 static void gst_rtsp_media_set_property (GObject * object, guint propid,
64     const GValue * value, GParamSpec * pspec);
65 static void gst_rtsp_media_finalize (GObject * obj);
66
67 static gpointer do_loop (GstRTSPMediaClass * klass);
68 static gboolean default_handle_message (GstRTSPMedia * media,
69     GstMessage * message);
70 static gboolean default_unprepare (GstRTSPMedia * media);
71 static void unlock_streams (GstRTSPMedia * media);
72
73 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
74
75 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
76
77 static void
78 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
79 {
80   GObjectClass *gobject_class;
81   GError *error = NULL;
82
83   gobject_class = G_OBJECT_CLASS (klass);
84
85   gobject_class->get_property = gst_rtsp_media_get_property;
86   gobject_class->set_property = gst_rtsp_media_set_property;
87   gobject_class->finalize = gst_rtsp_media_finalize;
88
89   g_object_class_install_property (gobject_class, PROP_SHARED,
90       g_param_spec_boolean ("shared", "Shared",
91           "If this media pipeline can be shared", DEFAULT_SHARED,
92           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
93
94   g_object_class_install_property (gobject_class, PROP_REUSABLE,
95       g_param_spec_boolean ("reusable", "Reusable",
96           "If this media pipeline can be reused after an unprepare",
97           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98
99   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
100       g_param_spec_flags ("protocols", "Protocols",
101           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
102           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
103
104   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
105       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
106           "Send an EOS event to the pipeline before unpreparing",
107           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
108
109   gst_rtsp_media_signals[SIGNAL_PREPARED] =
110       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
111       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
112       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
113
114   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
115       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
116       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
117       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
118
119   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
120       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
121       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
122       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
123
124   klass->context = g_main_context_new ();
125   klass->loop = g_main_loop_new (klass->context, TRUE);
126
127   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
128
129   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
130   if (error != NULL) {
131     g_critical ("could not start bus thread: %s", error->message);
132   }
133   klass->handle_message = default_handle_message;
134   klass->unprepare = default_unprepare;
135
136   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
137
138   gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL);
139
140 }
141
142 static void
143 gst_rtsp_media_init (GstRTSPMedia * media)
144 {
145   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
146   media->lock = g_mutex_new ();
147   media->cond = g_cond_new ();
148
149   media->shared = DEFAULT_SHARED;
150   media->reusable = DEFAULT_REUSABLE;
151   media->protocols = DEFAULT_PROTOCOLS;
152   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
153 }
154
155 /* FIXME. this should be done in multiudpsink */
156 typedef struct
157 {
158   gint count;
159   gchar *dest;
160   gint min, max;
161 } RTSPDestination;
162
163 static gint
164 dest_compare (RTSPDestination * a, RTSPDestination * b)
165 {
166   if ((a->min == b->min) && (a->max == b->max)
167       && (strcmp (a->dest, b->dest) == 0))
168     return 0;
169
170   return 1;
171 }
172
173 static RTSPDestination *
174 create_destination (const gchar * dest, gint min, gint max)
175 {
176   RTSPDestination *res;
177
178   res = g_slice_new (RTSPDestination);
179   res->count = 1;
180   res->dest = g_strdup (dest);
181   res->min = min;
182   res->max = max;
183
184   return res;
185 }
186
187 static void
188 free_destination (RTSPDestination * dest)
189 {
190   g_free (dest->dest);
191   g_slice_free (RTSPDestination, dest);
192 }
193
194 void
195 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
196 {
197   if (trans->transport) {
198     gst_rtsp_transport_free (trans->transport);
199     trans->transport = NULL;
200   }
201   if (trans->rtpsource) {
202     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
203     trans->rtpsource = NULL;
204   }
205 }
206
207 static void
208 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
209 {
210   if (stream->session)
211     g_object_unref (stream->session);
212
213   if (stream->caps)
214     gst_caps_unref (stream->caps);
215
216   if (stream->send_rtp_sink)
217     gst_object_unref (stream->send_rtp_sink);
218   if (stream->send_rtp_src)
219     gst_object_unref (stream->send_rtp_src);
220   if (stream->send_rtcp_src)
221     gst_object_unref (stream->send_rtcp_src);
222   if (stream->recv_rtcp_sink)
223     gst_object_unref (stream->recv_rtcp_sink);
224   if (stream->recv_rtp_sink)
225     gst_object_unref (stream->recv_rtp_sink);
226
227   g_list_free (stream->transports);
228
229   g_list_foreach (stream->destinations, (GFunc) free_destination, NULL);
230   g_list_free (stream->destinations);
231
232   g_free (stream);
233 }
234
235 static void
236 gst_rtsp_media_finalize (GObject * obj)
237 {
238   GstRTSPMedia *media;
239   guint i;
240
241   media = GST_RTSP_MEDIA (obj);
242
243   GST_INFO ("finalize media %p", media);
244
245   if (media->pipeline) {
246     unlock_streams (media);
247     gst_element_set_state (media->pipeline, GST_STATE_NULL);
248     gst_object_unref (media->pipeline);
249   }
250
251   for (i = 0; i < media->streams->len; i++) {
252     GstRTSPMediaStream *stream;
253
254     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
255
256     gst_rtsp_media_stream_free (stream);
257   }
258   g_array_free (media->streams, TRUE);
259
260   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
261   g_list_free (media->dynamic);
262
263   if (media->source) {
264     g_source_destroy (media->source);
265     g_source_unref (media->source);
266   }
267   g_mutex_free (media->lock);
268   g_cond_free (media->cond);
269
270   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
271 }
272
273 static void
274 gst_rtsp_media_get_property (GObject * object, guint propid,
275     GValue * value, GParamSpec * pspec)
276 {
277   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
278
279   switch (propid) {
280     case PROP_SHARED:
281       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
282       break;
283     case PROP_REUSABLE:
284       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
285       break;
286     case PROP_PROTOCOLS:
287       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
288       break;
289     case PROP_EOS_SHUTDOWN:
290       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
291       break;
292     default:
293       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
294   }
295 }
296
297 static void
298 gst_rtsp_media_set_property (GObject * object, guint propid,
299     const GValue * value, GParamSpec * pspec)
300 {
301   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
302
303   switch (propid) {
304     case PROP_SHARED:
305       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
306       break;
307     case PROP_REUSABLE:
308       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
309       break;
310     case PROP_PROTOCOLS:
311       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
312       break;
313     case PROP_EOS_SHUTDOWN:
314       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
315       break;
316     default:
317       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
318   }
319 }
320
321 static gpointer
322 do_loop (GstRTSPMediaClass * klass)
323 {
324   GST_INFO ("enter mainloop");
325   g_main_loop_run (klass->loop);
326   GST_INFO ("exit mainloop");
327
328   return NULL;
329 }
330
331 static void
332 collect_media_stats (GstRTSPMedia * media)
333 {
334   GstFormat format;
335   gint64 position, duration;
336
337   media->range.unit = GST_RTSP_RANGE_NPT;
338
339   if (media->is_live) {
340     media->range.min.type = GST_RTSP_TIME_NOW;
341     media->range.min.seconds = -1;
342     media->range.max.type = GST_RTSP_TIME_END;
343     media->range.max.seconds = -1;
344   } else {
345     /* get the position */
346     format = GST_FORMAT_TIME;
347     if (!gst_element_query_position (media->pipeline, &format, &position)) {
348       GST_INFO ("position query failed");
349       position = 0;
350     }
351
352     /* get the duration */
353     format = GST_FORMAT_TIME;
354     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
355       GST_INFO ("duration query failed");
356       duration = -1;
357     }
358
359     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
360         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
361
362     if (position == -1) {
363       media->range.min.type = GST_RTSP_TIME_NOW;
364       media->range.min.seconds = -1;
365     } else {
366       media->range.min.type = GST_RTSP_TIME_SECONDS;
367       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
368     }
369     if (duration == -1) {
370       media->range.max.type = GST_RTSP_TIME_END;
371       media->range.max.seconds = -1;
372     } else {
373       media->range.max.type = GST_RTSP_TIME_SECONDS;
374       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
375     }
376   }
377 }
378
379 /**
380  * gst_rtsp_media_new:
381  *
382  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
383  * element to produde RTP data for one or more related (audio/video/..) 
384  * streams.
385  *
386  * Returns: a new #GstRTSPMedia object.
387  */
388 GstRTSPMedia *
389 gst_rtsp_media_new (void)
390 {
391   GstRTSPMedia *result;
392
393   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
394
395   return result;
396 }
397
398 /**
399  * gst_rtsp_media_set_shared:
400  * @media: a #GstRTSPMedia
401  * @shared: the new value
402  *
403  * Set or unset if the pipeline for @media can be shared will multiple clients.
404  * When @shared is %TRUE, client requests for this media will share the media
405  * pipeline.
406  */
407 void
408 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
409 {
410   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
411
412   media->shared = shared;
413 }
414
415 /**
416  * gst_rtsp_media_is_shared:
417  * @media: a #GstRTSPMedia
418  *
419  * Check if the pipeline for @media can be shared between multiple clients.
420  *
421  * Returns: %TRUE if the media can be shared between clients.
422  */
423 gboolean
424 gst_rtsp_media_is_shared (GstRTSPMedia * media)
425 {
426   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
427
428   return media->shared;
429 }
430
431 /**
432  * gst_rtsp_media_set_reusable:
433  * @media: a #GstRTSPMedia
434  * @reusable: the new value
435  *
436  * Set or unset if the pipeline for @media can be reused after the pipeline has
437  * been unprepared.
438  */
439 void
440 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
441 {
442   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
443
444   media->reusable = reusable;
445 }
446
447 /**
448  * gst_rtsp_media_is_reusable:
449  * @media: a #GstRTSPMedia
450  *
451  * Check if the pipeline for @media can be reused after an unprepare.
452  *
453  * Returns: %TRUE if the media can be reused
454  */
455 gboolean
456 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
457 {
458   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
459
460   return media->reusable;
461 }
462
463 /**
464  * gst_rtsp_media_set_protocols:
465  * @media: a #GstRTSPMedia
466  * @protocols: the new flags
467  *
468  * Configure the allowed lower transport for @media.
469  */
470 void
471 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
472 {
473   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
474
475   media->protocols = protocols;
476 }
477
478 /**
479  * gst_rtsp_media_get_protocols:
480  * @media: a #GstRTSPMedia
481  *
482  * Get the allowed protocols of @media.
483  *
484  * Returns: a #GstRTSPLowerTrans
485  */
486 GstRTSPLowerTrans
487 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
488 {
489   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
490       GST_RTSP_LOWER_TRANS_UNKNOWN);
491
492   return media->protocols;
493 }
494
495 /**
496  * gst_rtsp_media_set_eos_shutdown:
497  * @media: a #GstRTSPMedia
498  * @eos_shutdown: the new value
499  *
500  * Set or unset if an EOS event will be sent to the pipeline for @media before
501  * it is unprepared.
502  */
503 void
504 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
505 {
506   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
507
508   media->eos_shutdown = eos_shutdown;
509 }
510
511 /**
512  * gst_rtsp_media_is_eos_shutdown:
513  * @media: a #GstRTSPMedia
514  *
515  * Check if the pipeline for @media will send an EOS down the pipeline before
516  * unpreparing.
517  *
518  * Returns: %TRUE if the media will send EOS before unpreparing.
519  */
520 gboolean
521 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
522 {
523   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
524
525   return media->eos_shutdown;
526 }
527
528 /**
529  * gst_rtsp_media_set_auth:
530  * @media: a #GstRTSPMedia
531  * @auth: a #GstRTSPAuth
532  *
533  * configure @auth to be used as the authentication manager of @media.
534  */
535 void
536 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
537 {
538   GstRTSPAuth *old;
539
540   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
541
542   old = media->auth;
543
544   if (old != auth) {
545     if (auth)
546       g_object_ref (auth);
547     media->auth = auth;
548     if (old)
549       g_object_unref (old);
550   }
551 }
552
553 /**
554  * gst_rtsp_media_get_auth:
555  * @media: a #GstRTSPMedia
556  *
557  * Get the #GstRTSPAuth used as the authentication manager of @media.
558  *
559  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
560  * usage.
561  */
562 GstRTSPAuth *
563 gst_rtsp_media_get_auth (GstRTSPMedia * media)
564 {
565   GstRTSPAuth *result;
566
567   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
568
569   if ((result = media->auth))
570     g_object_ref (result);
571
572   return result;
573 }
574
575
576 /**
577  * gst_rtsp_media_n_streams:
578  * @media: a #GstRTSPMedia
579  *
580  * Get the number of streams in this media.
581  *
582  * Returns: The number of streams.
583  */
584 guint
585 gst_rtsp_media_n_streams (GstRTSPMedia * media)
586 {
587   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
588
589   return media->streams->len;
590 }
591
592 /**
593  * gst_rtsp_media_get_stream:
594  * @media: a #GstRTSPMedia
595  * @idx: the stream index
596  *
597  * Retrieve the stream with index @idx from @media.
598  *
599  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
600  * that index did not exist.
601  */
602 GstRTSPMediaStream *
603 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
604 {
605   GstRTSPMediaStream *res;
606
607   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
608
609   if (idx < media->streams->len)
610     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
611   else
612     res = NULL;
613
614   return res;
615 }
616
617 /**
618  * gst_rtsp_media_get_range_string:
619  * @media: a #GstRTSPMedia
620  * @play: for the PLAY request
621  *
622  * Get the current range as a string.
623  *
624  * Returns: The range as a string, g_free() after usage.
625  */
626 gchar *
627 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
628 {
629   gchar *result;
630   GstRTSPTimeRange range;
631
632   /* make copy */
633   range = media->range;
634
635   if (!play && media->active > 0) {
636     range.min.type = GST_RTSP_TIME_NOW;
637     range.min.seconds = -1;
638   }
639
640   result = gst_rtsp_range_to_string (&range);
641
642   return result;
643 }
644
645 /**
646  * gst_rtsp_media_seek:
647  * @media: a #GstRTSPMedia
648  * @range: a #GstRTSPTimeRange
649  *
650  * Seek the pipeline to @range.
651  *
652  * Returns: %TRUE on success.
653  */
654 gboolean
655 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
656 {
657   GstSeekFlags flags;
658   gboolean res;
659   gint64 start, stop;
660   GstSeekType start_type, stop_type;
661
662   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
663   g_return_val_if_fail (range != NULL, FALSE);
664
665   if (range->unit != GST_RTSP_RANGE_NPT)
666     goto not_supported;
667
668   /* depends on the current playing state of the pipeline. We might need to
669    * queue this until we get EOS. */
670   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
671
672   start_type = stop_type = GST_SEEK_TYPE_NONE;
673
674   switch (range->min.type) {
675     case GST_RTSP_TIME_NOW:
676       start = -1;
677       break;
678     case GST_RTSP_TIME_SECONDS:
679       /* only seek when something changed */
680       if (media->range.min.seconds == range->min.seconds) {
681         start = -1;
682       } else {
683         start = range->min.seconds * GST_SECOND;
684         start_type = GST_SEEK_TYPE_SET;
685       }
686       break;
687     case GST_RTSP_TIME_END:
688     default:
689       goto weird_type;
690   }
691   switch (range->max.type) {
692     case GST_RTSP_TIME_SECONDS:
693       /* only seek when something changed */
694       if (media->range.max.seconds == range->max.seconds) {
695         stop = -1;
696       } else {
697         stop = range->max.seconds * GST_SECOND;
698         stop_type = GST_SEEK_TYPE_SET;
699       }
700       break;
701     case GST_RTSP_TIME_END:
702       stop = -1;
703       stop_type = GST_SEEK_TYPE_SET;
704       break;
705     case GST_RTSP_TIME_NOW:
706     default:
707       goto weird_type;
708   }
709
710   if (start != -1 || stop != -1) {
711     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
712         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
713
714     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
715         flags, start_type, start, stop_type, stop);
716
717     /* and block for the seek to complete */
718     GST_INFO ("done seeking %d", res);
719     gst_element_get_state (media->pipeline, NULL, NULL, -1);
720     GST_INFO ("prerolled again");
721
722     collect_media_stats (media);
723   } else {
724     GST_INFO ("no seek needed");
725     res = TRUE;
726   }
727
728   return res;
729
730   /* ERRORS */
731 not_supported:
732   {
733     GST_WARNING ("seek unit %d not supported", range->unit);
734     return FALSE;
735   }
736 weird_type:
737   {
738     GST_WARNING ("weird range type %d not supported", range->min.type);
739     return FALSE;
740   }
741 }
742
743 /**
744  * gst_rtsp_media_stream_rtp:
745  * @stream: a #GstRTSPMediaStream
746  * @buffer: a #GstBuffer
747  *
748  * Handle an RTP buffer for the stream. This method is usually called when a
749  * message has been received from a client using the TCP transport.
750  *
751  * This function takes ownership of @buffer.
752  *
753  * Returns: a GstFlowReturn.
754  */
755 GstFlowReturn
756 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
757 {
758   GstFlowReturn ret;
759
760   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
761
762   return ret;
763 }
764
765 /**
766  * gst_rtsp_media_stream_rtcp:
767  * @stream: a #GstRTSPMediaStream
768  * @buffer: a #GstBuffer
769  *
770  * Handle an RTCP buffer for the stream. This method is usually called when a
771  * message has been received from a client using the TCP transport.
772  *
773  * This function takes ownership of @buffer.
774  *
775  * Returns: a GstFlowReturn.
776  */
777 GstFlowReturn
778 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
779 {
780   GstFlowReturn ret;
781
782   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
783
784   return ret;
785 }
786
787 /* Allocate the udp ports and sockets */
788 static gboolean
789 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
790 {
791   GstStateChangeReturn ret;
792   GstElement *udpsrc0, *udpsrc1;
793   GstElement *udpsink0, *udpsink1;
794   gint tmp_rtp, tmp_rtcp;
795   guint count;
796   gint rtpport, rtcpport, sockfd;
797   const gchar *host;
798
799   udpsrc0 = NULL;
800   udpsrc1 = NULL;
801   udpsink0 = NULL;
802   udpsink1 = NULL;
803   count = 0;
804
805   /* Start with random port */
806   tmp_rtp = 0;
807
808   if (media->is_ipv6)
809     host = "udp://[::0]";
810   else
811     host = "udp://0.0.0.0";
812
813   /* try to allocate 2 UDP ports, the RTP port should be an even
814    * number and the RTCP port should be the next (uneven) port */
815 again:
816   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
817   if (udpsrc0 == NULL)
818     goto no_udp_protocol;
819   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
820
821   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
822   if (ret == GST_STATE_CHANGE_FAILURE) {
823     if (tmp_rtp != 0) {
824       tmp_rtp += 2;
825       if (++count > 20)
826         goto no_ports;
827
828       gst_element_set_state (udpsrc0, GST_STATE_NULL);
829       gst_object_unref (udpsrc0);
830
831       goto again;
832     }
833     goto no_udp_protocol;
834   }
835
836   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
837
838   /* check if port is even */
839   if ((tmp_rtp & 1) != 0) {
840     /* port not even, close and allocate another */
841     if (++count > 20)
842       goto no_ports;
843
844     gst_element_set_state (udpsrc0, GST_STATE_NULL);
845     gst_object_unref (udpsrc0);
846
847     tmp_rtp++;
848     goto again;
849   }
850
851   /* allocate port+1 for RTCP now */
852   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
853   if (udpsrc1 == NULL)
854     goto no_udp_rtcp_protocol;
855
856   /* set port */
857   tmp_rtcp = tmp_rtp + 1;
858   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
859
860   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
861   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
862   if (ret == GST_STATE_CHANGE_FAILURE) {
863
864     if (++count > 20)
865       goto no_ports;
866
867     gst_element_set_state (udpsrc0, GST_STATE_NULL);
868     gst_object_unref (udpsrc0);
869
870     gst_element_set_state (udpsrc1, GST_STATE_NULL);
871     gst_object_unref (udpsrc1);
872
873     tmp_rtp += 2;
874     goto again;
875   }
876
877   /* all fine, do port check */
878   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
879   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
880
881   /* this should not happen... */
882   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
883     goto port_error;
884
885   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
886   if (!udpsink0)
887     goto no_udp_protocol;
888
889   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
890   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
891   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
892
893   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
894   if (!udpsink1)
895     goto no_udp_protocol;
896
897   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
898           "send-duplicates")) {
899     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
900     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
901     stream->filter_duplicates = FALSE;
902   } else {
903     GST_WARNING ("multiudpsink version found without send-duplicates property");
904     stream->filter_duplicates = TRUE;
905   }
906
907   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
908           "buffer-size")) {
909     g_object_set (G_OBJECT (udpsink0), "buffer-size", 0x80000, NULL);
910   } else {
911     GST_WARNING ("multiudpsink version found without buffer-size property");
912   }
913
914   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
915   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
916   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
917   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
918   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
919
920   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
921   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
922   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
923   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
924
925   /* we keep these elements, we configure all in configure_transport when the
926    * server told us to really use the UDP ports. */
927   stream->udpsrc[0] = udpsrc0;
928   stream->udpsrc[1] = udpsrc1;
929   stream->udpsink[0] = udpsink0;
930   stream->udpsink[1] = udpsink1;
931   stream->server_port.min = rtpport;
932   stream->server_port.max = rtcpport;
933
934   return TRUE;
935
936   /* ERRORS */
937 no_udp_protocol:
938   {
939     goto cleanup;
940   }
941 no_ports:
942   {
943     goto cleanup;
944   }
945 no_udp_rtcp_protocol:
946   {
947     goto cleanup;
948   }
949 port_error:
950   {
951     goto cleanup;
952   }
953 cleanup:
954   {
955     if (udpsrc0) {
956       gst_element_set_state (udpsrc0, GST_STATE_NULL);
957       gst_object_unref (udpsrc0);
958     }
959     if (udpsrc1) {
960       gst_element_set_state (udpsrc1, GST_STATE_NULL);
961       gst_object_unref (udpsrc1);
962     }
963     if (udpsink0) {
964       gst_element_set_state (udpsink0, GST_STATE_NULL);
965       gst_object_unref (udpsink0);
966     }
967     if (udpsink1) {
968       gst_element_set_state (udpsink1, GST_STATE_NULL);
969       gst_object_unref (udpsink1);
970     }
971     return FALSE;
972   }
973 }
974
975 /* executed from streaming thread */
976 static void
977 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
978 {
979   gchar *capsstr;
980   GstCaps *newcaps, *oldcaps;
981
982   if ((newcaps = GST_PAD_CAPS (pad)))
983     gst_caps_ref (newcaps);
984
985   oldcaps = stream->caps;
986   stream->caps = newcaps;
987
988   if (oldcaps)
989     gst_caps_unref (oldcaps);
990
991   capsstr = gst_caps_to_string (newcaps);
992   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
993   g_free (capsstr);
994 }
995
996 static void
997 dump_structure (const GstStructure * s)
998 {
999   gchar *sstr;
1000
1001   sstr = gst_structure_to_string (s);
1002   GST_INFO ("structure: %s", sstr);
1003   g_free (sstr);
1004 }
1005
1006 static GstRTSPMediaTrans *
1007 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1008 {
1009   GList *walk;
1010   GstRTSPMediaTrans *result = NULL;
1011   const gchar *tmp;
1012   gchar *dest;
1013   guint port;
1014
1015   if (rtcp_from == NULL)
1016     return NULL;
1017
1018   tmp = g_strrstr (rtcp_from, ":");
1019   if (tmp == NULL)
1020     return NULL;
1021
1022   port = atoi (tmp + 1);
1023   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1024
1025   GST_INFO ("finding %s:%d", dest, port);
1026
1027   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1028     GstRTSPMediaTrans *trans = walk->data;
1029     gint min, max;
1030
1031     min = trans->transport->client_port.min;
1032     max = trans->transport->client_port.max;
1033
1034     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1035             || max == port)) {
1036       result = trans;
1037       break;
1038     }
1039   }
1040   g_free (dest);
1041
1042   return result;
1043 }
1044
1045 static void
1046 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1047 {
1048   GstStructure *stats;
1049   GstRTSPMediaTrans *trans;
1050
1051   GST_INFO ("%p: new source %p", stream, source);
1052
1053   /* see if we have a stream to match with the origin of the RTCP packet */
1054   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1055   if (trans == NULL) {
1056     g_object_get (source, "stats", &stats, NULL);
1057     if (stats) {
1058       const gchar *rtcp_from;
1059
1060       dump_structure (stats);
1061
1062       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1063       if ((trans = find_transport (stream, rtcp_from))) {
1064         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1065             source);
1066
1067         /* keep ref to the source */
1068         trans->rtpsource = source;
1069
1070         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1071       }
1072       gst_structure_free (stats);
1073     }
1074   } else {
1075     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1076   }
1077 }
1078
1079 static void
1080 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1081 {
1082   GST_INFO ("%p: new SDES %p", stream, source);
1083 }
1084
1085 static void
1086 on_ssrc_active (GObject * session, GObject * source,
1087     GstRTSPMediaStream * stream)
1088 {
1089   GstRTSPMediaTrans *trans;
1090
1091   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1092
1093   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1094
1095   if (trans && trans->keep_alive)
1096     trans->keep_alive (trans->ka_user_data);
1097
1098 #ifdef DUMP_STATS
1099   {
1100     GstStructure *stats;
1101     g_object_get (source, "stats", &stats, NULL);
1102     if (stats) {
1103       dump_structure (stats);
1104       gst_structure_free (stats);
1105     }
1106   }
1107 #endif
1108 }
1109
1110 static void
1111 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1112 {
1113   GST_INFO ("%p: source %p bye", stream, source);
1114 }
1115
1116 static void
1117 on_bye_timeout (GObject * session, GObject * source,
1118     GstRTSPMediaStream * stream)
1119 {
1120   GstRTSPMediaTrans *trans;
1121
1122   GST_INFO ("%p: source %p bye timeout", stream, source);
1123
1124   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1125     trans->rtpsource = NULL;
1126     trans->timeout = TRUE;
1127   }
1128 }
1129
1130 static void
1131 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1132 {
1133   GstRTSPMediaTrans *trans;
1134
1135   GST_INFO ("%p: source %p timeout", stream, source);
1136
1137   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1138     trans->rtpsource = NULL;
1139     trans->timeout = TRUE;
1140   }
1141 }
1142
1143 static GstFlowReturn
1144 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1145 {
1146   GList *walk;
1147   GstBuffer *buffer;
1148   GstRTSPMediaStream *stream;
1149
1150   buffer = gst_app_sink_pull_buffer (sink);
1151   if (!buffer)
1152     return GST_FLOW_OK;
1153
1154   stream = (GstRTSPMediaStream *) user_data;
1155
1156   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1157     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1158
1159     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1160       if (tr->send_rtp)
1161         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1162     } else {
1163       if (tr->send_rtcp)
1164         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1165     }
1166   }
1167   gst_buffer_unref (buffer);
1168
1169   return GST_FLOW_OK;
1170 }
1171
1172 static GstFlowReturn
1173 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1174 {
1175   GList *walk;
1176   GstBufferList *blist;
1177   GstRTSPMediaStream *stream;
1178
1179   blist = gst_app_sink_pull_buffer_list (sink);
1180   if (!blist)
1181     return GST_FLOW_OK;
1182
1183   stream = (GstRTSPMediaStream *) user_data;
1184
1185   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1186     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1187
1188     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1189       if (tr->send_rtp_list)
1190         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1191             tr->user_data);
1192     } else {
1193       if (tr->send_rtcp_list)
1194         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1195             tr->user_data);
1196     }
1197   }
1198   gst_buffer_list_unref (blist);
1199
1200   return GST_FLOW_OK;
1201 }
1202
1203 static GstAppSinkCallbacks sink_cb = {
1204   NULL,                         /* not interested in EOS */
1205   NULL,                         /* not interested in preroll buffers */
1206   handle_new_buffer,
1207   handle_new_buffer_list
1208 };
1209
1210 /* prepare the pipeline objects to handle @stream in @media */
1211 static gboolean
1212 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1213 {
1214   gchar *name;
1215   GstPad *pad, *teepad, *selpad;
1216   GstPadLinkReturn ret;
1217   gint i;
1218
1219   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1220    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1221    * elements */
1222   if (!alloc_udp_ports (media, stream))
1223     return FALSE;
1224
1225   /* add the ports to the pipeline */
1226   for (i = 0; i < 2; i++) {
1227     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1228     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1229   }
1230
1231   /* create elements for the TCP transfer */
1232   for (i = 0; i < 2; i++) {
1233     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1234     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1235     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1236     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1237     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1238     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1239     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1240     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1241         &sink_cb, stream, NULL);
1242   }
1243
1244   /* hook up the stream to the RTP session elements. */
1245   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1246   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1247   g_free (name);
1248   name = g_strdup_printf ("send_rtp_src_%d", idx);
1249   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1250   g_free (name);
1251   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1252   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1253   g_free (name);
1254   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1255   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1256   g_free (name);
1257   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1258   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1259   g_free (name);
1260
1261   /* get the session */
1262   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1263       &stream->session);
1264
1265   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1266       stream);
1267   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1268       stream);
1269   g_signal_connect (stream->session, "on-ssrc-active",
1270       (GCallback) on_ssrc_active, stream);
1271   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1272       stream);
1273   g_signal_connect (stream->session, "on-bye-timeout",
1274       (GCallback) on_bye_timeout, stream);
1275   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1276       stream);
1277
1278   /* link the RTP pad to the session manager */
1279   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1280   if (ret != GST_PAD_LINK_OK)
1281     goto link_failed;
1282
1283   /* make tee for RTP and link to stream */
1284   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1285   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1286
1287   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1288   gst_pad_link (stream->send_rtp_src, pad);
1289   gst_object_unref (pad);
1290
1291   /* link RTP sink, we're pretty sure this will work. */
1292   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1293   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1294   gst_pad_link (teepad, pad);
1295   gst_object_unref (pad);
1296   gst_object_unref (teepad);
1297
1298   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1299   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1300   gst_pad_link (teepad, pad);
1301   gst_object_unref (pad);
1302   gst_object_unref (teepad);
1303
1304   /* make tee for RTCP */
1305   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1306   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1307
1308   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1309   gst_pad_link (stream->send_rtcp_src, pad);
1310   gst_object_unref (pad);
1311
1312   /* link RTCP elements */
1313   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1314   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1315   gst_pad_link (teepad, pad);
1316   gst_object_unref (pad);
1317   gst_object_unref (teepad);
1318
1319   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1320   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1321   gst_pad_link (teepad, pad);
1322   gst_object_unref (pad);
1323   gst_object_unref (teepad);
1324
1325   /* make selector for the RTP receivers */
1326   stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
1327   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1328
1329   pad = gst_element_get_static_pad (stream->selector[0], "src");
1330   gst_pad_link (pad, stream->recv_rtp_sink);
1331   gst_object_unref (pad);
1332
1333   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1334   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1335   gst_pad_link (pad, selpad);
1336   gst_object_unref (pad);
1337   gst_object_unref (selpad);
1338
1339   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1340   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1341   gst_pad_link (pad, selpad);
1342   gst_object_unref (pad);
1343   gst_object_unref (selpad);
1344
1345   /* make selector for the RTCP receivers */
1346   stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
1347   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1348
1349   pad = gst_element_get_static_pad (stream->selector[1], "src");
1350   gst_pad_link (pad, stream->recv_rtcp_sink);
1351   gst_object_unref (pad);
1352
1353   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1354   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1355   gst_pad_link (pad, selpad);
1356   gst_object_unref (pad);
1357   gst_object_unref (selpad);
1358
1359   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1360   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1361   gst_pad_link (pad, selpad);
1362   gst_object_unref (pad);
1363   gst_object_unref (selpad);
1364
1365   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1366    * values */
1367   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1368   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1369   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1370   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1371
1372   /* be notified of caps changes */
1373   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1374       (GCallback) caps_notify, stream);
1375
1376   stream->prepared = TRUE;
1377
1378   return TRUE;
1379
1380   /* ERRORS */
1381 link_failed:
1382   {
1383     GST_WARNING ("failed to link stream %d", idx);
1384     return FALSE;
1385   }
1386 }
1387
1388 static void
1389 unlock_streams (GstRTSPMedia * media)
1390 {
1391   guint i, n_streams;
1392
1393   /* unlock the udp src elements */
1394   n_streams = gst_rtsp_media_n_streams (media);
1395   for (i = 0; i < n_streams; i++) {
1396     GstRTSPMediaStream *stream;
1397
1398     stream = gst_rtsp_media_get_stream (media, i);
1399
1400     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1401     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1402   }
1403 }
1404
1405 static void
1406 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1407 {
1408   g_mutex_lock (media->lock);
1409   /* never overwrite the error status */
1410   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1411     media->status = status;
1412   GST_DEBUG ("setting new status to %d", status);
1413   g_cond_broadcast (media->cond);
1414   g_mutex_unlock (media->lock);
1415 }
1416
1417 static GstRTSPMediaStatus
1418 gst_rtsp_media_get_status (GstRTSPMedia * media)
1419 {
1420   GstRTSPMediaStatus result;
1421   GTimeVal timeout;
1422
1423   g_mutex_lock (media->lock);
1424   g_get_current_time (&timeout);
1425   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1426   /* while we are preparing, wait */
1427   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1428     GST_DEBUG ("waiting for status change");
1429     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1430       GST_DEBUG ("timeout, assuming error status");
1431       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1432     }
1433   }
1434   /* could be success or error */
1435   result = media->status;
1436   GST_DEBUG ("got status %d", result);
1437   g_mutex_unlock (media->lock);
1438
1439   return result;
1440 }
1441
1442 static gboolean
1443 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1444 {
1445   GstMessageType type;
1446
1447   type = GST_MESSAGE_TYPE (message);
1448
1449   switch (type) {
1450     case GST_MESSAGE_STATE_CHANGED:
1451       break;
1452     case GST_MESSAGE_BUFFERING:
1453     {
1454       gint percent;
1455
1456       gst_message_parse_buffering (message, &percent);
1457
1458       /* no state management needed for live pipelines */
1459       if (media->is_live)
1460         break;
1461
1462       if (percent == 100) {
1463         /* a 100% message means buffering is done */
1464         media->buffering = FALSE;
1465         /* if the desired state is playing, go back */
1466         if (media->target_state == GST_STATE_PLAYING) {
1467           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1468           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1469         } else {
1470           GST_INFO ("Buffering done");
1471         }
1472       } else {
1473         /* buffering busy */
1474         if (media->buffering == FALSE) {
1475           if (media->target_state == GST_STATE_PLAYING) {
1476             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1477             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1478             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1479           } else {
1480             GST_INFO ("Buffering ...");
1481           }
1482         }
1483         media->buffering = TRUE;
1484       }
1485       break;
1486     }
1487     case GST_MESSAGE_LATENCY:
1488     {
1489       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1490       break;
1491     }
1492     case GST_MESSAGE_ERROR:
1493     {
1494       GError *gerror;
1495       gchar *debug;
1496
1497       gst_message_parse_error (message, &gerror, &debug);
1498       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1499       g_error_free (gerror);
1500       g_free (debug);
1501
1502       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1503       break;
1504     }
1505     case GST_MESSAGE_WARNING:
1506     {
1507       GError *gerror;
1508       gchar *debug;
1509
1510       gst_message_parse_warning (message, &gerror, &debug);
1511       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1512       g_error_free (gerror);
1513       g_free (debug);
1514       break;
1515     }
1516     case GST_MESSAGE_ELEMENT:
1517       break;
1518     case GST_MESSAGE_STREAM_STATUS:
1519       break;
1520     case GST_MESSAGE_ASYNC_DONE:
1521       if (!media->adding) {
1522         /* when we are dynamically adding pads, the addition of the udpsrc will
1523          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1524          * wait for the final ASYNC_DONE after everything prerolled */
1525         GST_INFO ("%p: got ASYNC_DONE", media);
1526         collect_media_stats (media);
1527
1528         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1529       } else {
1530         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1531       }
1532       break;
1533     case GST_MESSAGE_EOS:
1534       GST_INFO ("%p: got EOS", media);
1535       if (media->eos_pending) {
1536         GST_DEBUG ("shutting down after EOS");
1537         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1538         media->eos_pending = FALSE;
1539         g_object_unref (media);
1540       }
1541       break;
1542     default:
1543       GST_INFO ("%p: got message type %s", media,
1544           gst_message_type_get_name (type));
1545       break;
1546   }
1547   return TRUE;
1548 }
1549
1550 static gboolean
1551 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1552 {
1553   GstRTSPMediaClass *klass;
1554   gboolean ret;
1555
1556   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1557
1558   if (klass->handle_message)
1559     ret = klass->handle_message (media, message);
1560   else
1561     ret = FALSE;
1562
1563   return ret;
1564 }
1565
1566 /* called from streaming threads */
1567 static void
1568 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1569 {
1570   GstRTSPMediaStream *stream;
1571   gchar *name;
1572   gint i;
1573
1574   i = media->streams->len + 1;
1575
1576   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1577
1578   stream = g_new0 (GstRTSPMediaStream, 1);
1579   stream->payloader = element;
1580
1581   name = g_strdup_printf ("dynpay%d", i);
1582
1583   media->adding = TRUE;
1584
1585   /* ghost the pad of the payloader to the element */
1586   stream->srcpad = gst_ghost_pad_new (name, pad);
1587   gst_pad_set_active (stream->srcpad, TRUE);
1588   gst_element_add_pad (media->element, stream->srcpad);
1589   g_free (name);
1590
1591   /* add stream now */
1592   g_array_append_val (media->streams, stream);
1593
1594   setup_stream (stream, i, media);
1595
1596   for (i = 0; i < 2; i++) {
1597     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1598     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1599     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1600     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1601     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1602   }
1603   media->adding = FALSE;
1604 }
1605
1606 static void
1607 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1608 {
1609   GST_INFO ("no more pads");
1610   if (media->fakesink) {
1611     gst_object_ref (media->fakesink);
1612     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1613     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1614     gst_object_unref (media->fakesink);
1615     media->fakesink = NULL;
1616     GST_INFO ("removed fakesink");
1617   }
1618 }
1619
1620 /**
1621  * gst_rtsp_media_prepare:
1622  * @media: a #GstRTSPMedia
1623  *
1624  * Prepare @media for streaming. This function will create the pipeline and
1625  * other objects to manage the streaming.
1626  *
1627  * It will preroll the pipeline and collect vital information about the streams
1628  * such as the duration.
1629  *
1630  * Returns: %TRUE on success.
1631  */
1632 gboolean
1633 gst_rtsp_media_prepare (GstRTSPMedia * media)
1634 {
1635   GstStateChangeReturn ret;
1636   GstRTSPMediaStatus status;
1637   guint i, n_streams;
1638   GstRTSPMediaClass *klass;
1639   GstBus *bus;
1640   GList *walk;
1641
1642   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1643     goto was_prepared;
1644
1645   if (!media->reusable && media->reused)
1646     goto is_reused;
1647
1648   GST_INFO ("preparing media %p", media);
1649
1650   /* reset some variables */
1651   media->is_live = FALSE;
1652   media->buffering = FALSE;
1653   /* we're preparing now */
1654   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1655
1656   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1657
1658   /* add the pipeline bus to our custom mainloop */
1659   media->source = gst_bus_create_watch (bus);
1660   gst_object_unref (bus);
1661
1662   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1663
1664   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1665   media->id = g_source_attach (media->source, klass->context);
1666
1667   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1668
1669   /* add stuff to the bin */
1670   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1671
1672   /* link streams we already have, other streams might appear when we have
1673    * dynamic elements */
1674   n_streams = gst_rtsp_media_n_streams (media);
1675   for (i = 0; i < n_streams; i++) {
1676     GstRTSPMediaStream *stream;
1677
1678     stream = gst_rtsp_media_get_stream (media, i);
1679
1680     setup_stream (stream, i, media);
1681   }
1682
1683   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1684     GstElement *elem = walk->data;
1685
1686     GST_INFO ("adding callbacks for dynamic element %p", elem);
1687
1688     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1689     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1690
1691     /* we add a fakesink here in order to make the state change async. We remove
1692      * the fakesink again in the no-more-pads callback. */
1693     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1694     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1695   }
1696
1697   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1698   /* first go to PAUSED */
1699   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1700   media->target_state = GST_STATE_PAUSED;
1701
1702   switch (ret) {
1703     case GST_STATE_CHANGE_SUCCESS:
1704       GST_INFO ("SUCCESS state change for media %p", media);
1705       break;
1706     case GST_STATE_CHANGE_ASYNC:
1707       GST_INFO ("ASYNC state change for media %p", media);
1708       break;
1709     case GST_STATE_CHANGE_NO_PREROLL:
1710       /* we need to go to PLAYING */
1711       GST_INFO ("NO_PREROLL state change: live media %p", media);
1712       media->is_live = TRUE;
1713       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1714       if (ret == GST_STATE_CHANGE_FAILURE)
1715         goto state_failed;
1716       break;
1717     case GST_STATE_CHANGE_FAILURE:
1718       goto state_failed;
1719   }
1720
1721   /* now wait for all pads to be prerolled */
1722   status = gst_rtsp_media_get_status (media);
1723   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1724     goto state_failed;
1725
1726   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1727
1728   GST_INFO ("object %p is prerolled", media);
1729
1730   return TRUE;
1731
1732   /* OK */
1733 was_prepared:
1734   {
1735     return TRUE;
1736   }
1737   /* ERRORS */
1738 is_reused:
1739   {
1740     GST_WARNING ("can not reuse media %p", media);
1741     return FALSE;
1742   }
1743 state_failed:
1744   {
1745     GST_WARNING ("failed to preroll pipeline");
1746     unlock_streams (media);
1747     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1748     gst_rtsp_media_unprepare (media);
1749     return FALSE;
1750   }
1751 }
1752
1753 /**
1754  * gst_rtsp_media_unprepare:
1755  * @media: a #GstRTSPMedia
1756  *
1757  * Unprepare @media. After this call, the media should be prepared again before
1758  * it can be used again. If the media is set to be non-reusable, a new instance
1759  * must be created.
1760  *
1761  * Returns: %TRUE on success.
1762  */
1763 gboolean
1764 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1765 {
1766   GstRTSPMediaClass *klass;
1767   gboolean success;
1768
1769   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1770     return TRUE;
1771
1772   GST_INFO ("unprepare media %p", media);
1773   media->target_state = GST_STATE_NULL;
1774
1775   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1776   if (klass->unprepare)
1777     success = klass->unprepare (media);
1778   else
1779     success = TRUE;
1780
1781   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1782   media->reused = TRUE;
1783
1784   /* when the media is not reusable, this will effectively unref the media and
1785    * recreate it */
1786   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1787
1788   return success;
1789 }
1790
1791 static gboolean
1792 default_unprepare (GstRTSPMedia * media)
1793 {
1794   if (media->eos_shutdown) {
1795     GST_DEBUG ("sending EOS for shutdown");
1796     /* ref so that we don't disappear */
1797     g_object_ref (media);
1798     media->eos_pending = TRUE;
1799     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1800     /* we need to go to playing again for the EOS to propagate, normally in this
1801      * state, nothing is receiving data from us anymore so this is ok. */
1802     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1803   } else {
1804     GST_DEBUG ("shutting down");
1805     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1806   }
1807   return TRUE;
1808 }
1809
1810 static void
1811 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1812     gchar * dest, gint min, gint max)
1813 {
1814   gboolean do_add = TRUE;
1815   RTSPDestination *ndest;
1816
1817   if (stream->filter_duplicates) {
1818     RTSPDestination fdest;
1819     GList *find;
1820
1821     fdest.dest = dest;
1822     fdest.min = min;
1823     fdest.max = max;
1824
1825     /* first see if we already added this destination */
1826     find =
1827         g_list_find_custom (stream->destinations, &fdest,
1828         (GCompareFunc) dest_compare);
1829     if (find) {
1830       ndest = (RTSPDestination *) find->data;
1831
1832       GST_INFO ("already streaming to %s:%d-%d with %d clients", dest, min, max,
1833           ndest->count);
1834       ndest->count++;
1835       do_add = FALSE;
1836     }
1837   }
1838
1839   if (do_add) {
1840     GST_INFO ("adding %s:%d-%d", dest, min, max);
1841     g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1842     g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1843
1844     if (stream->filter_duplicates) {
1845       ndest = create_destination (dest, min, max);
1846       stream->destinations = g_list_prepend (stream->destinations, ndest);
1847     }
1848   }
1849 }
1850
1851 static void
1852 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1853     gchar * dest, gint min, gint max)
1854 {
1855   gboolean do_remove = TRUE;
1856   RTSPDestination *ndest = NULL;
1857   GList *find = NULL;
1858
1859   if (stream->filter_duplicates) {
1860     RTSPDestination fdest;
1861
1862     fdest.dest = dest;
1863     fdest.min = min;
1864     fdest.max = max;
1865
1866     /* first see if we already added this destination */
1867     find =
1868         g_list_find_custom (stream->destinations, &fdest,
1869         (GCompareFunc) dest_compare);
1870     if (!find)
1871       return;
1872
1873     ndest = (RTSPDestination *) find->data;
1874     if (--ndest->count > 0) {
1875       do_remove = FALSE;
1876       GST_INFO ("still streaming to %s:%d-%d with %d clients", dest, min, max,
1877           ndest->count);
1878     }
1879   }
1880
1881   if (do_remove) {
1882     GST_INFO ("removing %s:%d-%d", dest, min, max);
1883     g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1884     g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1885
1886     if (stream->filter_duplicates) {
1887       stream->destinations = g_list_delete_link (stream->destinations, find);
1888       free_destination (ndest);
1889     }
1890   }
1891 }
1892
1893 /**
1894  * gst_rtsp_media_set_state:
1895  * @media: a #GstRTSPMedia
1896  * @state: the target state of the media
1897  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1898  *
1899  * Set the state of @media to @state and for the transports in @transports.
1900  *
1901  * Returns: %TRUE on success.
1902  */
1903 gboolean
1904 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1905     GArray * transports)
1906 {
1907   gint i;
1908   GstStateChangeReturn ret;
1909   gboolean add, remove, do_state;
1910   gint old_active;
1911
1912   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1913   g_return_val_if_fail (transports != NULL, FALSE);
1914
1915   /* NULL and READY are the same */
1916   if (state == GST_STATE_READY)
1917     state = GST_STATE_NULL;
1918
1919   add = remove = FALSE;
1920
1921   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1922       media);
1923
1924   switch (state) {
1925     case GST_STATE_NULL:
1926       /* unlock the streams so that they follow the state changes from now on */
1927       unlock_streams (media);
1928       /* fallthrough */
1929     case GST_STATE_PAUSED:
1930       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1931       if (media->target_state == GST_STATE_PLAYING)
1932         remove = TRUE;
1933       break;
1934     case GST_STATE_PLAYING:
1935       /* we're going to PLAYING, add */
1936       add = TRUE;
1937       break;
1938     default:
1939       break;
1940   }
1941   old_active = media->active;
1942
1943   for (i = 0; i < transports->len; i++) {
1944     GstRTSPMediaTrans *tr;
1945     GstRTSPMediaStream *stream;
1946     GstRTSPTransport *trans;
1947
1948     /* we need a non-NULL entry in the array */
1949     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1950     if (tr == NULL)
1951       continue;
1952
1953     /* we need a transport */
1954     if (!(trans = tr->transport))
1955       continue;
1956
1957     /* get the stream and add the destinations */
1958     stream = gst_rtsp_media_get_stream (media, tr->idx);
1959     switch (trans->lower_transport) {
1960       case GST_RTSP_LOWER_TRANS_UDP:
1961       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1962       {
1963         gchar *dest;
1964         gint min, max;
1965
1966         dest = trans->destination;
1967         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1968           min = trans->port.min;
1969           max = trans->port.max;
1970         } else {
1971           min = trans->client_port.min;
1972           max = trans->client_port.max;
1973         }
1974
1975         if (add && !tr->active) {
1976           add_udp_destination (media, stream, dest, min, max);
1977           stream->transports = g_list_prepend (stream->transports, tr);
1978           tr->active = TRUE;
1979           media->active++;
1980         } else if (remove && tr->active) {
1981           remove_udp_destination (media, stream, dest, min, max);
1982           stream->transports = g_list_remove (stream->transports, tr);
1983           tr->active = FALSE;
1984           media->active--;
1985         }
1986         break;
1987       }
1988       case GST_RTSP_LOWER_TRANS_TCP:
1989         if (add && !tr->active) {
1990           GST_INFO ("adding TCP %s", trans->destination);
1991           stream->transports = g_list_prepend (stream->transports, tr);
1992           tr->active = TRUE;
1993           media->active++;
1994         } else if (remove && tr->active) {
1995           GST_INFO ("removing TCP %s", trans->destination);
1996           stream->transports = g_list_remove (stream->transports, tr);
1997           tr->active = FALSE;
1998           media->active--;
1999         }
2000         break;
2001       default:
2002         GST_INFO ("Unknown transport %d", trans->lower_transport);
2003         break;
2004     }
2005   }
2006
2007   /* we just added the first media, do the playing state change */
2008   if (old_active == 0 && add)
2009     do_state = TRUE;
2010   /* if we have no more active media, do the downward state changes */
2011   else if (media->active == 0)
2012     do_state = TRUE;
2013   else
2014     do_state = FALSE;
2015
2016   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2017       media, do_state);
2018
2019   if (media->target_state != state) {
2020     if (do_state) {
2021       if (state == GST_STATE_NULL) {
2022         gst_rtsp_media_unprepare (media);
2023       } else {
2024         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2025             media);
2026         media->target_state = state;
2027         ret = gst_element_set_state (media->pipeline, state);
2028       }
2029     }
2030     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2031         NULL);
2032   }
2033
2034   /* remember where we are */
2035   if (state == GST_STATE_PAUSED || old_active != media->active)
2036     collect_media_stats (media);
2037
2038   return TRUE;
2039 }
2040
2041 /**
2042  * gst_rtsp_media_remove_elements:
2043  * @media: a #GstRTSPMedia
2044  *
2045  * Remove all elements and the pipeline controlled by @media.
2046  */
2047 void
2048 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2049 {
2050   gint i, j;
2051
2052   unlock_streams (media);
2053
2054   for (i = 0; i < media->streams->len; i++) {
2055     GstRTSPMediaStream *stream;
2056
2057     GST_INFO ("Removing elements of stream %d from pipeline", i);
2058
2059     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2060
2061     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2062
2063     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2064
2065     for (j = 0; j < 2; j++) {
2066       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2067       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2068       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2069       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2070       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2071       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2072
2073       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2074       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2075       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2076       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2077       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2078       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2079     }
2080     if (stream->caps)
2081       gst_caps_unref (stream->caps);
2082     stream->caps = NULL;
2083     gst_rtsp_media_stream_free (stream);
2084   }
2085   g_array_remove_range (media->streams, 0, media->streams->len);
2086
2087   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2088   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2089
2090   gst_object_unref (media->pipeline);
2091   media->pipeline = NULL;
2092 }