a2fe5d3f5dba0a8ff6980bc97c59b5561a46bca2
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
24
25 #include "rtsp-media.h"
26
27 #define DEFAULT_SHARED         FALSE
28 #define DEFAULT_REUSABLE       FALSE
29
30 /* define to dump received RTCP packets */
31 #undef DUMP_STATS
32
33 enum
34 {
35   PROP_0,
36   PROP_SHARED,
37   PROP_REUSABLE,
38   PROP_LAST
39 };
40
41 enum
42 {
43   SIGNAL_UNPREPARED,
44   SIGNAL_LAST
45 };
46
47 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
48 #define GST_CAT_DEFAULT rtsp_media_debug
49
50 static GQuark ssrc_stream_map_key;
51
52 static void gst_rtsp_media_get_property (GObject * object, guint propid,
53     GValue * value, GParamSpec * pspec);
54 static void gst_rtsp_media_set_property (GObject * object, guint propid,
55     const GValue * value, GParamSpec * pspec);
56 static void gst_rtsp_media_finalize (GObject * obj);
57
58 static gpointer do_loop (GstRTSPMediaClass * klass);
59 static gboolean default_handle_message (GstRTSPMedia * media,
60     GstMessage * message);
61 static gboolean default_unprepare (GstRTSPMedia * media);
62 static void unlock_streams (GstRTSPMedia * media);
63
64 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
65
66 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
67
68 static void
69 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
70 {
71   GObjectClass *gobject_class;
72   GError *error = NULL;
73
74   gobject_class = G_OBJECT_CLASS (klass);
75
76   gobject_class->get_property = gst_rtsp_media_get_property;
77   gobject_class->set_property = gst_rtsp_media_set_property;
78   gobject_class->finalize = gst_rtsp_media_finalize;
79
80   g_object_class_install_property (gobject_class, PROP_SHARED,
81       g_param_spec_boolean ("shared", "Shared",
82           "If this media pipeline can be shared", DEFAULT_SHARED,
83           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
84
85   g_object_class_install_property (gobject_class, PROP_REUSABLE,
86       g_param_spec_boolean ("reusable", "Reusable",
87           "If this media pipeline can be reused after an unprepare",
88           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
89
90   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
91       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
92       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
93       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
94
95   klass->context = g_main_context_new ();
96   klass->loop = g_main_loop_new (klass->context, TRUE);
97
98   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
99   if (error != NULL) {
100     g_critical ("could not start bus thread: %s", error->message);
101   }
102   klass->handle_message = default_handle_message;
103   klass->unprepare = default_unprepare;
104
105   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
106 }
107
108 static void
109 gst_rtsp_media_init (GstRTSPMedia * media)
110 {
111   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
112   media->lock = g_mutex_new ();
113   media->cond = g_cond_new ();
114 }
115
116 static void
117 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
118 {
119   if (stream->session)
120     g_object_unref (stream->session);
121
122   if (stream->caps)
123     gst_caps_unref (stream->caps);
124
125   if (stream->send_rtp_sink)
126     gst_object_unref (stream->send_rtp_sink);
127   if (stream->send_rtp_src)
128     gst_object_unref (stream->send_rtp_src);
129   if (stream->send_rtcp_src)
130     gst_object_unref (stream->send_rtcp_src);
131   if (stream->recv_rtcp_sink)
132     gst_object_unref (stream->recv_rtcp_sink);
133   if (stream->recv_rtp_sink)
134     gst_object_unref (stream->recv_rtp_sink);
135
136   g_list_free (stream->transports);
137
138   g_free (stream);
139 }
140
141 static void
142 gst_rtsp_media_finalize (GObject * obj)
143 {
144   GstRTSPMedia *media;
145   guint i;
146
147   media = GST_RTSP_MEDIA (obj);
148
149   GST_INFO ("finalize media %p", media);
150
151   if (media->pipeline) {
152     unlock_streams (media);
153     gst_element_set_state (media->pipeline, GST_STATE_NULL);
154     gst_object_unref (media->pipeline);
155   }
156
157   for (i = 0; i < media->streams->len; i++) {
158     GstRTSPMediaStream *stream;
159
160     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
161
162     gst_rtsp_media_stream_free (stream);
163   }
164   g_array_free (media->streams, TRUE);
165
166   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
167   g_list_free (media->dynamic);
168
169   if (media->source) {
170     g_source_destroy (media->source);
171     g_source_unref (media->source);
172   }
173   g_mutex_free (media->lock);
174   g_cond_free (media->cond);
175
176   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
177 }
178
179 static void
180 gst_rtsp_media_get_property (GObject * object, guint propid,
181     GValue * value, GParamSpec * pspec)
182 {
183   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
184
185   switch (propid) {
186     case PROP_SHARED:
187       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
188       break;
189     case PROP_REUSABLE:
190       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
191       break;
192     default:
193       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
194   }
195 }
196
197 static void
198 gst_rtsp_media_set_property (GObject * object, guint propid,
199     const GValue * value, GParamSpec * pspec)
200 {
201   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
202
203   switch (propid) {
204     case PROP_SHARED:
205       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
206       break;
207     case PROP_REUSABLE:
208       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
209       break;
210     default:
211       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
212   }
213 }
214
215 static gpointer
216 do_loop (GstRTSPMediaClass * klass)
217 {
218   GST_INFO ("enter mainloop");
219   g_main_loop_run (klass->loop);
220   GST_INFO ("exit mainloop");
221
222   return NULL;
223 }
224
225 static void
226 collect_media_stats (GstRTSPMedia * media)
227 {
228   GstFormat format;
229   gint64 position, duration;
230
231   media->range.unit = GST_RTSP_RANGE_NPT;
232
233   if (media->is_live) {
234     media->range.min.type = GST_RTSP_TIME_NOW;
235     media->range.min.seconds = -1;
236     media->range.max.type = GST_RTSP_TIME_END;
237     media->range.max.seconds = -1;
238   } else {
239     /* get the position */
240     format = GST_FORMAT_TIME;
241     if (!gst_element_query_position (media->pipeline, &format, &position)) {
242       GST_INFO ("position query failed");
243       position = 0;
244     }
245
246     /* get the duration */
247     format = GST_FORMAT_TIME;
248     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
249       GST_INFO ("duration query failed");
250       duration = -1;
251     }
252
253     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
254         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
255
256     if (position == -1) {
257       media->range.min.type = GST_RTSP_TIME_NOW;
258       media->range.min.seconds = -1;
259     } else {
260       media->range.min.type = GST_RTSP_TIME_SECONDS;
261       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
262     }
263     if (duration == -1) {
264       media->range.max.type = GST_RTSP_TIME_END;
265       media->range.max.seconds = -1;
266     } else {
267       media->range.max.type = GST_RTSP_TIME_SECONDS;
268       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
269     }
270   }
271 }
272
273 /**
274  * gst_rtsp_media_new:
275  *
276  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
277  * element to produde RTP data for one or more related (audio/video/..) 
278  * streams.
279  *
280  * Returns: a new #GstRTSPMedia object.
281  */
282 GstRTSPMedia *
283 gst_rtsp_media_new (void)
284 {
285   GstRTSPMedia *result;
286
287   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
288
289   return result;
290 }
291
292 /**
293  * gst_rtsp_media_set_shared:
294  * @media: a #GstRTSPMedia
295  * @shared: the new value
296  *
297  * Set or unset if the pipeline for @media can be shared will multiple clients.
298  * When @shared is %TRUE, client requests for this media will share the media
299  * pipeline.
300  */
301 void
302 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
303 {
304   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
305
306   media->shared = shared;
307 }
308
309 /**
310  * gst_rtsp_media_is_shared:
311  * @media: a #GstRTSPMedia
312  *
313  * Check if the pipeline for @media can be shared between multiple clients.
314  *
315  * Returns: %TRUE if the media can be shared between clients.
316  */
317 gboolean
318 gst_rtsp_media_is_shared (GstRTSPMedia * media)
319 {
320   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
321
322   return media->shared;
323 }
324
325 /**
326  * gst_rtsp_media_set_reusable:
327  * @media: a #GstRTSPMedia
328  * @reusable: the new value
329  *
330  * Set or unset if the pipeline for @media can be reused after the pipeline has
331  * been unprepared.
332  */
333 void
334 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
335 {
336   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
337
338   media->reusable = reusable;
339 }
340
341 /**
342  * gst_rtsp_media_is_reusable:
343  * @media: a #GstRTSPMedia
344  *
345  * Check if the pipeline for @media can be reused after an unprepare.
346  *
347  * Returns: %TRUE if the media can be reused
348  */
349 gboolean
350 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
351 {
352   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
353
354   return media->reusable;
355 }
356
357 /**
358  * gst_rtsp_media_n_streams:
359  * @media: a #GstRTSPMedia
360  *
361  * Get the number of streams in this media.
362  *
363  * Returns: The number of streams.
364  */
365 guint
366 gst_rtsp_media_n_streams (GstRTSPMedia * media)
367 {
368   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
369
370   return media->streams->len;
371 }
372
373 /**
374  * gst_rtsp_media_get_stream:
375  * @media: a #GstRTSPMedia
376  * @idx: the stream index
377  *
378  * Retrieve the stream with index @idx from @media.
379  *
380  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
381  * that index did not exist.
382  */
383 GstRTSPMediaStream *
384 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
385 {
386   GstRTSPMediaStream *res;
387
388   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
389
390   if (idx < media->streams->len)
391     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
392   else
393     res = NULL;
394
395   return res;
396 }
397
398 /**
399  * gst_rtsp_media_seek:
400  * @stream: a #GstRTSPMediaStream
401  * @range: a #GstRTSPTimeRange
402  *
403  * Seek the pipeline to @range.
404  *
405  * Returns: %TRUE on success.
406  */
407 gboolean
408 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
409 {
410   GstSeekFlags flags;
411   gboolean res;
412   gint64 start, stop;
413   GstSeekType start_type, stop_type;
414
415   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
416   g_return_val_if_fail (range != NULL, FALSE);
417
418   if (range->unit != GST_RTSP_RANGE_NPT)
419     goto not_supported;
420
421   /* depends on the current playing state of the pipeline. We might need to
422    * queue this until we get EOS. */
423   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
424
425   start_type = stop_type = GST_SEEK_TYPE_NONE;
426
427   switch (range->min.type) {
428     case GST_RTSP_TIME_NOW:
429       start = -1;
430       break;
431     case GST_RTSP_TIME_SECONDS:
432       /* only seek when something changed */
433       if (media->range.min.seconds == range->min.seconds) {
434         start = -1;
435       } else {
436         start = range->min.seconds * GST_SECOND;
437         start_type = GST_SEEK_TYPE_SET;
438       }
439       break;
440     case GST_RTSP_TIME_END:
441     default:
442       goto weird_type;
443   }
444   switch (range->max.type) {
445     case GST_RTSP_TIME_SECONDS:
446       /* only seek when something changed */
447       if (media->range.max.seconds == range->max.seconds) {
448         stop = -1;
449       } else {
450         stop = range->max.seconds * GST_SECOND;
451         stop_type = GST_SEEK_TYPE_SET;
452       }
453       break;
454     case GST_RTSP_TIME_END:
455       stop = -1;
456       stop_type = GST_SEEK_TYPE_SET;
457       break;
458     case GST_RTSP_TIME_NOW:
459     default:
460       goto weird_type;
461   }
462
463   if (start != -1 || stop != -1) {
464     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
465         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
466
467     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
468         flags, start_type, start, stop_type, stop);
469
470     /* and block for the seek to complete */
471     GST_INFO ("done seeking %d", res);
472     gst_element_get_state (media->pipeline, NULL, NULL, -1);
473     GST_INFO ("prerolled again");
474   } else {
475     GST_INFO ("no seek needed");
476     res = TRUE;
477   }
478
479   return res;
480
481   /* ERRORS */
482 not_supported:
483   {
484     GST_WARNING ("seek unit %d not supported", range->unit);
485     return FALSE;
486   }
487 weird_type:
488   {
489     GST_WARNING ("weird range type %d not supported", range->min.type);
490     return FALSE;
491   }
492 }
493
494 /**
495  * gst_rtsp_media_stream_rtp:
496  * @stream: a #GstRTSPMediaStream
497  * @buffer: a #GstBuffer
498  *
499  * Handle an RTP buffer for the stream. This method is usually called when a
500  * message has been received from a client using the TCP transport.
501  *
502  * This function takes ownership of @buffer.
503  *
504  * Returns: a GstFlowReturn.
505  */
506 GstFlowReturn
507 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
508 {
509   GstFlowReturn ret;
510
511   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
512
513   return ret;
514 }
515
516 /**
517  * gst_rtsp_media_stream_rtcp:
518  * @stream: a #GstRTSPMediaStream
519  * @buffer: a #GstBuffer
520  *
521  * Handle an RTCP buffer for the stream. This method is usually called when a
522  * message has been received from a client using the TCP transport.
523  *
524  * This function takes ownership of @buffer.
525  *
526  * Returns: a GstFlowReturn.
527  */
528 GstFlowReturn
529 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
530 {
531   GstFlowReturn ret;
532
533   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
534
535   return ret;
536 }
537
538 /* Allocate the udp ports and sockets */
539 static gboolean
540 alloc_udp_ports (GstRTSPMediaStream * stream)
541 {
542   GstStateChangeReturn ret;
543   GstElement *udpsrc0, *udpsrc1;
544   GstElement *udpsink0, *udpsink1;
545   gint tmp_rtp, tmp_rtcp;
546   guint count;
547   gint rtpport, rtcpport, sockfd;
548
549   udpsrc0 = NULL;
550   udpsrc1 = NULL;
551   udpsink0 = NULL;
552   udpsink1 = NULL;
553   count = 0;
554
555   /* Start with random port */
556   tmp_rtp = 0;
557
558   /* try to allocate 2 UDP ports, the RTP port should be an even
559    * number and the RTCP port should be the next (uneven) port */
560 again:
561   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
562   if (udpsrc0 == NULL)
563     goto no_udp_protocol;
564   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
565
566   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
567   if (ret == GST_STATE_CHANGE_FAILURE) {
568     if (tmp_rtp != 0) {
569       tmp_rtp += 2;
570       if (++count > 20)
571         goto no_ports;
572
573       gst_element_set_state (udpsrc0, GST_STATE_NULL);
574       gst_object_unref (udpsrc0);
575
576       goto again;
577     }
578     goto no_udp_protocol;
579   }
580
581   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
582
583   /* check if port is even */
584   if ((tmp_rtp & 1) != 0) {
585     /* port not even, close and allocate another */
586     if (++count > 20)
587       goto no_ports;
588
589     gst_element_set_state (udpsrc0, GST_STATE_NULL);
590     gst_object_unref (udpsrc0);
591
592     tmp_rtp++;
593     goto again;
594   }
595
596   /* allocate port+1 for RTCP now */
597   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
598   if (udpsrc1 == NULL)
599     goto no_udp_rtcp_protocol;
600
601   /* set port */
602   tmp_rtcp = tmp_rtp + 1;
603   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
604
605   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
606   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
607   if (ret == GST_STATE_CHANGE_FAILURE) {
608
609     if (++count > 20)
610       goto no_ports;
611
612     gst_element_set_state (udpsrc0, GST_STATE_NULL);
613     gst_object_unref (udpsrc0);
614
615     gst_element_set_state (udpsrc1, GST_STATE_NULL);
616     gst_object_unref (udpsrc1);
617
618     tmp_rtp += 2;
619     goto again;
620   }
621
622   /* all fine, do port check */
623   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
624   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
625
626   /* this should not happen... */
627   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
628     goto port_error;
629
630   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
631   if (!udpsink0)
632     goto no_udp_protocol;
633
634   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
635   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
636   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
637
638   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
639   if (!udpsink1)
640     goto no_udp_protocol;
641
642   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
643   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
644   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
645   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
646   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
647
648   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
649   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
650   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
651   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
652
653   /* we keep these elements, we configure all in configure_transport when the
654    * server told us to really use the UDP ports. */
655   stream->udpsrc[0] = udpsrc0;
656   stream->udpsrc[1] = udpsrc1;
657   stream->udpsink[0] = udpsink0;
658   stream->udpsink[1] = udpsink1;
659   stream->server_port.min = rtpport;
660   stream->server_port.max = rtcpport;
661
662   return TRUE;
663
664   /* ERRORS */
665 no_udp_protocol:
666   {
667     goto cleanup;
668   }
669 no_ports:
670   {
671     goto cleanup;
672   }
673 no_udp_rtcp_protocol:
674   {
675     goto cleanup;
676   }
677 port_error:
678   {
679     goto cleanup;
680   }
681 cleanup:
682   {
683     if (udpsrc0) {
684       gst_element_set_state (udpsrc0, GST_STATE_NULL);
685       gst_object_unref (udpsrc0);
686     }
687     if (udpsrc1) {
688       gst_element_set_state (udpsrc1, GST_STATE_NULL);
689       gst_object_unref (udpsrc1);
690     }
691     if (udpsink0) {
692       gst_element_set_state (udpsink0, GST_STATE_NULL);
693       gst_object_unref (udpsink0);
694     }
695     if (udpsink1) {
696       gst_element_set_state (udpsink1, GST_STATE_NULL);
697       gst_object_unref (udpsink1);
698     }
699     return FALSE;
700   }
701 }
702
703 static void
704 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
705 {
706   gchar *capsstr;
707   GstCaps *newcaps, *oldcaps;
708
709   if ((newcaps = GST_PAD_CAPS (pad)))
710     gst_caps_ref (newcaps);
711
712   oldcaps = stream->caps;
713   stream->caps = newcaps;
714
715   if (oldcaps)
716     gst_caps_unref (oldcaps);
717
718   capsstr = gst_caps_to_string (newcaps);
719   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
720   g_free (capsstr);
721 }
722
723 static void
724 dump_structure (const GstStructure * s)
725 {
726   gchar *sstr;
727
728   sstr = gst_structure_to_string (s);
729   GST_INFO ("structure: %s", sstr);
730   g_free (sstr);
731 }
732
733 static GstRTSPMediaTrans *
734 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
735 {
736   GList *walk;
737   GstRTSPMediaTrans *result = NULL;
738   const gchar *tmp;
739   gchar *dest;
740   guint port;
741
742   if (rtcp_from == NULL)
743     return NULL;
744
745   tmp = g_strrstr (rtcp_from, ":");
746   if (tmp == NULL)
747     return NULL;
748
749   port = atoi (tmp + 1);
750   dest = g_strndup (rtcp_from, tmp - rtcp_from);
751
752   GST_INFO ("finding %s:%d", dest, port);
753
754   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
755     GstRTSPMediaTrans *trans = walk->data;
756     gint min, max;
757
758     min = trans->transport->client_port.min;
759     max = trans->transport->client_port.max;
760
761     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
762             || max == port)) {
763       result = trans;
764       break;
765     }
766   }
767   g_free (dest);
768
769   return result;
770 }
771
772 static void
773 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
774 {
775   GstStructure *stats;
776   GstRTSPMediaTrans *trans;
777
778   GST_INFO ("%p: new source %p", stream, source);
779
780   /* see if we have a stream to match with the origin of the RTCP packet */
781   trans = g_object_get_qdata (source, ssrc_stream_map_key);
782   if (trans == NULL) {
783     g_object_get (source, "stats", &stats, NULL);
784     if (stats) {
785       const gchar *rtcp_from;
786
787       dump_structure (stats);
788
789       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
790       if ((trans = find_transport (stream, rtcp_from))) {
791         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
792             source);
793
794         /* keep ref to the source */
795         trans->rtpsource = source;
796
797         g_object_set_qdata (source, ssrc_stream_map_key, trans);
798       }
799       gst_structure_free (stats);
800     }
801   } else {
802     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
803   }
804 }
805
806 static void
807 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
808 {
809   GST_INFO ("%p: new SDES %p", stream, source);
810 }
811
812 static void
813 on_ssrc_active (GObject * session, GObject * source,
814     GstRTSPMediaStream * stream)
815 {
816   GstRTSPMediaTrans *trans;
817
818   trans = g_object_get_qdata (source, ssrc_stream_map_key);
819
820   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
821
822   if (trans && trans->keep_alive)
823     trans->keep_alive (trans->ka_user_data);
824
825 #ifdef DUMP_STATS
826   {
827     GstStructure *stats;
828     g_object_get (source, "stats", &stats, NULL);
829     if (stats) {
830       dump_structure (stats);
831       gst_structure_free (stats);
832     }
833   }
834 #endif
835 }
836
837 static void
838 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
839 {
840   GST_INFO ("%p: source %p bye", stream, source);
841 }
842
843 static void
844 on_bye_timeout (GObject * session, GObject * source,
845     GstRTSPMediaStream * stream)
846 {
847   GstRTSPMediaTrans *trans;
848
849   GST_INFO ("%p: source %p bye timeout", stream, source);
850
851   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
852     trans->rtpsource = NULL;
853     trans->timeout = TRUE;
854   }
855 }
856
857 static void
858 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
859 {
860   GstRTSPMediaTrans *trans;
861
862   GST_INFO ("%p: source %p timeout", stream, source);
863
864   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
865     trans->rtpsource = NULL;
866     trans->timeout = TRUE;
867   }
868 }
869
870 static GstFlowReturn
871 handle_new_buffer (GstAppSink * sink, gpointer user_data)
872 {
873   GList *walk;
874   GstBuffer *buffer;
875   GstRTSPMediaStream *stream;
876
877   buffer = gst_app_sink_pull_buffer (sink);
878   if (!buffer)
879     return GST_FLOW_OK;
880
881   stream = (GstRTSPMediaStream *) user_data;
882
883   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
884     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
885
886     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
887       if (tr->send_rtp)
888         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
889     } else {
890       if (tr->send_rtcp)
891         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
892     }
893   }
894   gst_buffer_unref (buffer);
895
896   return GST_FLOW_OK;
897 }
898
899 static GstAppSinkCallbacks sink_cb = {
900   NULL,                         /* not interested in EOS */
901   NULL,                         /* not interested in preroll buffers */
902   handle_new_buffer
903 };
904
905 /* prepare the pipeline objects to handle @stream in @media */
906 static gboolean
907 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
908 {
909   gchar *name;
910   GstPad *pad, *teepad, *selpad;
911   GstPadLinkReturn ret;
912   gint i;
913
914   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
915    * for sending RTP/RTCP. The sender and receiver ports are shared between the
916    * elements */
917   if (!alloc_udp_ports (stream))
918     return FALSE;
919
920   /* add the ports to the pipeline */
921   for (i = 0; i < 2; i++) {
922     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
923     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
924   }
925
926   /* create elements for the TCP transfer */
927   for (i = 0; i < 2; i++) {
928     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
929     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
930     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
931     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
932     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
933     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
934     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
935     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
936         &sink_cb, stream, NULL);
937   }
938
939   /* hook up the stream to the RTP session elements. */
940   name = g_strdup_printf ("send_rtp_sink_%d", idx);
941   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
942   g_free (name);
943   name = g_strdup_printf ("send_rtp_src_%d", idx);
944   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
945   g_free (name);
946   name = g_strdup_printf ("send_rtcp_src_%d", idx);
947   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
948   g_free (name);
949   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
950   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
951   g_free (name);
952   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
953   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
954   g_free (name);
955
956   /* get the session */
957   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
958       &stream->session);
959
960   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
961       stream);
962   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
963       stream);
964   g_signal_connect (stream->session, "on-ssrc-active",
965       (GCallback) on_ssrc_active, stream);
966   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
967       stream);
968   g_signal_connect (stream->session, "on-bye-timeout",
969       (GCallback) on_bye_timeout, stream);
970   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
971       stream);
972
973   /* link the RTP pad to the session manager */
974   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
975   if (ret != GST_PAD_LINK_OK)
976     goto link_failed;
977
978   /* make tee for RTP and link to stream */
979   stream->tee[0] = gst_element_factory_make ("tee", NULL);
980   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
981
982   pad = gst_element_get_static_pad (stream->tee[0], "sink");
983   gst_pad_link (stream->send_rtp_src, pad);
984   gst_object_unref (pad);
985
986   /* link RTP sink, we're pretty sure this will work. */
987   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
988   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
989   gst_pad_link (teepad, pad);
990   gst_object_unref (pad);
991   gst_object_unref (teepad);
992
993   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
994   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
995   gst_pad_link (teepad, pad);
996   gst_object_unref (pad);
997   gst_object_unref (teepad);
998
999   /* make tee for RTCP */
1000   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1001   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1002
1003   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1004   gst_pad_link (stream->send_rtcp_src, pad);
1005   gst_object_unref (pad);
1006
1007   /* link RTCP elements */
1008   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1009   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1010   gst_pad_link (teepad, pad);
1011   gst_object_unref (pad);
1012   gst_object_unref (teepad);
1013
1014   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1015   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1016   gst_pad_link (teepad, pad);
1017   gst_object_unref (pad);
1018   gst_object_unref (teepad);
1019
1020   /* make selector for the RTP receivers */
1021   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1022   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1023   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1024
1025   pad = gst_element_get_static_pad (stream->selector[0], "src");
1026   gst_pad_link (pad, stream->recv_rtp_sink);
1027   gst_object_unref (pad);
1028
1029   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1030   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1031   gst_pad_link (pad, selpad);
1032   gst_object_unref (pad);
1033   gst_object_unref (selpad);
1034
1035   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1036   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1037   gst_pad_link (pad, selpad);
1038   gst_object_unref (pad);
1039   gst_object_unref (selpad);
1040
1041   /* make selector for the RTCP receivers */
1042   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1043   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1044   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1045
1046   pad = gst_element_get_static_pad (stream->selector[1], "src");
1047   gst_pad_link (pad, stream->recv_rtcp_sink);
1048   gst_object_unref (pad);
1049
1050   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1051   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1052   gst_pad_link (pad, selpad);
1053   gst_object_unref (pad);
1054   gst_object_unref (selpad);
1055
1056   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1057   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1058   gst_pad_link (pad, selpad);
1059   gst_object_unref (pad);
1060   gst_object_unref (selpad);
1061
1062   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1063    * values */
1064   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1065   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1066   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1067   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1068
1069   /* be notified of caps changes */
1070   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1071       (GCallback) caps_notify, stream);
1072
1073   stream->prepared = TRUE;
1074
1075   return TRUE;
1076
1077   /* ERRORS */
1078 link_failed:
1079   {
1080     GST_WARNING ("failed to link stream %d", idx);
1081     return FALSE;
1082   }
1083 }
1084
1085 static void
1086 unlock_streams (GstRTSPMedia * media)
1087 {
1088   guint i, n_streams;
1089
1090   /* unlock the udp src elements */
1091   n_streams = gst_rtsp_media_n_streams (media);
1092   for (i = 0; i < n_streams; i++) {
1093     GstRTSPMediaStream *stream;
1094
1095     stream = gst_rtsp_media_get_stream (media, i);
1096
1097     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1098     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1099   }
1100 }
1101
1102 static void
1103 gst_rtsp_media_set_status (GstRTSPMedia *media, GstRTSPMediaStatus status)
1104 {
1105   g_mutex_lock (media->lock);
1106   /* never overwrite the error status */
1107   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1108     media->status = status;
1109   GST_DEBUG ("setting new status to %d", status);
1110   g_cond_broadcast (media->cond);
1111   g_mutex_unlock (media->lock);
1112 }
1113
1114 static GstRTSPMediaStatus
1115 gst_rtsp_media_get_status (GstRTSPMedia *media)
1116 {
1117   GstRTSPMediaStatus result;
1118
1119   g_mutex_lock (media->lock);
1120   /* while we are preparing, wait */
1121   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1122     GST_DEBUG ("waiting for status change");
1123     g_cond_wait (media->cond, media->lock);
1124   }
1125   /* could be success or error */
1126   result = media->status;
1127   GST_DEBUG ("got status %d", result);
1128   g_mutex_unlock (media->lock);
1129
1130   return result;
1131 }
1132
1133 static gboolean
1134 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1135 {
1136   GstMessageType type;
1137
1138   type = GST_MESSAGE_TYPE (message);
1139
1140   switch (type) {
1141     case GST_MESSAGE_STATE_CHANGED:
1142       break;
1143     case GST_MESSAGE_BUFFERING:
1144     {
1145       gint percent;
1146
1147       gst_message_parse_buffering (message, &percent);
1148
1149       /* no state management needed for live pipelines */
1150       if (media->is_live)
1151         break;
1152
1153       if (percent == 100) {
1154         /* a 100% message means buffering is done */
1155         media->buffering = FALSE;
1156         /* if the desired state is playing, go back */
1157         if (media->target_state == GST_STATE_PLAYING) {
1158           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1159           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1160         } else {
1161           GST_INFO ("Buffering done");
1162         }
1163       } else {
1164         /* buffering busy */
1165         if (media->buffering == FALSE) {
1166           if (media->target_state == GST_STATE_PLAYING) {
1167             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1168             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1169             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1170           } else {
1171             GST_INFO ("Buffering ...");
1172           }
1173         }
1174         media->buffering = TRUE;
1175       }
1176       break;
1177     }
1178     case GST_MESSAGE_LATENCY:
1179     {
1180       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1181       break;
1182     }
1183     case GST_MESSAGE_ERROR:
1184     {
1185       GError *gerror;
1186       gchar *debug;
1187
1188       gst_message_parse_error (message, &gerror, &debug);
1189       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1190       g_error_free (gerror);
1191       g_free (debug);
1192
1193       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1194       break;
1195     }
1196     case GST_MESSAGE_WARNING:
1197     {
1198       GError *gerror;
1199       gchar *debug;
1200
1201       gst_message_parse_warning (message, &gerror, &debug);
1202       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1203       g_error_free (gerror);
1204       g_free (debug);
1205       break;
1206     }
1207     case GST_MESSAGE_ELEMENT:
1208       break;
1209     case GST_MESSAGE_STREAM_STATUS:
1210       break;
1211     case GST_MESSAGE_ASYNC_DONE:
1212       GST_INFO ("%p: got ASYNC_DONE", media);
1213       collect_media_stats (media);
1214
1215       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1216       break;
1217     default:
1218       GST_INFO ("%p: got message type %s", media,
1219           gst_message_type_get_name (type));
1220       break;
1221   }
1222   return TRUE;
1223 }
1224
1225 static gboolean
1226 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1227 {
1228   GstRTSPMediaClass *klass;
1229   gboolean ret;
1230
1231   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1232
1233   if (klass->handle_message)
1234     ret = klass->handle_message (media, message);
1235   else
1236     ret = FALSE;
1237
1238   return ret;
1239 }
1240
1241 static void
1242 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1243 {
1244   GstRTSPMediaStream *stream;
1245   gchar *name;
1246   gint i;
1247
1248   i = media->streams->len + 1;
1249
1250   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1251
1252   stream = g_new0 (GstRTSPMediaStream, 1);
1253   stream->payloader = element;
1254
1255   name = g_strdup_printf ("dynpay%d", i);
1256
1257   /* ghost the pad of the payloader to the element */
1258   stream->srcpad = gst_ghost_pad_new (name, pad);
1259   gst_pad_set_active (stream->srcpad, TRUE);
1260   gst_element_add_pad (media->element, stream->srcpad);
1261   g_free (name);
1262
1263   /* add stream now */
1264   g_array_append_val (media->streams, stream);
1265
1266   setup_stream (stream, i, media);
1267
1268   for (i = 0; i < 2; i++) {
1269     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1270     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1271     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1272     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1273     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1274   }
1275 }
1276
1277 static void
1278 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1279 {
1280   GST_INFO ("no more pads");
1281   if (media->fakesink) {
1282     gst_object_ref (media->fakesink);
1283     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1284     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1285     gst_object_unref (media->fakesink);
1286     media->fakesink = NULL;
1287     GST_INFO ("removed fakesink");
1288   }
1289 }
1290
1291 /**
1292  * gst_rtsp_media_prepare:
1293  * @obj: a #GstRTSPMedia
1294  *
1295  * Prepare @media for streaming. This function will create the pipeline and
1296  * other objects to manage the streaming.
1297  *
1298  * It will preroll the pipeline and collect vital information about the streams
1299  * such as the duration.
1300  *
1301  * Returns: %TRUE on success.
1302  */
1303 gboolean
1304 gst_rtsp_media_prepare (GstRTSPMedia * media)
1305 {
1306   GstStateChangeReturn ret;
1307   GstRTSPMediaStatus status;
1308   guint i, n_streams;
1309   GstRTSPMediaClass *klass;
1310   GstBus *bus;
1311   GList *walk;
1312
1313   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1314     goto was_prepared;
1315
1316   if (!media->reusable && media->reused)
1317     goto is_reused;
1318
1319   GST_INFO ("preparing media %p", media);
1320
1321   /* reset some variables */
1322   media->is_live = FALSE;
1323   media->buffering = FALSE;
1324   /* we're preparing now */
1325   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1326
1327   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1328
1329   /* add the pipeline bus to our custom mainloop */
1330   media->source = gst_bus_create_watch (bus);
1331   gst_object_unref (bus);
1332
1333   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1334
1335   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1336   media->id = g_source_attach (media->source, klass->context);
1337
1338   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1339
1340   /* add stuff to the bin */
1341   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1342
1343   /* link streams we already have, other streams might appear when we have
1344    * dynamic elements */
1345   n_streams = gst_rtsp_media_n_streams (media);
1346   for (i = 0; i < n_streams; i++) {
1347     GstRTSPMediaStream *stream;
1348
1349     stream = gst_rtsp_media_get_stream (media, i);
1350
1351     setup_stream (stream, i, media);
1352   }
1353
1354   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1355     GstElement *elem = walk->data;
1356
1357     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1358     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1359
1360     /* we add a fakesink here in order to make the state change async. We remove
1361      * the fakesink again in the no-more-pads callback. */
1362     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1363     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1364   }
1365
1366   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1367   /* first go to PAUSED */
1368   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1369   media->target_state = GST_STATE_PAUSED;
1370
1371   switch (ret) {
1372     case GST_STATE_CHANGE_SUCCESS:
1373       GST_INFO ("SUCCESS state change for media %p", media);
1374       break;
1375     case GST_STATE_CHANGE_ASYNC:
1376       GST_INFO ("ASYNC state change for media %p", media);
1377       break;
1378     case GST_STATE_CHANGE_NO_PREROLL:
1379       /* we need to go to PLAYING */
1380       GST_INFO ("NO_PREROLL state change: live media %p", media);
1381       media->is_live = TRUE;
1382       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1383       if (ret == GST_STATE_CHANGE_FAILURE)
1384         goto state_failed;
1385       break;
1386     case GST_STATE_CHANGE_FAILURE:
1387       goto state_failed;
1388   }
1389
1390   /* now wait for all pads to be prerolled */
1391   status = gst_rtsp_media_get_status (media);
1392   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1393     goto state_failed;
1394
1395   GST_INFO ("object %p is prerolled", media);
1396
1397   return TRUE;
1398
1399   /* OK */
1400 was_prepared:
1401   {
1402     return TRUE;
1403   }
1404   /* ERRORS */
1405 state_failed:
1406   {
1407     GST_WARNING ("failed to preroll pipeline");
1408     unlock_streams (media);
1409     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1410     return FALSE;
1411   }
1412 is_reused:
1413   {
1414     GST_WARNING ("can not reuse media %p", media);
1415     return FALSE;
1416   }
1417 }
1418
1419 /**
1420  * gst_rtsp_media_unprepare:
1421  * @obj: a #GstRTSPMedia
1422  *
1423  * Unprepare @media. After this call, the media should be prepared again before
1424  * it can be used again. If the media is set to be non-reusable, a new instance
1425  * must be created.
1426  *
1427  * Returns: %TRUE on success.
1428  */
1429 gboolean
1430 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1431 {
1432   GstRTSPMediaClass *klass;
1433   gboolean success;
1434
1435   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1436     return TRUE;
1437
1438   GST_INFO ("unprepare media %p", media);
1439   media->target_state = GST_STATE_NULL;
1440
1441   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1442   if (klass->unprepare)
1443     success = klass->unprepare (media);
1444   else
1445     success = TRUE;
1446
1447   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1448   media->reused = TRUE;
1449
1450   /* when the media is not reusable, this will effectively unref the media and
1451    * recreate it */
1452   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1453
1454   return success;
1455 }
1456
1457 static gboolean
1458 default_unprepare (GstRTSPMedia * media)
1459 {
1460   gst_element_set_state (media->pipeline, GST_STATE_NULL);
1461
1462   return TRUE;
1463 }
1464
1465 /**
1466  * gst_rtsp_media_set_state:
1467  * @media: a #GstRTSPMedia
1468  * @state: the target state of the media
1469  * @transports: a GArray of #GstRTSPMediaTrans pointers
1470  *
1471  * Set the state of @media to @state and for the transports in @transports.
1472  *
1473  * Returns: %TRUE on success.
1474  */
1475 gboolean
1476 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1477     GArray * transports)
1478 {
1479   gint i;
1480   GstStateChangeReturn ret;
1481   gboolean add, remove, do_state;
1482   gint old_active;
1483
1484   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1485   g_return_val_if_fail (transports != NULL, FALSE);
1486
1487   /* NULL and READY are the same */
1488   if (state == GST_STATE_READY)
1489     state = GST_STATE_NULL;
1490
1491   add = remove = FALSE;
1492
1493   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1494       media);
1495
1496   switch (state) {
1497     case GST_STATE_NULL:
1498       /* unlock the streams so that they follow the state changes from now on */
1499       unlock_streams (media);
1500       /* fallthrough */
1501     case GST_STATE_PAUSED:
1502       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1503       if (media->target_state == GST_STATE_PLAYING)
1504         remove = TRUE;
1505       break;
1506     case GST_STATE_PLAYING:
1507       /* we're going to PLAYING, add */
1508       add = TRUE;
1509       break;
1510     default:
1511       break;
1512   }
1513   old_active = media->active;
1514
1515   for (i = 0; i < transports->len; i++) {
1516     GstRTSPMediaTrans *tr;
1517     GstRTSPMediaStream *stream;
1518     GstRTSPTransport *trans;
1519
1520     /* we need a non-NULL entry in the array */
1521     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1522     if (tr == NULL)
1523       continue;
1524
1525     /* we need a transport */
1526     if (!(trans = tr->transport))
1527       continue;
1528
1529     /* get the stream and add the destinations */
1530     stream = gst_rtsp_media_get_stream (media, tr->idx);
1531     switch (trans->lower_transport) {
1532       case GST_RTSP_LOWER_TRANS_UDP:
1533       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1534       {
1535         gchar *dest;
1536         gint min, max;
1537
1538         dest = trans->destination;
1539         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1540           min = trans->port.min;
1541           max = trans->port.max;
1542         } else {
1543           min = trans->client_port.min;
1544           max = trans->client_port.max;
1545         }
1546
1547         if (add && !tr->active) {
1548           GST_INFO ("adding %s:%d-%d", dest, min, max);
1549           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1550           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1551           stream->transports = g_list_prepend (stream->transports, tr);
1552           tr->active = TRUE;
1553           media->active++;
1554         } else if (remove && tr->active) {
1555           GST_INFO ("removing %s:%d-%d", dest, min, max);
1556           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1557           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1558           stream->transports = g_list_remove (stream->transports, tr);
1559           tr->active = FALSE;
1560           media->active--;
1561         }
1562         break;
1563       }
1564       case GST_RTSP_LOWER_TRANS_TCP:
1565         if (add && !tr->active) {
1566           GST_INFO ("adding TCP %s", trans->destination);
1567           stream->transports = g_list_prepend (stream->transports, tr);
1568           tr->active = TRUE;
1569           media->active++;
1570         } else if (remove && tr->active) {
1571           GST_INFO ("removing TCP %s", trans->destination);
1572           stream->transports = g_list_remove (stream->transports, tr);
1573           tr->active = FALSE;
1574           media->active--;
1575         }
1576         break;
1577       default:
1578         GST_INFO ("Unknown transport %d", trans->lower_transport);
1579         break;
1580     }
1581   }
1582
1583   /* we just added the first media, do the playing state change */
1584   if (old_active == 0 && add)
1585     do_state = TRUE;
1586   /* if we have no more active media, do the downward state changes */
1587   else if (media->active == 0)
1588     do_state = TRUE;
1589   else
1590     do_state = FALSE;
1591
1592   GST_INFO ("active %d media %p", media->active, media);
1593
1594   if (do_state && media->target_state != state) {
1595     if (state == GST_STATE_NULL) {
1596       gst_rtsp_media_unprepare (media);
1597     } else {
1598       GST_INFO ("state %s media %p", gst_element_state_get_name (state), media);
1599       media->target_state = state;
1600       ret = gst_element_set_state (media->pipeline, state);
1601     }
1602   }
1603
1604   /* remember where we are */
1605   if (state == GST_STATE_PAUSED)
1606     collect_media_stats (media);
1607
1608   return TRUE;
1609 }
1610
1611 /**
1612  * gst_rtsp_media_remove_elements:
1613  * @media: a #GstRTSPMedia
1614  *
1615  * Remove all elements and the pipeline controlled by @media.
1616  */
1617 void
1618 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1619 {
1620   gint i, j;
1621
1622   unlock_streams (media);
1623
1624   for (i = 0; i < media->streams->len; i++) {
1625     GstRTSPMediaStream *stream;
1626
1627     GST_INFO ("Removing elements of stream %d from pipeline", i);
1628
1629     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1630
1631     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
1632
1633     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
1634
1635     for (j = 0; j < 2; j++) {
1636       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
1637       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
1638       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
1639       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
1640       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
1641       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
1642
1643       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
1644       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
1645       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
1646       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
1647       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
1648       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
1649     }
1650     if (stream->caps)
1651       gst_caps_unref (stream->caps);
1652     stream->caps = NULL;
1653     gst_rtsp_media_stream_free (stream);
1654   }
1655   g_array_remove_range (media->streams, 0, media->streams->len);
1656
1657   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
1658   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
1659
1660   gst_object_unref (media->pipeline);
1661   media->pipeline = NULL;
1662 }