client: Use client transport settings for multicast if allowed.
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED          FALSE
29 #define DEFAULT_REUSABLE        FALSE
30 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN    FALSE
33 #define DEFAULT_BUFFER_SIZE     0x80000
34 #define DEFAULT_MULTICAST_GROUP "224.2.0.1"
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_MULTICAST_GROUP,
48   PROP_LAST
49 };
50
51 enum
52 {
53   SIGNAL_PREPARED,
54   SIGNAL_UNPREPARED,
55   SIGNAL_NEW_STATE,
56   SIGNAL_LAST
57 };
58
59 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
60 #define GST_CAT_DEFAULT rtsp_media_debug
61
62 static GQuark ssrc_stream_map_key;
63
64 static void gst_rtsp_media_get_property (GObject * object, guint propid,
65     GValue * value, GParamSpec * pspec);
66 static void gst_rtsp_media_set_property (GObject * object, guint propid,
67     const GValue * value, GParamSpec * pspec);
68 static void gst_rtsp_media_finalize (GObject * obj);
69
70 static gpointer do_loop (GstRTSPMediaClass * klass);
71 static gboolean default_handle_message (GstRTSPMedia * media,
72     GstMessage * message);
73 static gboolean default_unprepare (GstRTSPMedia * media);
74 static void unlock_streams (GstRTSPMedia * media);
75 static void default_handle_mtu (GstRTSPMedia * media, guint mtu);
76
77 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
78
79 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
80
81 static void
82 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
83 {
84   GObjectClass *gobject_class;
85
86   gobject_class = G_OBJECT_CLASS (klass);
87
88   gobject_class->get_property = gst_rtsp_media_get_property;
89   gobject_class->set_property = gst_rtsp_media_set_property;
90   gobject_class->finalize = gst_rtsp_media_finalize;
91
92   g_object_class_install_property (gobject_class, PROP_SHARED,
93       g_param_spec_boolean ("shared", "Shared",
94           "If this media pipeline can be shared", DEFAULT_SHARED,
95           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96
97   g_object_class_install_property (gobject_class, PROP_REUSABLE,
98       g_param_spec_boolean ("reusable", "Reusable",
99           "If this media pipeline can be reused after an unprepare",
100           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
101
102   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
103       g_param_spec_flags ("protocols", "Protocols",
104           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
105           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
106
107   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
108       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
109           "Send an EOS event to the pipeline before unpreparing",
110           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111
112   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
113       g_param_spec_uint ("buffer-size", "Buffer Size",
114           "The kernel UDP buffer size to use", 0, G_MAXUINT,
115           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116
117   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
118       g_param_spec_string ("multicast-group", "Multicast Group",
119           "The Multicast group to send media to",
120           DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121
122   gst_rtsp_media_signals[SIGNAL_PREPARED] =
123       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
124       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
125       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
126
127   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
128       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
129       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
130       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
131
132   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
133       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
134       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
135       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
136
137   klass->context = g_main_context_new ();
138   klass->loop = g_main_loop_new (klass->context, TRUE);
139
140   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
141
142   klass->thread = g_thread_new ("Bus Thread", (GThreadFunc) do_loop, klass);
143
144   klass->handle_message = default_handle_message;
145   klass->unprepare = default_unprepare;
146   klass->handle_mtu = default_handle_mtu;
147
148   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
149 }
150
151 static void
152 gst_rtsp_media_init (GstRTSPMedia * media)
153 {
154   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
155   g_mutex_init (&media->lock);
156   g_cond_init (&media->cond);
157
158   media->shared = DEFAULT_SHARED;
159   media->reusable = DEFAULT_REUSABLE;
160   media->protocols = DEFAULT_PROTOCOLS;
161   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
162   media->buffer_size = DEFAULT_BUFFER_SIZE;
163   media->multicast_group = g_strdup (DEFAULT_MULTICAST_GROUP);
164 }
165
166 void
167 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
168 {
169   if (trans->transport) {
170     gst_rtsp_transport_free (trans->transport);
171     trans->transport = NULL;
172   }
173   if (trans->rtpsource) {
174     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
175     trans->rtpsource = NULL;
176   }
177 }
178
179 static void
180 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
181 {
182   if (stream->session)
183     g_object_unref (stream->session);
184
185   if (stream->caps)
186     gst_caps_unref (stream->caps);
187
188   if (stream->send_rtp_sink)
189     gst_object_unref (stream->send_rtp_sink);
190   if (stream->send_rtp_src)
191     gst_object_unref (stream->send_rtp_src);
192   if (stream->send_rtcp_src)
193     gst_object_unref (stream->send_rtcp_src);
194   if (stream->recv_rtcp_sink)
195     gst_object_unref (stream->recv_rtcp_sink);
196   if (stream->recv_rtp_sink)
197     gst_object_unref (stream->recv_rtp_sink);
198
199   g_list_free (stream->transports);
200
201   g_free (stream);
202 }
203
204 static void
205 gst_rtsp_media_finalize (GObject * obj)
206 {
207   GstRTSPMedia *media;
208   guint i;
209
210   media = GST_RTSP_MEDIA (obj);
211
212   GST_INFO ("finalize media %p", media);
213
214   if (media->pipeline) {
215     unlock_streams (media);
216     gst_element_set_state (media->pipeline, GST_STATE_NULL);
217     gst_object_unref (media->pipeline);
218   }
219
220   for (i = 0; i < media->streams->len; i++) {
221     GstRTSPMediaStream *stream;
222
223     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
224
225     gst_rtsp_media_stream_free (stream);
226   }
227   g_array_free (media->streams, TRUE);
228
229   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
230   g_list_free (media->dynamic);
231
232   if (media->source) {
233     g_source_destroy (media->source);
234     g_source_unref (media->source);
235   }
236   g_free (media->multicast_group);
237   g_mutex_clear (&media->lock);
238   g_cond_clear (&media->cond);
239
240   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
241 }
242
243 static void
244 gst_rtsp_media_get_property (GObject * object, guint propid,
245     GValue * value, GParamSpec * pspec)
246 {
247   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
248
249   switch (propid) {
250     case PROP_SHARED:
251       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
252       break;
253     case PROP_REUSABLE:
254       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
255       break;
256     case PROP_PROTOCOLS:
257       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
258       break;
259     case PROP_EOS_SHUTDOWN:
260       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
261       break;
262     case PROP_BUFFER_SIZE:
263       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
264       break;
265     case PROP_MULTICAST_GROUP:
266       g_value_take_string (value, gst_rtsp_media_get_multicast_group (media));
267       break;
268     default:
269       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
270   }
271 }
272
273 static void
274 gst_rtsp_media_set_property (GObject * object, guint propid,
275     const GValue * value, GParamSpec * pspec)
276 {
277   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
278
279   switch (propid) {
280     case PROP_SHARED:
281       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
282       break;
283     case PROP_REUSABLE:
284       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
285       break;
286     case PROP_PROTOCOLS:
287       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
288       break;
289     case PROP_EOS_SHUTDOWN:
290       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
291       break;
292     case PROP_BUFFER_SIZE:
293       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
294       break;
295     case PROP_MULTICAST_GROUP:
296       gst_rtsp_media_set_multicast_group (media, g_value_get_string (value));
297       break;
298     default:
299       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
300   }
301 }
302
303 static gpointer
304 do_loop (GstRTSPMediaClass * klass)
305 {
306   GST_INFO ("enter mainloop");
307   g_main_loop_run (klass->loop);
308   GST_INFO ("exit mainloop");
309
310   return NULL;
311 }
312
313 static void
314 collect_media_stats (GstRTSPMedia * media)
315 {
316   gint64 position, duration;
317
318   media->range.unit = GST_RTSP_RANGE_NPT;
319
320   if (media->is_live) {
321     media->range.min.type = GST_RTSP_TIME_NOW;
322     media->range.min.seconds = -1;
323     media->range.max.type = GST_RTSP_TIME_END;
324     media->range.max.seconds = -1;
325   } else {
326     /* get the position */
327     if (!gst_element_query_position (media->pipeline, GST_FORMAT_TIME,
328             &position)) {
329       GST_INFO ("position query failed");
330       position = 0;
331     }
332
333     /* get the duration */
334     if (!gst_element_query_duration (media->pipeline, GST_FORMAT_TIME,
335             &duration)) {
336       GST_INFO ("duration query failed");
337       duration = -1;
338     }
339
340     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
341         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
342
343     if (position == -1) {
344       media->range.min.type = GST_RTSP_TIME_NOW;
345       media->range.min.seconds = -1;
346     } else {
347       media->range.min.type = GST_RTSP_TIME_SECONDS;
348       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
349     }
350     if (duration == -1) {
351       media->range.max.type = GST_RTSP_TIME_END;
352       media->range.max.seconds = -1;
353     } else {
354       media->range.max.type = GST_RTSP_TIME_SECONDS;
355       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
356     }
357   }
358 }
359
360 /**
361  * gst_rtsp_media_new:
362  *
363  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
364  * element to produce RTP data for one or more related (audio/video/..)
365  * streams.
366  *
367  * Returns: a new #GstRTSPMedia object.
368  */
369 GstRTSPMedia *
370 gst_rtsp_media_new (void)
371 {
372   GstRTSPMedia *result;
373
374   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
375
376   return result;
377 }
378
379 /**
380  * gst_rtsp_media_set_shared:
381  * @media: a #GstRTSPMedia
382  * @shared: the new value
383  *
384  * Set or unset if the pipeline for @media can be shared will multiple clients.
385  * When @shared is %TRUE, client requests for this media will share the media
386  * pipeline.
387  */
388 void
389 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
390 {
391   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
392
393   media->shared = shared;
394 }
395
396 /**
397  * gst_rtsp_media_is_shared:
398  * @media: a #GstRTSPMedia
399  *
400  * Check if the pipeline for @media can be shared between multiple clients.
401  *
402  * Returns: %TRUE if the media can be shared between clients.
403  */
404 gboolean
405 gst_rtsp_media_is_shared (GstRTSPMedia * media)
406 {
407   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
408
409   return media->shared;
410 }
411
412 /**
413  * gst_rtsp_media_set_reusable:
414  * @media: a #GstRTSPMedia
415  * @reusable: the new value
416  *
417  * Set or unset if the pipeline for @media can be reused after the pipeline has
418  * been unprepared.
419  */
420 void
421 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
422 {
423   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
424
425   media->reusable = reusable;
426 }
427
428 /**
429  * gst_rtsp_media_is_reusable:
430  * @media: a #GstRTSPMedia
431  *
432  * Check if the pipeline for @media can be reused after an unprepare.
433  *
434  * Returns: %TRUE if the media can be reused
435  */
436 gboolean
437 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
438 {
439   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
440
441   return media->reusable;
442 }
443
444 /**
445  * gst_rtsp_media_set_protocols:
446  * @media: a #GstRTSPMedia
447  * @protocols: the new flags
448  *
449  * Configure the allowed lower transport for @media.
450  */
451 void
452 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
453 {
454   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
455
456   media->protocols = protocols;
457 }
458
459 /**
460  * gst_rtsp_media_get_protocols:
461  * @media: a #GstRTSPMedia
462  *
463  * Get the allowed protocols of @media.
464  *
465  * Returns: a #GstRTSPLowerTrans
466  */
467 GstRTSPLowerTrans
468 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
469 {
470   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
471       GST_RTSP_LOWER_TRANS_UNKNOWN);
472
473   return media->protocols;
474 }
475
476 /**
477  * gst_rtsp_media_set_eos_shutdown:
478  * @media: a #GstRTSPMedia
479  * @eos_shutdown: the new value
480  *
481  * Set or unset if an EOS event will be sent to the pipeline for @media before
482  * it is unprepared.
483  */
484 void
485 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
486 {
487   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
488
489   media->eos_shutdown = eos_shutdown;
490 }
491
492 /**
493  * gst_rtsp_media_is_eos_shutdown:
494  * @media: a #GstRTSPMedia
495  *
496  * Check if the pipeline for @media will send an EOS down the pipeline before
497  * unpreparing.
498  *
499  * Returns: %TRUE if the media will send EOS before unpreparing.
500  */
501 gboolean
502 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
503 {
504   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
505
506   return media->eos_shutdown;
507 }
508
509 /**
510  * gst_rtsp_media_set_buffer_size:
511  * @media: a #GstRTSPMedia
512  * @size: the new value
513  *
514  * Set the kernel UDP buffer size.
515  */
516 void
517 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
518 {
519   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
520
521   media->buffer_size = size;
522 }
523
524 /**
525  * gst_rtsp_media_get_buffer_size:
526  * @media: a #GstRTSPMedia
527  *
528  * Get the kernel UDP buffer size.
529  *
530  * Returns: the kernel UDP buffer size.
531  */
532 guint
533 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
534 {
535   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
536
537   return media->buffer_size;
538 }
539
540 /**
541  * gst_rtsp_media_set_multicast_group:
542  * @media: a #GstRTSPMedia
543  * @mc: the new multicast group
544  *
545  * Set the multicast group that media from @media will be streamed to.
546  */
547 void
548 gst_rtsp_media_set_multicast_group (GstRTSPMedia * media, const gchar * mc)
549 {
550   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
551
552   g_mutex_lock (&media->lock);
553   g_free (media->multicast_group);
554   media->multicast_group = g_strdup (mc);
555   g_mutex_unlock (&media->lock);
556 }
557
558 /**
559  * gst_rtsp_media_get_multicast_group:
560  * @media: a #GstRTSPMedia
561  *
562  * Get the multicast group that media from @media will be streamed to.
563  *
564  * Returns: the multicast group
565  */
566 gchar *
567 gst_rtsp_media_get_multicast_group (GstRTSPMedia * media)
568 {
569   gchar *result;
570
571   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
572
573   g_mutex_lock (&media->lock);
574   result = g_strdup (media->multicast_group);
575   g_mutex_unlock (&media->lock);
576
577   return result;
578 }
579
580 /**
581  * gst_rtsp_media_set_auth:
582  * @media: a #GstRTSPMedia
583  * @auth: a #GstRTSPAuth
584  *
585  * configure @auth to be used as the authentication manager of @media.
586  */
587 void
588 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
589 {
590   GstRTSPAuth *old;
591
592   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
593
594   old = media->auth;
595
596   if (old != auth) {
597     if (auth)
598       g_object_ref (auth);
599     media->auth = auth;
600     if (old)
601       g_object_unref (old);
602   }
603 }
604
605 /**
606  * gst_rtsp_media_get_auth:
607  * @media: a #GstRTSPMedia
608  *
609  * Get the #GstRTSPAuth used as the authentication manager of @media.
610  *
611  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
612  * usage.
613  */
614 GstRTSPAuth *
615 gst_rtsp_media_get_auth (GstRTSPMedia * media)
616 {
617   GstRTSPAuth *result;
618
619   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
620
621   if ((result = media->auth))
622     g_object_ref (result);
623
624   return result;
625 }
626
627
628 /**
629  * gst_rtsp_media_n_streams:
630  * @media: a #GstRTSPMedia
631  *
632  * Get the number of streams in this media.
633  *
634  * Returns: The number of streams.
635  */
636 guint
637 gst_rtsp_media_n_streams (GstRTSPMedia * media)
638 {
639   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
640
641   return media->streams->len;
642 }
643
644 /**
645  * gst_rtsp_media_get_stream:
646  * @media: a #GstRTSPMedia
647  * @idx: the stream index
648  *
649  * Retrieve the stream with index @idx from @media.
650  *
651  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
652  * that index did not exist.
653  */
654 GstRTSPMediaStream *
655 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
656 {
657   GstRTSPMediaStream *res;
658
659   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
660
661   if (idx < media->streams->len)
662     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
663   else
664     res = NULL;
665
666   return res;
667 }
668
669 /**
670  * gst_rtsp_media_get_range_string:
671  * @media: a #GstRTSPMedia
672  * @play: for the PLAY request
673  *
674  * Get the current range as a string.
675  *
676  * Returns: The range as a string, g_free() after usage.
677  */
678 gchar *
679 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
680 {
681   gchar *result;
682   GstRTSPTimeRange range;
683
684   /* make copy */
685   range = media->range;
686
687   if (!play && media->active > 0) {
688     range.min.type = GST_RTSP_TIME_NOW;
689     range.min.seconds = -1;
690   }
691
692   result = gst_rtsp_range_to_string (&range);
693
694   return result;
695 }
696
697 /**
698  * gst_rtsp_media_seek:
699  * @media: a #GstRTSPMedia
700  * @range: a #GstRTSPTimeRange
701  *
702  * Seek the pipeline to @range.
703  *
704  * Returns: %TRUE on success.
705  */
706 gboolean
707 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
708 {
709   GstSeekFlags flags;
710   gboolean res;
711   gint64 start, stop;
712   GstSeekType start_type, stop_type;
713
714   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
715   g_return_val_if_fail (range != NULL, FALSE);
716
717   if (!media->seekable) {
718     GST_INFO ("pipeline is not seekable");
719     return TRUE;
720   }
721
722   if (range->unit != GST_RTSP_RANGE_NPT)
723     goto not_supported;
724
725   /* depends on the current playing state of the pipeline. We might need to
726    * queue this until we get EOS. */
727   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
728
729   start_type = stop_type = GST_SEEK_TYPE_NONE;
730
731   switch (range->min.type) {
732     case GST_RTSP_TIME_NOW:
733       start = -1;
734       break;
735     case GST_RTSP_TIME_SECONDS:
736       /* only seek when something changed */
737       if (media->range.min.seconds == range->min.seconds) {
738         start = -1;
739       } else {
740         start = range->min.seconds * GST_SECOND;
741         start_type = GST_SEEK_TYPE_SET;
742       }
743       break;
744     case GST_RTSP_TIME_END:
745     default:
746       goto weird_type;
747   }
748   switch (range->max.type) {
749     case GST_RTSP_TIME_SECONDS:
750       /* only seek when something changed */
751       if (media->range.max.seconds == range->max.seconds) {
752         stop = -1;
753       } else {
754         stop = range->max.seconds * GST_SECOND;
755         stop_type = GST_SEEK_TYPE_SET;
756       }
757       break;
758     case GST_RTSP_TIME_END:
759       stop = -1;
760       stop_type = GST_SEEK_TYPE_SET;
761       break;
762     case GST_RTSP_TIME_NOW:
763     default:
764       goto weird_type;
765   }
766
767   if (start != -1 || stop != -1) {
768     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
769         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
770
771     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
772         flags, start_type, start, stop_type, stop);
773
774     /* and block for the seek to complete */
775     GST_INFO ("done seeking %d", res);
776     gst_element_get_state (media->pipeline, NULL, NULL, -1);
777     GST_INFO ("prerolled again");
778
779     collect_media_stats (media);
780   } else {
781     GST_INFO ("no seek needed");
782     res = TRUE;
783   }
784
785   return res;
786
787   /* ERRORS */
788 not_supported:
789   {
790     GST_WARNING ("seek unit %d not supported", range->unit);
791     return FALSE;
792   }
793 weird_type:
794   {
795     GST_WARNING ("weird range type %d not supported", range->min.type);
796     return FALSE;
797   }
798 }
799
800 /**
801  * gst_rtsp_media_stream_rtp:
802  * @stream: a #GstRTSPMediaStream
803  * @buffer: a #GstBuffer
804  *
805  * Handle an RTP buffer for the stream. This method is usually called when a
806  * message has been received from a client using the TCP transport.
807  *
808  * This function takes ownership of @buffer.
809  *
810  * Returns: a GstFlowReturn.
811  */
812 GstFlowReturn
813 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
814 {
815   GstFlowReturn ret;
816
817   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
818
819   return ret;
820 }
821
822 /**
823  * gst_rtsp_media_stream_rtcp:
824  * @stream: a #GstRTSPMediaStream
825  * @buffer: a #GstBuffer
826  *
827  * Handle an RTCP buffer for the stream. This method is usually called when a
828  * message has been received from a client using the TCP transport.
829  *
830  * This function takes ownership of @buffer.
831  *
832  * Returns: a GstFlowReturn.
833  */
834 GstFlowReturn
835 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
836 {
837   GstFlowReturn ret;
838
839   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
840
841   return ret;
842 }
843
844 /* Allocate the udp ports and sockets */
845 static gboolean
846 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
847 {
848   GstStateChangeReturn ret;
849   GstElement *udpsrc0, *udpsrc1;
850   GstElement *udpsink0, *udpsink1;
851   gint tmp_rtp, tmp_rtcp;
852   guint count;
853   gint rtpport, rtcpport;
854   GSocket *socket;
855   const gchar *host;
856
857   udpsrc0 = NULL;
858   udpsrc1 = NULL;
859   udpsink0 = NULL;
860   udpsink1 = NULL;
861   count = 0;
862
863   /* Start with random port */
864   tmp_rtp = 0;
865
866   if (media->is_ipv6)
867     host = "udp://[::0]";
868   else
869     host = "udp://0.0.0.0";
870
871   /* try to allocate 2 UDP ports, the RTP port should be an even
872    * number and the RTCP port should be the next (uneven) port */
873 again:
874   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
875   if (udpsrc0 == NULL)
876     goto no_udp_protocol;
877   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
878
879   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
880   if (ret == GST_STATE_CHANGE_FAILURE) {
881     if (tmp_rtp != 0) {
882       tmp_rtp += 2;
883       if (++count > 20)
884         goto no_ports;
885
886       gst_element_set_state (udpsrc0, GST_STATE_NULL);
887       gst_object_unref (udpsrc0);
888
889       goto again;
890     }
891     goto no_udp_protocol;
892   }
893
894   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
895
896   /* check if port is even */
897   if ((tmp_rtp & 1) != 0) {
898     /* port not even, close and allocate another */
899     if (++count > 20)
900       goto no_ports;
901
902     gst_element_set_state (udpsrc0, GST_STATE_NULL);
903     gst_object_unref (udpsrc0);
904
905     tmp_rtp++;
906     goto again;
907   }
908
909   /* allocate port+1 for RTCP now */
910   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
911   if (udpsrc1 == NULL)
912     goto no_udp_rtcp_protocol;
913
914   /* set port */
915   tmp_rtcp = tmp_rtp + 1;
916   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
917
918   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
919   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
920   if (ret == GST_STATE_CHANGE_FAILURE) {
921
922     if (++count > 20)
923       goto no_ports;
924
925     gst_element_set_state (udpsrc0, GST_STATE_NULL);
926     gst_object_unref (udpsrc0);
927
928     gst_element_set_state (udpsrc1, GST_STATE_NULL);
929     gst_object_unref (udpsrc1);
930
931     tmp_rtp += 2;
932     goto again;
933   }
934
935   /* all fine, do port check */
936   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
937   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
938
939   /* this should not happen... */
940   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
941     goto port_error;
942
943   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
944   if (!udpsink0)
945     goto no_udp_protocol;
946
947   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
948   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
949   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
950
951   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
952   if (!udpsink1)
953     goto no_udp_protocol;
954
955   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
956           "send-duplicates")) {
957     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
958     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
959   } else {
960     g_warning
961         ("old multiudpsink version found without send-duplicates property");
962   }
963
964   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
965           "buffer-size")) {
966     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
967   } else {
968     GST_WARNING ("multiudpsink version found without buffer-size property");
969   }
970
971   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
972   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
973   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
974   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
975   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
976
977   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
978   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
979   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
980   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
981
982   /* we keep these elements, we configure all in configure_transport when the
983    * server told us to really use the UDP ports. */
984   stream->udpsrc[0] = udpsrc0;
985   stream->udpsrc[1] = udpsrc1;
986   stream->udpsink[0] = udpsink0;
987   stream->udpsink[1] = udpsink1;
988   stream->server_port.min = rtpport;
989   stream->server_port.max = rtcpport;
990
991   return TRUE;
992
993   /* ERRORS */
994 no_udp_protocol:
995   {
996     goto cleanup;
997   }
998 no_ports:
999   {
1000     goto cleanup;
1001   }
1002 no_udp_rtcp_protocol:
1003   {
1004     goto cleanup;
1005   }
1006 port_error:
1007   {
1008     goto cleanup;
1009   }
1010 cleanup:
1011   {
1012     if (udpsrc0) {
1013       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1014       gst_object_unref (udpsrc0);
1015     }
1016     if (udpsrc1) {
1017       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1018       gst_object_unref (udpsrc1);
1019     }
1020     if (udpsink0) {
1021       gst_element_set_state (udpsink0, GST_STATE_NULL);
1022       gst_object_unref (udpsink0);
1023     }
1024     if (udpsink1) {
1025       gst_element_set_state (udpsink1, GST_STATE_NULL);
1026       gst_object_unref (udpsink1);
1027     }
1028     return FALSE;
1029   }
1030 }
1031
1032 /* executed from streaming thread */
1033 static void
1034 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
1035 {
1036   gchar *capsstr;
1037   GstCaps *newcaps, *oldcaps;
1038
1039   newcaps = gst_pad_get_current_caps (pad);
1040
1041   oldcaps = stream->caps;
1042   stream->caps = newcaps;
1043
1044   if (oldcaps)
1045     gst_caps_unref (oldcaps);
1046
1047   capsstr = gst_caps_to_string (newcaps);
1048   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
1049   g_free (capsstr);
1050 }
1051
1052 static void
1053 dump_structure (const GstStructure * s)
1054 {
1055   gchar *sstr;
1056
1057   sstr = gst_structure_to_string (s);
1058   GST_INFO ("structure: %s", sstr);
1059   g_free (sstr);
1060 }
1061
1062 static GstRTSPMediaTrans *
1063 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1064 {
1065   GList *walk;
1066   GstRTSPMediaTrans *result = NULL;
1067   const gchar *tmp;
1068   gchar *dest;
1069   guint port;
1070
1071   if (rtcp_from == NULL)
1072     return NULL;
1073
1074   tmp = g_strrstr (rtcp_from, ":");
1075   if (tmp == NULL)
1076     return NULL;
1077
1078   port = atoi (tmp + 1);
1079   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1080
1081   GST_INFO ("finding %s:%d in %d transports", dest, port,
1082       g_list_length (stream->transports));
1083
1084   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1085     GstRTSPMediaTrans *trans = walk->data;
1086     gint min, max;
1087
1088     min = trans->transport->client_port.min;
1089     max = trans->transport->client_port.max;
1090
1091     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1092             || max == port)) {
1093       result = trans;
1094       break;
1095     }
1096   }
1097   g_free (dest);
1098
1099   return result;
1100 }
1101
1102 static GstRTSPMediaTrans *
1103 check_transport (GObject * source, GstRTSPMediaStream * stream)
1104 {
1105   GstStructure *stats;
1106   GstRTSPMediaTrans *trans;
1107
1108   /* see if we have a stream to match with the origin of the RTCP packet */
1109   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1110   if (trans == NULL) {
1111     g_object_get (source, "stats", &stats, NULL);
1112     if (stats) {
1113       const gchar *rtcp_from;
1114
1115       dump_structure (stats);
1116
1117       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1118       if ((trans = find_transport (stream, rtcp_from))) {
1119         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1120             source);
1121
1122         /* keep ref to the source */
1123         trans->rtpsource = source;
1124
1125         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1126       }
1127       gst_structure_free (stats);
1128     }
1129   }
1130
1131   return trans;
1132 }
1133
1134 static void
1135 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1136 {
1137   GstRTSPMediaTrans *trans;
1138
1139   GST_INFO ("%p: new source %p", stream, source);
1140
1141   trans = check_transport (source, stream);
1142
1143   if (trans)
1144     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1145 }
1146
1147 static void
1148 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1149 {
1150   GST_INFO ("%p: new SDES %p", stream, source);
1151 }
1152
1153 static void
1154 on_ssrc_active (GObject * session, GObject * source,
1155     GstRTSPMediaStream * stream)
1156 {
1157   GstRTSPMediaTrans *trans;
1158
1159   trans = check_transport (source, stream);
1160
1161   if (trans)
1162     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1163
1164   if (trans && trans->keep_alive)
1165     trans->keep_alive (trans->ka_user_data);
1166
1167 #ifdef DUMP_STATS
1168   {
1169     GstStructure *stats;
1170     g_object_get (source, "stats", &stats, NULL);
1171     if (stats) {
1172       dump_structure (stats);
1173       gst_structure_free (stats);
1174     }
1175   }
1176 #endif
1177 }
1178
1179 static void
1180 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1181 {
1182   GST_INFO ("%p: source %p bye", stream, source);
1183 }
1184
1185 static void
1186 on_bye_timeout (GObject * session, GObject * source,
1187     GstRTSPMediaStream * stream)
1188 {
1189   GstRTSPMediaTrans *trans;
1190
1191   GST_INFO ("%p: source %p bye timeout", stream, source);
1192
1193   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1194     trans->rtpsource = NULL;
1195     trans->timeout = TRUE;
1196   }
1197 }
1198
1199 static void
1200 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1201 {
1202   GstRTSPMediaTrans *trans;
1203
1204   GST_INFO ("%p: source %p timeout", stream, source);
1205
1206   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1207     trans->rtpsource = NULL;
1208     trans->timeout = TRUE;
1209   }
1210 }
1211
1212 static GstFlowReturn
1213 handle_new_sample (GstAppSink * sink, gpointer user_data)
1214 {
1215   GList *walk;
1216   GstSample *sample;
1217   GstBuffer *buffer;
1218   GstRTSPMediaStream *stream;
1219
1220   sample = gst_app_sink_pull_sample (sink);
1221   if (!sample)
1222     return GST_FLOW_OK;
1223
1224   stream = (GstRTSPMediaStream *) user_data;
1225   buffer = gst_sample_get_buffer (sample);
1226
1227   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1228     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1229
1230     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1231       if (tr->send_rtp)
1232         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1233     } else {
1234       if (tr->send_rtcp)
1235         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1236     }
1237   }
1238   gst_sample_unref (sample);
1239
1240   return GST_FLOW_OK;
1241 }
1242
1243 static GstAppSinkCallbacks sink_cb = {
1244   NULL,                         /* not interested in EOS */
1245   NULL,                         /* not interested in preroll samples */
1246   handle_new_sample,
1247 };
1248
1249 /* prepare the pipeline objects to handle @stream in @media */
1250 static gboolean
1251 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1252 {
1253   gchar *name;
1254   GstPad *pad, *teepad, *queuepad, *selpad;
1255   GstPadLinkReturn ret;
1256   gint i;
1257
1258   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1259    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1260    * elements */
1261   if (!alloc_udp_ports (media, stream))
1262     return FALSE;
1263
1264   /* add the ports to the pipeline */
1265   for (i = 0; i < 2; i++) {
1266     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1267     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1268   }
1269
1270   /* create elements for the TCP transfer */
1271   for (i = 0; i < 2; i++) {
1272     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1273     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
1274     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1275     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1276     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1277     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appqueue[i]);
1278     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1279     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1280     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1281         &sink_cb, stream, NULL);
1282   }
1283
1284   /* hook up the stream to the RTP session elements. */
1285   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1286   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1287   g_free (name);
1288   name = g_strdup_printf ("send_rtp_src_%u", idx);
1289   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1290   g_free (name);
1291   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1292   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1293   g_free (name);
1294   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1295   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1296   g_free (name);
1297   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1298   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1299   g_free (name);
1300
1301   /* get the session */
1302   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1303       &stream->session);
1304
1305   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1306       stream);
1307   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1308       stream);
1309   g_signal_connect (stream->session, "on-ssrc-active",
1310       (GCallback) on_ssrc_active, stream);
1311   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1312       stream);
1313   g_signal_connect (stream->session, "on-bye-timeout",
1314       (GCallback) on_bye_timeout, stream);
1315   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1316       stream);
1317
1318   /* link the RTP pad to the session manager */
1319   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1320   if (ret != GST_PAD_LINK_OK)
1321     goto link_failed;
1322
1323   /* make tee for RTP and link to stream */
1324   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1325   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1326
1327   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1328   gst_pad_link (stream->send_rtp_src, pad);
1329   gst_object_unref (pad);
1330
1331   /* link RTP sink, we're pretty sure this will work. */
1332   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1333   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1334   gst_pad_link (teepad, pad);
1335   gst_object_unref (pad);
1336   gst_object_unref (teepad);
1337
1338   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
1339   pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
1340   gst_pad_link (teepad, pad);
1341   gst_object_unref (pad);
1342   gst_object_unref (teepad);
1343
1344   queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
1345   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1346   gst_pad_link (queuepad, pad);
1347   gst_object_unref (pad);
1348   gst_object_unref (queuepad);
1349
1350   /* make tee for RTCP */
1351   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1352   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1353
1354   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1355   gst_pad_link (stream->send_rtcp_src, pad);
1356   gst_object_unref (pad);
1357
1358   /* link RTCP elements */
1359   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1360   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1361   gst_pad_link (teepad, pad);
1362   gst_object_unref (pad);
1363   gst_object_unref (teepad);
1364
1365   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
1366   pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
1367   gst_pad_link (teepad, pad);
1368   gst_object_unref (pad);
1369   gst_object_unref (teepad);
1370
1371   queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
1372   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1373   gst_pad_link (queuepad, pad);
1374   gst_object_unref (pad);
1375   gst_object_unref (queuepad);
1376
1377   /* make selector for the RTP receivers */
1378   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
1379   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1380
1381   pad = gst_element_get_static_pad (stream->selector[0], "src");
1382   gst_pad_link (pad, stream->recv_rtp_sink);
1383   gst_object_unref (pad);
1384
1385   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1386   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1387   gst_pad_link (pad, selpad);
1388   gst_object_unref (pad);
1389   gst_object_unref (selpad);
1390
1391   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
1392   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1393   gst_pad_link (pad, selpad);
1394   gst_object_unref (pad);
1395   gst_object_unref (selpad);
1396
1397   /* make selector for the RTCP receivers */
1398   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
1399   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1400
1401   pad = gst_element_get_static_pad (stream->selector[1], "src");
1402   gst_pad_link (pad, stream->recv_rtcp_sink);
1403   gst_object_unref (pad);
1404
1405   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1406   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1407   gst_pad_link (pad, selpad);
1408   gst_object_unref (pad);
1409   gst_object_unref (selpad);
1410
1411   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
1412   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1413   gst_pad_link (pad, selpad);
1414   gst_object_unref (pad);
1415   gst_object_unref (selpad);
1416
1417   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1418    * values */
1419   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1420   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1421   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1422   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1423
1424   /* be notified of caps changes */
1425   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1426       (GCallback) caps_notify, stream);
1427
1428   stream->prepared = TRUE;
1429
1430   return TRUE;
1431
1432   /* ERRORS */
1433 link_failed:
1434   {
1435     GST_WARNING ("failed to link stream %d", idx);
1436     return FALSE;
1437   }
1438 }
1439
1440 static void
1441 unlock_streams (GstRTSPMedia * media)
1442 {
1443   guint i, n_streams;
1444
1445   /* unlock the udp src elements */
1446   n_streams = gst_rtsp_media_n_streams (media);
1447   for (i = 0; i < n_streams; i++) {
1448     GstRTSPMediaStream *stream;
1449
1450     stream = gst_rtsp_media_get_stream (media, i);
1451
1452     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1453     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1454   }
1455 }
1456
1457 static void
1458 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1459 {
1460   g_mutex_lock (&media->lock);
1461   /* never overwrite the error status */
1462   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1463     media->status = status;
1464   GST_DEBUG ("setting new status to %d", status);
1465   g_cond_broadcast (&media->cond);
1466   g_mutex_unlock (&media->lock);
1467 }
1468
1469 static GstRTSPMediaStatus
1470 gst_rtsp_media_get_status (GstRTSPMedia * media)
1471 {
1472   GstRTSPMediaStatus result;
1473   gint64 end_time;
1474
1475   g_mutex_lock (&media->lock);
1476   end_time = g_get_monotonic_time () + 20 * G_TIME_SPAN_SECOND;
1477   /* while we are preparing, wait */
1478   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1479     GST_DEBUG ("waiting for status change");
1480     if (!g_cond_wait_until (&media->cond, &media->lock, end_time)) {
1481       GST_DEBUG ("timeout, assuming error status");
1482       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1483     }
1484   }
1485   /* could be success or error */
1486   result = media->status;
1487   GST_DEBUG ("got status %d", result);
1488   g_mutex_unlock (&media->lock);
1489
1490   return result;
1491 }
1492
1493 static gboolean
1494 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1495 {
1496   GstMessageType type;
1497
1498   type = GST_MESSAGE_TYPE (message);
1499
1500   switch (type) {
1501     case GST_MESSAGE_STATE_CHANGED:
1502       break;
1503     case GST_MESSAGE_BUFFERING:
1504     {
1505       gint percent;
1506
1507       gst_message_parse_buffering (message, &percent);
1508
1509       /* no state management needed for live pipelines */
1510       if (media->is_live)
1511         break;
1512
1513       if (percent == 100) {
1514         /* a 100% message means buffering is done */
1515         media->buffering = FALSE;
1516         /* if the desired state is playing, go back */
1517         if (media->target_state == GST_STATE_PLAYING) {
1518           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1519           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1520         } else {
1521           GST_INFO ("Buffering done");
1522         }
1523       } else {
1524         /* buffering busy */
1525         if (media->buffering == FALSE) {
1526           if (media->target_state == GST_STATE_PLAYING) {
1527             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1528             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1529             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1530           } else {
1531             GST_INFO ("Buffering ...");
1532           }
1533         }
1534         media->buffering = TRUE;
1535       }
1536       break;
1537     }
1538     case GST_MESSAGE_LATENCY:
1539     {
1540       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1541       break;
1542     }
1543     case GST_MESSAGE_ERROR:
1544     {
1545       GError *gerror;
1546       gchar *debug;
1547
1548       gst_message_parse_error (message, &gerror, &debug);
1549       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1550       g_error_free (gerror);
1551       g_free (debug);
1552
1553       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1554       break;
1555     }
1556     case GST_MESSAGE_WARNING:
1557     {
1558       GError *gerror;
1559       gchar *debug;
1560
1561       gst_message_parse_warning (message, &gerror, &debug);
1562       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1563       g_error_free (gerror);
1564       g_free (debug);
1565       break;
1566     }
1567     case GST_MESSAGE_ELEMENT:
1568       break;
1569     case GST_MESSAGE_STREAM_STATUS:
1570       break;
1571     case GST_MESSAGE_ASYNC_DONE:
1572       if (!media->adding) {
1573         /* when we are dynamically adding pads, the addition of the udpsrc will
1574          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1575          * wait for the final ASYNC_DONE after everything prerolled */
1576         GST_INFO ("%p: got ASYNC_DONE", media);
1577         collect_media_stats (media);
1578
1579         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1580       } else {
1581         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1582       }
1583       break;
1584     case GST_MESSAGE_EOS:
1585       GST_INFO ("%p: got EOS", media);
1586       if (media->eos_pending) {
1587         GST_DEBUG ("shutting down after EOS");
1588         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1589         media->eos_pending = FALSE;
1590         g_object_unref (media);
1591       }
1592       break;
1593     default:
1594       GST_INFO ("%p: got message type %s", media,
1595           gst_message_type_get_name (type));
1596       break;
1597   }
1598   return TRUE;
1599 }
1600
1601 static gboolean
1602 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1603 {
1604   GstRTSPMediaClass *klass;
1605   gboolean ret;
1606
1607   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1608
1609   if (klass->handle_message)
1610     ret = klass->handle_message (media, message);
1611   else
1612     ret = FALSE;
1613
1614   return ret;
1615 }
1616
1617 /* called from streaming threads */
1618 static void
1619 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1620 {
1621   GstRTSPMediaStream *stream;
1622   gchar *name;
1623   gint i;
1624
1625   i = media->streams->len + 1;
1626
1627   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1628
1629   stream = g_new0 (GstRTSPMediaStream, 1);
1630   stream->payloader = element;
1631
1632   name = g_strdup_printf ("dynpay%d", i);
1633
1634   media->adding = TRUE;
1635
1636   /* ghost the pad of the payloader to the element */
1637   stream->srcpad = gst_ghost_pad_new (name, pad);
1638   gst_pad_set_active (stream->srcpad, TRUE);
1639   gst_element_add_pad (media->element, stream->srcpad);
1640   g_free (name);
1641
1642   /* add stream now */
1643   g_array_append_val (media->streams, stream);
1644
1645   setup_stream (stream, i, media);
1646
1647   for (i = 0; i < 2; i++) {
1648     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1649     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1650     gst_element_set_state (stream->appqueue[i], GST_STATE_PAUSED);
1651     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1652     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1653     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1654   }
1655   media->adding = FALSE;
1656 }
1657
1658 static void
1659 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1660 {
1661   GST_INFO ("no more pads");
1662   if (media->fakesink) {
1663     gst_object_ref (media->fakesink);
1664     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1665     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1666     gst_object_unref (media->fakesink);
1667     media->fakesink = NULL;
1668     GST_INFO ("removed fakesink");
1669   }
1670 }
1671
1672 /**
1673  * gst_rtsp_media_prepare:
1674  * @media: a #GstRTSPMedia
1675  *
1676  * Prepare @media for streaming. This function will create the pipeline and
1677  * other objects to manage the streaming.
1678  *
1679  * It will preroll the pipeline and collect vital information about the streams
1680  * such as the duration.
1681  *
1682  * Returns: %TRUE on success.
1683  */
1684 gboolean
1685 gst_rtsp_media_prepare (GstRTSPMedia * media)
1686 {
1687   GstStateChangeReturn ret;
1688   GstRTSPMediaStatus status;
1689   guint i, n_streams;
1690   GstRTSPMediaClass *klass;
1691   GstBus *bus;
1692   GList *walk;
1693
1694   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1695     goto was_prepared;
1696
1697   if (!media->reusable && media->reused)
1698     goto is_reused;
1699
1700   media->rtpbin = gst_element_factory_make ("rtpbin", NULL);
1701   if (media->rtpbin == NULL)
1702     goto no_rtpbin;
1703
1704   GST_INFO ("preparing media %p", media);
1705
1706   /* reset some variables */
1707   media->is_live = FALSE;
1708   media->seekable = FALSE;
1709   media->buffering = FALSE;
1710   /* we're preparing now */
1711   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1712
1713   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1714
1715   /* add the pipeline bus to our custom mainloop */
1716   media->source = gst_bus_create_watch (bus);
1717   gst_object_unref (bus);
1718
1719   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1720
1721   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1722   media->id = g_source_attach (media->source, klass->context);
1723
1724   /* add stuff to the bin */
1725   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1726
1727   /* link streams we already have, other streams might appear when we have
1728    * dynamic elements */
1729   n_streams = gst_rtsp_media_n_streams (media);
1730   for (i = 0; i < n_streams; i++) {
1731     GstRTSPMediaStream *stream;
1732
1733     stream = gst_rtsp_media_get_stream (media, i);
1734
1735     setup_stream (stream, i, media);
1736   }
1737
1738   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1739     GstElement *elem = walk->data;
1740
1741     GST_INFO ("adding callbacks for dynamic element %p", elem);
1742
1743     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1744     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1745
1746     /* we add a fakesink here in order to make the state change async. We remove
1747      * the fakesink again in the no-more-pads callback. */
1748     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1749     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1750   }
1751
1752   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1753   /* first go to PAUSED */
1754   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1755   media->target_state = GST_STATE_PAUSED;
1756
1757   switch (ret) {
1758     case GST_STATE_CHANGE_SUCCESS:
1759       GST_INFO ("SUCCESS state change for media %p", media);
1760       media->seekable = TRUE;
1761       break;
1762     case GST_STATE_CHANGE_ASYNC:
1763       GST_INFO ("ASYNC state change for media %p", media);
1764       media->seekable = TRUE;
1765       break;
1766     case GST_STATE_CHANGE_NO_PREROLL:
1767       /* we need to go to PLAYING */
1768       GST_INFO ("NO_PREROLL state change: live media %p", media);
1769       /* FIXME we disable seeking for live streams for now. We should perform a
1770        * seeking query in preroll instead and do a seeking query. */
1771       media->seekable = FALSE;
1772       media->is_live = TRUE;
1773       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1774       if (ret == GST_STATE_CHANGE_FAILURE)
1775         goto state_failed;
1776       break;
1777     case GST_STATE_CHANGE_FAILURE:
1778       goto state_failed;
1779   }
1780
1781   /* now wait for all pads to be prerolled */
1782   status = gst_rtsp_media_get_status (media);
1783   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1784     goto state_failed;
1785
1786   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1787
1788   GST_INFO ("object %p is prerolled", media);
1789
1790   return TRUE;
1791
1792   /* OK */
1793 was_prepared:
1794   {
1795     return TRUE;
1796   }
1797   /* ERRORS */
1798 is_reused:
1799   {
1800     GST_WARNING ("can not reuse media %p", media);
1801     return FALSE;
1802   }
1803 no_rtpbin:
1804   {
1805     GST_WARNING ("no rtpbin element");
1806     g_warning ("failed to create element 'rtpbin', check your installation");
1807     return FALSE;
1808   }
1809 state_failed:
1810   {
1811     GST_WARNING ("failed to preroll pipeline");
1812     unlock_streams (media);
1813     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1814     gst_rtsp_media_unprepare (media);
1815     return FALSE;
1816   }
1817 }
1818
1819 /**
1820  * gst_rtsp_media_unprepare:
1821  * @media: a #GstRTSPMedia
1822  *
1823  * Unprepare @media. After this call, the media should be prepared again before
1824  * it can be used again. If the media is set to be non-reusable, a new instance
1825  * must be created.
1826  *
1827  * Returns: %TRUE on success.
1828  */
1829 gboolean
1830 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1831 {
1832   GstRTSPMediaClass *klass;
1833   gboolean success;
1834
1835   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1836     return TRUE;
1837
1838   GST_INFO ("unprepare media %p", media);
1839   media->target_state = GST_STATE_NULL;
1840
1841   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1842   if (klass->unprepare)
1843     success = klass->unprepare (media);
1844   else
1845     success = TRUE;
1846
1847   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1848   media->reused = TRUE;
1849
1850   /* when the media is not reusable, this will effectively unref the media and
1851    * recreate it */
1852   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1853
1854   return success;
1855 }
1856
1857 static gboolean
1858 default_unprepare (GstRTSPMedia * media)
1859 {
1860   if (media->eos_shutdown) {
1861     GST_DEBUG ("sending EOS for shutdown");
1862     /* ref so that we don't disappear */
1863     g_object_ref (media);
1864     media->eos_pending = TRUE;
1865     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1866     /* we need to go to playing again for the EOS to propagate, normally in this
1867      * state, nothing is receiving data from us anymore so this is ok. */
1868     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1869   } else {
1870     GST_DEBUG ("shutting down");
1871     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1872   }
1873   return TRUE;
1874 }
1875
1876 static void
1877 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1878     gchar * dest, gint min, gint max)
1879 {
1880   GST_INFO ("adding %s:%d-%d", dest, min, max);
1881   g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1882   g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1883 }
1884
1885 static void
1886 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1887     gchar * dest, gint min, gint max)
1888 {
1889   GST_INFO ("removing %s:%d-%d", dest, min, max);
1890   g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1891   g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1892 }
1893
1894 static void
1895 set_multicast_ttl (GstRTSPMedia * media, GstRTSPMediaStream * stream, guint ttl)
1896 {
1897   GST_INFO ("setting ttl-mc %d", ttl);
1898   g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
1899   g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
1900 }
1901
1902 /**
1903  * gst_rtsp_media_set_state:
1904  * @media: a #GstRTSPMedia
1905  * @state: the target state of the media
1906  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1907  *
1908  * Set the state of @media to @state and for the transports in @transports.
1909  *
1910  * Returns: %TRUE on success.
1911  */
1912 gboolean
1913 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1914     GArray * transports)
1915 {
1916   gint i;
1917   gboolean add, remove, do_state;
1918   gint old_active;
1919
1920   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1921   g_return_val_if_fail (transports != NULL, FALSE);
1922
1923   /* NULL and READY are the same */
1924   if (state == GST_STATE_READY)
1925     state = GST_STATE_NULL;
1926
1927   add = remove = FALSE;
1928
1929   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1930       media);
1931
1932   switch (state) {
1933     case GST_STATE_NULL:
1934       /* unlock the streams so that they follow the state changes from now on */
1935       unlock_streams (media);
1936       /* fallthrough */
1937     case GST_STATE_PAUSED:
1938       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1939       if (media->target_state == GST_STATE_PLAYING)
1940         remove = TRUE;
1941       break;
1942     case GST_STATE_PLAYING:
1943       /* we're going to PLAYING, add */
1944       add = TRUE;
1945       break;
1946     default:
1947       break;
1948   }
1949   old_active = media->active;
1950
1951   for (i = 0; i < transports->len; i++) {
1952     GstRTSPMediaTrans *tr;
1953     GstRTSPMediaStream *stream;
1954     GstRTSPTransport *trans;
1955
1956     /* we need a non-NULL entry in the array */
1957     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1958     if (tr == NULL)
1959       continue;
1960
1961     /* we need a transport */
1962     if (!(trans = tr->transport))
1963       continue;
1964
1965     /* get the stream and add the destinations */
1966     stream = gst_rtsp_media_get_stream (media, tr->idx);
1967     switch (trans->lower_transport) {
1968       case GST_RTSP_LOWER_TRANS_UDP:
1969       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1970       {
1971         gchar *dest;
1972         gint min, max;
1973         guint ttl = 0;
1974
1975         dest = trans->destination;
1976         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1977           min = trans->port.min;
1978           max = trans->port.max;
1979           ttl = trans->ttl;
1980         } else {
1981           min = trans->client_port.min;
1982           max = trans->client_port.max;
1983         }
1984
1985         if (add && !tr->active) {
1986           add_udp_destination (media, stream, dest, min, max);
1987           if (ttl > 0) {
1988             set_multicast_ttl (media, stream, ttl);
1989           }
1990           stream->transports = g_list_prepend (stream->transports, tr);
1991           tr->active = TRUE;
1992           media->active++;
1993         } else if (remove && tr->active) {
1994           remove_udp_destination (media, stream, dest, min, max);
1995           stream->transports = g_list_remove (stream->transports, tr);
1996           tr->active = FALSE;
1997           media->active--;
1998         }
1999         break;
2000       }
2001       case GST_RTSP_LOWER_TRANS_TCP:
2002         if (add && !tr->active) {
2003           GST_INFO ("adding TCP %s", trans->destination);
2004           stream->transports = g_list_prepend (stream->transports, tr);
2005           tr->active = TRUE;
2006           media->active++;
2007         } else if (remove && tr->active) {
2008           GST_INFO ("removing TCP %s", trans->destination);
2009           stream->transports = g_list_remove (stream->transports, tr);
2010           tr->active = FALSE;
2011           media->active--;
2012         }
2013         break;
2014       default:
2015         GST_INFO ("Unknown transport %d", trans->lower_transport);
2016         break;
2017     }
2018   }
2019
2020   /* we just added the first media, do the playing state change */
2021   if (old_active == 0 && add)
2022     do_state = TRUE;
2023   /* if we have no more active media, do the downward state changes */
2024   else if (media->active == 0)
2025     do_state = TRUE;
2026   else
2027     do_state = FALSE;
2028
2029   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2030       media, do_state);
2031
2032   if (media->target_state != state) {
2033     if (do_state) {
2034       if (state == GST_STATE_NULL) {
2035         gst_rtsp_media_unprepare (media);
2036       } else {
2037         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2038             media);
2039         media->target_state = state;
2040         gst_element_set_state (media->pipeline, state);
2041       }
2042     }
2043     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2044         NULL);
2045   }
2046
2047   /* remember where we are */
2048   if (state != GST_STATE_NULL && (state == GST_STATE_PAUSED ||
2049           old_active != media->active))
2050     collect_media_stats (media);
2051
2052   return TRUE;
2053 }
2054
2055 /**
2056  * gst_rtsp_media_remove_elements:
2057  * @media: a #GstRTSPMedia
2058  *
2059  * Remove all elements and the pipeline controlled by @media.
2060  */
2061 void
2062 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2063 {
2064   gint i, j;
2065
2066   unlock_streams (media);
2067
2068   for (i = 0; i < media->streams->len; i++) {
2069     GstRTSPMediaStream *stream;
2070
2071     GST_INFO ("Removing elements of stream %d from pipeline", i);
2072
2073     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2074
2075     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2076
2077     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2078
2079     for (j = 0; j < 2; j++) {
2080       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2081       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2082       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2083       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2084       gst_element_set_state (stream->appqueue[j], GST_STATE_NULL);
2085       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2086       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2087
2088       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2089       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2090       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2091       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2092       gst_bin_remove (GST_BIN (media->pipeline), stream->appqueue[j]);
2093       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2094       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2095     }
2096     if (stream->caps)
2097       gst_caps_unref (stream->caps);
2098     stream->caps = NULL;
2099     gst_rtsp_media_stream_free (stream);
2100   }
2101   g_array_remove_range (media->streams, 0, media->streams->len);
2102
2103   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2104   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2105
2106   gst_object_unref (media->pipeline);
2107   media->pipeline = NULL;
2108 }
2109
2110 static void
2111 default_handle_mtu (GstRTSPMedia * media, guint mtu)
2112 {
2113   gint i;
2114
2115   for (i = 0; i < media->streams->len; i++) {
2116     GstRTSPMediaStream *stream;
2117
2118     GST_INFO ("Setting mtu %d for stream %d", mtu, i);
2119
2120     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2121
2122     g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
2123   }
2124 }
2125
2126 /**
2127  * gst_rtsp_media_handle_mtu:
2128  * @media: a #GstRTSPMedia
2129  * @mtu: the mtu
2130  *
2131  * Set maximum size of one RTP packet on the payloaders.
2132  */
2133 void
2134 gst_rtsp_media_handle_mtu (GstRTSPMedia * media, guint mtu)
2135 {
2136   GstRTSPMediaClass *klass;
2137
2138   klass = GST_RTSP_MEDIA_GET_CLASS (media);
2139
2140   if (klass->handle_mtu)
2141     klass->handle_mtu (media, mtu);
2142 }