Added vmethod unprepare to GstRTSPMedia
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
24
25 #include "rtsp-media.h"
26
27 #define DEFAULT_SHARED         FALSE
28 #define DEFAULT_REUSABLE       FALSE
29
30 /* define to dump received RTCP packets */
31 #undef DUMP_STATS
32
33 enum
34 {
35   PROP_0,
36   PROP_SHARED,
37   PROP_REUSABLE,
38   PROP_LAST
39 };
40
41 enum
42 {
43   SIGNAL_UNPREPARED,
44   SIGNAL_LAST
45 };
46
47 static GQuark ssrc_stream_map_key;
48
49 static void gst_rtsp_media_get_property (GObject *object, guint propid,
50     GValue *value, GParamSpec *pspec);
51 static void gst_rtsp_media_set_property (GObject *object, guint propid,
52     const GValue *value, GParamSpec *pspec);
53 static void gst_rtsp_media_finalize (GObject * obj);
54
55 static gpointer do_loop (GstRTSPMediaClass *klass);
56 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
57 static gboolean default_unprepare (GstRTSPMedia *media);
58 static void unlock_streams (GstRTSPMedia *media);
59
60 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
61
62 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
63
64 static void
65 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
66 {
67   GObjectClass *gobject_class;
68   GError *error = NULL;
69
70   gobject_class = G_OBJECT_CLASS (klass);
71
72   gobject_class->get_property = gst_rtsp_media_get_property;
73   gobject_class->set_property = gst_rtsp_media_set_property;
74   gobject_class->finalize = gst_rtsp_media_finalize;
75
76   g_object_class_install_property (gobject_class, PROP_SHARED,
77       g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
78           DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
79
80   g_object_class_install_property (gobject_class, PROP_REUSABLE,
81       g_param_spec_boolean ("reusable", "Reusable",
82           "If this media pipeline can be reused after an unprepare",
83           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
84
85   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
86       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
87       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
88       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
89
90   klass->context = g_main_context_new ();
91   klass->loop = g_main_loop_new (klass->context, TRUE);
92
93   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
94   if (error != NULL) {
95     g_critical ("could not start bus thread: %s", error->message);
96   }
97   klass->handle_message = default_handle_message;
98   klass->unprepare = default_unprepare;
99
100   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
101 }
102
103 static void
104 gst_rtsp_media_init (GstRTSPMedia * media)
105 {
106   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
107   media->is_live = FALSE;
108   media->buffering = FALSE;
109 }
110
111 static void
112 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
113 {
114   if (stream->session)
115     g_object_unref (stream->session);
116
117   if (stream->caps)
118     gst_caps_unref (stream->caps);
119
120   if (stream->send_rtp_sink)
121     gst_object_unref (stream->send_rtp_sink);
122   if (stream->send_rtp_src)
123     gst_object_unref (stream->send_rtp_src);
124   if (stream->send_rtcp_src)
125     gst_object_unref (stream->send_rtcp_src);
126   if (stream->recv_rtcp_sink)
127     gst_object_unref (stream->recv_rtcp_sink);
128   if (stream->recv_rtp_sink)
129     gst_object_unref (stream->recv_rtp_sink);
130
131   g_list_free (stream->transports);
132
133   g_free (stream);
134 }
135
136 static void
137 gst_rtsp_media_finalize (GObject * obj)
138 {
139   GstRTSPMedia *media;
140   guint i;
141
142   media = GST_RTSP_MEDIA (obj);
143
144   g_message ("finalize media %p", media);
145
146   if (media->pipeline) {
147     unlock_streams (media);
148     gst_element_set_state (media->pipeline, GST_STATE_NULL);
149     gst_object_unref (media->pipeline);
150   }
151
152   for (i = 0; i < media->streams->len; i++) {
153     GstRTSPMediaStream *stream;
154
155     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
156
157     gst_rtsp_media_stream_free (stream);
158   }
159   g_array_free (media->streams, TRUE);
160
161   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
162   g_list_free (media->dynamic);
163
164   if (media->source) {
165     g_source_destroy (media->source);
166     g_source_unref (media->source);
167   }
168
169   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
170 }
171
172 static void
173 gst_rtsp_media_get_property (GObject *object, guint propid,
174     GValue *value, GParamSpec *pspec)
175 {
176   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
177
178   switch (propid) {
179     case PROP_SHARED:
180       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
181       break;
182     case PROP_REUSABLE:
183       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
184       break;
185     default:
186       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
187   }
188 }
189
190 static void
191 gst_rtsp_media_set_property (GObject *object, guint propid,
192     const GValue *value, GParamSpec *pspec)
193 {
194   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
195
196   switch (propid) {
197     case PROP_SHARED:
198       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
199       break;
200     case PROP_REUSABLE:
201       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
202       break;
203     default:
204       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
205   }
206 }
207
208 static gpointer
209 do_loop (GstRTSPMediaClass *klass)
210 {
211   g_message ("enter mainloop");
212   g_main_loop_run (klass->loop);
213   g_message ("exit mainloop");
214
215   return NULL;
216 }
217
218 static void
219 collect_media_stats (GstRTSPMedia *media)
220 {
221   GstFormat format;
222   gint64 position, duration;
223
224   media->range.unit = GST_RTSP_RANGE_NPT;
225
226   if (media->is_live) {
227     media->range.min.type = GST_RTSP_TIME_NOW;
228     media->range.min.seconds = -1;
229     media->range.max.type = GST_RTSP_TIME_END;
230     media->range.max.seconds = -1;
231   }
232   else {
233     /* get the position */
234     format = GST_FORMAT_TIME;
235     if (!gst_element_query_position (media->pipeline, &format, &position)) {
236       g_message ("position query failed");
237       position = 0;
238     }
239
240     /* get the duration */
241     format = GST_FORMAT_TIME;
242     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
243       g_message ("duration query failed");
244       duration = -1;
245     }
246
247     g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
248         GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
249
250     if (position == -1) {
251       media->range.min.type = GST_RTSP_TIME_NOW;
252       media->range.min.seconds = -1;
253     }
254     else {
255       media->range.min.type = GST_RTSP_TIME_SECONDS;
256       media->range.min.seconds = ((gdouble)position) / GST_SECOND;
257     }
258     if (duration == -1) {
259       media->range.max.type = GST_RTSP_TIME_END;
260       media->range.max.seconds = -1;
261     }
262     else {
263       media->range.max.type = GST_RTSP_TIME_SECONDS;
264       media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
265     }
266   }
267 }
268
269 /**
270  * gst_rtsp_media_new:
271  *
272  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
273  * element to produde RTP data for one or more related (audio/video/..) 
274  * streams.
275  *
276  * Returns: a new #GstRTSPMedia object.
277  */
278 GstRTSPMedia *
279 gst_rtsp_media_new (void)
280 {
281   GstRTSPMedia *result;
282
283   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
284
285   return result;
286 }
287
288 /**
289  * gst_rtsp_media_set_shared:
290  * @media: a #GstRTSPMedia
291  * @shared: the new value
292  *
293  * Set or unset if the pipeline for @media can be shared will multiple clients.
294  * When @shared is %TRUE, client requests for this media will share the media
295  * pipeline.
296  */
297 void
298 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
299 {
300   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
301
302   media->shared = shared;
303 }
304
305 /**
306  * gst_rtsp_media_is_shared:
307  * @media: a #GstRTSPMedia
308  *
309  * Check if the pipeline for @media can be shared between multiple clients.
310  *
311  * Returns: %TRUE if the media can be shared between clients.
312  */
313 gboolean
314 gst_rtsp_media_is_shared (GstRTSPMedia *media)
315 {
316   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
317
318   return media->shared;
319 }
320
321 /**
322  * gst_rtsp_media_set_reusable:
323  * @media: a #GstRTSPMedia
324  * @reusable: the new value
325  *
326  * Set or unset if the pipeline for @media can be reused after the pipeline has
327  * been unprepared.
328  */
329 void
330 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
331 {
332   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
333
334   media->reusable = reusable;
335 }
336
337 /**
338  * gst_rtsp_media_is_reusable:
339  * @media: a #GstRTSPMedia
340  *
341  * Check if the pipeline for @media can be reused after an unprepare.
342  *
343  * Returns: %TRUE if the media can be reused
344  */
345 gboolean
346 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
347 {
348   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
349
350   return media->reusable;
351 }
352
353 /**
354  * gst_rtsp_media_n_streams:
355  * @media: a #GstRTSPMedia
356  *
357  * Get the number of streams in this media.
358  *
359  * Returns: The number of streams.
360  */
361 guint
362 gst_rtsp_media_n_streams (GstRTSPMedia *media)
363 {
364   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
365
366   return media->streams->len;
367 }
368
369 /**
370  * gst_rtsp_media_get_stream:
371  * @media: a #GstRTSPMedia
372  * @idx: the stream index
373  *
374  * Retrieve the stream with index @idx from @media.
375  *
376  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
377  * that index did not exist.
378  */
379 GstRTSPMediaStream *
380 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
381 {
382   GstRTSPMediaStream *res;
383   
384   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
385
386   if (idx < media->streams->len)
387     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
388   else
389     res = NULL;
390
391   return res;
392 }
393
394 /**
395  * gst_rtsp_media_seek:
396  * @stream: a #GstRTSPMediaStream
397  * @range: a #GstRTSPTimeRange
398  *
399  * Seek the pipeline to @range.
400  *
401  * Returns: %TRUE on success.
402  */
403 gboolean
404 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
405 {
406   GstSeekFlags flags;
407   gboolean res;
408   gint64 start, stop;
409   GstSeekType start_type, stop_type;
410
411   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
412   g_return_val_if_fail (range != NULL, FALSE);
413
414   if (range->unit != GST_RTSP_RANGE_NPT)
415     goto not_supported;
416
417   /* depends on the current playing state of the pipeline. We might need to
418    * queue this until we get EOS. */
419   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
420
421   start_type = stop_type = GST_SEEK_TYPE_NONE;
422
423   switch (range->min.type) {
424     case GST_RTSP_TIME_NOW:
425       start = -1;
426       break;
427     case GST_RTSP_TIME_SECONDS:
428       /* only seek when something changed */
429       if (media->range.min.seconds == range->min.seconds) {
430         start = -1;
431       } else {
432         start = range->min.seconds * GST_SECOND;
433         start_type = GST_SEEK_TYPE_SET;
434       }
435       break;
436     case GST_RTSP_TIME_END:
437     default:
438       goto weird_type;
439   }
440   switch (range->max.type) {
441     case GST_RTSP_TIME_SECONDS:
442       /* only seek when something changed */
443       if (media->range.max.seconds == range->max.seconds) {
444         stop = -1;
445       } else {
446         stop = range->max.seconds * GST_SECOND;
447         stop_type = GST_SEEK_TYPE_SET;
448       }
449       break;
450     case GST_RTSP_TIME_END:
451       stop = -1;
452       stop_type = GST_SEEK_TYPE_SET;
453       break;
454     case GST_RTSP_TIME_NOW:
455     default:
456       goto weird_type;
457   }
458   
459   if (start != -1 || stop != -1) {
460     g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
461                 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
462
463     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
464         flags, start_type, start, stop_type, stop);
465
466     /* and block for the seek to complete */
467     g_message ("done seeking %d", res);
468     gst_element_get_state (media->pipeline, NULL, NULL, -1);
469     g_message ("prerolled again");
470
471     collect_media_stats (media);
472   }
473   else {
474     g_message ("no seek needed");
475     res = TRUE;
476   }
477
478   return res;
479
480   /* ERRORS */
481 not_supported:
482   {
483     g_warning ("seek unit %d not supported", range->unit);
484     return FALSE;
485   }
486 weird_type:
487   {
488     g_warning ("weird range type %d not supported", range->min.type);
489     return FALSE;
490   }
491 }
492
493 /**
494  * gst_rtsp_media_stream_rtp:
495  * @stream: a #GstRTSPMediaStream
496  * @buffer: a #GstBuffer
497  *
498  * Handle an RTP buffer for the stream. This method is usually called when a
499  * message has been received from a client using the TCP transport.
500  *
501  * This function takes ownership of @buffer.
502  *
503  * Returns: a GstFlowReturn.
504  */
505 GstFlowReturn
506 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
507 {
508   GstFlowReturn ret;
509
510   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
511
512   return ret;
513 }
514
515 /**
516  * gst_rtsp_media_stream_rtcp:
517  * @stream: a #GstRTSPMediaStream
518  * @buffer: a #GstBuffer
519  *
520  * Handle an RTCP buffer for the stream. This method is usually called when a
521  * message has been received from a client using the TCP transport.
522  *
523  * This function takes ownership of @buffer.
524  *
525  * Returns: a GstFlowReturn.
526  */
527 GstFlowReturn
528 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
529 {
530   GstFlowReturn ret;
531
532   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
533
534   return ret;
535 }
536
537 /* Allocate the udp ports and sockets */
538 static gboolean
539 alloc_udp_ports (GstRTSPMediaStream * stream)
540 {
541   GstStateChangeReturn ret;
542   GstElement *udpsrc0, *udpsrc1;
543   GstElement *udpsink0, *udpsink1;
544   gint tmp_rtp, tmp_rtcp;
545   guint count;
546   gint rtpport, rtcpport, sockfd;
547
548   udpsrc0 = NULL;
549   udpsrc1 = NULL;
550   udpsink0 = NULL;
551   udpsink1 = NULL;
552   count = 0;
553
554   /* Start with random port */
555   tmp_rtp = 0;
556
557   /* try to allocate 2 UDP ports, the RTP port should be an even
558    * number and the RTCP port should be the next (uneven) port */
559 again:
560   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
561   if (udpsrc0 == NULL)
562     goto no_udp_protocol;
563   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
564
565   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
566   if (ret == GST_STATE_CHANGE_FAILURE) {
567     if (tmp_rtp != 0) {
568       tmp_rtp += 2;
569       if (++count > 20)
570         goto no_ports;
571
572       gst_element_set_state (udpsrc0, GST_STATE_NULL);
573       gst_object_unref (udpsrc0);
574
575       goto again;
576     }
577     goto no_udp_protocol;
578   }
579
580   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
581
582   /* check if port is even */
583   if ((tmp_rtp & 1) != 0) {
584     /* port not even, close and allocate another */
585     if (++count > 20)
586       goto no_ports;
587
588     gst_element_set_state (udpsrc0, GST_STATE_NULL);
589     gst_object_unref (udpsrc0);
590
591     tmp_rtp++;
592     goto again;
593   }
594
595   /* allocate port+1 for RTCP now */
596   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
597   if (udpsrc1 == NULL)
598     goto no_udp_rtcp_protocol;
599
600   /* set port */
601   tmp_rtcp = tmp_rtp + 1;
602   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
603
604   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
605   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
606   if (ret == GST_STATE_CHANGE_FAILURE) {
607
608     if (++count > 20)
609       goto no_ports;
610
611     gst_element_set_state (udpsrc0, GST_STATE_NULL);
612     gst_object_unref (udpsrc0);
613
614     gst_element_set_state (udpsrc1, GST_STATE_NULL);
615     gst_object_unref (udpsrc1);
616
617     tmp_rtp += 2;
618     goto again;
619   }
620
621   /* all fine, do port check */
622   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
623   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
624
625   /* this should not happen... */
626   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
627     goto port_error;
628
629   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
630   if (!udpsink0)
631     goto no_udp_protocol;
632
633   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
634   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
635   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
636
637   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
638   if (!udpsink1)
639     goto no_udp_protocol;
640
641   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
642   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
643   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
644   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
645   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
646
647   /* we keep these elements, we configure all in configure_transport when the
648    * server told us to really use the UDP ports. */
649   stream->udpsrc[0] = udpsrc0;
650   stream->udpsrc[1] = udpsrc1;
651   stream->udpsink[0] = udpsink0;
652   stream->udpsink[1] = udpsink1;
653   stream->server_port.min = rtpport;
654   stream->server_port.max = rtcpport;
655
656   return TRUE;
657
658   /* ERRORS */
659 no_udp_protocol:
660   {
661     goto cleanup;
662   }
663 no_ports:
664   {
665     goto cleanup;
666   }
667 no_udp_rtcp_protocol:
668   {
669     goto cleanup;
670   }
671 port_error:
672   {
673     goto cleanup;
674   }
675 cleanup:
676   {
677     if (udpsrc0) {
678       gst_element_set_state (udpsrc0, GST_STATE_NULL);
679       gst_object_unref (udpsrc0);
680     }
681     if (udpsrc1) {
682       gst_element_set_state (udpsrc1, GST_STATE_NULL);
683       gst_object_unref (udpsrc1);
684     }
685     if (udpsink0) {
686       gst_element_set_state (udpsink0, GST_STATE_NULL);
687       gst_object_unref (udpsink0);
688     }
689     if (udpsink1) {
690       gst_element_set_state (udpsink1, GST_STATE_NULL);
691       gst_object_unref (udpsink1);
692     }
693     return FALSE;
694   }
695 }
696
697 static void
698 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
699 {
700   gchar *capsstr;
701   GstCaps *newcaps, *oldcaps;
702
703   if ((newcaps = GST_PAD_CAPS (pad)))
704     gst_caps_ref (newcaps);
705
706   oldcaps = stream->caps;
707   stream->caps = newcaps;
708
709   if (oldcaps)
710     gst_caps_unref (oldcaps);
711
712   capsstr = gst_caps_to_string (newcaps);
713   g_message ("stream %p received caps %p, %s", stream, newcaps, capsstr);
714   g_free (capsstr);
715 }
716
717 static void
718 dump_structure (const GstStructure *s)
719 {
720   gchar *sstr;
721
722   sstr = gst_structure_to_string (s);
723   g_message ("structure: %s", sstr);
724   g_free (sstr);
725 }
726
727 static GstRTSPMediaTrans *
728 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
729 {
730   GList *walk;
731   GstRTSPMediaTrans *result = NULL;
732   const gchar *tmp;
733   gchar *dest;
734   guint port;
735
736   if (rtcp_from == NULL)
737     return NULL;
738
739   tmp = g_strrstr (rtcp_from, ":");
740   if (tmp == NULL)
741     return NULL;
742
743   port = atoi (tmp + 1);
744   dest = g_strndup (rtcp_from, tmp - rtcp_from);
745
746   g_message ("finding %s:%d", dest, port);
747
748   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
749     GstRTSPMediaTrans *trans = walk->data;
750     gint min, max;
751
752     min = trans->transport->client_port.min;
753     max = trans->transport->client_port.max;
754
755     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
756       result = trans;
757       break;
758     }
759   }
760   g_free (dest);
761
762   return result;
763 }
764
765 static void
766 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
767 {
768   GstStructure *stats;
769   GstRTSPMediaTrans *trans;
770
771   g_message ("%p: new source %p", stream, source);
772
773   /* see if we have a stream to match with the origin of the RTCP packet */
774   trans = g_object_get_qdata (source, ssrc_stream_map_key);
775   if (trans == NULL) {
776     g_object_get (source, "stats", &stats, NULL);
777     if (stats) {
778       const gchar *rtcp_from;
779
780       dump_structure (stats);
781
782       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
783       if ((trans = find_transport (stream, rtcp_from))) {
784         g_message ("%p: found transport %p for source  %p", stream, trans, source);
785
786         /* keep ref to the source */
787         trans->rtpsource = source;
788
789         g_object_set_qdata (source, ssrc_stream_map_key, trans);
790       }
791       gst_structure_free (stats);
792     }
793   } else {
794     g_message ("%p: source %p for transport %p", stream, source, trans);
795   }
796 }
797
798 static void
799 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
800 {
801   g_message ("%p: new SDES %p", stream, source);
802 }
803
804 static void
805 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
806 {
807   GstRTSPMediaTrans *trans;
808
809   trans = g_object_get_qdata (source, ssrc_stream_map_key);
810
811   g_message ("%p: source %p in transport %p is active", stream, source, trans);
812
813   if (trans && trans->keep_alive)
814     trans->keep_alive (trans->ka_user_data);
815
816 #ifdef DUMP_STATS
817   {
818     GstStructure *stats;
819     g_object_get (source, "stats", &stats, NULL);
820     if (stats) {
821       dump_structure (stats);
822       gst_structure_free (stats);
823     }
824   }
825 #endif
826 }
827
828 static void
829 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
830 {
831   g_message ("%p: source %p bye", stream, source);
832 }
833
834 static void
835 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
836 {
837   GstRTSPMediaTrans *trans;
838
839   g_message ("%p: source %p bye timeout", stream, source);
840
841   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
842     trans->rtpsource = NULL;
843     trans->timeout = TRUE;
844   }
845 }
846
847 static void
848 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
849 {
850   GstRTSPMediaTrans *trans;
851
852   g_message ("%p: source %p timeout", stream, source);
853
854   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
855     trans->rtpsource = NULL;
856     trans->timeout = TRUE;
857   }
858 }
859
860 static GstFlowReturn
861 handle_new_buffer (GstAppSink *sink, gpointer user_data)
862 {
863   GList *walk;
864   GstBuffer *buffer;
865   GstRTSPMediaStream *stream;
866
867   buffer = gst_app_sink_pull_buffer (sink);
868   if (!buffer)
869     return GST_FLOW_OK;
870
871   stream = (GstRTSPMediaStream *) user_data;
872
873   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
874     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
875
876     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
877       if (tr->send_rtp) 
878         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
879     }
880     else {
881       if (tr->send_rtcp) 
882         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
883     }
884   }
885   gst_buffer_unref (buffer);
886
887   return GST_FLOW_OK;
888 }
889
890 static GstAppSinkCallbacks sink_cb = {
891   NULL,  /* not interested in EOS */
892   NULL,  /* not interested in preroll buffers */
893   handle_new_buffer
894 };
895
896 /* prepare the pipeline objects to handle @stream in @media */
897 static gboolean
898 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
899 {
900   gchar *name;
901   GstPad *pad, *teepad, *selpad;
902   GstPadLinkReturn ret;
903   gint i;
904
905   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
906    * for sending RTP/RTCP. The sender and receiver ports are shared between the
907    * elements */
908   if (!alloc_udp_ports (stream))
909     return FALSE;
910
911   /* add the ports to the pipeline */
912   for (i = 0; i < 2; i++) {
913     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
914     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
915   }
916
917   /* create elements for the TCP transfer */
918   for (i = 0; i < 2; i++) {
919     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
920     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
921     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
922     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
923     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
924     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
925     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
926     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
927                   &sink_cb, stream, NULL);
928   }
929
930   /* hook up the stream to the RTP session elements. */
931   name = g_strdup_printf ("send_rtp_sink_%d", idx);
932   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
933   g_free (name);
934   name = g_strdup_printf ("send_rtp_src_%d", idx);
935   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
936   g_free (name);
937   name = g_strdup_printf ("send_rtcp_src_%d", idx);
938   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
939   g_free (name);
940   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
941   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
942   g_free (name);
943   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
944   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
945   g_free (name);
946
947   /* get the session */
948   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
949                   &stream->session);
950
951   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
952       stream);
953   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
954       stream);
955   g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
956       stream);
957   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
958       stream);
959   g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
960       stream);
961   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
962       stream);
963
964   /* link the RTP pad to the session manager */
965   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
966   if (ret != GST_PAD_LINK_OK)
967     goto link_failed;
968
969   /* make tee for RTP and link to stream */
970   stream->tee[0] = gst_element_factory_make ("tee", NULL);
971   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
972
973   pad = gst_element_get_static_pad (stream->tee[0], "sink");
974   gst_pad_link (stream->send_rtp_src, pad);
975   gst_object_unref (pad);
976
977   /* link RTP sink, we're pretty sure this will work. */
978   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
979   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
980   gst_pad_link (teepad, pad);
981   gst_object_unref (pad);
982   gst_object_unref (teepad);
983
984   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
985   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
986   gst_pad_link (teepad, pad);
987   gst_object_unref (pad);
988   gst_object_unref (teepad);
989
990   /* make tee for RTCP */
991   stream->tee[1] = gst_element_factory_make ("tee", NULL);
992   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
993
994   pad = gst_element_get_static_pad (stream->tee[1], "sink");
995   gst_pad_link (stream->send_rtcp_src, pad);
996   gst_object_unref (pad);
997
998   /* link RTCP elements */
999   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1000   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1001   gst_pad_link (teepad, pad);
1002   gst_object_unref (pad);
1003   gst_object_unref (teepad);
1004
1005   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1006   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1007   gst_pad_link (teepad, pad);
1008   gst_object_unref (pad);
1009   gst_object_unref (teepad);
1010
1011   /* make selector for the RTP receivers */
1012   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1013   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1014   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1015
1016   pad = gst_element_get_static_pad (stream->selector[0], "src");
1017   gst_pad_link (pad, stream->recv_rtp_sink);
1018   gst_object_unref (pad);
1019
1020   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1021   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1022   gst_pad_link (pad, selpad);
1023   gst_object_unref (pad);
1024   gst_object_unref (selpad);
1025
1026   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1027   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1028   gst_pad_link (pad, selpad);
1029   gst_object_unref (pad);
1030   gst_object_unref (selpad);
1031
1032   /* make selector for the RTCP receivers */
1033   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1034   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1035   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1036
1037   pad = gst_element_get_static_pad (stream->selector[1], "src");
1038   gst_pad_link (pad, stream->recv_rtcp_sink);
1039   gst_object_unref (pad);
1040
1041   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1042   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1043   gst_pad_link (pad, selpad);
1044   gst_object_unref (pad);
1045   gst_object_unref (selpad);
1046
1047   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1048   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1049   gst_pad_link (pad, selpad);
1050   gst_object_unref (pad);
1051   gst_object_unref (selpad);
1052
1053   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1054    * values */
1055   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1056   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1057   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1058   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1059  
1060   /* be notified of caps changes */
1061   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1062                   (GCallback) caps_notify, stream);
1063
1064   stream->prepared = TRUE;
1065
1066   return TRUE;
1067
1068   /* ERRORS */
1069 link_failed:
1070   {
1071     g_warning ("failed to link stream %d", idx);
1072     return FALSE;
1073   }
1074 }
1075
1076 static void
1077 unlock_streams (GstRTSPMedia *media)
1078 {
1079   guint i, n_streams;
1080
1081   /* unlock the udp src elements */
1082   n_streams = gst_rtsp_media_n_streams (media);
1083   for (i = 0; i < n_streams; i++) {
1084     GstRTSPMediaStream *stream;
1085
1086     stream = gst_rtsp_media_get_stream (media, i);
1087
1088     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1089     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1090   }
1091 }
1092
1093 static gboolean
1094 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1095 {
1096   GstMessageType type;
1097
1098   type = GST_MESSAGE_TYPE (message);
1099
1100   switch (type) {
1101     case GST_MESSAGE_STATE_CHANGED:
1102       break;
1103     case GST_MESSAGE_BUFFERING:
1104     {
1105       gint percent;
1106
1107       gst_message_parse_buffering (message, &percent);
1108
1109       /* no state management needed for live pipelines */
1110       if (media->is_live)
1111         break;
1112
1113       if (percent == 100) {
1114         /* a 100% message means buffering is done */
1115         media->buffering = FALSE;
1116         /* if the desired state is playing, go back */
1117         if (media->target_state == GST_STATE_PLAYING) {
1118           g_message ("Buffering done, setting pipeline to PLAYING");
1119           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1120         }
1121         else {
1122           g_message ("Buffering done");
1123         }
1124       } else {
1125         /* buffering busy */
1126         if (media->buffering == FALSE) {
1127           if (media->target_state == GST_STATE_PLAYING) {
1128             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1129             g_message ("Buffering, setting pipeline to PAUSED ...");
1130             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1131           }
1132           else {
1133             g_message ("Buffering ...");
1134           }
1135         }
1136         media->buffering = TRUE;
1137       }
1138       break;
1139     }
1140     case GST_MESSAGE_LATENCY:
1141     {
1142       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1143       break;
1144     }
1145     case GST_MESSAGE_ERROR:
1146     {
1147       GError *gerror;
1148       gchar *debug;
1149
1150       gst_message_parse_error (message, &gerror, &debug);
1151       g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1152       g_error_free (gerror);
1153       g_free (debug);
1154       break;
1155     }
1156     case GST_MESSAGE_WARNING:
1157     {
1158       GError *gerror;
1159       gchar *debug;
1160
1161       gst_message_parse_warning (message, &gerror, &debug);
1162       g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1163       g_error_free (gerror);
1164       g_free (debug);
1165       break;
1166     }
1167     case GST_MESSAGE_ELEMENT:
1168       break;
1169     case GST_MESSAGE_STREAM_STATUS:
1170       break;
1171     default:
1172       g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1173       break;
1174   }
1175   return TRUE;
1176 }
1177
1178 static gboolean
1179 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1180 {
1181   GstRTSPMediaClass *klass;
1182   gboolean ret;
1183   
1184   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1185
1186   if (klass->handle_message)
1187     ret = klass->handle_message (media, message);
1188   else
1189     ret = FALSE;
1190
1191   return ret;
1192 }
1193
1194 static void
1195 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1196 {
1197   GstRTSPMediaStream *stream;
1198   gchar *name;
1199   gint i;
1200
1201   i = media->streams->len + 1;
1202
1203   g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1204
1205   stream = g_new0 (GstRTSPMediaStream, 1);
1206   stream->payloader = element;
1207
1208   name = g_strdup_printf ("dynpay%d", i);
1209
1210   /* ghost the pad of the payloader to the element */
1211   stream->srcpad = gst_ghost_pad_new (name, pad);
1212   gst_pad_set_active (stream->srcpad, TRUE);
1213   gst_element_add_pad (media->element, stream->srcpad);
1214   g_free (name);
1215
1216   /* add stream now */
1217   g_array_append_val (media->streams, stream);
1218
1219   setup_stream (stream, i, media);
1220
1221   for (i = 0; i < 2; i++) {
1222     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1223     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1224     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1225     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1226     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1227   }
1228 }
1229
1230 static void
1231 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1232 {
1233   g_message ("no more pads");
1234   if (media->fakesink) {
1235     gst_object_ref (media->fakesink);
1236     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1237     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1238     gst_object_unref (media->fakesink);
1239     media->fakesink = NULL;
1240     g_message ("removed fakesink");
1241   }
1242 }
1243
1244 /**
1245  * gst_rtsp_media_prepare:
1246  * @obj: a #GstRTSPMedia
1247  *
1248  * Prepare @media for streaming. This function will create the pipeline and
1249  * other objects to manage the streaming.
1250  *
1251  * It will preroll the pipeline and collect vital information about the streams
1252  * such as the duration.
1253  *
1254  * Returns: %TRUE on success.
1255  */
1256 gboolean
1257 gst_rtsp_media_prepare (GstRTSPMedia *media)
1258 {
1259   GstStateChangeReturn ret;
1260   guint i, n_streams;
1261   GstRTSPMediaClass *klass;
1262   GstBus *bus;
1263   GList *walk;
1264
1265   if (media->prepared)
1266     goto was_prepared;
1267
1268   if (!media->reusable && media->reused)
1269     goto is_reused;
1270
1271   g_message ("preparing media %p", media);
1272
1273   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1274
1275   /* add the pipeline bus to our custom mainloop */
1276   media->source = gst_bus_create_watch (bus);
1277   gst_object_unref (bus);
1278
1279   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1280
1281   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1282   media->id = g_source_attach (media->source, klass->context);
1283
1284   media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1285
1286   /* add stuff to the bin */
1287   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1288
1289   /* link streams we already have, other streams might appear when we have
1290    * dynamic elements */
1291   n_streams = gst_rtsp_media_n_streams (media);
1292   for (i = 0; i < n_streams; i++) {
1293     GstRTSPMediaStream *stream;
1294
1295     stream = gst_rtsp_media_get_stream (media, i);
1296
1297     setup_stream (stream, i, media);
1298   }
1299
1300   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1301     GstElement *elem = walk->data;
1302
1303     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1304     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1305
1306     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1307     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1308   }
1309
1310   /* first go to PAUSED */
1311   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1312   media->target_state = GST_STATE_PAUSED;
1313
1314   switch (ret) {
1315     case GST_STATE_CHANGE_SUCCESS:
1316       break;
1317     case GST_STATE_CHANGE_ASYNC:
1318       break;
1319     case GST_STATE_CHANGE_NO_PREROLL:
1320       /* we need to go to PLAYING */
1321       g_message ("live media %p", media);
1322       media->is_live = TRUE;
1323       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1324       if (ret == GST_STATE_CHANGE_FAILURE)
1325         goto state_failed;
1326       break;
1327     case GST_STATE_CHANGE_FAILURE:
1328       goto state_failed;
1329   }
1330
1331   /* now wait for all pads to be prerolled */
1332   ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1333   if (ret == GST_STATE_CHANGE_FAILURE)
1334     goto state_failed;
1335
1336   /* collect stats about the media */
1337   collect_media_stats (media);
1338
1339   g_message ("object %p is prerolled", media);
1340
1341   media->prepared = TRUE;
1342
1343   return TRUE;
1344
1345   /* OK */
1346 was_prepared:
1347   {
1348     return TRUE;
1349   }
1350   /* ERRORS */
1351 state_failed:
1352   {
1353     g_warning ("failed to preroll pipeline");
1354     unlock_streams (media);
1355     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1356     return FALSE;
1357   }
1358 is_reused:
1359   {
1360     g_warning ("can not reuse media %p", media);
1361     return FALSE;
1362   }
1363 }
1364
1365 /**
1366  * gst_rtsp_media_unprepare:
1367  * @obj: a #GstRTSPMedia
1368  *
1369  * Unprepare @media. After this call, the media should be prepared again before
1370  * it can be used again. If the media is set to be non-reusable, a new instance
1371  * must be created.
1372  *
1373  * Returns: %TRUE on success.
1374  */
1375 gboolean
1376 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1377 {
1378   GstRTSPMediaClass *klass;
1379   gboolean success;
1380
1381   if (!media->prepared)
1382     return TRUE;
1383
1384   g_message ("unprepare media %p", media);
1385   media->target_state = GST_STATE_NULL;
1386
1387   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1388   if (klass->unprepare)
1389     success = klass->unprepare (media);
1390   else
1391     success = TRUE;
1392
1393   media->prepared = FALSE;
1394   media->reused = TRUE;
1395
1396   /* when the media is not reusable, this will effectively unref the media and
1397    * recreate it */
1398   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1399
1400   return success;
1401 }
1402
1403 static gboolean
1404 default_unprepare (GstRTSPMedia *media) {
1405   gst_element_set_state (media->pipeline, GST_STATE_NULL);
1406   return TRUE;
1407 }
1408
1409 /**
1410  * gst_rtsp_media_set_state:
1411  * @media: a #GstRTSPMedia
1412  * @state: the target state of the media
1413  * @transports: a GArray of #GstRTSPMediaTrans pointers
1414  *
1415  * Set the state of @media to @state and for the transports in @transports.
1416  *
1417  * Returns: %TRUE on success.
1418  */
1419 gboolean
1420 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1421 {
1422   gint i;
1423   GstStateChangeReturn ret;
1424   gboolean add, remove, do_state;
1425   gint old_active;
1426
1427   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1428   g_return_val_if_fail (transports != NULL, FALSE);
1429
1430   /* NULL and READY are the same */
1431   if (state == GST_STATE_READY)
1432     state = GST_STATE_NULL;
1433
1434   add = remove = FALSE;
1435
1436   g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1437
1438   switch (state) {
1439     case GST_STATE_NULL:
1440       /* unlock the streams so that they follow the state changes from now on */
1441       unlock_streams (media);
1442       /* fallthrough */
1443     case GST_STATE_PAUSED:
1444       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1445       if (media->target_state == GST_STATE_PLAYING)
1446         remove = TRUE;
1447       break;
1448     case GST_STATE_PLAYING:
1449       /* we're going to PLAYING, add */
1450       add = TRUE;
1451       break;
1452     default:
1453       break;
1454   }
1455   old_active = media->active;
1456
1457   for (i = 0; i < transports->len; i++) {
1458     GstRTSPMediaTrans *tr;
1459     GstRTSPMediaStream *stream;
1460     GstRTSPTransport *trans;
1461
1462     /* we need a non-NULL entry in the array */
1463     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1464     if (tr == NULL)
1465       continue;
1466
1467     /* we need a transport */
1468     if (!(trans = tr->transport))
1469       continue;
1470
1471     /* get the stream and add the destinations */
1472     stream = gst_rtsp_media_get_stream (media, tr->idx);
1473     switch (trans->lower_transport) {
1474       case GST_RTSP_LOWER_TRANS_UDP:
1475       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1476       {
1477         gchar *dest;
1478         gint min, max;
1479
1480         dest = trans->destination;
1481         min = trans->client_port.min;
1482         max = trans->client_port.max;
1483
1484         if (add && !tr->active) {
1485           g_message ("adding %s:%d-%d", dest, min, max);
1486           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1487           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1488           stream->transports = g_list_prepend (stream->transports, tr);
1489           tr->active = TRUE;
1490           media->active++;
1491         } else if (remove && tr->active) {
1492           g_message ("removing %s:%d-%d", dest, min, max);
1493           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1494           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1495           stream->transports = g_list_remove (stream->transports, tr);
1496           tr->active = FALSE;
1497           media->active--;
1498         }
1499         break;
1500       }
1501       case GST_RTSP_LOWER_TRANS_TCP:
1502         if (add && !tr->active) {
1503           g_message ("adding TCP %s", trans->destination);
1504           stream->transports = g_list_prepend (stream->transports, tr);
1505           tr->active = TRUE;
1506           media->active++;
1507         } else if (remove && tr->active) {
1508           g_message ("removing TCP %s", trans->destination);
1509           stream->transports = g_list_remove (stream->transports, tr);
1510           tr->active = FALSE;
1511           media->active--;
1512         }
1513         break;
1514       default:
1515         g_message ("Unknown transport %d", trans->lower_transport);
1516         break;
1517     }
1518   }
1519
1520   /* we just added the first media, do the playing state change */
1521   if (old_active == 0 && add)
1522     do_state = TRUE;
1523   /* if we have no more active media, do the downward state changes */
1524   else if (media->active == 0)
1525     do_state = TRUE;
1526   else
1527     do_state = FALSE;
1528
1529   g_message ("active %d media %p", media->active, media);
1530
1531   if (do_state && media->target_state != state) {
1532     if (state == GST_STATE_NULL) {
1533       gst_rtsp_media_unprepare (media);
1534     } else {
1535       g_message ("state %s media %p", gst_element_state_get_name (state), media);
1536       media->target_state = state;
1537       ret = gst_element_set_state (media->pipeline, state);
1538     }
1539   }
1540
1541   /* remember where we are */
1542   if (state == GST_STATE_PAUSED)
1543     collect_media_stats (media);
1544
1545   return TRUE;
1546 }
1547