media: reindent
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
24
25 #include "rtsp-media.h"
26
27 #define DEFAULT_SHARED         FALSE
28 #define DEFAULT_REUSABLE       FALSE
29
30 /* define to dump received RTCP packets */
31 #undef DUMP_STATS
32
33 enum
34 {
35   PROP_0,
36   PROP_SHARED,
37   PROP_REUSABLE,
38   PROP_LAST
39 };
40
41 enum
42 {
43   SIGNAL_UNPREPARED,
44   SIGNAL_LAST
45 };
46
47 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
48 #define GST_CAT_DEFAULT rtsp_media_debug
49
50 static GQuark ssrc_stream_map_key;
51
52 static void gst_rtsp_media_get_property (GObject * object, guint propid,
53     GValue * value, GParamSpec * pspec);
54 static void gst_rtsp_media_set_property (GObject * object, guint propid,
55     const GValue * value, GParamSpec * pspec);
56 static void gst_rtsp_media_finalize (GObject * obj);
57
58 static gpointer do_loop (GstRTSPMediaClass * klass);
59 static gboolean default_handle_message (GstRTSPMedia * media,
60     GstMessage * message);
61 static gboolean default_unprepare (GstRTSPMedia * media);
62 static void unlock_streams (GstRTSPMedia * media);
63
64 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
65
66 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
67
68 static void
69 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
70 {
71   GObjectClass *gobject_class;
72   GError *error = NULL;
73
74   gobject_class = G_OBJECT_CLASS (klass);
75
76   gobject_class->get_property = gst_rtsp_media_get_property;
77   gobject_class->set_property = gst_rtsp_media_set_property;
78   gobject_class->finalize = gst_rtsp_media_finalize;
79
80   g_object_class_install_property (gobject_class, PROP_SHARED,
81       g_param_spec_boolean ("shared", "Shared",
82           "If this media pipeline can be shared", DEFAULT_SHARED,
83           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
84
85   g_object_class_install_property (gobject_class, PROP_REUSABLE,
86       g_param_spec_boolean ("reusable", "Reusable",
87           "If this media pipeline can be reused after an unprepare",
88           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
89
90   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
91       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
92       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
93       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
94
95   klass->context = g_main_context_new ();
96   klass->loop = g_main_loop_new (klass->context, TRUE);
97
98   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
99   if (error != NULL) {
100     g_critical ("could not start bus thread: %s", error->message);
101   }
102   klass->handle_message = default_handle_message;
103   klass->unprepare = default_unprepare;
104
105   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
106 }
107
108 static void
109 gst_rtsp_media_init (GstRTSPMedia * media)
110 {
111   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
112 }
113
114 static void
115 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
116 {
117   if (stream->session)
118     g_object_unref (stream->session);
119
120   if (stream->caps)
121     gst_caps_unref (stream->caps);
122
123   if (stream->send_rtp_sink)
124     gst_object_unref (stream->send_rtp_sink);
125   if (stream->send_rtp_src)
126     gst_object_unref (stream->send_rtp_src);
127   if (stream->send_rtcp_src)
128     gst_object_unref (stream->send_rtcp_src);
129   if (stream->recv_rtcp_sink)
130     gst_object_unref (stream->recv_rtcp_sink);
131   if (stream->recv_rtp_sink)
132     gst_object_unref (stream->recv_rtp_sink);
133
134   g_list_free (stream->transports);
135
136   g_free (stream);
137 }
138
139 static void
140 gst_rtsp_media_finalize (GObject * obj)
141 {
142   GstRTSPMedia *media;
143   guint i;
144
145   media = GST_RTSP_MEDIA (obj);
146
147   GST_INFO ("finalize media %p", media);
148
149   if (media->pipeline) {
150     unlock_streams (media);
151     gst_element_set_state (media->pipeline, GST_STATE_NULL);
152     gst_object_unref (media->pipeline);
153   }
154
155   for (i = 0; i < media->streams->len; i++) {
156     GstRTSPMediaStream *stream;
157
158     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
159
160     gst_rtsp_media_stream_free (stream);
161   }
162   g_array_free (media->streams, TRUE);
163
164   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
165   g_list_free (media->dynamic);
166
167   if (media->source) {
168     g_source_destroy (media->source);
169     g_source_unref (media->source);
170   }
171
172   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
173 }
174
175 static void
176 gst_rtsp_media_get_property (GObject * object, guint propid,
177     GValue * value, GParamSpec * pspec)
178 {
179   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
180
181   switch (propid) {
182     case PROP_SHARED:
183       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
184       break;
185     case PROP_REUSABLE:
186       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
187       break;
188     default:
189       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
190   }
191 }
192
193 static void
194 gst_rtsp_media_set_property (GObject * object, guint propid,
195     const GValue * value, GParamSpec * pspec)
196 {
197   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
198
199   switch (propid) {
200     case PROP_SHARED:
201       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
202       break;
203     case PROP_REUSABLE:
204       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
205       break;
206     default:
207       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
208   }
209 }
210
211 static gpointer
212 do_loop (GstRTSPMediaClass * klass)
213 {
214   GST_INFO ("enter mainloop");
215   g_main_loop_run (klass->loop);
216   GST_INFO ("exit mainloop");
217
218   return NULL;
219 }
220
221 static void
222 collect_media_stats (GstRTSPMedia * media)
223 {
224   GstFormat format;
225   gint64 position, duration;
226
227   media->range.unit = GST_RTSP_RANGE_NPT;
228
229   if (media->is_live) {
230     media->range.min.type = GST_RTSP_TIME_NOW;
231     media->range.min.seconds = -1;
232     media->range.max.type = GST_RTSP_TIME_END;
233     media->range.max.seconds = -1;
234   } else {
235     /* get the position */
236     format = GST_FORMAT_TIME;
237     if (!gst_element_query_position (media->pipeline, &format, &position)) {
238       GST_INFO ("position query failed");
239       position = 0;
240     }
241
242     /* get the duration */
243     format = GST_FORMAT_TIME;
244     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
245       GST_INFO ("duration query failed");
246       duration = -1;
247     }
248
249     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
250         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
251
252     if (position == -1) {
253       media->range.min.type = GST_RTSP_TIME_NOW;
254       media->range.min.seconds = -1;
255     } else {
256       media->range.min.type = GST_RTSP_TIME_SECONDS;
257       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
258     }
259     if (duration == -1) {
260       media->range.max.type = GST_RTSP_TIME_END;
261       media->range.max.seconds = -1;
262     } else {
263       media->range.max.type = GST_RTSP_TIME_SECONDS;
264       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
265     }
266   }
267 }
268
269 /**
270  * gst_rtsp_media_new:
271  *
272  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
273  * element to produde RTP data for one or more related (audio/video/..) 
274  * streams.
275  *
276  * Returns: a new #GstRTSPMedia object.
277  */
278 GstRTSPMedia *
279 gst_rtsp_media_new (void)
280 {
281   GstRTSPMedia *result;
282
283   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
284
285   return result;
286 }
287
288 /**
289  * gst_rtsp_media_set_shared:
290  * @media: a #GstRTSPMedia
291  * @shared: the new value
292  *
293  * Set or unset if the pipeline for @media can be shared will multiple clients.
294  * When @shared is %TRUE, client requests for this media will share the media
295  * pipeline.
296  */
297 void
298 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
299 {
300   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
301
302   media->shared = shared;
303 }
304
305 /**
306  * gst_rtsp_media_is_shared:
307  * @media: a #GstRTSPMedia
308  *
309  * Check if the pipeline for @media can be shared between multiple clients.
310  *
311  * Returns: %TRUE if the media can be shared between clients.
312  */
313 gboolean
314 gst_rtsp_media_is_shared (GstRTSPMedia * media)
315 {
316   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
317
318   return media->shared;
319 }
320
321 /**
322  * gst_rtsp_media_set_reusable:
323  * @media: a #GstRTSPMedia
324  * @reusable: the new value
325  *
326  * Set or unset if the pipeline for @media can be reused after the pipeline has
327  * been unprepared.
328  */
329 void
330 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
331 {
332   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
333
334   media->reusable = reusable;
335 }
336
337 /**
338  * gst_rtsp_media_is_reusable:
339  * @media: a #GstRTSPMedia
340  *
341  * Check if the pipeline for @media can be reused after an unprepare.
342  *
343  * Returns: %TRUE if the media can be reused
344  */
345 gboolean
346 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
347 {
348   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
349
350   return media->reusable;
351 }
352
353 /**
354  * gst_rtsp_media_n_streams:
355  * @media: a #GstRTSPMedia
356  *
357  * Get the number of streams in this media.
358  *
359  * Returns: The number of streams.
360  */
361 guint
362 gst_rtsp_media_n_streams (GstRTSPMedia * media)
363 {
364   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
365
366   return media->streams->len;
367 }
368
369 /**
370  * gst_rtsp_media_get_stream:
371  * @media: a #GstRTSPMedia
372  * @idx: the stream index
373  *
374  * Retrieve the stream with index @idx from @media.
375  *
376  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
377  * that index did not exist.
378  */
379 GstRTSPMediaStream *
380 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
381 {
382   GstRTSPMediaStream *res;
383
384   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
385
386   if (idx < media->streams->len)
387     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
388   else
389     res = NULL;
390
391   return res;
392 }
393
394 /**
395  * gst_rtsp_media_seek:
396  * @stream: a #GstRTSPMediaStream
397  * @range: a #GstRTSPTimeRange
398  *
399  * Seek the pipeline to @range.
400  *
401  * Returns: %TRUE on success.
402  */
403 gboolean
404 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
405 {
406   GstSeekFlags flags;
407   gboolean res;
408   gint64 start, stop;
409   GstSeekType start_type, stop_type;
410
411   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
412   g_return_val_if_fail (range != NULL, FALSE);
413
414   if (range->unit != GST_RTSP_RANGE_NPT)
415     goto not_supported;
416
417   /* depends on the current playing state of the pipeline. We might need to
418    * queue this until we get EOS. */
419   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
420
421   start_type = stop_type = GST_SEEK_TYPE_NONE;
422
423   switch (range->min.type) {
424     case GST_RTSP_TIME_NOW:
425       start = -1;
426       break;
427     case GST_RTSP_TIME_SECONDS:
428       /* only seek when something changed */
429       if (media->range.min.seconds == range->min.seconds) {
430         start = -1;
431       } else {
432         start = range->min.seconds * GST_SECOND;
433         start_type = GST_SEEK_TYPE_SET;
434       }
435       break;
436     case GST_RTSP_TIME_END:
437     default:
438       goto weird_type;
439   }
440   switch (range->max.type) {
441     case GST_RTSP_TIME_SECONDS:
442       /* only seek when something changed */
443       if (media->range.max.seconds == range->max.seconds) {
444         stop = -1;
445       } else {
446         stop = range->max.seconds * GST_SECOND;
447         stop_type = GST_SEEK_TYPE_SET;
448       }
449       break;
450     case GST_RTSP_TIME_END:
451       stop = -1;
452       stop_type = GST_SEEK_TYPE_SET;
453       break;
454     case GST_RTSP_TIME_NOW:
455     default:
456       goto weird_type;
457   }
458
459   if (start != -1 || stop != -1) {
460     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
461         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
462
463     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
464         flags, start_type, start, stop_type, stop);
465
466     /* and block for the seek to complete */
467     GST_INFO ("done seeking %d", res);
468     gst_element_get_state (media->pipeline, NULL, NULL, -1);
469     GST_INFO ("prerolled again");
470
471     collect_media_stats (media);
472   } else {
473     GST_INFO ("no seek needed");
474     res = TRUE;
475   }
476
477   return res;
478
479   /* ERRORS */
480 not_supported:
481   {
482     GST_WARNING ("seek unit %d not supported", range->unit);
483     return FALSE;
484   }
485 weird_type:
486   {
487     GST_WARNING ("weird range type %d not supported", range->min.type);
488     return FALSE;
489   }
490 }
491
492 /**
493  * gst_rtsp_media_stream_rtp:
494  * @stream: a #GstRTSPMediaStream
495  * @buffer: a #GstBuffer
496  *
497  * Handle an RTP buffer for the stream. This method is usually called when a
498  * message has been received from a client using the TCP transport.
499  *
500  * This function takes ownership of @buffer.
501  *
502  * Returns: a GstFlowReturn.
503  */
504 GstFlowReturn
505 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
506 {
507   GstFlowReturn ret;
508
509   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
510
511   return ret;
512 }
513
514 /**
515  * gst_rtsp_media_stream_rtcp:
516  * @stream: a #GstRTSPMediaStream
517  * @buffer: a #GstBuffer
518  *
519  * Handle an RTCP buffer for the stream. This method is usually called when a
520  * message has been received from a client using the TCP transport.
521  *
522  * This function takes ownership of @buffer.
523  *
524  * Returns: a GstFlowReturn.
525  */
526 GstFlowReturn
527 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
528 {
529   GstFlowReturn ret;
530
531   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
532
533   return ret;
534 }
535
536 /* Allocate the udp ports and sockets */
537 static gboolean
538 alloc_udp_ports (GstRTSPMediaStream * stream)
539 {
540   GstStateChangeReturn ret;
541   GstElement *udpsrc0, *udpsrc1;
542   GstElement *udpsink0, *udpsink1;
543   gint tmp_rtp, tmp_rtcp;
544   guint count;
545   gint rtpport, rtcpport, sockfd;
546
547   udpsrc0 = NULL;
548   udpsrc1 = NULL;
549   udpsink0 = NULL;
550   udpsink1 = NULL;
551   count = 0;
552
553   /* Start with random port */
554   tmp_rtp = 0;
555
556   /* try to allocate 2 UDP ports, the RTP port should be an even
557    * number and the RTCP port should be the next (uneven) port */
558 again:
559   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
560   if (udpsrc0 == NULL)
561     goto no_udp_protocol;
562   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
563
564   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
565   if (ret == GST_STATE_CHANGE_FAILURE) {
566     if (tmp_rtp != 0) {
567       tmp_rtp += 2;
568       if (++count > 20)
569         goto no_ports;
570
571       gst_element_set_state (udpsrc0, GST_STATE_NULL);
572       gst_object_unref (udpsrc0);
573
574       goto again;
575     }
576     goto no_udp_protocol;
577   }
578
579   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
580
581   /* check if port is even */
582   if ((tmp_rtp & 1) != 0) {
583     /* port not even, close and allocate another */
584     if (++count > 20)
585       goto no_ports;
586
587     gst_element_set_state (udpsrc0, GST_STATE_NULL);
588     gst_object_unref (udpsrc0);
589
590     tmp_rtp++;
591     goto again;
592   }
593
594   /* allocate port+1 for RTCP now */
595   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
596   if (udpsrc1 == NULL)
597     goto no_udp_rtcp_protocol;
598
599   /* set port */
600   tmp_rtcp = tmp_rtp + 1;
601   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
602
603   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
604   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
605   if (ret == GST_STATE_CHANGE_FAILURE) {
606
607     if (++count > 20)
608       goto no_ports;
609
610     gst_element_set_state (udpsrc0, GST_STATE_NULL);
611     gst_object_unref (udpsrc0);
612
613     gst_element_set_state (udpsrc1, GST_STATE_NULL);
614     gst_object_unref (udpsrc1);
615
616     tmp_rtp += 2;
617     goto again;
618   }
619
620   /* all fine, do port check */
621   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
622   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
623
624   /* this should not happen... */
625   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
626     goto port_error;
627
628   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
629   if (!udpsink0)
630     goto no_udp_protocol;
631
632   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
633   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
634   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
635
636   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
637   if (!udpsink1)
638     goto no_udp_protocol;
639
640   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
641   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
642   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
643   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
644   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
645
646   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
647   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
648   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
649   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
650
651   /* we keep these elements, we configure all in configure_transport when the
652    * server told us to really use the UDP ports. */
653   stream->udpsrc[0] = udpsrc0;
654   stream->udpsrc[1] = udpsrc1;
655   stream->udpsink[0] = udpsink0;
656   stream->udpsink[1] = udpsink1;
657   stream->server_port.min = rtpport;
658   stream->server_port.max = rtcpport;
659
660   return TRUE;
661
662   /* ERRORS */
663 no_udp_protocol:
664   {
665     goto cleanup;
666   }
667 no_ports:
668   {
669     goto cleanup;
670   }
671 no_udp_rtcp_protocol:
672   {
673     goto cleanup;
674   }
675 port_error:
676   {
677     goto cleanup;
678   }
679 cleanup:
680   {
681     if (udpsrc0) {
682       gst_element_set_state (udpsrc0, GST_STATE_NULL);
683       gst_object_unref (udpsrc0);
684     }
685     if (udpsrc1) {
686       gst_element_set_state (udpsrc1, GST_STATE_NULL);
687       gst_object_unref (udpsrc1);
688     }
689     if (udpsink0) {
690       gst_element_set_state (udpsink0, GST_STATE_NULL);
691       gst_object_unref (udpsink0);
692     }
693     if (udpsink1) {
694       gst_element_set_state (udpsink1, GST_STATE_NULL);
695       gst_object_unref (udpsink1);
696     }
697     return FALSE;
698   }
699 }
700
701 static void
702 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
703 {
704   gchar *capsstr;
705   GstCaps *newcaps, *oldcaps;
706
707   if ((newcaps = GST_PAD_CAPS (pad)))
708     gst_caps_ref (newcaps);
709
710   oldcaps = stream->caps;
711   stream->caps = newcaps;
712
713   if (oldcaps)
714     gst_caps_unref (oldcaps);
715
716   capsstr = gst_caps_to_string (newcaps);
717   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
718   g_free (capsstr);
719 }
720
721 static void
722 dump_structure (const GstStructure * s)
723 {
724   gchar *sstr;
725
726   sstr = gst_structure_to_string (s);
727   GST_INFO ("structure: %s", sstr);
728   g_free (sstr);
729 }
730
731 static GstRTSPMediaTrans *
732 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
733 {
734   GList *walk;
735   GstRTSPMediaTrans *result = NULL;
736   const gchar *tmp;
737   gchar *dest;
738   guint port;
739
740   if (rtcp_from == NULL)
741     return NULL;
742
743   tmp = g_strrstr (rtcp_from, ":");
744   if (tmp == NULL)
745     return NULL;
746
747   port = atoi (tmp + 1);
748   dest = g_strndup (rtcp_from, tmp - rtcp_from);
749
750   GST_INFO ("finding %s:%d", dest, port);
751
752   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
753     GstRTSPMediaTrans *trans = walk->data;
754     gint min, max;
755
756     min = trans->transport->client_port.min;
757     max = trans->transport->client_port.max;
758
759     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
760             || max == port)) {
761       result = trans;
762       break;
763     }
764   }
765   g_free (dest);
766
767   return result;
768 }
769
770 static void
771 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
772 {
773   GstStructure *stats;
774   GstRTSPMediaTrans *trans;
775
776   GST_INFO ("%p: new source %p", stream, source);
777
778   /* see if we have a stream to match with the origin of the RTCP packet */
779   trans = g_object_get_qdata (source, ssrc_stream_map_key);
780   if (trans == NULL) {
781     g_object_get (source, "stats", &stats, NULL);
782     if (stats) {
783       const gchar *rtcp_from;
784
785       dump_structure (stats);
786
787       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
788       if ((trans = find_transport (stream, rtcp_from))) {
789         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
790             source);
791
792         /* keep ref to the source */
793         trans->rtpsource = source;
794
795         g_object_set_qdata (source, ssrc_stream_map_key, trans);
796       }
797       gst_structure_free (stats);
798     }
799   } else {
800     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
801   }
802 }
803
804 static void
805 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
806 {
807   GST_INFO ("%p: new SDES %p", stream, source);
808 }
809
810 static void
811 on_ssrc_active (GObject * session, GObject * source,
812     GstRTSPMediaStream * stream)
813 {
814   GstRTSPMediaTrans *trans;
815
816   trans = g_object_get_qdata (source, ssrc_stream_map_key);
817
818   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
819
820   if (trans && trans->keep_alive)
821     trans->keep_alive (trans->ka_user_data);
822
823 #ifdef DUMP_STATS
824   {
825     GstStructure *stats;
826     g_object_get (source, "stats", &stats, NULL);
827     if (stats) {
828       dump_structure (stats);
829       gst_structure_free (stats);
830     }
831   }
832 #endif
833 }
834
835 static void
836 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
837 {
838   GST_INFO ("%p: source %p bye", stream, source);
839 }
840
841 static void
842 on_bye_timeout (GObject * session, GObject * source,
843     GstRTSPMediaStream * stream)
844 {
845   GstRTSPMediaTrans *trans;
846
847   GST_INFO ("%p: source %p bye timeout", stream, source);
848
849   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
850     trans->rtpsource = NULL;
851     trans->timeout = TRUE;
852   }
853 }
854
855 static void
856 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
857 {
858   GstRTSPMediaTrans *trans;
859
860   GST_INFO ("%p: source %p timeout", stream, source);
861
862   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
863     trans->rtpsource = NULL;
864     trans->timeout = TRUE;
865   }
866 }
867
868 static GstFlowReturn
869 handle_new_buffer (GstAppSink * sink, gpointer user_data)
870 {
871   GList *walk;
872   GstBuffer *buffer;
873   GstRTSPMediaStream *stream;
874
875   buffer = gst_app_sink_pull_buffer (sink);
876   if (!buffer)
877     return GST_FLOW_OK;
878
879   stream = (GstRTSPMediaStream *) user_data;
880
881   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
882     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
883
884     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
885       if (tr->send_rtp)
886         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
887     } else {
888       if (tr->send_rtcp)
889         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
890     }
891   }
892   gst_buffer_unref (buffer);
893
894   return GST_FLOW_OK;
895 }
896
897 static GstAppSinkCallbacks sink_cb = {
898   NULL,                         /* not interested in EOS */
899   NULL,                         /* not interested in preroll buffers */
900   handle_new_buffer
901 };
902
903 /* prepare the pipeline objects to handle @stream in @media */
904 static gboolean
905 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
906 {
907   gchar *name;
908   GstPad *pad, *teepad, *selpad;
909   GstPadLinkReturn ret;
910   gint i;
911
912   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
913    * for sending RTP/RTCP. The sender and receiver ports are shared between the
914    * elements */
915   if (!alloc_udp_ports (stream))
916     return FALSE;
917
918   /* add the ports to the pipeline */
919   for (i = 0; i < 2; i++) {
920     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
921     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
922   }
923
924   /* create elements for the TCP transfer */
925   for (i = 0; i < 2; i++) {
926     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
927     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
928     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
929     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
930     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
931     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
932     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
933     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
934         &sink_cb, stream, NULL);
935   }
936
937   /* hook up the stream to the RTP session elements. */
938   name = g_strdup_printf ("send_rtp_sink_%d", idx);
939   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
940   g_free (name);
941   name = g_strdup_printf ("send_rtp_src_%d", idx);
942   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
943   g_free (name);
944   name = g_strdup_printf ("send_rtcp_src_%d", idx);
945   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
946   g_free (name);
947   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
948   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
949   g_free (name);
950   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
951   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
952   g_free (name);
953
954   /* get the session */
955   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
956       &stream->session);
957
958   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
959       stream);
960   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
961       stream);
962   g_signal_connect (stream->session, "on-ssrc-active",
963       (GCallback) on_ssrc_active, stream);
964   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
965       stream);
966   g_signal_connect (stream->session, "on-bye-timeout",
967       (GCallback) on_bye_timeout, stream);
968   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
969       stream);
970
971   /* link the RTP pad to the session manager */
972   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
973   if (ret != GST_PAD_LINK_OK)
974     goto link_failed;
975
976   /* make tee for RTP and link to stream */
977   stream->tee[0] = gst_element_factory_make ("tee", NULL);
978   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
979
980   pad = gst_element_get_static_pad (stream->tee[0], "sink");
981   gst_pad_link (stream->send_rtp_src, pad);
982   gst_object_unref (pad);
983
984   /* link RTP sink, we're pretty sure this will work. */
985   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
986   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
987   gst_pad_link (teepad, pad);
988   gst_object_unref (pad);
989   gst_object_unref (teepad);
990
991   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
992   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
993   gst_pad_link (teepad, pad);
994   gst_object_unref (pad);
995   gst_object_unref (teepad);
996
997   /* make tee for RTCP */
998   stream->tee[1] = gst_element_factory_make ("tee", NULL);
999   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1000
1001   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1002   gst_pad_link (stream->send_rtcp_src, pad);
1003   gst_object_unref (pad);
1004
1005   /* link RTCP elements */
1006   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1007   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1008   gst_pad_link (teepad, pad);
1009   gst_object_unref (pad);
1010   gst_object_unref (teepad);
1011
1012   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1013   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1014   gst_pad_link (teepad, pad);
1015   gst_object_unref (pad);
1016   gst_object_unref (teepad);
1017
1018   /* make selector for the RTP receivers */
1019   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1020   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1021   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1022
1023   pad = gst_element_get_static_pad (stream->selector[0], "src");
1024   gst_pad_link (pad, stream->recv_rtp_sink);
1025   gst_object_unref (pad);
1026
1027   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1028   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1029   gst_pad_link (pad, selpad);
1030   gst_object_unref (pad);
1031   gst_object_unref (selpad);
1032
1033   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1034   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1035   gst_pad_link (pad, selpad);
1036   gst_object_unref (pad);
1037   gst_object_unref (selpad);
1038
1039   /* make selector for the RTCP receivers */
1040   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1041   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1042   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1043
1044   pad = gst_element_get_static_pad (stream->selector[1], "src");
1045   gst_pad_link (pad, stream->recv_rtcp_sink);
1046   gst_object_unref (pad);
1047
1048   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1049   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1050   gst_pad_link (pad, selpad);
1051   gst_object_unref (pad);
1052   gst_object_unref (selpad);
1053
1054   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1055   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1056   gst_pad_link (pad, selpad);
1057   gst_object_unref (pad);
1058   gst_object_unref (selpad);
1059
1060   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1061    * values */
1062   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1063   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1064   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1065   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1066
1067   /* be notified of caps changes */
1068   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1069       (GCallback) caps_notify, stream);
1070
1071   stream->prepared = TRUE;
1072
1073   return TRUE;
1074
1075   /* ERRORS */
1076 link_failed:
1077   {
1078     GST_WARNING ("failed to link stream %d", idx);
1079     return FALSE;
1080   }
1081 }
1082
1083 static void
1084 unlock_streams (GstRTSPMedia * media)
1085 {
1086   guint i, n_streams;
1087
1088   /* unlock the udp src elements */
1089   n_streams = gst_rtsp_media_n_streams (media);
1090   for (i = 0; i < n_streams; i++) {
1091     GstRTSPMediaStream *stream;
1092
1093     stream = gst_rtsp_media_get_stream (media, i);
1094
1095     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1096     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1097   }
1098 }
1099
1100 static gboolean
1101 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1102 {
1103   GstMessageType type;
1104
1105   type = GST_MESSAGE_TYPE (message);
1106
1107   switch (type) {
1108     case GST_MESSAGE_STATE_CHANGED:
1109       break;
1110     case GST_MESSAGE_BUFFERING:
1111     {
1112       gint percent;
1113
1114       gst_message_parse_buffering (message, &percent);
1115
1116       /* no state management needed for live pipelines */
1117       if (media->is_live)
1118         break;
1119
1120       if (percent == 100) {
1121         /* a 100% message means buffering is done */
1122         media->buffering = FALSE;
1123         /* if the desired state is playing, go back */
1124         if (media->target_state == GST_STATE_PLAYING) {
1125           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1126           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1127         } else {
1128           GST_INFO ("Buffering done");
1129         }
1130       } else {
1131         /* buffering busy */
1132         if (media->buffering == FALSE) {
1133           if (media->target_state == GST_STATE_PLAYING) {
1134             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1135             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1136             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1137           } else {
1138             GST_INFO ("Buffering ...");
1139           }
1140         }
1141         media->buffering = TRUE;
1142       }
1143       break;
1144     }
1145     case GST_MESSAGE_LATENCY:
1146     {
1147       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1148       break;
1149     }
1150     case GST_MESSAGE_ERROR:
1151     {
1152       GError *gerror;
1153       gchar *debug;
1154
1155       gst_message_parse_error (message, &gerror, &debug);
1156       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1157       g_error_free (gerror);
1158       g_free (debug);
1159       break;
1160     }
1161     case GST_MESSAGE_WARNING:
1162     {
1163       GError *gerror;
1164       gchar *debug;
1165
1166       gst_message_parse_warning (message, &gerror, &debug);
1167       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1168       g_error_free (gerror);
1169       g_free (debug);
1170       break;
1171     }
1172     case GST_MESSAGE_ELEMENT:
1173       break;
1174     case GST_MESSAGE_STREAM_STATUS:
1175       break;
1176     default:
1177       GST_INFO ("%p: got message type %s", media,
1178           gst_message_type_get_name (type));
1179       break;
1180   }
1181   return TRUE;
1182 }
1183
1184 static gboolean
1185 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1186 {
1187   GstRTSPMediaClass *klass;
1188   gboolean ret;
1189
1190   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1191
1192   if (klass->handle_message)
1193     ret = klass->handle_message (media, message);
1194   else
1195     ret = FALSE;
1196
1197   return ret;
1198 }
1199
1200 static void
1201 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1202 {
1203   GstRTSPMediaStream *stream;
1204   gchar *name;
1205   gint i;
1206
1207   i = media->streams->len + 1;
1208
1209   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1210
1211   stream = g_new0 (GstRTSPMediaStream, 1);
1212   stream->payloader = element;
1213
1214   name = g_strdup_printf ("dynpay%d", i);
1215
1216   /* ghost the pad of the payloader to the element */
1217   stream->srcpad = gst_ghost_pad_new (name, pad);
1218   gst_pad_set_active (stream->srcpad, TRUE);
1219   gst_element_add_pad (media->element, stream->srcpad);
1220   g_free (name);
1221
1222   /* add stream now */
1223   g_array_append_val (media->streams, stream);
1224
1225   setup_stream (stream, i, media);
1226
1227   for (i = 0; i < 2; i++) {
1228     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1229     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1230     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1231     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1232     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1233   }
1234 }
1235
1236 static void
1237 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1238 {
1239   GST_INFO ("no more pads");
1240   if (media->fakesink) {
1241     gst_object_ref (media->fakesink);
1242     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1243     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1244     gst_object_unref (media->fakesink);
1245     media->fakesink = NULL;
1246     GST_INFO ("removed fakesink");
1247   }
1248 }
1249
1250 /**
1251  * gst_rtsp_media_prepare:
1252  * @obj: a #GstRTSPMedia
1253  *
1254  * Prepare @media for streaming. This function will create the pipeline and
1255  * other objects to manage the streaming.
1256  *
1257  * It will preroll the pipeline and collect vital information about the streams
1258  * such as the duration.
1259  *
1260  * Returns: %TRUE on success.
1261  */
1262 gboolean
1263 gst_rtsp_media_prepare (GstRTSPMedia * media)
1264 {
1265   GstStateChangeReturn ret;
1266   guint i, n_streams;
1267   GstRTSPMediaClass *klass;
1268   GstBus *bus;
1269   GList *walk;
1270
1271   if (media->prepared)
1272     goto was_prepared;
1273
1274   if (!media->reusable && media->reused)
1275     goto is_reused;
1276
1277   GST_INFO ("preparing media %p", media);
1278
1279   /* reset some variables */
1280   media->is_live = FALSE;
1281   media->buffering = FALSE;
1282
1283   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1284
1285   /* add the pipeline bus to our custom mainloop */
1286   media->source = gst_bus_create_watch (bus);
1287   gst_object_unref (bus);
1288
1289   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1290
1291   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1292   media->id = g_source_attach (media->source, klass->context);
1293
1294   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1295
1296   /* add stuff to the bin */
1297   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1298
1299   /* link streams we already have, other streams might appear when we have
1300    * dynamic elements */
1301   n_streams = gst_rtsp_media_n_streams (media);
1302   for (i = 0; i < n_streams; i++) {
1303     GstRTSPMediaStream *stream;
1304
1305     stream = gst_rtsp_media_get_stream (media, i);
1306
1307     setup_stream (stream, i, media);
1308   }
1309
1310   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1311     GstElement *elem = walk->data;
1312
1313     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1314     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1315
1316     /* we add a fakesink here in order to make the state change async. We remove
1317      * the fakesink again in the no-more-pads callback. */
1318     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1319     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1320   }
1321
1322   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1323   /* first go to PAUSED */
1324   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1325   media->target_state = GST_STATE_PAUSED;
1326
1327   switch (ret) {
1328     case GST_STATE_CHANGE_SUCCESS:
1329       GST_INFO ("SUCCESS state change for media %p", media);
1330       break;
1331     case GST_STATE_CHANGE_ASYNC:
1332       GST_INFO ("ASYNC state change for media %p", media);
1333       break;
1334     case GST_STATE_CHANGE_NO_PREROLL:
1335       /* we need to go to PLAYING */
1336       GST_INFO ("NO_PREROLL state change: live media %p", media);
1337       media->is_live = TRUE;
1338       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1339       if (ret == GST_STATE_CHANGE_FAILURE)
1340         goto state_failed;
1341       break;
1342     case GST_STATE_CHANGE_FAILURE:
1343       goto state_failed;
1344   }
1345
1346   /* now wait for all pads to be prerolled */
1347   ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1348   if (ret == GST_STATE_CHANGE_FAILURE)
1349     goto state_failed;
1350
1351   /* collect stats about the media */
1352   collect_media_stats (media);
1353
1354   GST_INFO ("object %p is prerolled", media);
1355
1356   media->prepared = TRUE;
1357
1358   return TRUE;
1359
1360   /* OK */
1361 was_prepared:
1362   {
1363     return TRUE;
1364   }
1365   /* ERRORS */
1366 state_failed:
1367   {
1368     GST_WARNING ("failed to preroll pipeline");
1369     unlock_streams (media);
1370     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1371     return FALSE;
1372   }
1373 is_reused:
1374   {
1375     GST_WARNING ("can not reuse media %p", media);
1376     return FALSE;
1377   }
1378 }
1379
1380 /**
1381  * gst_rtsp_media_unprepare:
1382  * @obj: a #GstRTSPMedia
1383  *
1384  * Unprepare @media. After this call, the media should be prepared again before
1385  * it can be used again. If the media is set to be non-reusable, a new instance
1386  * must be created.
1387  *
1388  * Returns: %TRUE on success.
1389  */
1390 gboolean
1391 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1392 {
1393   GstRTSPMediaClass *klass;
1394   gboolean success;
1395
1396   if (!media->prepared)
1397     return TRUE;
1398
1399   GST_INFO ("unprepare media %p", media);
1400   media->target_state = GST_STATE_NULL;
1401
1402   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1403   if (klass->unprepare)
1404     success = klass->unprepare (media);
1405   else
1406     success = TRUE;
1407
1408   media->prepared = FALSE;
1409   media->reused = TRUE;
1410
1411   /* when the media is not reusable, this will effectively unref the media and
1412    * recreate it */
1413   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1414
1415   return success;
1416 }
1417
1418 static gboolean
1419 default_unprepare (GstRTSPMedia * media)
1420 {
1421   gst_element_set_state (media->pipeline, GST_STATE_NULL);
1422
1423   return TRUE;
1424 }
1425
1426 /**
1427  * gst_rtsp_media_set_state:
1428  * @media: a #GstRTSPMedia
1429  * @state: the target state of the media
1430  * @transports: a GArray of #GstRTSPMediaTrans pointers
1431  *
1432  * Set the state of @media to @state and for the transports in @transports.
1433  *
1434  * Returns: %TRUE on success.
1435  */
1436 gboolean
1437 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1438     GArray * transports)
1439 {
1440   gint i;
1441   GstStateChangeReturn ret;
1442   gboolean add, remove, do_state;
1443   gint old_active;
1444
1445   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1446   g_return_val_if_fail (transports != NULL, FALSE);
1447
1448   /* NULL and READY are the same */
1449   if (state == GST_STATE_READY)
1450     state = GST_STATE_NULL;
1451
1452   add = remove = FALSE;
1453
1454   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1455       media);
1456
1457   switch (state) {
1458     case GST_STATE_NULL:
1459       /* unlock the streams so that they follow the state changes from now on */
1460       unlock_streams (media);
1461       /* fallthrough */
1462     case GST_STATE_PAUSED:
1463       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1464       if (media->target_state == GST_STATE_PLAYING)
1465         remove = TRUE;
1466       break;
1467     case GST_STATE_PLAYING:
1468       /* we're going to PLAYING, add */
1469       add = TRUE;
1470       break;
1471     default:
1472       break;
1473   }
1474   old_active = media->active;
1475
1476   for (i = 0; i < transports->len; i++) {
1477     GstRTSPMediaTrans *tr;
1478     GstRTSPMediaStream *stream;
1479     GstRTSPTransport *trans;
1480
1481     /* we need a non-NULL entry in the array */
1482     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1483     if (tr == NULL)
1484       continue;
1485
1486     /* we need a transport */
1487     if (!(trans = tr->transport))
1488       continue;
1489
1490     /* get the stream and add the destinations */
1491     stream = gst_rtsp_media_get_stream (media, tr->idx);
1492     switch (trans->lower_transport) {
1493       case GST_RTSP_LOWER_TRANS_UDP:
1494       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1495       {
1496         gchar *dest;
1497         gint min, max;
1498
1499         dest = trans->destination;
1500         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1501           min = trans->port.min;
1502           max = trans->port.max;
1503         } else {
1504           min = trans->client_port.min;
1505           max = trans->client_port.max;
1506         }
1507
1508         if (add && !tr->active) {
1509           GST_INFO ("adding %s:%d-%d", dest, min, max);
1510           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1511           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1512           stream->transports = g_list_prepend (stream->transports, tr);
1513           tr->active = TRUE;
1514           media->active++;
1515         } else if (remove && tr->active) {
1516           GST_INFO ("removing %s:%d-%d", dest, min, max);
1517           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1518           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1519           stream->transports = g_list_remove (stream->transports, tr);
1520           tr->active = FALSE;
1521           media->active--;
1522         }
1523         break;
1524       }
1525       case GST_RTSP_LOWER_TRANS_TCP:
1526         if (add && !tr->active) {
1527           GST_INFO ("adding TCP %s", trans->destination);
1528           stream->transports = g_list_prepend (stream->transports, tr);
1529           tr->active = TRUE;
1530           media->active++;
1531         } else if (remove && tr->active) {
1532           GST_INFO ("removing TCP %s", trans->destination);
1533           stream->transports = g_list_remove (stream->transports, tr);
1534           tr->active = FALSE;
1535           media->active--;
1536         }
1537         break;
1538       default:
1539         GST_INFO ("Unknown transport %d", trans->lower_transport);
1540         break;
1541     }
1542   }
1543
1544   /* we just added the first media, do the playing state change */
1545   if (old_active == 0 && add)
1546     do_state = TRUE;
1547   /* if we have no more active media, do the downward state changes */
1548   else if (media->active == 0)
1549     do_state = TRUE;
1550   else
1551     do_state = FALSE;
1552
1553   GST_INFO ("active %d media %p", media->active, media);
1554
1555   if (do_state && media->target_state != state) {
1556     if (state == GST_STATE_NULL) {
1557       gst_rtsp_media_unprepare (media);
1558     } else {
1559       GST_INFO ("state %s media %p", gst_element_state_get_name (state), media);
1560       media->target_state = state;
1561       ret = gst_element_set_state (media->pipeline, state);
1562     }
1563   }
1564
1565   /* remember where we are */
1566   if (state == GST_STATE_PAUSED)
1567     collect_media_stats (media);
1568
1569   return TRUE;
1570 }
1571
1572 /**
1573  * gst_rtsp_media_remove_elements:
1574  * @media: a #GstRTSPMedia
1575  *
1576  * Remove all elements and the pipeline controlled by @media.
1577  */
1578 void
1579 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1580 {
1581   gint i, j;
1582
1583   unlock_streams (media);
1584
1585   for (i = 0; i < media->streams->len; i++) {
1586     GstRTSPMediaStream *stream;
1587
1588     GST_INFO ("Removing elements of stream %d from pipeline", i);
1589
1590     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1591
1592     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
1593
1594     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
1595
1596     for (j = 0; j < 2; j++) {
1597       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
1598       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
1599       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
1600       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
1601       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
1602       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
1603
1604       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
1605       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
1606       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
1607       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
1608       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
1609       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
1610     }
1611     if (stream->caps)
1612       gst_caps_unref (stream->caps);
1613     stream->caps = NULL;
1614     gst_rtsp_media_stream_free (stream);
1615   }
1616   g_array_remove_range (media->streams, 0, media->streams->len);
1617
1618   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
1619   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
1620
1621   gst_object_unref (media->pipeline);
1622   media->pipeline = NULL;
1623 }