media: don't leak session pads
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
24
25 #include "rtsp-media.h"
26
27 #define DEFAULT_SHARED         FALSE
28 #define DEFAULT_REUSABLE       FALSE
29
30 /* define to dump received RTCP packets */
31 #undef DUMP_STATS
32
33 enum
34 {
35   PROP_0,
36   PROP_SHARED,
37   PROP_REUSABLE,
38   PROP_LAST
39 };
40
41 enum
42 {
43   SIGNAL_UNPREPARED,
44   SIGNAL_LAST
45 };
46
47 static GQuark ssrc_stream_map_key;
48
49 static void gst_rtsp_media_get_property (GObject *object, guint propid,
50     GValue *value, GParamSpec *pspec);
51 static void gst_rtsp_media_set_property (GObject *object, guint propid,
52     const GValue *value, GParamSpec *pspec);
53 static void gst_rtsp_media_finalize (GObject * obj);
54
55 static gpointer do_loop (GstRTSPMediaClass *klass);
56 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
57 static void unlock_streams (GstRTSPMedia *media);
58
59 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
60
61 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
62
63 static void
64 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
65 {
66   GObjectClass *gobject_class;
67   GError *error = NULL;
68
69   gobject_class = G_OBJECT_CLASS (klass);
70
71   gobject_class->get_property = gst_rtsp_media_get_property;
72   gobject_class->set_property = gst_rtsp_media_set_property;
73   gobject_class->finalize = gst_rtsp_media_finalize;
74
75   g_object_class_install_property (gobject_class, PROP_SHARED,
76       g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
77           DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
78
79   g_object_class_install_property (gobject_class, PROP_REUSABLE,
80       g_param_spec_boolean ("reusable", "Reusable",
81           "If this media pipeline can be reused after an unprepare",
82           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
83
84   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
85       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
86       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
87       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
88
89   klass->context = g_main_context_new ();
90   klass->loop = g_main_loop_new (klass->context, TRUE);
91
92   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
93   if (error != NULL) {
94     g_critical ("could not start bus thread: %s", error->message);
95   }
96   klass->handle_message = default_handle_message;
97
98   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
99 }
100
101 static void
102 gst_rtsp_media_init (GstRTSPMedia * media)
103 {
104   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
105   media->is_live = FALSE;
106   media->buffering = FALSE;
107 }
108
109 static void
110 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
111 {
112   if (stream->session)
113     g_object_unref (stream->session);
114
115   if (stream->caps)
116     gst_caps_unref (stream->caps);
117
118   if (stream->send_rtp_sink)
119     gst_object_unref (stream->send_rtp_sink);
120   if (stream->send_rtp_src)
121     gst_object_unref (stream->send_rtp_src);
122   if (stream->send_rtcp_src)
123     gst_object_unref (stream->send_rtcp_src);
124   if (stream->recv_rtcp_sink)
125     gst_object_unref (stream->recv_rtcp_sink);
126   if (stream->recv_rtp_sink)
127     gst_object_unref (stream->recv_rtp_sink);
128
129   g_list_free (stream->transports);
130
131   g_free (stream);
132 }
133
134 static void
135 gst_rtsp_media_finalize (GObject * obj)
136 {
137   GstRTSPMedia *media;
138   guint i;
139
140   media = GST_RTSP_MEDIA (obj);
141
142   g_message ("finalize media %p", media);
143
144   if (media->pipeline) {
145     unlock_streams (media);
146     gst_element_set_state (media->pipeline, GST_STATE_NULL);
147     gst_object_unref (media->pipeline);
148   }
149
150   for (i = 0; i < media->streams->len; i++) {
151     GstRTSPMediaStream *stream;
152
153     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
154
155     gst_rtsp_media_stream_free (stream);
156   }
157   g_array_free (media->streams, TRUE);
158
159   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
160   g_list_free (media->dynamic);
161
162   if (media->source) {
163     g_source_destroy (media->source);
164     g_source_unref (media->source);
165   }
166
167   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
168 }
169
170 static void
171 gst_rtsp_media_get_property (GObject *object, guint propid,
172     GValue *value, GParamSpec *pspec)
173 {
174   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
175
176   switch (propid) {
177     case PROP_SHARED:
178       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
179       break;
180     case PROP_REUSABLE:
181       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
182       break;
183     default:
184       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
185   }
186 }
187
188 static void
189 gst_rtsp_media_set_property (GObject *object, guint propid,
190     const GValue *value, GParamSpec *pspec)
191 {
192   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
193
194   switch (propid) {
195     case PROP_SHARED:
196       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
197       break;
198     case PROP_REUSABLE:
199       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
200       break;
201     default:
202       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
203   }
204 }
205
206 static gpointer
207 do_loop (GstRTSPMediaClass *klass)
208 {
209   g_message ("enter mainloop");
210   g_main_loop_run (klass->loop);
211   g_message ("exit mainloop");
212
213   return NULL;
214 }
215
216 static void
217 collect_media_stats (GstRTSPMedia *media)
218 {
219   GstFormat format;
220   gint64 position, duration;
221
222   media->range.unit = GST_RTSP_RANGE_NPT;
223
224   if (media->is_live) {
225     media->range.min.type = GST_RTSP_TIME_NOW;
226     media->range.min.seconds = -1;
227     media->range.max.type = GST_RTSP_TIME_END;
228     media->range.max.seconds = -1;
229   }
230   else {
231     /* get the position */
232     format = GST_FORMAT_TIME;
233     if (!gst_element_query_position (media->pipeline, &format, &position)) {
234       g_message ("position query failed");
235       position = 0;
236     }
237
238     /* get the duration */
239     format = GST_FORMAT_TIME;
240     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
241       g_message ("duration query failed");
242       duration = -1;
243     }
244
245     g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
246         GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
247
248     if (position == -1) {
249       media->range.min.type = GST_RTSP_TIME_NOW;
250       media->range.min.seconds = -1;
251     }
252     else {
253       media->range.min.type = GST_RTSP_TIME_SECONDS;
254       media->range.min.seconds = ((gdouble)position) / GST_SECOND;
255     }
256     if (duration == -1) {
257       media->range.max.type = GST_RTSP_TIME_END;
258       media->range.max.seconds = -1;
259     }
260     else {
261       media->range.max.type = GST_RTSP_TIME_SECONDS;
262       media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
263     }
264   }
265 }
266
267 /**
268  * gst_rtsp_media_new:
269  *
270  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
271  * element to produde RTP data for one or more related (audio/video/..) 
272  * streams.
273  *
274  * Returns: a new #GstRTSPMedia object.
275  */
276 GstRTSPMedia *
277 gst_rtsp_media_new (void)
278 {
279   GstRTSPMedia *result;
280
281   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
282
283   return result;
284 }
285
286 /**
287  * gst_rtsp_media_set_shared:
288  * @media: a #GstRTSPMedia
289  * @shared: the new value
290  *
291  * Set or unset if the pipeline for @media can be shared will multiple clients.
292  * When @shared is %TRUE, client requests for this media will share the media
293  * pipeline.
294  */
295 void
296 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
297 {
298   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
299
300   media->shared = shared;
301 }
302
303 /**
304  * gst_rtsp_media_is_shared:
305  * @media: a #GstRTSPMedia
306  *
307  * Check if the pipeline for @media can be shared between multiple clients.
308  *
309  * Returns: %TRUE if the media can be shared between clients.
310  */
311 gboolean
312 gst_rtsp_media_is_shared (GstRTSPMedia *media)
313 {
314   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
315
316   return media->shared;
317 }
318
319 /**
320  * gst_rtsp_media_set_reusable:
321  * @media: a #GstRTSPMedia
322  * @reusable: the new value
323  *
324  * Set or unset if the pipeline for @media can be reused after the pipeline has
325  * been unprepared.
326  */
327 void
328 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
329 {
330   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
331
332   media->reusable = reusable;
333 }
334
335 /**
336  * gst_rtsp_media_is_reusable:
337  * @media: a #GstRTSPMedia
338  *
339  * Check if the pipeline for @media can be reused after an unprepare.
340  *
341  * Returns: %TRUE if the media can be reused
342  */
343 gboolean
344 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
345 {
346   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
347
348   return media->reusable;
349 }
350
351 /**
352  * gst_rtsp_media_n_streams:
353  * @media: a #GstRTSPMedia
354  *
355  * Get the number of streams in this media.
356  *
357  * Returns: The number of streams.
358  */
359 guint
360 gst_rtsp_media_n_streams (GstRTSPMedia *media)
361 {
362   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
363
364   return media->streams->len;
365 }
366
367 /**
368  * gst_rtsp_media_get_stream:
369  * @media: a #GstRTSPMedia
370  * @idx: the stream index
371  *
372  * Retrieve the stream with index @idx from @media.
373  *
374  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
375  * that index did not exist.
376  */
377 GstRTSPMediaStream *
378 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
379 {
380   GstRTSPMediaStream *res;
381   
382   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
383
384   if (idx < media->streams->len)
385     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
386   else
387     res = NULL;
388
389   return res;
390 }
391
392 /**
393  * gst_rtsp_media_seek:
394  * @stream: a #GstRTSPMediaStream
395  * @range: a #GstRTSPTimeRange
396  *
397  * Seek the pipeline to @range.
398  *
399  * Returns: %TRUE on success.
400  */
401 gboolean
402 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
403 {
404   GstSeekFlags flags;
405   gboolean res;
406   gint64 start, stop;
407   GstSeekType start_type, stop_type;
408
409   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
410   g_return_val_if_fail (range != NULL, FALSE);
411
412   if (range->unit != GST_RTSP_RANGE_NPT)
413     goto not_supported;
414
415   /* depends on the current playing state of the pipeline. We might need to
416    * queue this until we get EOS. */
417   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
418
419   start_type = stop_type = GST_SEEK_TYPE_NONE;
420
421   switch (range->min.type) {
422     case GST_RTSP_TIME_NOW:
423       start = -1;
424       break;
425     case GST_RTSP_TIME_SECONDS:
426       /* only seek when something changed */
427       if (media->range.min.seconds == range->min.seconds) {
428         start = -1;
429       } else {
430         start = range->min.seconds * GST_SECOND;
431         start_type = GST_SEEK_TYPE_SET;
432       }
433       break;
434     case GST_RTSP_TIME_END:
435     default:
436       goto weird_type;
437   }
438   switch (range->max.type) {
439     case GST_RTSP_TIME_SECONDS:
440       /* only seek when something changed */
441       if (media->range.max.seconds == range->max.seconds) {
442         stop = -1;
443       } else {
444         stop = range->max.seconds * GST_SECOND;
445         stop_type = GST_SEEK_TYPE_SET;
446       }
447       break;
448     case GST_RTSP_TIME_END:
449       stop = -1;
450       stop_type = GST_SEEK_TYPE_SET;
451       break;
452     case GST_RTSP_TIME_NOW:
453     default:
454       goto weird_type;
455   }
456   
457   if (start != -1 || stop != -1) {
458     g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
459                 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
460
461     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
462         flags, start_type, start, stop_type, stop);
463
464     /* and block for the seek to complete */
465     g_message ("done seeking %d", res);
466     gst_element_get_state (media->pipeline, NULL, NULL, -1);
467     g_message ("prerolled again");
468
469     collect_media_stats (media);
470   }
471   else {
472     g_message ("no seek needed");
473     res = TRUE;
474   }
475
476   return res;
477
478   /* ERRORS */
479 not_supported:
480   {
481     g_warning ("seek unit %d not supported", range->unit);
482     return FALSE;
483   }
484 weird_type:
485   {
486     g_warning ("weird range type %d not supported", range->min.type);
487     return FALSE;
488   }
489 }
490
491 /**
492  * gst_rtsp_media_stream_rtp:
493  * @stream: a #GstRTSPMediaStream
494  * @buffer: a #GstBuffer
495  *
496  * Handle an RTP buffer for the stream. This method is usually called when a
497  * message has been received from a client using the TCP transport.
498  *
499  * This function takes ownership of @buffer.
500  *
501  * Returns: a GstFlowReturn.
502  */
503 GstFlowReturn
504 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
505 {
506   GstFlowReturn ret;
507
508   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
509
510   return ret;
511 }
512
513 /**
514  * gst_rtsp_media_stream_rtcp:
515  * @stream: a #GstRTSPMediaStream
516  * @buffer: a #GstBuffer
517  *
518  * Handle an RTCP buffer for the stream. This method is usually called when a
519  * message has been received from a client using the TCP transport.
520  *
521  * This function takes ownership of @buffer.
522  *
523  * Returns: a GstFlowReturn.
524  */
525 GstFlowReturn
526 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
527 {
528   GstFlowReturn ret;
529
530   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
531
532   return ret;
533 }
534
535 /* Allocate the udp ports and sockets */
536 static gboolean
537 alloc_udp_ports (GstRTSPMediaStream * stream)
538 {
539   GstStateChangeReturn ret;
540   GstElement *udpsrc0, *udpsrc1;
541   GstElement *udpsink0, *udpsink1;
542   gint tmp_rtp, tmp_rtcp;
543   guint count;
544   gint rtpport, rtcpport, sockfd;
545
546   udpsrc0 = NULL;
547   udpsrc1 = NULL;
548   udpsink0 = NULL;
549   udpsink1 = NULL;
550   count = 0;
551
552   /* Start with random port */
553   tmp_rtp = 0;
554
555   /* try to allocate 2 UDP ports, the RTP port should be an even
556    * number and the RTCP port should be the next (uneven) port */
557 again:
558   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
559   if (udpsrc0 == NULL)
560     goto no_udp_protocol;
561   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
562
563   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
564   if (ret == GST_STATE_CHANGE_FAILURE) {
565     if (tmp_rtp != 0) {
566       tmp_rtp += 2;
567       if (++count > 20)
568         goto no_ports;
569
570       gst_element_set_state (udpsrc0, GST_STATE_NULL);
571       gst_object_unref (udpsrc0);
572
573       goto again;
574     }
575     goto no_udp_protocol;
576   }
577
578   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
579
580   /* check if port is even */
581   if ((tmp_rtp & 1) != 0) {
582     /* port not even, close and allocate another */
583     if (++count > 20)
584       goto no_ports;
585
586     gst_element_set_state (udpsrc0, GST_STATE_NULL);
587     gst_object_unref (udpsrc0);
588
589     tmp_rtp++;
590     goto again;
591   }
592
593   /* allocate port+1 for RTCP now */
594   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
595   if (udpsrc1 == NULL)
596     goto no_udp_rtcp_protocol;
597
598   /* set port */
599   tmp_rtcp = tmp_rtp + 1;
600   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
601
602   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
603   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
604   if (ret == GST_STATE_CHANGE_FAILURE) {
605
606     if (++count > 20)
607       goto no_ports;
608
609     gst_element_set_state (udpsrc0, GST_STATE_NULL);
610     gst_object_unref (udpsrc0);
611
612     gst_element_set_state (udpsrc1, GST_STATE_NULL);
613     gst_object_unref (udpsrc1);
614
615     tmp_rtp += 2;
616     goto again;
617   }
618
619   /* all fine, do port check */
620   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
621   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
622
623   /* this should not happen... */
624   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
625     goto port_error;
626
627   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
628   if (!udpsink0)
629     goto no_udp_protocol;
630
631   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
632   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
633   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
634
635   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
636   if (!udpsink1)
637     goto no_udp_protocol;
638
639   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
640   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
641   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
642   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
643   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
644
645   /* we keep these elements, we configure all in configure_transport when the
646    * server told us to really use the UDP ports. */
647   stream->udpsrc[0] = udpsrc0;
648   stream->udpsrc[1] = udpsrc1;
649   stream->udpsink[0] = udpsink0;
650   stream->udpsink[1] = udpsink1;
651   stream->server_port.min = rtpport;
652   stream->server_port.max = rtcpport;
653
654   return TRUE;
655
656   /* ERRORS */
657 no_udp_protocol:
658   {
659     goto cleanup;
660   }
661 no_ports:
662   {
663     goto cleanup;
664   }
665 no_udp_rtcp_protocol:
666   {
667     goto cleanup;
668   }
669 port_error:
670   {
671     goto cleanup;
672   }
673 cleanup:
674   {
675     if (udpsrc0) {
676       gst_element_set_state (udpsrc0, GST_STATE_NULL);
677       gst_object_unref (udpsrc0);
678     }
679     if (udpsrc1) {
680       gst_element_set_state (udpsrc1, GST_STATE_NULL);
681       gst_object_unref (udpsrc1);
682     }
683     if (udpsink0) {
684       gst_element_set_state (udpsink0, GST_STATE_NULL);
685       gst_object_unref (udpsink0);
686     }
687     if (udpsink1) {
688       gst_element_set_state (udpsink1, GST_STATE_NULL);
689       gst_object_unref (udpsink1);
690     }
691     return FALSE;
692   }
693 }
694
695 static void
696 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
697 {
698   gchar *capsstr;
699   GstCaps *newcaps, *oldcaps;
700
701   if ((newcaps = GST_PAD_CAPS (pad)))
702     gst_caps_ref (newcaps);
703
704   oldcaps = stream->caps;
705   stream->caps = newcaps;
706
707   if (oldcaps)
708     gst_caps_unref (oldcaps);
709
710   capsstr = gst_caps_to_string (newcaps);
711   g_message ("stream %p received caps %p, %s", stream, newcaps, capsstr);
712   g_free (capsstr);
713 }
714
715 static void
716 dump_structure (const GstStructure *s)
717 {
718   gchar *sstr;
719
720   sstr = gst_structure_to_string (s);
721   g_message ("structure: %s", sstr);
722   g_free (sstr);
723 }
724
725 static GstRTSPMediaTrans *
726 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
727 {
728   GList *walk;
729   GstRTSPMediaTrans *result = NULL;
730   const gchar *tmp;
731   gchar *dest;
732   guint port;
733
734   if (rtcp_from == NULL)
735     return NULL;
736
737   tmp = g_strrstr (rtcp_from, ":");
738   if (tmp == NULL)
739     return NULL;
740
741   port = atoi (tmp + 1);
742   dest = g_strndup (rtcp_from, tmp - rtcp_from);
743
744   g_message ("finding %s:%d", dest, port);
745
746   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
747     GstRTSPMediaTrans *trans = walk->data;
748     gint min, max;
749
750     min = trans->transport->client_port.min;
751     max = trans->transport->client_port.max;
752
753     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
754       result = trans;
755       break;
756     }
757   }
758   g_free (dest);
759
760   return result;
761 }
762
763 static void
764 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
765 {
766   GstStructure *stats;
767   GstRTSPMediaTrans *trans;
768
769   g_message ("%p: new source %p", stream, source);
770
771   /* see if we have a stream to match with the origin of the RTCP packet */
772   trans = g_object_get_qdata (source, ssrc_stream_map_key);
773   if (trans == NULL) {
774     g_object_get (source, "stats", &stats, NULL);
775     if (stats) {
776       const gchar *rtcp_from;
777
778       dump_structure (stats);
779
780       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
781       if ((trans = find_transport (stream, rtcp_from))) {
782         g_message ("%p: found transport %p for source  %p", stream, trans, source);
783
784         /* keep ref to the source */
785         trans->rtpsource = source;
786
787         g_object_set_qdata (source, ssrc_stream_map_key, trans);
788       }
789       gst_structure_free (stats);
790     }
791   } else {
792     g_message ("%p: source %p for transport %p", stream, source, trans);
793   }
794 }
795
796 static void
797 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
798 {
799   g_message ("%p: new SDES %p", stream, source);
800 }
801
802 static void
803 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
804 {
805   GstRTSPMediaTrans *trans;
806
807   trans = g_object_get_qdata (source, ssrc_stream_map_key);
808
809   g_message ("%p: source %p in transport %p is active", stream, source, trans);
810
811   if (trans && trans->keep_alive)
812     trans->keep_alive (trans->ka_user_data);
813
814 #ifdef DUMP_STATS
815   {
816     GstStructure *stats;
817     g_object_get (source, "stats", &stats, NULL);
818     if (stats) {
819       dump_structure (stats);
820       gst_structure_free (stats);
821     }
822   }
823 #endif
824 }
825
826 static void
827 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
828 {
829   g_message ("%p: source %p bye", stream, source);
830 }
831
832 static void
833 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
834 {
835   GstRTSPMediaTrans *trans;
836
837   g_message ("%p: source %p bye timeout", stream, source);
838
839   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
840     trans->rtpsource = NULL;
841     trans->timeout = TRUE;
842   }
843 }
844
845 static void
846 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
847 {
848   GstRTSPMediaTrans *trans;
849
850   g_message ("%p: source %p timeout", stream, source);
851
852   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
853     trans->rtpsource = NULL;
854     trans->timeout = TRUE;
855   }
856 }
857
858 static GstFlowReturn
859 handle_new_buffer (GstAppSink *sink, gpointer user_data)
860 {
861   GList *walk;
862   GstBuffer *buffer;
863   GstRTSPMediaStream *stream;
864
865   buffer = gst_app_sink_pull_buffer (sink);
866   if (!buffer)
867     return GST_FLOW_OK;
868
869   stream = (GstRTSPMediaStream *) user_data;
870
871   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
872     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
873
874     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
875       if (tr->send_rtp) 
876         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
877     }
878     else {
879       if (tr->send_rtcp) 
880         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
881     }
882   }
883   gst_buffer_unref (buffer);
884
885   return GST_FLOW_OK;
886 }
887
888 static GstAppSinkCallbacks sink_cb = {
889   NULL,  /* not interested in EOS */
890   NULL,  /* not interested in preroll buffers */
891   handle_new_buffer
892 };
893
894 /* prepare the pipeline objects to handle @stream in @media */
895 static gboolean
896 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
897 {
898   gchar *name;
899   GstPad *pad, *teepad, *selpad;
900   GstPadLinkReturn ret;
901   gint i;
902
903   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
904    * for sending RTP/RTCP. The sender and receiver ports are shared between the
905    * elements */
906   if (!alloc_udp_ports (stream))
907     return FALSE;
908
909   /* add the ports to the pipeline */
910   for (i = 0; i < 2; i++) {
911     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
912     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
913   }
914
915   /* create elements for the TCP transfer */
916   for (i = 0; i < 2; i++) {
917     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
918     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
919     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
920     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
921     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
922     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
923     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
924     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
925                   &sink_cb, stream, NULL);
926   }
927
928   /* hook up the stream to the RTP session elements. */
929   name = g_strdup_printf ("send_rtp_sink_%d", idx);
930   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
931   g_free (name);
932   name = g_strdup_printf ("send_rtp_src_%d", idx);
933   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
934   g_free (name);
935   name = g_strdup_printf ("send_rtcp_src_%d", idx);
936   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
937   g_free (name);
938   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
939   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
940   g_free (name);
941   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
942   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
943   g_free (name);
944
945   /* get the session */
946   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
947                   &stream->session);
948
949   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
950       stream);
951   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
952       stream);
953   g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
954       stream);
955   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
956       stream);
957   g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
958       stream);
959   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
960       stream);
961
962   /* link the RTP pad to the session manager */
963   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
964   if (ret != GST_PAD_LINK_OK)
965     goto link_failed;
966
967   /* make tee for RTP and link to stream */
968   stream->tee[0] = gst_element_factory_make ("tee", NULL);
969   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
970
971   pad = gst_element_get_static_pad (stream->tee[0], "sink");
972   gst_pad_link (stream->send_rtp_src, pad);
973   gst_object_unref (pad);
974
975   /* link RTP sink, we're pretty sure this will work. */
976   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
977   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
978   gst_pad_link (teepad, pad);
979   gst_object_unref (pad);
980   gst_object_unref (teepad);
981
982   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
983   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
984   gst_pad_link (teepad, pad);
985   gst_object_unref (pad);
986   gst_object_unref (teepad);
987
988   /* make tee for RTCP */
989   stream->tee[1] = gst_element_factory_make ("tee", NULL);
990   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
991
992   pad = gst_element_get_static_pad (stream->tee[1], "sink");
993   gst_pad_link (stream->send_rtcp_src, pad);
994   gst_object_unref (pad);
995
996   /* link RTCP elements */
997   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
998   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
999   gst_pad_link (teepad, pad);
1000   gst_object_unref (pad);
1001   gst_object_unref (teepad);
1002
1003   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1004   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1005   gst_pad_link (teepad, pad);
1006   gst_object_unref (pad);
1007   gst_object_unref (teepad);
1008
1009   /* make selector for the RTP receivers */
1010   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1011   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1012   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1013
1014   pad = gst_element_get_static_pad (stream->selector[0], "src");
1015   gst_pad_link (pad, stream->recv_rtp_sink);
1016   gst_object_unref (pad);
1017
1018   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1019   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1020   gst_pad_link (pad, selpad);
1021   gst_object_unref (pad);
1022   gst_object_unref (selpad);
1023
1024   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1025   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1026   gst_pad_link (pad, selpad);
1027   gst_object_unref (pad);
1028   gst_object_unref (selpad);
1029
1030   /* make selector for the RTCP receivers */
1031   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1032   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1033   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1034
1035   pad = gst_element_get_static_pad (stream->selector[1], "src");
1036   gst_pad_link (pad, stream->recv_rtcp_sink);
1037   gst_object_unref (pad);
1038
1039   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1040   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1041   gst_pad_link (pad, selpad);
1042   gst_object_unref (pad);
1043   gst_object_unref (selpad);
1044
1045   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1046   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1047   gst_pad_link (pad, selpad);
1048   gst_object_unref (pad);
1049   gst_object_unref (selpad);
1050
1051   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1052    * values */
1053   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1054   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1055   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1056   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1057  
1058   /* be notified of caps changes */
1059   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1060                   (GCallback) caps_notify, stream);
1061
1062   stream->prepared = TRUE;
1063
1064   return TRUE;
1065
1066   /* ERRORS */
1067 link_failed:
1068   {
1069     g_warning ("failed to link stream %d", idx);
1070     return FALSE;
1071   }
1072 }
1073
1074 static void
1075 unlock_streams (GstRTSPMedia *media)
1076 {
1077   guint i, n_streams;
1078
1079   /* unlock the udp src elements */
1080   n_streams = gst_rtsp_media_n_streams (media);
1081   for (i = 0; i < n_streams; i++) {
1082     GstRTSPMediaStream *stream;
1083
1084     stream = gst_rtsp_media_get_stream (media, i);
1085
1086     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1087     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1088   }
1089 }
1090
1091 static gboolean
1092 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1093 {
1094   GstMessageType type;
1095
1096   type = GST_MESSAGE_TYPE (message);
1097
1098   switch (type) {
1099     case GST_MESSAGE_STATE_CHANGED:
1100       break;
1101     case GST_MESSAGE_BUFFERING:
1102     {
1103       gint percent;
1104
1105       gst_message_parse_buffering (message, &percent);
1106
1107       /* no state management needed for live pipelines */
1108       if (media->is_live)
1109         break;
1110
1111       if (percent == 100) {
1112         /* a 100% message means buffering is done */
1113         media->buffering = FALSE;
1114         /* if the desired state is playing, go back */
1115         if (media->target_state == GST_STATE_PLAYING) {
1116           g_message ("Buffering done, setting pipeline to PLAYING");
1117           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1118         }
1119         else {
1120           g_message ("Buffering done");
1121         }
1122       } else {
1123         /* buffering busy */
1124         if (media->buffering == FALSE) {
1125           if (media->target_state == GST_STATE_PLAYING) {
1126             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1127             g_message ("Buffering, setting pipeline to PAUSED ...");
1128             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1129           }
1130           else {
1131             g_message ("Buffering ...");
1132           }
1133         }
1134         media->buffering = TRUE;
1135       }
1136       break;
1137     }
1138     case GST_MESSAGE_LATENCY:
1139     {
1140       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1141       break;
1142     }
1143     case GST_MESSAGE_ERROR:
1144     {
1145       GError *gerror;
1146       gchar *debug;
1147
1148       gst_message_parse_error (message, &gerror, &debug);
1149       g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1150       g_error_free (gerror);
1151       g_free (debug);
1152       break;
1153     }
1154     case GST_MESSAGE_WARNING:
1155     {
1156       GError *gerror;
1157       gchar *debug;
1158
1159       gst_message_parse_warning (message, &gerror, &debug);
1160       g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1161       g_error_free (gerror);
1162       g_free (debug);
1163       break;
1164     }
1165     case GST_MESSAGE_ELEMENT:
1166       break;
1167     case GST_MESSAGE_STREAM_STATUS:
1168       break;
1169     default:
1170       g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1171       break;
1172   }
1173   return TRUE;
1174 }
1175
1176 static gboolean
1177 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1178 {
1179   GstRTSPMediaClass *klass;
1180   gboolean ret;
1181   
1182   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1183
1184   if (klass->handle_message)
1185     ret = klass->handle_message (media, message);
1186   else
1187     ret = FALSE;
1188
1189   return ret;
1190 }
1191
1192 static void
1193 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1194 {
1195   GstRTSPMediaStream *stream;
1196   gchar *name;
1197   gint i;
1198
1199   i = media->streams->len + 1;
1200
1201   g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1202
1203   stream = g_new0 (GstRTSPMediaStream, 1);
1204   stream->payloader = element;
1205
1206   name = g_strdup_printf ("dynpay%d", i);
1207
1208   /* ghost the pad of the payloader to the element */
1209   stream->srcpad = gst_ghost_pad_new (name, pad);
1210   gst_pad_set_active (stream->srcpad, TRUE);
1211   gst_element_add_pad (media->element, stream->srcpad);
1212   g_free (name);
1213
1214   /* add stream now */
1215   g_array_append_val (media->streams, stream);
1216
1217   setup_stream (stream, i, media);
1218
1219   for (i = 0; i < 2; i++) {
1220     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1221     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1222     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1223     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1224     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1225   }
1226 }
1227
1228 static void
1229 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1230 {
1231   g_message ("no more pads");
1232   if (media->fakesink) {
1233     gst_object_ref (media->fakesink);
1234     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1235     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1236     gst_object_unref (media->fakesink);
1237     media->fakesink = NULL;
1238     g_message ("removed fakesink");
1239   }
1240 }
1241
1242 /**
1243  * gst_rtsp_media_prepare:
1244  * @obj: a #GstRTSPMedia
1245  *
1246  * Prepare @media for streaming. This function will create the pipeline and
1247  * other objects to manage the streaming.
1248  *
1249  * It will preroll the pipeline and collect vital information about the streams
1250  * such as the duration.
1251  *
1252  * Returns: %TRUE on success.
1253  */
1254 gboolean
1255 gst_rtsp_media_prepare (GstRTSPMedia *media)
1256 {
1257   GstStateChangeReturn ret;
1258   guint i, n_streams;
1259   GstRTSPMediaClass *klass;
1260   GstBus *bus;
1261   GList *walk;
1262
1263   if (media->prepared)
1264     goto was_prepared;
1265
1266   if (!media->reusable && media->reused)
1267     goto is_reused;
1268
1269   g_message ("preparing media %p", media);
1270
1271   media->pipeline = gst_pipeline_new ("media-pipeline");
1272   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1273
1274   /* add the pipeline bus to our custom mainloop */
1275   media->source = gst_bus_create_watch (bus);
1276   gst_object_unref (bus);
1277
1278   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1279
1280   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1281   media->id = g_source_attach (media->source, klass->context);
1282
1283   gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
1284
1285   media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1286
1287   /* add stuff to the bin */
1288   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1289
1290   /* link streams we already have, other streams might appear when we have
1291    * dynamic elements */
1292   n_streams = gst_rtsp_media_n_streams (media);
1293   for (i = 0; i < n_streams; i++) {
1294     GstRTSPMediaStream *stream;
1295
1296     stream = gst_rtsp_media_get_stream (media, i);
1297
1298     setup_stream (stream, i, media);
1299   }
1300
1301   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1302     GstElement *elem = walk->data;
1303
1304     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1305     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1306
1307     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1308     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1309   }
1310
1311   /* first go to PAUSED */
1312   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1313   media->target_state = GST_STATE_PAUSED;
1314
1315   switch (ret) {
1316     case GST_STATE_CHANGE_SUCCESS:
1317       break;
1318     case GST_STATE_CHANGE_ASYNC:
1319       break;
1320     case GST_STATE_CHANGE_NO_PREROLL:
1321       /* we need to go to PLAYING */
1322       g_message ("live media %p", media);
1323       media->is_live = TRUE;
1324       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1325       if (ret == GST_STATE_CHANGE_FAILURE)
1326         goto state_failed;
1327       break;
1328     case GST_STATE_CHANGE_FAILURE:
1329       goto state_failed;
1330   }
1331
1332   /* now wait for all pads to be prerolled */
1333   ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1334   if (ret == GST_STATE_CHANGE_FAILURE)
1335     goto state_failed;
1336
1337   /* collect stats about the media */
1338   collect_media_stats (media);
1339
1340   g_message ("object %p is prerolled", media);
1341
1342   media->prepared = TRUE;
1343
1344   return TRUE;
1345
1346   /* OK */
1347 was_prepared:
1348   {
1349     return TRUE;
1350   }
1351   /* ERRORS */
1352 state_failed:
1353   {
1354     g_warning ("failed to preroll pipeline");
1355     unlock_streams (media);
1356     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1357     return FALSE;
1358   }
1359 is_reused:
1360   {
1361     g_warning ("can not reuse media %p", media);
1362     return FALSE;
1363   }
1364 }
1365
1366 /**
1367  * gst_rtsp_media_unprepare:
1368  * @obj: a #GstRTSPMedia
1369  *
1370  * Unprepare @media. After this call, the media should be prepared again before
1371  * it can be used again. If the media is set to be non-reusable, a new instance
1372  * must be created.
1373  *
1374  * Returns: %TRUE on success.
1375  */
1376 gboolean
1377 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1378 {
1379   if (!media->prepared)
1380     return TRUE;
1381
1382   g_message ("unprepare media %p", media);
1383   media->target_state = GST_STATE_NULL;
1384   gst_element_set_state (media->pipeline, GST_STATE_NULL);
1385
1386   media->prepared = FALSE;
1387   media->reused = TRUE;
1388
1389   /* when the media is not reusable, this will effectively unref the media and
1390    * recreate it */
1391   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1392
1393   return TRUE;
1394 }
1395
1396 /**
1397  * gst_rtsp_media_set_state:
1398  * @media: a #GstRTSPMedia
1399  * @state: the target state of the media
1400  * @transports: a GArray of #GstRTSPMediaTrans pointers
1401  *
1402  * Set the state of @media to @state and for the transports in @transports.
1403  *
1404  * Returns: %TRUE on success.
1405  */
1406 gboolean
1407 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1408 {
1409   gint i;
1410   GstStateChangeReturn ret;
1411   gboolean add, remove, do_state;
1412   gint old_active;
1413
1414   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1415   g_return_val_if_fail (transports != NULL, FALSE);
1416
1417   /* NULL and READY are the same */
1418   if (state == GST_STATE_READY)
1419     state = GST_STATE_NULL;
1420
1421   add = remove = FALSE;
1422
1423   g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1424
1425   switch (state) {
1426     case GST_STATE_NULL:
1427       /* unlock the streams so that they follow the state changes from now on */
1428       unlock_streams (media);
1429       /* fallthrough */
1430     case GST_STATE_PAUSED:
1431       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1432       if (media->target_state == GST_STATE_PLAYING)
1433         remove = TRUE;
1434       break;
1435     case GST_STATE_PLAYING:
1436       /* we're going to PLAYING, add */
1437       add = TRUE;
1438       break;
1439     default:
1440       break;
1441   }
1442   old_active = media->active;
1443
1444   for (i = 0; i < transports->len; i++) {
1445     GstRTSPMediaTrans *tr;
1446     GstRTSPMediaStream *stream;
1447     GstRTSPTransport *trans;
1448
1449     /* we need a non-NULL entry in the array */
1450     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1451     if (tr == NULL)
1452       continue;
1453
1454     /* we need a transport */
1455     if (!(trans = tr->transport))
1456       continue;
1457
1458     /* get the stream and add the destinations */
1459     stream = gst_rtsp_media_get_stream (media, tr->idx);
1460     switch (trans->lower_transport) {
1461       case GST_RTSP_LOWER_TRANS_UDP:
1462       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1463       {
1464         gchar *dest;
1465         gint min, max;
1466
1467         dest = trans->destination;
1468         min = trans->client_port.min;
1469         max = trans->client_port.max;
1470
1471         if (add && !tr->active) {
1472           g_message ("adding %s:%d-%d", dest, min, max);
1473           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1474           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1475           stream->transports = g_list_prepend (stream->transports, tr);
1476           tr->active = TRUE;
1477           media->active++;
1478         } else if (remove && tr->active) {
1479           g_message ("removing %s:%d-%d", dest, min, max);
1480           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1481           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1482           stream->transports = g_list_remove (stream->transports, tr);
1483           tr->active = FALSE;
1484           media->active--;
1485         }
1486         break;
1487       }
1488       case GST_RTSP_LOWER_TRANS_TCP:
1489         if (add && !tr->active) {
1490           g_message ("adding TCP %s", trans->destination);
1491           stream->transports = g_list_prepend (stream->transports, tr);
1492           tr->active = TRUE;
1493           media->active++;
1494         } else if (remove && tr->active) {
1495           g_message ("removing TCP %s", trans->destination);
1496           stream->transports = g_list_remove (stream->transports, tr);
1497           tr->active = FALSE;
1498           media->active--;
1499         }
1500         break;
1501       default:
1502         g_message ("Unknown transport %d", trans->lower_transport);
1503         break;
1504     }
1505   }
1506
1507   /* we just added the first media, do the playing state change */
1508   if (old_active == 0 && add)
1509     do_state = TRUE;
1510   /* if we have no more active media, do the downward state changes */
1511   else if (media->active == 0)
1512     do_state = TRUE;
1513   else
1514     do_state = FALSE;
1515
1516   g_message ("active %d media %p", media->active, media);
1517
1518   if (do_state && media->target_state != state) {
1519     if (state == GST_STATE_NULL) {
1520       gst_rtsp_media_unprepare (media);
1521     } else {
1522       g_message ("state %s media %p", gst_element_state_get_name (state), media);
1523       media->target_state = state;
1524       ret = gst_element_set_state (media->pipeline, state);
1525     }
1526   }
1527
1528   /* remember where we are */
1529   if (state == GST_STATE_PAUSED)
1530     collect_media_stats (media);
1531
1532   return TRUE;
1533 }
1534