b7f2d47159619367f2a6c7b26fb7a452d1729e0e
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-funnel.h"
27 #include "rtsp-media.h"
28
29 #define DEFAULT_SHARED         FALSE
30 #define DEFAULT_REUSABLE       FALSE
31 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
32 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
33 #define DEFAULT_EOS_SHUTDOWN   FALSE
34 #define DEFAULT_BUFFER_SIZE    0x800000
35
36 /* define to dump received RTCP packets */
37 #undef DUMP_STATS
38
39 enum
40 {
41   PROP_0,
42   PROP_SHARED,
43   PROP_REUSABLE,
44   PROP_PROTOCOLS,
45   PROP_EOS_SHUTDOWN,
46   PROP_BUFFER_SIZE,
47   PROP_LAST
48 };
49
50 enum
51 {
52   SIGNAL_PREPARED,
53   SIGNAL_UNPREPARED,
54   SIGNAL_NEW_STATE,
55   SIGNAL_LAST
56 };
57
58 GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
59 #define GST_CAT_DEFAULT rtsp_media_debug
60
61 static GQuark ssrc_stream_map_key;
62
63 static void gst_rtsp_media_get_property (GObject * object, guint propid,
64     GValue * value, GParamSpec * pspec);
65 static void gst_rtsp_media_set_property (GObject * object, guint propid,
66     const GValue * value, GParamSpec * pspec);
67 static void gst_rtsp_media_finalize (GObject * obj);
68
69 static gpointer do_loop (GstRTSPMediaClass * klass);
70 static gboolean default_handle_message (GstRTSPMedia * media,
71     GstMessage * message);
72 static gboolean default_unprepare (GstRTSPMedia * media);
73 static void unlock_streams (GstRTSPMedia * media);
74
75 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
76
77 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
78
79 static void
80 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
81 {
82   GObjectClass *gobject_class;
83   GError *error = NULL;
84
85   gobject_class = G_OBJECT_CLASS (klass);
86
87   gobject_class->get_property = gst_rtsp_media_get_property;
88   gobject_class->set_property = gst_rtsp_media_set_property;
89   gobject_class->finalize = gst_rtsp_media_finalize;
90
91   g_object_class_install_property (gobject_class, PROP_SHARED,
92       g_param_spec_boolean ("shared", "Shared",
93           "If this media pipeline can be shared", DEFAULT_SHARED,
94           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
95
96   g_object_class_install_property (gobject_class, PROP_REUSABLE,
97       g_param_spec_boolean ("reusable", "Reusable",
98           "If this media pipeline can be reused after an unprepare",
99           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100
101   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
102       g_param_spec_flags ("protocols", "Protocols",
103           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
104           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105
106   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
107       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
108           "Send an EOS event to the pipeline before unpreparing",
109           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
110
111   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
112       g_param_spec_uint ("buffer-size", "Buffer Size",
113           "The kernel UDP buffer size to use", 0, G_MAXUINT,
114           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115
116   gst_rtsp_media_signals[SIGNAL_PREPARED] =
117       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
118       G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
119       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
120
121   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
122       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
123       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
124       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
125
126   gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
127       g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
128       G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
129       g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
130
131   klass->context = g_main_context_new ();
132   klass->loop = g_main_loop_new (klass->context, TRUE);
133
134   GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
135
136   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
137   if (error != NULL) {
138     g_critical ("could not start bus thread: %s", error->message);
139   }
140   klass->handle_message = default_handle_message;
141   klass->unprepare = default_unprepare;
142
143   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
144
145   gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL);
146
147 }
148
149 static void
150 gst_rtsp_media_init (GstRTSPMedia * media)
151 {
152   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
153   media->lock = g_mutex_new ();
154   media->cond = g_cond_new ();
155
156   media->shared = DEFAULT_SHARED;
157   media->reusable = DEFAULT_REUSABLE;
158   media->protocols = DEFAULT_PROTOCOLS;
159   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
160   media->buffer_size = DEFAULT_BUFFER_SIZE;
161 }
162
163 /* FIXME. this should be done in multiudpsink */
164 typedef struct
165 {
166   gint count;
167   gchar *dest;
168   gint min, max;
169 } RTSPDestination;
170
171 static gint
172 dest_compare (RTSPDestination * a, RTSPDestination * b)
173 {
174   if ((a->min == b->min) && (a->max == b->max)
175       && (strcmp (a->dest, b->dest) == 0))
176     return 0;
177
178   return 1;
179 }
180
181 static RTSPDestination *
182 create_destination (const gchar * dest, gint min, gint max)
183 {
184   RTSPDestination *res;
185
186   res = g_slice_new (RTSPDestination);
187   res->count = 1;
188   res->dest = g_strdup (dest);
189   res->min = min;
190   res->max = max;
191
192   return res;
193 }
194
195 static void
196 free_destination (RTSPDestination * dest)
197 {
198   g_free (dest->dest);
199   g_slice_free (RTSPDestination, dest);
200 }
201
202 void
203 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
204 {
205   if (trans->transport) {
206     gst_rtsp_transport_free (trans->transport);
207     trans->transport = NULL;
208   }
209   if (trans->rtpsource) {
210     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
211     trans->rtpsource = NULL;
212   }
213 }
214
215 static void
216 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
217 {
218   if (stream->session)
219     g_object_unref (stream->session);
220
221   if (stream->caps)
222     gst_caps_unref (stream->caps);
223
224   if (stream->send_rtp_sink)
225     gst_object_unref (stream->send_rtp_sink);
226   if (stream->send_rtp_src)
227     gst_object_unref (stream->send_rtp_src);
228   if (stream->send_rtcp_src)
229     gst_object_unref (stream->send_rtcp_src);
230   if (stream->recv_rtcp_sink)
231     gst_object_unref (stream->recv_rtcp_sink);
232   if (stream->recv_rtp_sink)
233     gst_object_unref (stream->recv_rtp_sink);
234
235   g_list_free (stream->transports);
236
237   g_list_foreach (stream->destinations, (GFunc) free_destination, NULL);
238   g_list_free (stream->destinations);
239
240   g_free (stream);
241 }
242
243 static void
244 gst_rtsp_media_finalize (GObject * obj)
245 {
246   GstRTSPMedia *media;
247   guint i;
248
249   media = GST_RTSP_MEDIA (obj);
250
251   GST_INFO ("finalize media %p", media);
252
253   if (media->pipeline) {
254     unlock_streams (media);
255     gst_element_set_state (media->pipeline, GST_STATE_NULL);
256     gst_object_unref (media->pipeline);
257   }
258
259   for (i = 0; i < media->streams->len; i++) {
260     GstRTSPMediaStream *stream;
261
262     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
263
264     gst_rtsp_media_stream_free (stream);
265   }
266   g_array_free (media->streams, TRUE);
267
268   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
269   g_list_free (media->dynamic);
270
271   if (media->source) {
272     g_source_destroy (media->source);
273     g_source_unref (media->source);
274   }
275   g_mutex_free (media->lock);
276   g_cond_free (media->cond);
277
278   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
279 }
280
281 static void
282 gst_rtsp_media_get_property (GObject * object, guint propid,
283     GValue * value, GParamSpec * pspec)
284 {
285   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
286
287   switch (propid) {
288     case PROP_SHARED:
289       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
290       break;
291     case PROP_REUSABLE:
292       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
293       break;
294     case PROP_PROTOCOLS:
295       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
296       break;
297     case PROP_EOS_SHUTDOWN:
298       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
299       break;
300     case PROP_BUFFER_SIZE:
301       g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
302       break;
303     default:
304       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
305   }
306 }
307
308 static void
309 gst_rtsp_media_set_property (GObject * object, guint propid,
310     const GValue * value, GParamSpec * pspec)
311 {
312   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
313
314   switch (propid) {
315     case PROP_SHARED:
316       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
317       break;
318     case PROP_REUSABLE:
319       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
320       break;
321     case PROP_PROTOCOLS:
322       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
323       break;
324     case PROP_EOS_SHUTDOWN:
325       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
326       break;
327     case PROP_BUFFER_SIZE:
328       gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
329       break;
330     default:
331       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
332   }
333 }
334
335 static gpointer
336 do_loop (GstRTSPMediaClass * klass)
337 {
338   GST_INFO ("enter mainloop");
339   g_main_loop_run (klass->loop);
340   GST_INFO ("exit mainloop");
341
342   return NULL;
343 }
344
345 static void
346 collect_media_stats (GstRTSPMedia * media)
347 {
348   GstFormat format;
349   gint64 position, duration;
350
351   media->range.unit = GST_RTSP_RANGE_NPT;
352
353   if (media->is_live) {
354     media->range.min.type = GST_RTSP_TIME_NOW;
355     media->range.min.seconds = -1;
356     media->range.max.type = GST_RTSP_TIME_END;
357     media->range.max.seconds = -1;
358   } else {
359     /* get the position */
360     format = GST_FORMAT_TIME;
361     if (!gst_element_query_position (media->pipeline, &format, &position)) {
362       GST_INFO ("position query failed");
363       position = 0;
364     }
365
366     /* get the duration */
367     format = GST_FORMAT_TIME;
368     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
369       GST_INFO ("duration query failed");
370       duration = -1;
371     }
372
373     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
374         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
375
376     if (position == -1) {
377       media->range.min.type = GST_RTSP_TIME_NOW;
378       media->range.min.seconds = -1;
379     } else {
380       media->range.min.type = GST_RTSP_TIME_SECONDS;
381       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
382     }
383     if (duration == -1) {
384       media->range.max.type = GST_RTSP_TIME_END;
385       media->range.max.seconds = -1;
386     } else {
387       media->range.max.type = GST_RTSP_TIME_SECONDS;
388       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
389     }
390   }
391 }
392
393 /**
394  * gst_rtsp_media_new:
395  *
396  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
397  * element to produde RTP data for one or more related (audio/video/..) 
398  * streams.
399  *
400  * Returns: a new #GstRTSPMedia object.
401  */
402 GstRTSPMedia *
403 gst_rtsp_media_new (void)
404 {
405   GstRTSPMedia *result;
406
407   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
408
409   return result;
410 }
411
412 /**
413  * gst_rtsp_media_set_shared:
414  * @media: a #GstRTSPMedia
415  * @shared: the new value
416  *
417  * Set or unset if the pipeline for @media can be shared will multiple clients.
418  * When @shared is %TRUE, client requests for this media will share the media
419  * pipeline.
420  */
421 void
422 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
423 {
424   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
425
426   media->shared = shared;
427 }
428
429 /**
430  * gst_rtsp_media_is_shared:
431  * @media: a #GstRTSPMedia
432  *
433  * Check if the pipeline for @media can be shared between multiple clients.
434  *
435  * Returns: %TRUE if the media can be shared between clients.
436  */
437 gboolean
438 gst_rtsp_media_is_shared (GstRTSPMedia * media)
439 {
440   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
441
442   return media->shared;
443 }
444
445 /**
446  * gst_rtsp_media_set_reusable:
447  * @media: a #GstRTSPMedia
448  * @reusable: the new value
449  *
450  * Set or unset if the pipeline for @media can be reused after the pipeline has
451  * been unprepared.
452  */
453 void
454 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
455 {
456   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
457
458   media->reusable = reusable;
459 }
460
461 /**
462  * gst_rtsp_media_is_reusable:
463  * @media: a #GstRTSPMedia
464  *
465  * Check if the pipeline for @media can be reused after an unprepare.
466  *
467  * Returns: %TRUE if the media can be reused
468  */
469 gboolean
470 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
471 {
472   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
473
474   return media->reusable;
475 }
476
477 /**
478  * gst_rtsp_media_set_protocols:
479  * @media: a #GstRTSPMedia
480  * @protocols: the new flags
481  *
482  * Configure the allowed lower transport for @media.
483  */
484 void
485 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
486 {
487   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
488
489   media->protocols = protocols;
490 }
491
492 /**
493  * gst_rtsp_media_get_protocols:
494  * @media: a #GstRTSPMedia
495  *
496  * Get the allowed protocols of @media.
497  *
498  * Returns: a #GstRTSPLowerTrans
499  */
500 GstRTSPLowerTrans
501 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
502 {
503   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
504       GST_RTSP_LOWER_TRANS_UNKNOWN);
505
506   return media->protocols;
507 }
508
509 /**
510  * gst_rtsp_media_set_eos_shutdown:
511  * @media: a #GstRTSPMedia
512  * @eos_shutdown: the new value
513  *
514  * Set or unset if an EOS event will be sent to the pipeline for @media before
515  * it is unprepared.
516  */
517 void
518 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
519 {
520   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
521
522   media->eos_shutdown = eos_shutdown;
523 }
524
525 /**
526  * gst_rtsp_media_is_eos_shutdown:
527  * @media: a #GstRTSPMedia
528  *
529  * Check if the pipeline for @media will send an EOS down the pipeline before
530  * unpreparing.
531  *
532  * Returns: %TRUE if the media will send EOS before unpreparing.
533  */
534 gboolean
535 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
536 {
537   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
538
539   return media->eos_shutdown;
540 }
541
542 /**
543  * gst_rtsp_media_set_buffer_size:
544  * @media: a #GstRTSPMedia
545  * @size: the new value
546  *
547  * Set the kernel UDP buffer size.
548  */
549 void
550 gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
551 {
552   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
553
554   media->buffer_size = size;
555 }
556
557 /**
558  * gst_rtsp_media_get_buffer_size:
559  * @media: a #GstRTSPMedia
560  *
561  * Get the kernel UDP buffer size.
562  *
563  * Returns: the kernel UDP buffer size.
564  */
565 guint
566 gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
567 {
568   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
569
570   return media->buffer_size;
571 }
572
573 /**
574  * gst_rtsp_media_set_auth:
575  * @media: a #GstRTSPMedia
576  * @auth: a #GstRTSPAuth
577  *
578  * configure @auth to be used as the authentication manager of @media.
579  */
580 void
581 gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
582 {
583   GstRTSPAuth *old;
584
585   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
586
587   old = media->auth;
588
589   if (old != auth) {
590     if (auth)
591       g_object_ref (auth);
592     media->auth = auth;
593     if (old)
594       g_object_unref (old);
595   }
596 }
597
598 /**
599  * gst_rtsp_media_get_auth:
600  * @media: a #GstRTSPMedia
601  *
602  * Get the #GstRTSPAuth used as the authentication manager of @media.
603  *
604  * Returns: the #GstRTSPAuth of @media. g_object_unref() after
605  * usage.
606  */
607 GstRTSPAuth *
608 gst_rtsp_media_get_auth (GstRTSPMedia * media)
609 {
610   GstRTSPAuth *result;
611
612   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
613
614   if ((result = media->auth))
615     g_object_ref (result);
616
617   return result;
618 }
619
620
621 /**
622  * gst_rtsp_media_n_streams:
623  * @media: a #GstRTSPMedia
624  *
625  * Get the number of streams in this media.
626  *
627  * Returns: The number of streams.
628  */
629 guint
630 gst_rtsp_media_n_streams (GstRTSPMedia * media)
631 {
632   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
633
634   return media->streams->len;
635 }
636
637 /**
638  * gst_rtsp_media_get_stream:
639  * @media: a #GstRTSPMedia
640  * @idx: the stream index
641  *
642  * Retrieve the stream with index @idx from @media.
643  *
644  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
645  * that index did not exist.
646  */
647 GstRTSPMediaStream *
648 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
649 {
650   GstRTSPMediaStream *res;
651
652   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
653
654   if (idx < media->streams->len)
655     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
656   else
657     res = NULL;
658
659   return res;
660 }
661
662 /**
663  * gst_rtsp_media_get_range_string:
664  * @media: a #GstRTSPMedia
665  * @play: for the PLAY request
666  *
667  * Get the current range as a string.
668  *
669  * Returns: The range as a string, g_free() after usage.
670  */
671 gchar *
672 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
673 {
674   gchar *result;
675   GstRTSPTimeRange range;
676
677   /* make copy */
678   range = media->range;
679
680   if (!play && media->active > 0) {
681     range.min.type = GST_RTSP_TIME_NOW;
682     range.min.seconds = -1;
683   }
684
685   result = gst_rtsp_range_to_string (&range);
686
687   return result;
688 }
689
690 /**
691  * gst_rtsp_media_seek:
692  * @media: a #GstRTSPMedia
693  * @range: a #GstRTSPTimeRange
694  *
695  * Seek the pipeline to @range.
696  *
697  * Returns: %TRUE on success.
698  */
699 gboolean
700 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
701 {
702   GstSeekFlags flags;
703   gboolean res;
704   gint64 start, stop;
705   GstSeekType start_type, stop_type;
706
707   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
708   g_return_val_if_fail (range != NULL, FALSE);
709
710   if (range->unit != GST_RTSP_RANGE_NPT)
711     goto not_supported;
712
713   /* depends on the current playing state of the pipeline. We might need to
714    * queue this until we get EOS. */
715   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
716
717   start_type = stop_type = GST_SEEK_TYPE_NONE;
718
719   switch (range->min.type) {
720     case GST_RTSP_TIME_NOW:
721       start = -1;
722       break;
723     case GST_RTSP_TIME_SECONDS:
724       /* only seek when something changed */
725       if (media->range.min.seconds == range->min.seconds) {
726         start = -1;
727       } else {
728         start = range->min.seconds * GST_SECOND;
729         start_type = GST_SEEK_TYPE_SET;
730       }
731       break;
732     case GST_RTSP_TIME_END:
733     default:
734       goto weird_type;
735   }
736   switch (range->max.type) {
737     case GST_RTSP_TIME_SECONDS:
738       /* only seek when something changed */
739       if (media->range.max.seconds == range->max.seconds) {
740         stop = -1;
741       } else {
742         stop = range->max.seconds * GST_SECOND;
743         stop_type = GST_SEEK_TYPE_SET;
744       }
745       break;
746     case GST_RTSP_TIME_END:
747       stop = -1;
748       stop_type = GST_SEEK_TYPE_SET;
749       break;
750     case GST_RTSP_TIME_NOW:
751     default:
752       goto weird_type;
753   }
754
755   if (start != -1 || stop != -1) {
756     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
757         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
758
759     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
760         flags, start_type, start, stop_type, stop);
761
762     /* and block for the seek to complete */
763     GST_INFO ("done seeking %d", res);
764     gst_element_get_state (media->pipeline, NULL, NULL, -1);
765     GST_INFO ("prerolled again");
766
767     collect_media_stats (media);
768   } else {
769     GST_INFO ("no seek needed");
770     res = TRUE;
771   }
772
773   return res;
774
775   /* ERRORS */
776 not_supported:
777   {
778     GST_WARNING ("seek unit %d not supported", range->unit);
779     return FALSE;
780   }
781 weird_type:
782   {
783     GST_WARNING ("weird range type %d not supported", range->min.type);
784     return FALSE;
785   }
786 }
787
788 /**
789  * gst_rtsp_media_stream_rtp:
790  * @stream: a #GstRTSPMediaStream
791  * @buffer: a #GstBuffer
792  *
793  * Handle an RTP buffer for the stream. This method is usually called when a
794  * message has been received from a client using the TCP transport.
795  *
796  * This function takes ownership of @buffer.
797  *
798  * Returns: a GstFlowReturn.
799  */
800 GstFlowReturn
801 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
802 {
803   GstFlowReturn ret;
804
805   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
806
807   return ret;
808 }
809
810 /**
811  * gst_rtsp_media_stream_rtcp:
812  * @stream: a #GstRTSPMediaStream
813  * @buffer: a #GstBuffer
814  *
815  * Handle an RTCP buffer for the stream. This method is usually called when a
816  * message has been received from a client using the TCP transport.
817  *
818  * This function takes ownership of @buffer.
819  *
820  * Returns: a GstFlowReturn.
821  */
822 GstFlowReturn
823 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
824 {
825   GstFlowReturn ret;
826
827   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
828
829   return ret;
830 }
831
832 /* Allocate the udp ports and sockets */
833 static gboolean
834 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
835 {
836   GstStateChangeReturn ret;
837   GstElement *udpsrc0, *udpsrc1;
838   GstElement *udpsink0, *udpsink1;
839   gint tmp_rtp, tmp_rtcp;
840   guint count;
841   gint rtpport, rtcpport, sockfd;
842   const gchar *host;
843
844   udpsrc0 = NULL;
845   udpsrc1 = NULL;
846   udpsink0 = NULL;
847   udpsink1 = NULL;
848   count = 0;
849
850   /* Start with random port */
851   tmp_rtp = 0;
852
853   if (media->is_ipv6)
854     host = "udp://[::0]";
855   else
856     host = "udp://0.0.0.0";
857
858   /* try to allocate 2 UDP ports, the RTP port should be an even
859    * number and the RTCP port should be the next (uneven) port */
860 again:
861   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
862   if (udpsrc0 == NULL)
863     goto no_udp_protocol;
864   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
865
866   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
867   if (ret == GST_STATE_CHANGE_FAILURE) {
868     if (tmp_rtp != 0) {
869       tmp_rtp += 2;
870       if (++count > 20)
871         goto no_ports;
872
873       gst_element_set_state (udpsrc0, GST_STATE_NULL);
874       gst_object_unref (udpsrc0);
875
876       goto again;
877     }
878     goto no_udp_protocol;
879   }
880
881   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
882
883   /* check if port is even */
884   if ((tmp_rtp & 1) != 0) {
885     /* port not even, close and allocate another */
886     if (++count > 20)
887       goto no_ports;
888
889     gst_element_set_state (udpsrc0, GST_STATE_NULL);
890     gst_object_unref (udpsrc0);
891
892     tmp_rtp++;
893     goto again;
894   }
895
896   /* allocate port+1 for RTCP now */
897   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
898   if (udpsrc1 == NULL)
899     goto no_udp_rtcp_protocol;
900
901   /* set port */
902   tmp_rtcp = tmp_rtp + 1;
903   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
904
905   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
906   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
907   if (ret == GST_STATE_CHANGE_FAILURE) {
908
909     if (++count > 20)
910       goto no_ports;
911
912     gst_element_set_state (udpsrc0, GST_STATE_NULL);
913     gst_object_unref (udpsrc0);
914
915     gst_element_set_state (udpsrc1, GST_STATE_NULL);
916     gst_object_unref (udpsrc1);
917
918     tmp_rtp += 2;
919     goto again;
920   }
921
922   /* all fine, do port check */
923   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
924   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
925
926   /* this should not happen... */
927   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
928     goto port_error;
929
930   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
931   if (!udpsink0)
932     goto no_udp_protocol;
933
934   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
935   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
936   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
937
938   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
939   if (!udpsink1)
940     goto no_udp_protocol;
941
942   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
943           "send-duplicates")) {
944     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
945     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
946     stream->filter_duplicates = FALSE;
947   } else {
948     GST_WARNING ("multiudpsink version found without send-duplicates property");
949     stream->filter_duplicates = TRUE;
950   }
951
952   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
953           "buffer-size")) {
954     g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
955   } else {
956     GST_WARNING ("multiudpsink version found without buffer-size property");
957   }
958
959   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
960   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
961   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
962   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
963   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
964
965   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
966   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
967   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
968   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
969
970   /* we keep these elements, we configure all in configure_transport when the
971    * server told us to really use the UDP ports. */
972   stream->udpsrc[0] = udpsrc0;
973   stream->udpsrc[1] = udpsrc1;
974   stream->udpsink[0] = udpsink0;
975   stream->udpsink[1] = udpsink1;
976   stream->server_port.min = rtpport;
977   stream->server_port.max = rtcpport;
978
979   return TRUE;
980
981   /* ERRORS */
982 no_udp_protocol:
983   {
984     goto cleanup;
985   }
986 no_ports:
987   {
988     goto cleanup;
989   }
990 no_udp_rtcp_protocol:
991   {
992     goto cleanup;
993   }
994 port_error:
995   {
996     goto cleanup;
997   }
998 cleanup:
999   {
1000     if (udpsrc0) {
1001       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1002       gst_object_unref (udpsrc0);
1003     }
1004     if (udpsrc1) {
1005       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1006       gst_object_unref (udpsrc1);
1007     }
1008     if (udpsink0) {
1009       gst_element_set_state (udpsink0, GST_STATE_NULL);
1010       gst_object_unref (udpsink0);
1011     }
1012     if (udpsink1) {
1013       gst_element_set_state (udpsink1, GST_STATE_NULL);
1014       gst_object_unref (udpsink1);
1015     }
1016     return FALSE;
1017   }
1018 }
1019
1020 /* executed from streaming thread */
1021 static void
1022 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
1023 {
1024   gchar *capsstr;
1025   GstCaps *newcaps, *oldcaps;
1026
1027   if ((newcaps = GST_PAD_CAPS (pad)))
1028     gst_caps_ref (newcaps);
1029
1030   oldcaps = stream->caps;
1031   stream->caps = newcaps;
1032
1033   if (oldcaps)
1034     gst_caps_unref (oldcaps);
1035
1036   capsstr = gst_caps_to_string (newcaps);
1037   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
1038   g_free (capsstr);
1039 }
1040
1041 static void
1042 dump_structure (const GstStructure * s)
1043 {
1044   gchar *sstr;
1045
1046   sstr = gst_structure_to_string (s);
1047   GST_INFO ("structure: %s", sstr);
1048   g_free (sstr);
1049 }
1050
1051 static GstRTSPMediaTrans *
1052 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
1053 {
1054   GList *walk;
1055   GstRTSPMediaTrans *result = NULL;
1056   const gchar *tmp;
1057   gchar *dest;
1058   guint port;
1059
1060   if (rtcp_from == NULL)
1061     return NULL;
1062
1063   tmp = g_strrstr (rtcp_from, ":");
1064   if (tmp == NULL)
1065     return NULL;
1066
1067   port = atoi (tmp + 1);
1068   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1069
1070   GST_INFO ("finding %s:%d", dest, port);
1071
1072   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1073     GstRTSPMediaTrans *trans = walk->data;
1074     gint min, max;
1075
1076     min = trans->transport->client_port.min;
1077     max = trans->transport->client_port.max;
1078
1079     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
1080             || max == port)) {
1081       result = trans;
1082       break;
1083     }
1084   }
1085   g_free (dest);
1086
1087   return result;
1088 }
1089
1090 static void
1091 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1092 {
1093   GstStructure *stats;
1094   GstRTSPMediaTrans *trans;
1095
1096   GST_INFO ("%p: new source %p", stream, source);
1097
1098   /* see if we have a stream to match with the origin of the RTCP packet */
1099   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1100   if (trans == NULL) {
1101     g_object_get (source, "stats", &stats, NULL);
1102     if (stats) {
1103       const gchar *rtcp_from;
1104
1105       dump_structure (stats);
1106
1107       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1108       if ((trans = find_transport (stream, rtcp_from))) {
1109         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1110             source);
1111
1112         /* keep ref to the source */
1113         trans->rtpsource = source;
1114
1115         g_object_set_qdata (source, ssrc_stream_map_key, trans);
1116       }
1117       gst_structure_free (stats);
1118     }
1119   } else {
1120     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1121   }
1122 }
1123
1124 static void
1125 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1126 {
1127   GST_INFO ("%p: new SDES %p", stream, source);
1128 }
1129
1130 static void
1131 on_ssrc_active (GObject * session, GObject * source,
1132     GstRTSPMediaStream * stream)
1133 {
1134   GstRTSPMediaTrans *trans;
1135
1136   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1137
1138   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1139
1140   if (trans && trans->keep_alive)
1141     trans->keep_alive (trans->ka_user_data);
1142
1143 #ifdef DUMP_STATS
1144   {
1145     GstStructure *stats;
1146     g_object_get (source, "stats", &stats, NULL);
1147     if (stats) {
1148       dump_structure (stats);
1149       gst_structure_free (stats);
1150     }
1151   }
1152 #endif
1153 }
1154
1155 static void
1156 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1157 {
1158   GST_INFO ("%p: source %p bye", stream, source);
1159 }
1160
1161 static void
1162 on_bye_timeout (GObject * session, GObject * source,
1163     GstRTSPMediaStream * stream)
1164 {
1165   GstRTSPMediaTrans *trans;
1166
1167   GST_INFO ("%p: source %p bye timeout", stream, source);
1168
1169   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1170     trans->rtpsource = NULL;
1171     trans->timeout = TRUE;
1172   }
1173 }
1174
1175 static void
1176 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1177 {
1178   GstRTSPMediaTrans *trans;
1179
1180   GST_INFO ("%p: source %p timeout", stream, source);
1181
1182   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1183     trans->rtpsource = NULL;
1184     trans->timeout = TRUE;
1185   }
1186 }
1187
1188 static GstFlowReturn
1189 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1190 {
1191   GList *walk;
1192   GstBuffer *buffer;
1193   GstRTSPMediaStream *stream;
1194
1195   buffer = gst_app_sink_pull_buffer (sink);
1196   if (!buffer)
1197     return GST_FLOW_OK;
1198
1199   stream = (GstRTSPMediaStream *) user_data;
1200
1201   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1202     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1203
1204     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1205       if (tr->send_rtp)
1206         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1207     } else {
1208       if (tr->send_rtcp)
1209         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1210     }
1211   }
1212   gst_buffer_unref (buffer);
1213
1214   return GST_FLOW_OK;
1215 }
1216
1217 static GstFlowReturn
1218 handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
1219 {
1220   GList *walk;
1221   GstBufferList *blist;
1222   GstRTSPMediaStream *stream;
1223
1224   blist = gst_app_sink_pull_buffer_list (sink);
1225   if (!blist)
1226     return GST_FLOW_OK;
1227
1228   stream = (GstRTSPMediaStream *) user_data;
1229
1230   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1231     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1232
1233     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1234       if (tr->send_rtp_list)
1235         tr->send_rtp_list (blist, tr->transport->interleaved.min,
1236             tr->user_data);
1237     } else {
1238       if (tr->send_rtcp_list)
1239         tr->send_rtcp_list (blist, tr->transport->interleaved.max,
1240             tr->user_data);
1241     }
1242   }
1243   gst_buffer_list_unref (blist);
1244
1245   return GST_FLOW_OK;
1246 }
1247
1248 static GstAppSinkCallbacks sink_cb = {
1249   NULL,                         /* not interested in EOS */
1250   NULL,                         /* not interested in preroll buffers */
1251   handle_new_buffer,
1252   handle_new_buffer_list
1253 };
1254
1255 /* prepare the pipeline objects to handle @stream in @media */
1256 static gboolean
1257 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1258 {
1259   gchar *name;
1260   GstPad *pad, *teepad, *selpad;
1261   GstPadLinkReturn ret;
1262   gint i;
1263
1264   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1265    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1266    * elements */
1267   if (!alloc_udp_ports (media, stream))
1268     return FALSE;
1269
1270   /* add the ports to the pipeline */
1271   for (i = 0; i < 2; i++) {
1272     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1273     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1274   }
1275
1276   /* create elements for the TCP transfer */
1277   for (i = 0; i < 2; i++) {
1278     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1279     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1280     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1281     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1282     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1283     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1284     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1285     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1286         &sink_cb, stream, NULL);
1287   }
1288
1289   /* hook up the stream to the RTP session elements. */
1290   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1291   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1292   g_free (name);
1293   name = g_strdup_printf ("send_rtp_src_%d", idx);
1294   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1295   g_free (name);
1296   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1297   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1298   g_free (name);
1299   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1300   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1301   g_free (name);
1302   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1303   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1304   g_free (name);
1305
1306   /* get the session */
1307   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1308       &stream->session);
1309
1310   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1311       stream);
1312   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1313       stream);
1314   g_signal_connect (stream->session, "on-ssrc-active",
1315       (GCallback) on_ssrc_active, stream);
1316   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1317       stream);
1318   g_signal_connect (stream->session, "on-bye-timeout",
1319       (GCallback) on_bye_timeout, stream);
1320   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1321       stream);
1322
1323   /* link the RTP pad to the session manager */
1324   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1325   if (ret != GST_PAD_LINK_OK)
1326     goto link_failed;
1327
1328   /* make tee for RTP and link to stream */
1329   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1330   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1331
1332   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1333   gst_pad_link (stream->send_rtp_src, pad);
1334   gst_object_unref (pad);
1335
1336   /* link RTP sink, we're pretty sure this will work. */
1337   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1338   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1339   gst_pad_link (teepad, pad);
1340   gst_object_unref (pad);
1341   gst_object_unref (teepad);
1342
1343   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1344   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1345   gst_pad_link (teepad, pad);
1346   gst_object_unref (pad);
1347   gst_object_unref (teepad);
1348
1349   /* make tee for RTCP */
1350   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1351   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1352
1353   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1354   gst_pad_link (stream->send_rtcp_src, pad);
1355   gst_object_unref (pad);
1356
1357   /* link RTCP elements */
1358   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1359   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1360   gst_pad_link (teepad, pad);
1361   gst_object_unref (pad);
1362   gst_object_unref (teepad);
1363
1364   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1365   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1366   gst_pad_link (teepad, pad);
1367   gst_object_unref (pad);
1368   gst_object_unref (teepad);
1369
1370   /* make selector for the RTP receivers */
1371   stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
1372   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1373
1374   pad = gst_element_get_static_pad (stream->selector[0], "src");
1375   gst_pad_link (pad, stream->recv_rtp_sink);
1376   gst_object_unref (pad);
1377
1378   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1379   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1380   gst_pad_link (pad, selpad);
1381   gst_object_unref (pad);
1382   gst_object_unref (selpad);
1383
1384   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1385   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1386   gst_pad_link (pad, selpad);
1387   gst_object_unref (pad);
1388   gst_object_unref (selpad);
1389
1390   /* make selector for the RTCP receivers */
1391   stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
1392   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1393
1394   pad = gst_element_get_static_pad (stream->selector[1], "src");
1395   gst_pad_link (pad, stream->recv_rtcp_sink);
1396   gst_object_unref (pad);
1397
1398   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1399   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1400   gst_pad_link (pad, selpad);
1401   gst_object_unref (pad);
1402   gst_object_unref (selpad);
1403
1404   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1405   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1406   gst_pad_link (pad, selpad);
1407   gst_object_unref (pad);
1408   gst_object_unref (selpad);
1409
1410   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1411    * values */
1412   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1413   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1414   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1415   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1416
1417   /* be notified of caps changes */
1418   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1419       (GCallback) caps_notify, stream);
1420
1421   stream->prepared = TRUE;
1422
1423   return TRUE;
1424
1425   /* ERRORS */
1426 link_failed:
1427   {
1428     GST_WARNING ("failed to link stream %d", idx);
1429     return FALSE;
1430   }
1431 }
1432
1433 static void
1434 unlock_streams (GstRTSPMedia * media)
1435 {
1436   guint i, n_streams;
1437
1438   /* unlock the udp src elements */
1439   n_streams = gst_rtsp_media_n_streams (media);
1440   for (i = 0; i < n_streams; i++) {
1441     GstRTSPMediaStream *stream;
1442
1443     stream = gst_rtsp_media_get_stream (media, i);
1444
1445     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1446     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1447   }
1448 }
1449
1450 static void
1451 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1452 {
1453   g_mutex_lock (media->lock);
1454   /* never overwrite the error status */
1455   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1456     media->status = status;
1457   GST_DEBUG ("setting new status to %d", status);
1458   g_cond_broadcast (media->cond);
1459   g_mutex_unlock (media->lock);
1460 }
1461
1462 static GstRTSPMediaStatus
1463 gst_rtsp_media_get_status (GstRTSPMedia * media)
1464 {
1465   GstRTSPMediaStatus result;
1466   GTimeVal timeout;
1467
1468   g_mutex_lock (media->lock);
1469   g_get_current_time (&timeout);
1470   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1471   /* while we are preparing, wait */
1472   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1473     GST_DEBUG ("waiting for status change");
1474     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1475       GST_DEBUG ("timeout, assuming error status");
1476       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1477     }
1478   }
1479   /* could be success or error */
1480   result = media->status;
1481   GST_DEBUG ("got status %d", result);
1482   g_mutex_unlock (media->lock);
1483
1484   return result;
1485 }
1486
1487 static gboolean
1488 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1489 {
1490   GstMessageType type;
1491
1492   type = GST_MESSAGE_TYPE (message);
1493
1494   switch (type) {
1495     case GST_MESSAGE_STATE_CHANGED:
1496       break;
1497     case GST_MESSAGE_BUFFERING:
1498     {
1499       gint percent;
1500
1501       gst_message_parse_buffering (message, &percent);
1502
1503       /* no state management needed for live pipelines */
1504       if (media->is_live)
1505         break;
1506
1507       if (percent == 100) {
1508         /* a 100% message means buffering is done */
1509         media->buffering = FALSE;
1510         /* if the desired state is playing, go back */
1511         if (media->target_state == GST_STATE_PLAYING) {
1512           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1513           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1514         } else {
1515           GST_INFO ("Buffering done");
1516         }
1517       } else {
1518         /* buffering busy */
1519         if (media->buffering == FALSE) {
1520           if (media->target_state == GST_STATE_PLAYING) {
1521             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1522             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1523             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1524           } else {
1525             GST_INFO ("Buffering ...");
1526           }
1527         }
1528         media->buffering = TRUE;
1529       }
1530       break;
1531     }
1532     case GST_MESSAGE_LATENCY:
1533     {
1534       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1535       break;
1536     }
1537     case GST_MESSAGE_ERROR:
1538     {
1539       GError *gerror;
1540       gchar *debug;
1541
1542       gst_message_parse_error (message, &gerror, &debug);
1543       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1544       g_error_free (gerror);
1545       g_free (debug);
1546
1547       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1548       break;
1549     }
1550     case GST_MESSAGE_WARNING:
1551     {
1552       GError *gerror;
1553       gchar *debug;
1554
1555       gst_message_parse_warning (message, &gerror, &debug);
1556       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1557       g_error_free (gerror);
1558       g_free (debug);
1559       break;
1560     }
1561     case GST_MESSAGE_ELEMENT:
1562       break;
1563     case GST_MESSAGE_STREAM_STATUS:
1564       break;
1565     case GST_MESSAGE_ASYNC_DONE:
1566       if (!media->adding) {
1567         /* when we are dynamically adding pads, the addition of the udpsrc will
1568          * temporarily produce ASYNC_DONE messages. We have to ignore them and
1569          * wait for the final ASYNC_DONE after everything prerolled */
1570         GST_INFO ("%p: got ASYNC_DONE", media);
1571         collect_media_stats (media);
1572
1573         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1574       } else {
1575         GST_INFO ("%p: ignoring ASYNC_DONE", media);
1576       }
1577       break;
1578     case GST_MESSAGE_EOS:
1579       GST_INFO ("%p: got EOS", media);
1580       if (media->eos_pending) {
1581         GST_DEBUG ("shutting down after EOS");
1582         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1583         media->eos_pending = FALSE;
1584         g_object_unref (media);
1585       }
1586       break;
1587     default:
1588       GST_INFO ("%p: got message type %s", media,
1589           gst_message_type_get_name (type));
1590       break;
1591   }
1592   return TRUE;
1593 }
1594
1595 static gboolean
1596 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1597 {
1598   GstRTSPMediaClass *klass;
1599   gboolean ret;
1600
1601   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1602
1603   if (klass->handle_message)
1604     ret = klass->handle_message (media, message);
1605   else
1606     ret = FALSE;
1607
1608   return ret;
1609 }
1610
1611 /* called from streaming threads */
1612 static void
1613 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1614 {
1615   GstRTSPMediaStream *stream;
1616   gchar *name;
1617   gint i;
1618
1619   i = media->streams->len + 1;
1620
1621   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1622
1623   stream = g_new0 (GstRTSPMediaStream, 1);
1624   stream->payloader = element;
1625
1626   name = g_strdup_printf ("dynpay%d", i);
1627
1628   media->adding = TRUE;
1629
1630   /* ghost the pad of the payloader to the element */
1631   stream->srcpad = gst_ghost_pad_new (name, pad);
1632   gst_pad_set_active (stream->srcpad, TRUE);
1633   gst_element_add_pad (media->element, stream->srcpad);
1634   g_free (name);
1635
1636   /* add stream now */
1637   g_array_append_val (media->streams, stream);
1638
1639   setup_stream (stream, i, media);
1640
1641   for (i = 0; i < 2; i++) {
1642     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1643     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1644     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1645     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1646     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1647   }
1648   media->adding = FALSE;
1649 }
1650
1651 static void
1652 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1653 {
1654   GST_INFO ("no more pads");
1655   if (media->fakesink) {
1656     gst_object_ref (media->fakesink);
1657     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1658     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1659     gst_object_unref (media->fakesink);
1660     media->fakesink = NULL;
1661     GST_INFO ("removed fakesink");
1662   }
1663 }
1664
1665 /**
1666  * gst_rtsp_media_prepare:
1667  * @media: a #GstRTSPMedia
1668  *
1669  * Prepare @media for streaming. This function will create the pipeline and
1670  * other objects to manage the streaming.
1671  *
1672  * It will preroll the pipeline and collect vital information about the streams
1673  * such as the duration.
1674  *
1675  * Returns: %TRUE on success.
1676  */
1677 gboolean
1678 gst_rtsp_media_prepare (GstRTSPMedia * media)
1679 {
1680   GstStateChangeReturn ret;
1681   GstRTSPMediaStatus status;
1682   guint i, n_streams;
1683   GstRTSPMediaClass *klass;
1684   GstBus *bus;
1685   GList *walk;
1686
1687   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1688     goto was_prepared;
1689
1690   if (!media->reusable && media->reused)
1691     goto is_reused;
1692
1693   GST_INFO ("preparing media %p", media);
1694
1695   /* reset some variables */
1696   media->is_live = FALSE;
1697   media->buffering = FALSE;
1698   /* we're preparing now */
1699   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1700
1701   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1702
1703   /* add the pipeline bus to our custom mainloop */
1704   media->source = gst_bus_create_watch (bus);
1705   gst_object_unref (bus);
1706
1707   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1708
1709   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1710   media->id = g_source_attach (media->source, klass->context);
1711
1712   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1713
1714   /* add stuff to the bin */
1715   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1716
1717   /* link streams we already have, other streams might appear when we have
1718    * dynamic elements */
1719   n_streams = gst_rtsp_media_n_streams (media);
1720   for (i = 0; i < n_streams; i++) {
1721     GstRTSPMediaStream *stream;
1722
1723     stream = gst_rtsp_media_get_stream (media, i);
1724
1725     setup_stream (stream, i, media);
1726   }
1727
1728   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1729     GstElement *elem = walk->data;
1730
1731     GST_INFO ("adding callbacks for dynamic element %p", elem);
1732
1733     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1734     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1735
1736     /* we add a fakesink here in order to make the state change async. We remove
1737      * the fakesink again in the no-more-pads callback. */
1738     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1739     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1740   }
1741
1742   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1743   /* first go to PAUSED */
1744   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1745   media->target_state = GST_STATE_PAUSED;
1746
1747   switch (ret) {
1748     case GST_STATE_CHANGE_SUCCESS:
1749       GST_INFO ("SUCCESS state change for media %p", media);
1750       break;
1751     case GST_STATE_CHANGE_ASYNC:
1752       GST_INFO ("ASYNC state change for media %p", media);
1753       break;
1754     case GST_STATE_CHANGE_NO_PREROLL:
1755       /* we need to go to PLAYING */
1756       GST_INFO ("NO_PREROLL state change: live media %p", media);
1757       media->is_live = TRUE;
1758       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1759       if (ret == GST_STATE_CHANGE_FAILURE)
1760         goto state_failed;
1761       break;
1762     case GST_STATE_CHANGE_FAILURE:
1763       goto state_failed;
1764   }
1765
1766   /* now wait for all pads to be prerolled */
1767   status = gst_rtsp_media_get_status (media);
1768   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1769     goto state_failed;
1770
1771   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
1772
1773   GST_INFO ("object %p is prerolled", media);
1774
1775   return TRUE;
1776
1777   /* OK */
1778 was_prepared:
1779   {
1780     return TRUE;
1781   }
1782   /* ERRORS */
1783 is_reused:
1784   {
1785     GST_WARNING ("can not reuse media %p", media);
1786     return FALSE;
1787   }
1788 state_failed:
1789   {
1790     GST_WARNING ("failed to preroll pipeline");
1791     unlock_streams (media);
1792     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1793     gst_rtsp_media_unprepare (media);
1794     return FALSE;
1795   }
1796 }
1797
1798 /**
1799  * gst_rtsp_media_unprepare:
1800  * @media: a #GstRTSPMedia
1801  *
1802  * Unprepare @media. After this call, the media should be prepared again before
1803  * it can be used again. If the media is set to be non-reusable, a new instance
1804  * must be created.
1805  *
1806  * Returns: %TRUE on success.
1807  */
1808 gboolean
1809 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1810 {
1811   GstRTSPMediaClass *klass;
1812   gboolean success;
1813
1814   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1815     return TRUE;
1816
1817   GST_INFO ("unprepare media %p", media);
1818   media->target_state = GST_STATE_NULL;
1819
1820   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1821   if (klass->unprepare)
1822     success = klass->unprepare (media);
1823   else
1824     success = TRUE;
1825
1826   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1827   media->reused = TRUE;
1828
1829   /* when the media is not reusable, this will effectively unref the media and
1830    * recreate it */
1831   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1832
1833   return success;
1834 }
1835
1836 static gboolean
1837 default_unprepare (GstRTSPMedia * media)
1838 {
1839   if (media->eos_shutdown) {
1840     GST_DEBUG ("sending EOS for shutdown");
1841     /* ref so that we don't disappear */
1842     g_object_ref (media);
1843     media->eos_pending = TRUE;
1844     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1845     /* we need to go to playing again for the EOS to propagate, normally in this
1846      * state, nothing is receiving data from us anymore so this is ok. */
1847     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1848   } else {
1849     GST_DEBUG ("shutting down");
1850     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1851   }
1852   return TRUE;
1853 }
1854
1855 static void
1856 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1857     gchar * dest, gint min, gint max)
1858 {
1859   gboolean do_add = TRUE;
1860   RTSPDestination *ndest;
1861
1862   if (stream->filter_duplicates) {
1863     RTSPDestination fdest;
1864     GList *find;
1865
1866     fdest.dest = dest;
1867     fdest.min = min;
1868     fdest.max = max;
1869
1870     /* first see if we already added this destination */
1871     find =
1872         g_list_find_custom (stream->destinations, &fdest,
1873         (GCompareFunc) dest_compare);
1874     if (find) {
1875       ndest = (RTSPDestination *) find->data;
1876
1877       GST_INFO ("already streaming to %s:%d-%d with %d clients", dest, min, max,
1878           ndest->count);
1879       ndest->count++;
1880       do_add = FALSE;
1881     }
1882   }
1883
1884   if (do_add) {
1885     GST_INFO ("adding %s:%d-%d", dest, min, max);
1886     g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1887     g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1888
1889     if (stream->filter_duplicates) {
1890       ndest = create_destination (dest, min, max);
1891       stream->destinations = g_list_prepend (stream->destinations, ndest);
1892     }
1893   }
1894 }
1895
1896 static void
1897 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1898     gchar * dest, gint min, gint max)
1899 {
1900   gboolean do_remove = TRUE;
1901   RTSPDestination *ndest = NULL;
1902   GList *find = NULL;
1903
1904   if (stream->filter_duplicates) {
1905     RTSPDestination fdest;
1906
1907     fdest.dest = dest;
1908     fdest.min = min;
1909     fdest.max = max;
1910
1911     /* first see if we already added this destination */
1912     find =
1913         g_list_find_custom (stream->destinations, &fdest,
1914         (GCompareFunc) dest_compare);
1915     if (!find)
1916       return;
1917
1918     ndest = (RTSPDestination *) find->data;
1919     if (--ndest->count > 0) {
1920       do_remove = FALSE;
1921       GST_INFO ("still streaming to %s:%d-%d with %d clients", dest, min, max,
1922           ndest->count);
1923     }
1924   }
1925
1926   if (do_remove) {
1927     GST_INFO ("removing %s:%d-%d", dest, min, max);
1928     g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1929     g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1930
1931     if (stream->filter_duplicates) {
1932       stream->destinations = g_list_delete_link (stream->destinations, find);
1933       free_destination (ndest);
1934     }
1935   }
1936 }
1937
1938 /**
1939  * gst_rtsp_media_set_state:
1940  * @media: a #GstRTSPMedia
1941  * @state: the target state of the media
1942  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1943  *
1944  * Set the state of @media to @state and for the transports in @transports.
1945  *
1946  * Returns: %TRUE on success.
1947  */
1948 gboolean
1949 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1950     GArray * transports)
1951 {
1952   gint i;
1953   GstStateChangeReturn ret;
1954   gboolean add, remove, do_state;
1955   gint old_active;
1956
1957   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1958   g_return_val_if_fail (transports != NULL, FALSE);
1959
1960   /* NULL and READY are the same */
1961   if (state == GST_STATE_READY)
1962     state = GST_STATE_NULL;
1963
1964   add = remove = FALSE;
1965
1966   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1967       media);
1968
1969   switch (state) {
1970     case GST_STATE_NULL:
1971       /* unlock the streams so that they follow the state changes from now on */
1972       unlock_streams (media);
1973       /* fallthrough */
1974     case GST_STATE_PAUSED:
1975       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1976       if (media->target_state == GST_STATE_PLAYING)
1977         remove = TRUE;
1978       break;
1979     case GST_STATE_PLAYING:
1980       /* we're going to PLAYING, add */
1981       add = TRUE;
1982       break;
1983     default:
1984       break;
1985   }
1986   old_active = media->active;
1987
1988   for (i = 0; i < transports->len; i++) {
1989     GstRTSPMediaTrans *tr;
1990     GstRTSPMediaStream *stream;
1991     GstRTSPTransport *trans;
1992
1993     /* we need a non-NULL entry in the array */
1994     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1995     if (tr == NULL)
1996       continue;
1997
1998     /* we need a transport */
1999     if (!(trans = tr->transport))
2000       continue;
2001
2002     /* get the stream and add the destinations */
2003     stream = gst_rtsp_media_get_stream (media, tr->idx);
2004     switch (trans->lower_transport) {
2005       case GST_RTSP_LOWER_TRANS_UDP:
2006       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2007       {
2008         gchar *dest;
2009         gint min, max;
2010
2011         dest = trans->destination;
2012         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2013           min = trans->port.min;
2014           max = trans->port.max;
2015         } else {
2016           min = trans->client_port.min;
2017           max = trans->client_port.max;
2018         }
2019
2020         if (add && !tr->active) {
2021           add_udp_destination (media, stream, dest, min, max);
2022           stream->transports = g_list_prepend (stream->transports, tr);
2023           tr->active = TRUE;
2024           media->active++;
2025         } else if (remove && tr->active) {
2026           remove_udp_destination (media, stream, dest, min, max);
2027           stream->transports = g_list_remove (stream->transports, tr);
2028           tr->active = FALSE;
2029           media->active--;
2030         }
2031         break;
2032       }
2033       case GST_RTSP_LOWER_TRANS_TCP:
2034         if (add && !tr->active) {
2035           GST_INFO ("adding TCP %s", trans->destination);
2036           stream->transports = g_list_prepend (stream->transports, tr);
2037           tr->active = TRUE;
2038           media->active++;
2039         } else if (remove && tr->active) {
2040           GST_INFO ("removing TCP %s", trans->destination);
2041           stream->transports = g_list_remove (stream->transports, tr);
2042           tr->active = FALSE;
2043           media->active--;
2044         }
2045         break;
2046       default:
2047         GST_INFO ("Unknown transport %d", trans->lower_transport);
2048         break;
2049     }
2050   }
2051
2052   /* we just added the first media, do the playing state change */
2053   if (old_active == 0 && add)
2054     do_state = TRUE;
2055   /* if we have no more active media, do the downward state changes */
2056   else if (media->active == 0)
2057     do_state = TRUE;
2058   else
2059     do_state = FALSE;
2060
2061   GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
2062       media, do_state);
2063
2064   if (media->target_state != state) {
2065     if (do_state) {
2066       if (state == GST_STATE_NULL) {
2067         gst_rtsp_media_unprepare (media);
2068       } else {
2069         GST_INFO ("state %s media %p", gst_element_state_get_name (state),
2070             media);
2071         media->target_state = state;
2072         ret = gst_element_set_state (media->pipeline, state);
2073       }
2074     }
2075     g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
2076         NULL);
2077   }
2078
2079   /* remember where we are */
2080   if (state == GST_STATE_PAUSED || old_active != media->active)
2081     collect_media_stats (media);
2082
2083   return TRUE;
2084 }
2085
2086 /**
2087  * gst_rtsp_media_remove_elements:
2088  * @media: a #GstRTSPMedia
2089  *
2090  * Remove all elements and the pipeline controlled by @media.
2091  */
2092 void
2093 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
2094 {
2095   gint i, j;
2096
2097   unlock_streams (media);
2098
2099   for (i = 0; i < media->streams->len; i++) {
2100     GstRTSPMediaStream *stream;
2101
2102     GST_INFO ("Removing elements of stream %d from pipeline", i);
2103
2104     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
2105
2106     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
2107
2108     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
2109
2110     for (j = 0; j < 2; j++) {
2111       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
2112       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
2113       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
2114       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
2115       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
2116       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
2117
2118       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
2119       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
2120       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
2121       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
2122       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
2123       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
2124     }
2125     if (stream->caps)
2126       gst_caps_unref (stream->caps);
2127     stream->caps = NULL;
2128     gst_rtsp_media_stream_free (stream);
2129   }
2130   g_array_remove_range (media->streams, 0, media->streams->len);
2131
2132   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
2133   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
2134
2135   gst_object_unref (media->pipeline);
2136   media->pipeline = NULL;
2137 }