Add stdlib.h for atoi()
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED         FALSE
29 #define DEFAULT_REUSABLE       FALSE
30 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32
33 /* define to dump received RTCP packets */
34 #undef DUMP_STATS
35
36 enum
37 {
38   PROP_0,
39   PROP_SHARED,
40   PROP_REUSABLE,
41   PROP_PROTOCOLS,
42   PROP_LAST
43 };
44
45 enum
46 {
47   SIGNAL_UNPREPARED,
48   SIGNAL_LAST
49 };
50
51 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
52 #define GST_CAT_DEFAULT rtsp_media_debug
53
54 static GQuark ssrc_stream_map_key;
55
56 static void gst_rtsp_media_get_property (GObject * object, guint propid,
57     GValue * value, GParamSpec * pspec);
58 static void gst_rtsp_media_set_property (GObject * object, guint propid,
59     const GValue * value, GParamSpec * pspec);
60 static void gst_rtsp_media_finalize (GObject * obj);
61
62 static gpointer do_loop (GstRTSPMediaClass * klass);
63 static gboolean default_handle_message (GstRTSPMedia * media,
64     GstMessage * message);
65 static gboolean default_unprepare (GstRTSPMedia * media);
66 static void unlock_streams (GstRTSPMedia * media);
67
68 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
69
70 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
71
72 static void
73 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
74 {
75   GObjectClass *gobject_class;
76   GError *error = NULL;
77
78   gobject_class = G_OBJECT_CLASS (klass);
79
80   gobject_class->get_property = gst_rtsp_media_get_property;
81   gobject_class->set_property = gst_rtsp_media_set_property;
82   gobject_class->finalize = gst_rtsp_media_finalize;
83
84   g_object_class_install_property (gobject_class, PROP_SHARED,
85       g_param_spec_boolean ("shared", "Shared",
86           "If this media pipeline can be shared", DEFAULT_SHARED,
87           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
88
89   g_object_class_install_property (gobject_class, PROP_REUSABLE,
90       g_param_spec_boolean ("reusable", "Reusable",
91           "If this media pipeline can be reused after an unprepare",
92           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
93
94   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
95       g_param_spec_flags ("protocols", "Protocols",
96           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
97           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98
99   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
100       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
101       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
102       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
103
104   klass->context = g_main_context_new ();
105   klass->loop = g_main_loop_new (klass->context, TRUE);
106
107   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
108   if (error != NULL) {
109     g_critical ("could not start bus thread: %s", error->message);
110   }
111   klass->handle_message = default_handle_message;
112   klass->unprepare = default_unprepare;
113
114   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
115 }
116
117 static void
118 gst_rtsp_media_init (GstRTSPMedia * media)
119 {
120   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
121   media->lock = g_mutex_new ();
122   media->cond = g_cond_new ();
123
124   media->shared = DEFAULT_SHARED;
125   media->reusable = DEFAULT_REUSABLE;
126   media->protocols = DEFAULT_PROTOCOLS;
127 }
128
129 static void
130 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
131 {
132   if (stream->session)
133     g_object_unref (stream->session);
134
135   if (stream->caps)
136     gst_caps_unref (stream->caps);
137
138   if (stream->send_rtp_sink)
139     gst_object_unref (stream->send_rtp_sink);
140   if (stream->send_rtp_src)
141     gst_object_unref (stream->send_rtp_src);
142   if (stream->send_rtcp_src)
143     gst_object_unref (stream->send_rtcp_src);
144   if (stream->recv_rtcp_sink)
145     gst_object_unref (stream->recv_rtcp_sink);
146   if (stream->recv_rtp_sink)
147     gst_object_unref (stream->recv_rtp_sink);
148
149   g_list_free (stream->transports);
150
151   g_free (stream);
152 }
153
154 static void
155 gst_rtsp_media_finalize (GObject * obj)
156 {
157   GstRTSPMedia *media;
158   guint i;
159
160   media = GST_RTSP_MEDIA (obj);
161
162   GST_INFO ("finalize media %p", media);
163
164   if (media->pipeline) {
165     unlock_streams (media);
166     gst_element_set_state (media->pipeline, GST_STATE_NULL);
167     gst_object_unref (media->pipeline);
168   }
169
170   for (i = 0; i < media->streams->len; i++) {
171     GstRTSPMediaStream *stream;
172
173     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
174
175     gst_rtsp_media_stream_free (stream);
176   }
177   g_array_free (media->streams, TRUE);
178
179   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
180   g_list_free (media->dynamic);
181
182   if (media->source) {
183     g_source_destroy (media->source);
184     g_source_unref (media->source);
185   }
186   g_mutex_free (media->lock);
187   g_cond_free (media->cond);
188
189   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
190 }
191
192 static void
193 gst_rtsp_media_get_property (GObject * object, guint propid,
194     GValue * value, GParamSpec * pspec)
195 {
196   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
197
198   switch (propid) {
199     case PROP_SHARED:
200       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
201       break;
202     case PROP_REUSABLE:
203       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
204       break;
205     case PROP_PROTOCOLS:
206       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
207       break;
208     default:
209       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
210   }
211 }
212
213 static void
214 gst_rtsp_media_set_property (GObject * object, guint propid,
215     const GValue * value, GParamSpec * pspec)
216 {
217   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
218
219   switch (propid) {
220     case PROP_SHARED:
221       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
222       break;
223     case PROP_REUSABLE:
224       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
225       break;
226     case PROP_PROTOCOLS:
227       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
228       break;
229     default:
230       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
231   }
232 }
233
234 static gpointer
235 do_loop (GstRTSPMediaClass * klass)
236 {
237   GST_INFO ("enter mainloop");
238   g_main_loop_run (klass->loop);
239   GST_INFO ("exit mainloop");
240
241   return NULL;
242 }
243
244 static void
245 collect_media_stats (GstRTSPMedia * media)
246 {
247   GstFormat format;
248   gint64 position, duration;
249
250   media->range.unit = GST_RTSP_RANGE_NPT;
251
252   if (media->is_live) {
253     media->range.min.type = GST_RTSP_TIME_NOW;
254     media->range.min.seconds = -1;
255     media->range.max.type = GST_RTSP_TIME_END;
256     media->range.max.seconds = -1;
257   } else {
258     /* get the position */
259     format = GST_FORMAT_TIME;
260     if (!gst_element_query_position (media->pipeline, &format, &position)) {
261       GST_INFO ("position query failed");
262       position = 0;
263     }
264
265     /* get the duration */
266     format = GST_FORMAT_TIME;
267     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
268       GST_INFO ("duration query failed");
269       duration = -1;
270     }
271
272     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
273         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
274
275     if (position == -1) {
276       media->range.min.type = GST_RTSP_TIME_NOW;
277       media->range.min.seconds = -1;
278     } else {
279       media->range.min.type = GST_RTSP_TIME_SECONDS;
280       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
281     }
282     if (duration == -1) {
283       media->range.max.type = GST_RTSP_TIME_END;
284       media->range.max.seconds = -1;
285     } else {
286       media->range.max.type = GST_RTSP_TIME_SECONDS;
287       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
288     }
289   }
290 }
291
292 /**
293  * gst_rtsp_media_new:
294  *
295  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
296  * element to produde RTP data for one or more related (audio/video/..) 
297  * streams.
298  *
299  * Returns: a new #GstRTSPMedia object.
300  */
301 GstRTSPMedia *
302 gst_rtsp_media_new (void)
303 {
304   GstRTSPMedia *result;
305
306   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
307
308   return result;
309 }
310
311 /**
312  * gst_rtsp_media_set_shared:
313  * @media: a #GstRTSPMedia
314  * @shared: the new value
315  *
316  * Set or unset if the pipeline for @media can be shared will multiple clients.
317  * When @shared is %TRUE, client requests for this media will share the media
318  * pipeline.
319  */
320 void
321 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
322 {
323   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
324
325   media->shared = shared;
326 }
327
328 /**
329  * gst_rtsp_media_is_shared:
330  * @media: a #GstRTSPMedia
331  *
332  * Check if the pipeline for @media can be shared between multiple clients.
333  *
334  * Returns: %TRUE if the media can be shared between clients.
335  */
336 gboolean
337 gst_rtsp_media_is_shared (GstRTSPMedia * media)
338 {
339   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
340
341   return media->shared;
342 }
343
344 /**
345  * gst_rtsp_media_set_reusable:
346  * @media: a #GstRTSPMedia
347  * @reusable: the new value
348  *
349  * Set or unset if the pipeline for @media can be reused after the pipeline has
350  * been unprepared.
351  */
352 void
353 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
354 {
355   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
356
357   media->reusable = reusable;
358 }
359
360 /**
361  * gst_rtsp_media_is_reusable:
362  * @media: a #GstRTSPMedia
363  *
364  * Check if the pipeline for @media can be reused after an unprepare.
365  *
366  * Returns: %TRUE if the media can be reused
367  */
368 gboolean
369 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
370 {
371   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
372
373   return media->reusable;
374 }
375
376 /**
377  * gst_rtsp_media_set_protocols:
378  * @media: a #GstRTSPMedia
379  * @protocols: the new flags
380  *
381  * Configure the allowed lower transport for @media.
382  */
383 void
384 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
385 {
386   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
387
388   media->protocols = protocols;
389 }
390
391 /**
392  * gst_rtsp_media_get_protocols:
393  * @media: a #GstRTSPMedia
394  *
395  * Get the allowed protocols of @media.
396  *
397  * Returns: a #GstRTSPLowerTrans
398  */
399 GstRTSPLowerTrans
400 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), GST_RTSP_LOWER_TRANS_UNKNOWN);
403
404   return media->protocols;
405 }
406
407 /**
408  * gst_rtsp_media_n_streams:
409  * @media: a #GstRTSPMedia
410  *
411  * Get the number of streams in this media.
412  *
413  * Returns: The number of streams.
414  */
415 guint
416 gst_rtsp_media_n_streams (GstRTSPMedia * media)
417 {
418   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
419
420   return media->streams->len;
421 }
422
423 /**
424  * gst_rtsp_media_get_stream:
425  * @media: a #GstRTSPMedia
426  * @idx: the stream index
427  *
428  * Retrieve the stream with index @idx from @media.
429  *
430  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
431  * that index did not exist.
432  */
433 GstRTSPMediaStream *
434 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
435 {
436   GstRTSPMediaStream *res;
437
438   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
439
440   if (idx < media->streams->len)
441     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
442   else
443     res = NULL;
444
445   return res;
446 }
447
448 /**
449  * gst_rtsp_media_seek:
450  * @stream: a #GstRTSPMediaStream
451  * @range: a #GstRTSPTimeRange
452  *
453  * Seek the pipeline to @range.
454  *
455  * Returns: %TRUE on success.
456  */
457 gboolean
458 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
459 {
460   GstSeekFlags flags;
461   gboolean res;
462   gint64 start, stop;
463   GstSeekType start_type, stop_type;
464
465   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
466   g_return_val_if_fail (range != NULL, FALSE);
467
468   if (range->unit != GST_RTSP_RANGE_NPT)
469     goto not_supported;
470
471   /* depends on the current playing state of the pipeline. We might need to
472    * queue this until we get EOS. */
473   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
474
475   start_type = stop_type = GST_SEEK_TYPE_NONE;
476
477   switch (range->min.type) {
478     case GST_RTSP_TIME_NOW:
479       start = -1;
480       break;
481     case GST_RTSP_TIME_SECONDS:
482       /* only seek when something changed */
483       if (media->range.min.seconds == range->min.seconds) {
484         start = -1;
485       } else {
486         start = range->min.seconds * GST_SECOND;
487         start_type = GST_SEEK_TYPE_SET;
488       }
489       break;
490     case GST_RTSP_TIME_END:
491     default:
492       goto weird_type;
493   }
494   switch (range->max.type) {
495     case GST_RTSP_TIME_SECONDS:
496       /* only seek when something changed */
497       if (media->range.max.seconds == range->max.seconds) {
498         stop = -1;
499       } else {
500         stop = range->max.seconds * GST_SECOND;
501         stop_type = GST_SEEK_TYPE_SET;
502       }
503       break;
504     case GST_RTSP_TIME_END:
505       stop = -1;
506       stop_type = GST_SEEK_TYPE_SET;
507       break;
508     case GST_RTSP_TIME_NOW:
509     default:
510       goto weird_type;
511   }
512
513   if (start != -1 || stop != -1) {
514     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
515         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
516
517     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
518         flags, start_type, start, stop_type, stop);
519
520     /* and block for the seek to complete */
521     GST_INFO ("done seeking %d", res);
522     gst_element_get_state (media->pipeline, NULL, NULL, -1);
523     GST_INFO ("prerolled again");
524
525     collect_media_stats (media);
526   } else {
527     GST_INFO ("no seek needed");
528     res = TRUE;
529   }
530
531   return res;
532
533   /* ERRORS */
534 not_supported:
535   {
536     GST_WARNING ("seek unit %d not supported", range->unit);
537     return FALSE;
538   }
539 weird_type:
540   {
541     GST_WARNING ("weird range type %d not supported", range->min.type);
542     return FALSE;
543   }
544 }
545
546 /**
547  * gst_rtsp_media_stream_rtp:
548  * @stream: a #GstRTSPMediaStream
549  * @buffer: a #GstBuffer
550  *
551  * Handle an RTP buffer for the stream. This method is usually called when a
552  * message has been received from a client using the TCP transport.
553  *
554  * This function takes ownership of @buffer.
555  *
556  * Returns: a GstFlowReturn.
557  */
558 GstFlowReturn
559 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
560 {
561   GstFlowReturn ret;
562
563   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
564
565   return ret;
566 }
567
568 /**
569  * gst_rtsp_media_stream_rtcp:
570  * @stream: a #GstRTSPMediaStream
571  * @buffer: a #GstBuffer
572  *
573  * Handle an RTCP buffer for the stream. This method is usually called when a
574  * message has been received from a client using the TCP transport.
575  *
576  * This function takes ownership of @buffer.
577  *
578  * Returns: a GstFlowReturn.
579  */
580 GstFlowReturn
581 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
582 {
583   GstFlowReturn ret;
584
585   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
586
587   return ret;
588 }
589
590 /* Allocate the udp ports and sockets */
591 static gboolean
592 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
593 {
594   GstStateChangeReturn ret;
595   GstElement *udpsrc0, *udpsrc1;
596   GstElement *udpsink0, *udpsink1;
597   gint tmp_rtp, tmp_rtcp;
598   guint count;
599   gint rtpport, rtcpport, sockfd;
600   const gchar *host;
601
602   udpsrc0 = NULL;
603   udpsrc1 = NULL;
604   udpsink0 = NULL;
605   udpsink1 = NULL;
606   count = 0;
607
608   /* Start with random port */
609   tmp_rtp = 0;
610
611   if (media->is_ipv6)
612     host = "udp://[::0]";
613   else
614     host = "udp://0.0.0.0";
615
616   /* try to allocate 2 UDP ports, the RTP port should be an even
617    * number and the RTCP port should be the next (uneven) port */
618 again:
619   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
620   if (udpsrc0 == NULL)
621     goto no_udp_protocol;
622   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
623
624   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
625   if (ret == GST_STATE_CHANGE_FAILURE) {
626     if (tmp_rtp != 0) {
627       tmp_rtp += 2;
628       if (++count > 20)
629         goto no_ports;
630
631       gst_element_set_state (udpsrc0, GST_STATE_NULL);
632       gst_object_unref (udpsrc0);
633
634       goto again;
635     }
636     goto no_udp_protocol;
637   }
638
639   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
640
641   /* check if port is even */
642   if ((tmp_rtp & 1) != 0) {
643     /* port not even, close and allocate another */
644     if (++count > 20)
645       goto no_ports;
646
647     gst_element_set_state (udpsrc0, GST_STATE_NULL);
648     gst_object_unref (udpsrc0);
649
650     tmp_rtp++;
651     goto again;
652   }
653
654   /* allocate port+1 for RTCP now */
655   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
656   if (udpsrc1 == NULL)
657     goto no_udp_rtcp_protocol;
658
659   /* set port */
660   tmp_rtcp = tmp_rtp + 1;
661   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
662
663   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
664   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
665   if (ret == GST_STATE_CHANGE_FAILURE) {
666
667     if (++count > 20)
668       goto no_ports;
669
670     gst_element_set_state (udpsrc0, GST_STATE_NULL);
671     gst_object_unref (udpsrc0);
672
673     gst_element_set_state (udpsrc1, GST_STATE_NULL);
674     gst_object_unref (udpsrc1);
675
676     tmp_rtp += 2;
677     goto again;
678   }
679
680   /* all fine, do port check */
681   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
682   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
683
684   /* this should not happen... */
685   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
686     goto port_error;
687
688   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
689   if (!udpsink0)
690     goto no_udp_protocol;
691
692   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
693   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
694   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
695
696   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
697   if (!udpsink1)
698     goto no_udp_protocol;
699
700   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
701   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
702   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
703   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
704   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
705
706   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
707   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
708   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
709   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
710
711   /* we keep these elements, we configure all in configure_transport when the
712    * server told us to really use the UDP ports. */
713   stream->udpsrc[0] = udpsrc0;
714   stream->udpsrc[1] = udpsrc1;
715   stream->udpsink[0] = udpsink0;
716   stream->udpsink[1] = udpsink1;
717   stream->server_port.min = rtpport;
718   stream->server_port.max = rtcpport;
719
720   return TRUE;
721
722   /* ERRORS */
723 no_udp_protocol:
724   {
725     goto cleanup;
726   }
727 no_ports:
728   {
729     goto cleanup;
730   }
731 no_udp_rtcp_protocol:
732   {
733     goto cleanup;
734   }
735 port_error:
736   {
737     goto cleanup;
738   }
739 cleanup:
740   {
741     if (udpsrc0) {
742       gst_element_set_state (udpsrc0, GST_STATE_NULL);
743       gst_object_unref (udpsrc0);
744     }
745     if (udpsrc1) {
746       gst_element_set_state (udpsrc1, GST_STATE_NULL);
747       gst_object_unref (udpsrc1);
748     }
749     if (udpsink0) {
750       gst_element_set_state (udpsink0, GST_STATE_NULL);
751       gst_object_unref (udpsink0);
752     }
753     if (udpsink1) {
754       gst_element_set_state (udpsink1, GST_STATE_NULL);
755       gst_object_unref (udpsink1);
756     }
757     return FALSE;
758   }
759 }
760
761 static void
762 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
763 {
764   gchar *capsstr;
765   GstCaps *newcaps, *oldcaps;
766
767   if ((newcaps = GST_PAD_CAPS (pad)))
768     gst_caps_ref (newcaps);
769
770   oldcaps = stream->caps;
771   stream->caps = newcaps;
772
773   if (oldcaps)
774     gst_caps_unref (oldcaps);
775
776   capsstr = gst_caps_to_string (newcaps);
777   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
778   g_free (capsstr);
779 }
780
781 static void
782 dump_structure (const GstStructure * s)
783 {
784   gchar *sstr;
785
786   sstr = gst_structure_to_string (s);
787   GST_INFO ("structure: %s", sstr);
788   g_free (sstr);
789 }
790
791 static GstRTSPMediaTrans *
792 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
793 {
794   GList *walk;
795   GstRTSPMediaTrans *result = NULL;
796   const gchar *tmp;
797   gchar *dest;
798   guint port;
799
800   if (rtcp_from == NULL)
801     return NULL;
802
803   tmp = g_strrstr (rtcp_from, ":");
804   if (tmp == NULL)
805     return NULL;
806
807   port = atoi (tmp + 1);
808   dest = g_strndup (rtcp_from, tmp - rtcp_from);
809
810   GST_INFO ("finding %s:%d", dest, port);
811
812   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
813     GstRTSPMediaTrans *trans = walk->data;
814     gint min, max;
815
816     min = trans->transport->client_port.min;
817     max = trans->transport->client_port.max;
818
819     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
820             || max == port)) {
821       result = trans;
822       break;
823     }
824   }
825   g_free (dest);
826
827   return result;
828 }
829
830 static void
831 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
832 {
833   GstStructure *stats;
834   GstRTSPMediaTrans *trans;
835
836   GST_INFO ("%p: new source %p", stream, source);
837
838   /* see if we have a stream to match with the origin of the RTCP packet */
839   trans = g_object_get_qdata (source, ssrc_stream_map_key);
840   if (trans == NULL) {
841     g_object_get (source, "stats", &stats, NULL);
842     if (stats) {
843       const gchar *rtcp_from;
844
845       dump_structure (stats);
846
847       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
848       if ((trans = find_transport (stream, rtcp_from))) {
849         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
850             source);
851
852         /* keep ref to the source */
853         trans->rtpsource = source;
854
855         g_object_set_qdata (source, ssrc_stream_map_key, trans);
856       }
857       gst_structure_free (stats);
858     }
859   } else {
860     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
861   }
862 }
863
864 static void
865 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
866 {
867   GST_INFO ("%p: new SDES %p", stream, source);
868 }
869
870 static void
871 on_ssrc_active (GObject * session, GObject * source,
872     GstRTSPMediaStream * stream)
873 {
874   GstRTSPMediaTrans *trans;
875
876   trans = g_object_get_qdata (source, ssrc_stream_map_key);
877
878   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
879
880   if (trans && trans->keep_alive)
881     trans->keep_alive (trans->ka_user_data);
882
883 #ifdef DUMP_STATS
884   {
885     GstStructure *stats;
886     g_object_get (source, "stats", &stats, NULL);
887     if (stats) {
888       dump_structure (stats);
889       gst_structure_free (stats);
890     }
891   }
892 #endif
893 }
894
895 static void
896 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
897 {
898   GST_INFO ("%p: source %p bye", stream, source);
899 }
900
901 static void
902 on_bye_timeout (GObject * session, GObject * source,
903     GstRTSPMediaStream * stream)
904 {
905   GstRTSPMediaTrans *trans;
906
907   GST_INFO ("%p: source %p bye timeout", stream, source);
908
909   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
910     trans->rtpsource = NULL;
911     trans->timeout = TRUE;
912   }
913 }
914
915 static void
916 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
917 {
918   GstRTSPMediaTrans *trans;
919
920   GST_INFO ("%p: source %p timeout", stream, source);
921
922   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
923     trans->rtpsource = NULL;
924     trans->timeout = TRUE;
925   }
926 }
927
928 static GstFlowReturn
929 handle_new_buffer (GstAppSink * sink, gpointer user_data)
930 {
931   GList *walk;
932   GstBuffer *buffer;
933   GstRTSPMediaStream *stream;
934
935   buffer = gst_app_sink_pull_buffer (sink);
936   if (!buffer)
937     return GST_FLOW_OK;
938
939   stream = (GstRTSPMediaStream *) user_data;
940
941   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
942     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
943
944     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
945       if (tr->send_rtp)
946         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
947     } else {
948       if (tr->send_rtcp)
949         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
950     }
951   }
952   gst_buffer_unref (buffer);
953
954   return GST_FLOW_OK;
955 }
956
957 static GstAppSinkCallbacks sink_cb = {
958   NULL,                         /* not interested in EOS */
959   NULL,                         /* not interested in preroll buffers */
960   handle_new_buffer
961 };
962
963 /* prepare the pipeline objects to handle @stream in @media */
964 static gboolean
965 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
966 {
967   gchar *name;
968   GstPad *pad, *teepad, *selpad;
969   GstPadLinkReturn ret;
970   gint i;
971
972   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
973    * for sending RTP/RTCP. The sender and receiver ports are shared between the
974    * elements */
975   if (!alloc_udp_ports (media, stream))
976     return FALSE;
977
978   /* add the ports to the pipeline */
979   for (i = 0; i < 2; i++) {
980     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
981     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
982   }
983
984   /* create elements for the TCP transfer */
985   for (i = 0; i < 2; i++) {
986     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
987     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
988     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
989     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
990     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
991     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
992     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
993     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
994         &sink_cb, stream, NULL);
995   }
996
997   /* hook up the stream to the RTP session elements. */
998   name = g_strdup_printf ("send_rtp_sink_%d", idx);
999   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1000   g_free (name);
1001   name = g_strdup_printf ("send_rtp_src_%d", idx);
1002   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1003   g_free (name);
1004   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1005   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1006   g_free (name);
1007   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1008   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1009   g_free (name);
1010   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1011   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1012   g_free (name);
1013
1014   /* get the session */
1015   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1016       &stream->session);
1017
1018   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1019       stream);
1020   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1021       stream);
1022   g_signal_connect (stream->session, "on-ssrc-active",
1023       (GCallback) on_ssrc_active, stream);
1024   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1025       stream);
1026   g_signal_connect (stream->session, "on-bye-timeout",
1027       (GCallback) on_bye_timeout, stream);
1028   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1029       stream);
1030
1031   /* link the RTP pad to the session manager */
1032   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1033   if (ret != GST_PAD_LINK_OK)
1034     goto link_failed;
1035
1036   /* make tee for RTP and link to stream */
1037   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1038   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1039
1040   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1041   gst_pad_link (stream->send_rtp_src, pad);
1042   gst_object_unref (pad);
1043
1044   /* link RTP sink, we're pretty sure this will work. */
1045   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1046   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1047   gst_pad_link (teepad, pad);
1048   gst_object_unref (pad);
1049   gst_object_unref (teepad);
1050
1051   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1052   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1053   gst_pad_link (teepad, pad);
1054   gst_object_unref (pad);
1055   gst_object_unref (teepad);
1056
1057   /* make tee for RTCP */
1058   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1059   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1060
1061   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1062   gst_pad_link (stream->send_rtcp_src, pad);
1063   gst_object_unref (pad);
1064
1065   /* link RTCP elements */
1066   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1067   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1068   gst_pad_link (teepad, pad);
1069   gst_object_unref (pad);
1070   gst_object_unref (teepad);
1071
1072   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1073   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1074   gst_pad_link (teepad, pad);
1075   gst_object_unref (pad);
1076   gst_object_unref (teepad);
1077
1078   /* make selector for the RTP receivers */
1079   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1080   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1081   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1082
1083   pad = gst_element_get_static_pad (stream->selector[0], "src");
1084   gst_pad_link (pad, stream->recv_rtp_sink);
1085   gst_object_unref (pad);
1086
1087   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1088   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1089   gst_pad_link (pad, selpad);
1090   gst_object_unref (pad);
1091   gst_object_unref (selpad);
1092
1093   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1094   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1095   gst_pad_link (pad, selpad);
1096   gst_object_unref (pad);
1097   gst_object_unref (selpad);
1098
1099   /* make selector for the RTCP receivers */
1100   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1101   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1102   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1103
1104   pad = gst_element_get_static_pad (stream->selector[1], "src");
1105   gst_pad_link (pad, stream->recv_rtcp_sink);
1106   gst_object_unref (pad);
1107
1108   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1109   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1110   gst_pad_link (pad, selpad);
1111   gst_object_unref (pad);
1112   gst_object_unref (selpad);
1113
1114   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1115   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1116   gst_pad_link (pad, selpad);
1117   gst_object_unref (pad);
1118   gst_object_unref (selpad);
1119
1120   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1121    * values */
1122   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1123   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1124   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1125   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1126
1127   /* be notified of caps changes */
1128   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1129       (GCallback) caps_notify, stream);
1130
1131   stream->prepared = TRUE;
1132
1133   return TRUE;
1134
1135   /* ERRORS */
1136 link_failed:
1137   {
1138     GST_WARNING ("failed to link stream %d", idx);
1139     return FALSE;
1140   }
1141 }
1142
1143 static void
1144 unlock_streams (GstRTSPMedia * media)
1145 {
1146   guint i, n_streams;
1147
1148   /* unlock the udp src elements */
1149   n_streams = gst_rtsp_media_n_streams (media);
1150   for (i = 0; i < n_streams; i++) {
1151     GstRTSPMediaStream *stream;
1152
1153     stream = gst_rtsp_media_get_stream (media, i);
1154
1155     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1156     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1157   }
1158 }
1159
1160 static void
1161 gst_rtsp_media_set_status (GstRTSPMedia *media, GstRTSPMediaStatus status)
1162 {
1163   g_mutex_lock (media->lock);
1164   /* never overwrite the error status */
1165   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1166     media->status = status;
1167   GST_DEBUG ("setting new status to %d", status);
1168   g_cond_broadcast (media->cond);
1169   g_mutex_unlock (media->lock);
1170 }
1171
1172 static GstRTSPMediaStatus
1173 gst_rtsp_media_get_status (GstRTSPMedia *media)
1174 {
1175   GstRTSPMediaStatus result;
1176   GTimeVal timeout;
1177
1178   g_mutex_lock (media->lock);
1179   g_get_current_time (&timeout);
1180   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1181   /* while we are preparing, wait */
1182   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1183     GST_DEBUG ("waiting for status change");
1184     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1185       GST_DEBUG ("timeout, assuming error status");
1186       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1187     }
1188   }
1189   /* could be success or error */
1190   result = media->status;
1191   GST_DEBUG ("got status %d", result);
1192   g_mutex_unlock (media->lock);
1193
1194   return result;
1195 }
1196
1197 static gboolean
1198 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1199 {
1200   GstMessageType type;
1201
1202   type = GST_MESSAGE_TYPE (message);
1203
1204   switch (type) {
1205     case GST_MESSAGE_STATE_CHANGED:
1206       break;
1207     case GST_MESSAGE_BUFFERING:
1208     {
1209       gint percent;
1210
1211       gst_message_parse_buffering (message, &percent);
1212
1213       /* no state management needed for live pipelines */
1214       if (media->is_live)
1215         break;
1216
1217       if (percent == 100) {
1218         /* a 100% message means buffering is done */
1219         media->buffering = FALSE;
1220         /* if the desired state is playing, go back */
1221         if (media->target_state == GST_STATE_PLAYING) {
1222           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1223           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1224         } else {
1225           GST_INFO ("Buffering done");
1226         }
1227       } else {
1228         /* buffering busy */
1229         if (media->buffering == FALSE) {
1230           if (media->target_state == GST_STATE_PLAYING) {
1231             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1232             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1233             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1234           } else {
1235             GST_INFO ("Buffering ...");
1236           }
1237         }
1238         media->buffering = TRUE;
1239       }
1240       break;
1241     }
1242     case GST_MESSAGE_LATENCY:
1243     {
1244       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1245       break;
1246     }
1247     case GST_MESSAGE_ERROR:
1248     {
1249       GError *gerror;
1250       gchar *debug;
1251
1252       gst_message_parse_error (message, &gerror, &debug);
1253       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1254       g_error_free (gerror);
1255       g_free (debug);
1256
1257       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1258       break;
1259     }
1260     case GST_MESSAGE_WARNING:
1261     {
1262       GError *gerror;
1263       gchar *debug;
1264
1265       gst_message_parse_warning (message, &gerror, &debug);
1266       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1267       g_error_free (gerror);
1268       g_free (debug);
1269       break;
1270     }
1271     case GST_MESSAGE_ELEMENT:
1272       break;
1273     case GST_MESSAGE_STREAM_STATUS:
1274       break;
1275     case GST_MESSAGE_ASYNC_DONE:
1276       GST_INFO ("%p: got ASYNC_DONE", media);
1277       collect_media_stats (media);
1278
1279       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1280       break;
1281     default:
1282       GST_INFO ("%p: got message type %s", media,
1283           gst_message_type_get_name (type));
1284       break;
1285   }
1286   return TRUE;
1287 }
1288
1289 static gboolean
1290 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1291 {
1292   GstRTSPMediaClass *klass;
1293   gboolean ret;
1294
1295   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1296
1297   if (klass->handle_message)
1298     ret = klass->handle_message (media, message);
1299   else
1300     ret = FALSE;
1301
1302   return ret;
1303 }
1304
1305 static void
1306 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1307 {
1308   GstRTSPMediaStream *stream;
1309   gchar *name;
1310   gint i;
1311
1312   i = media->streams->len + 1;
1313
1314   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1315
1316   stream = g_new0 (GstRTSPMediaStream, 1);
1317   stream->payloader = element;
1318
1319   name = g_strdup_printf ("dynpay%d", i);
1320
1321   /* ghost the pad of the payloader to the element */
1322   stream->srcpad = gst_ghost_pad_new (name, pad);
1323   gst_pad_set_active (stream->srcpad, TRUE);
1324   gst_element_add_pad (media->element, stream->srcpad);
1325   g_free (name);
1326
1327   /* add stream now */
1328   g_array_append_val (media->streams, stream);
1329
1330   setup_stream (stream, i, media);
1331
1332   for (i = 0; i < 2; i++) {
1333     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1334     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1335     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1336     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1337     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1338   }
1339 }
1340
1341 static void
1342 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1343 {
1344   GST_INFO ("no more pads");
1345   if (media->fakesink) {
1346     gst_object_ref (media->fakesink);
1347     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1348     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1349     gst_object_unref (media->fakesink);
1350     media->fakesink = NULL;
1351     GST_INFO ("removed fakesink");
1352   }
1353 }
1354
1355 /**
1356  * gst_rtsp_media_prepare:
1357  * @obj: a #GstRTSPMedia
1358  *
1359  * Prepare @media for streaming. This function will create the pipeline and
1360  * other objects to manage the streaming.
1361  *
1362  * It will preroll the pipeline and collect vital information about the streams
1363  * such as the duration.
1364  *
1365  * Returns: %TRUE on success.
1366  */
1367 gboolean
1368 gst_rtsp_media_prepare (GstRTSPMedia * media)
1369 {
1370   GstStateChangeReturn ret;
1371   GstRTSPMediaStatus status;
1372   guint i, n_streams;
1373   GstRTSPMediaClass *klass;
1374   GstBus *bus;
1375   GList *walk;
1376
1377   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1378     goto was_prepared;
1379
1380   if (!media->reusable && media->reused)
1381     goto is_reused;
1382
1383   GST_INFO ("preparing media %p", media);
1384
1385   /* reset some variables */
1386   media->is_live = FALSE;
1387   media->buffering = FALSE;
1388   /* we're preparing now */
1389   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1390
1391   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1392
1393   /* add the pipeline bus to our custom mainloop */
1394   media->source = gst_bus_create_watch (bus);
1395   gst_object_unref (bus);
1396
1397   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1398
1399   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1400   media->id = g_source_attach (media->source, klass->context);
1401
1402   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1403
1404   /* add stuff to the bin */
1405   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1406
1407   /* link streams we already have, other streams might appear when we have
1408    * dynamic elements */
1409   n_streams = gst_rtsp_media_n_streams (media);
1410   for (i = 0; i < n_streams; i++) {
1411     GstRTSPMediaStream *stream;
1412
1413     stream = gst_rtsp_media_get_stream (media, i);
1414
1415     setup_stream (stream, i, media);
1416   }
1417
1418   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1419     GstElement *elem = walk->data;
1420
1421     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1422     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1423
1424     /* we add a fakesink here in order to make the state change async. We remove
1425      * the fakesink again in the no-more-pads callback. */
1426     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1427     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1428   }
1429
1430   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1431   /* first go to PAUSED */
1432   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1433   media->target_state = GST_STATE_PAUSED;
1434
1435   switch (ret) {
1436     case GST_STATE_CHANGE_SUCCESS:
1437       GST_INFO ("SUCCESS state change for media %p", media);
1438       break;
1439     case GST_STATE_CHANGE_ASYNC:
1440       GST_INFO ("ASYNC state change for media %p", media);
1441       break;
1442     case GST_STATE_CHANGE_NO_PREROLL:
1443       /* we need to go to PLAYING */
1444       GST_INFO ("NO_PREROLL state change: live media %p", media);
1445       media->is_live = TRUE;
1446       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1447       if (ret == GST_STATE_CHANGE_FAILURE)
1448         goto state_failed;
1449       break;
1450     case GST_STATE_CHANGE_FAILURE:
1451       goto state_failed;
1452   }
1453
1454   /* now wait for all pads to be prerolled */
1455   status = gst_rtsp_media_get_status (media);
1456   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1457     goto state_failed;
1458
1459   GST_INFO ("object %p is prerolled", media);
1460
1461   return TRUE;
1462
1463   /* OK */
1464 was_prepared:
1465   {
1466     return TRUE;
1467   }
1468   /* ERRORS */
1469 is_reused:
1470   {
1471     GST_WARNING ("can not reuse media %p", media);
1472     return FALSE;
1473   }
1474 state_failed:
1475   {
1476     GST_WARNING ("failed to preroll pipeline");
1477     unlock_streams (media);
1478     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1479     gst_rtsp_media_unprepare (media);
1480     return FALSE;
1481   }
1482 }
1483
1484 /**
1485  * gst_rtsp_media_unprepare:
1486  * @obj: a #GstRTSPMedia
1487  *
1488  * Unprepare @media. After this call, the media should be prepared again before
1489  * it can be used again. If the media is set to be non-reusable, a new instance
1490  * must be created.
1491  *
1492  * Returns: %TRUE on success.
1493  */
1494 gboolean
1495 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1496 {
1497   GstRTSPMediaClass *klass;
1498   gboolean success;
1499
1500   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1501     return TRUE;
1502
1503   GST_INFO ("unprepare media %p", media);
1504   media->target_state = GST_STATE_NULL;
1505
1506   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1507   if (klass->unprepare)
1508     success = klass->unprepare (media);
1509   else
1510     success = TRUE;
1511
1512   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1513   media->reused = TRUE;
1514
1515   /* when the media is not reusable, this will effectively unref the media and
1516    * recreate it */
1517   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1518
1519   return success;
1520 }
1521
1522 static gboolean
1523 default_unprepare (GstRTSPMedia * media)
1524 {
1525   gst_element_set_state (media->pipeline, GST_STATE_NULL);
1526
1527   return TRUE;
1528 }
1529
1530 /**
1531  * gst_rtsp_media_set_state:
1532  * @media: a #GstRTSPMedia
1533  * @state: the target state of the media
1534  * @transports: a GArray of #GstRTSPMediaTrans pointers
1535  *
1536  * Set the state of @media to @state and for the transports in @transports.
1537  *
1538  * Returns: %TRUE on success.
1539  */
1540 gboolean
1541 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1542     GArray * transports)
1543 {
1544   gint i;
1545   GstStateChangeReturn ret;
1546   gboolean add, remove, do_state;
1547   gint old_active;
1548
1549   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1550   g_return_val_if_fail (transports != NULL, FALSE);
1551
1552   /* NULL and READY are the same */
1553   if (state == GST_STATE_READY)
1554     state = GST_STATE_NULL;
1555
1556   add = remove = FALSE;
1557
1558   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1559       media);
1560
1561   switch (state) {
1562     case GST_STATE_NULL:
1563       /* unlock the streams so that they follow the state changes from now on */
1564       unlock_streams (media);
1565       /* fallthrough */
1566     case GST_STATE_PAUSED:
1567       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1568       if (media->target_state == GST_STATE_PLAYING)
1569         remove = TRUE;
1570       break;
1571     case GST_STATE_PLAYING:
1572       /* we're going to PLAYING, add */
1573       add = TRUE;
1574       break;
1575     default:
1576       break;
1577   }
1578   old_active = media->active;
1579
1580   for (i = 0; i < transports->len; i++) {
1581     GstRTSPMediaTrans *tr;
1582     GstRTSPMediaStream *stream;
1583     GstRTSPTransport *trans;
1584
1585     /* we need a non-NULL entry in the array */
1586     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1587     if (tr == NULL)
1588       continue;
1589
1590     /* we need a transport */
1591     if (!(trans = tr->transport))
1592       continue;
1593
1594     /* get the stream and add the destinations */
1595     stream = gst_rtsp_media_get_stream (media, tr->idx);
1596     switch (trans->lower_transport) {
1597       case GST_RTSP_LOWER_TRANS_UDP:
1598       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1599       {
1600         gchar *dest;
1601         gint min, max;
1602
1603         dest = trans->destination;
1604         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1605           min = trans->port.min;
1606           max = trans->port.max;
1607         } else {
1608           min = trans->client_port.min;
1609           max = trans->client_port.max;
1610         }
1611
1612         if (add && !tr->active) {
1613           GST_INFO ("adding %s:%d-%d", dest, min, max);
1614           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1615           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1616           stream->transports = g_list_prepend (stream->transports, tr);
1617           tr->active = TRUE;
1618           media->active++;
1619         } else if (remove && tr->active) {
1620           GST_INFO ("removing %s:%d-%d", dest, min, max);
1621           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1622           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1623           stream->transports = g_list_remove (stream->transports, tr);
1624           tr->active = FALSE;
1625           media->active--;
1626         }
1627         break;
1628       }
1629       case GST_RTSP_LOWER_TRANS_TCP:
1630         if (add && !tr->active) {
1631           GST_INFO ("adding TCP %s", trans->destination);
1632           stream->transports = g_list_prepend (stream->transports, tr);
1633           tr->active = TRUE;
1634           media->active++;
1635         } else if (remove && tr->active) {
1636           GST_INFO ("removing TCP %s", trans->destination);
1637           stream->transports = g_list_remove (stream->transports, tr);
1638           tr->active = FALSE;
1639           media->active--;
1640         }
1641         break;
1642       default:
1643         GST_INFO ("Unknown transport %d", trans->lower_transport);
1644         break;
1645     }
1646   }
1647
1648   /* we just added the first media, do the playing state change */
1649   if (old_active == 0 && add)
1650     do_state = TRUE;
1651   /* if we have no more active media, do the downward state changes */
1652   else if (media->active == 0)
1653     do_state = TRUE;
1654   else
1655     do_state = FALSE;
1656
1657   GST_INFO ("active %d media %p", media->active, media);
1658
1659   if (do_state && media->target_state != state) {
1660     if (state == GST_STATE_NULL) {
1661       gst_rtsp_media_unprepare (media);
1662     } else {
1663       GST_INFO ("state %s media %p", gst_element_state_get_name (state), media);
1664       media->target_state = state;
1665       ret = gst_element_set_state (media->pipeline, state);
1666     }
1667   }
1668
1669   /* remember where we are */
1670   if (state == GST_STATE_PAUSED)
1671     collect_media_stats (media);
1672
1673   return TRUE;
1674 }
1675
1676 /**
1677  * gst_rtsp_media_remove_elements:
1678  * @media: a #GstRTSPMedia
1679  *
1680  * Remove all elements and the pipeline controlled by @media.
1681  */
1682 void
1683 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1684 {
1685   gint i, j;
1686
1687   unlock_streams (media);
1688
1689   for (i = 0; i < media->streams->len; i++) {
1690     GstRTSPMediaStream *stream;
1691
1692     GST_INFO ("Removing elements of stream %d from pipeline", i);
1693
1694     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1695
1696     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
1697
1698     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
1699
1700     for (j = 0; j < 2; j++) {
1701       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
1702       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
1703       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
1704       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
1705       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
1706       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
1707
1708       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
1709       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
1710       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
1711       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
1712       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
1713       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
1714     }
1715     if (stream->caps)
1716       gst_caps_unref (stream->caps);
1717     stream->caps = NULL;
1718     gst_rtsp_media_stream_free (stream);
1719   }
1720   g_array_remove_range (media->streams, 0, media->streams->len);
1721
1722   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
1723   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
1724
1725   gst_object_unref (media->pipeline);
1726   media->pipeline = NULL;
1727 }