rtsp-server: Run gst-indent
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gst/app/gstappsrc.h>
24 #include <gst/app/gstappsink.h>
25
26 #include "rtsp-media.h"
27
28 #define DEFAULT_SHARED         FALSE
29 #define DEFAULT_REUSABLE       FALSE
30 #define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
31 //#define DEFAULT_PROTOCOLS      GST_RTSP_LOWER_TRANS_UDP_MCAST
32 #define DEFAULT_EOS_SHUTDOWN   FALSE
33
34 /* define to dump received RTCP packets */
35 #undef DUMP_STATS
36
37 enum
38 {
39   PROP_0,
40   PROP_SHARED,
41   PROP_REUSABLE,
42   PROP_PROTOCOLS,
43   PROP_EOS_SHUTDOWN,
44   PROP_LAST
45 };
46
47 enum
48 {
49   SIGNAL_UNPREPARED,
50   SIGNAL_LAST
51 };
52
53 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
54 #define GST_CAT_DEFAULT rtsp_media_debug
55
56 static GQuark ssrc_stream_map_key;
57
58 static void gst_rtsp_media_get_property (GObject * object, guint propid,
59     GValue * value, GParamSpec * pspec);
60 static void gst_rtsp_media_set_property (GObject * object, guint propid,
61     const GValue * value, GParamSpec * pspec);
62 static void gst_rtsp_media_finalize (GObject * obj);
63
64 static gpointer do_loop (GstRTSPMediaClass * klass);
65 static gboolean default_handle_message (GstRTSPMedia * media,
66     GstMessage * message);
67 static gboolean default_unprepare (GstRTSPMedia * media);
68 static void unlock_streams (GstRTSPMedia * media);
69
70 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
71
72 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
73
74 static void
75 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
76 {
77   GObjectClass *gobject_class;
78   GError *error = NULL;
79
80   gobject_class = G_OBJECT_CLASS (klass);
81
82   gobject_class->get_property = gst_rtsp_media_get_property;
83   gobject_class->set_property = gst_rtsp_media_set_property;
84   gobject_class->finalize = gst_rtsp_media_finalize;
85
86   g_object_class_install_property (gobject_class, PROP_SHARED,
87       g_param_spec_boolean ("shared", "Shared",
88           "If this media pipeline can be shared", DEFAULT_SHARED,
89           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
90
91   g_object_class_install_property (gobject_class, PROP_REUSABLE,
92       g_param_spec_boolean ("reusable", "Reusable",
93           "If this media pipeline can be reused after an unprepare",
94           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
95
96   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
97       g_param_spec_flags ("protocols", "Protocols",
98           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
99           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100
101   g_object_class_install_property (gobject_class, PROP_EOS_SHUTDOWN,
102       g_param_spec_boolean ("eos-shutdown", "EOS Shutdown",
103           "Send an EOS event to the pipeline before unpreparing",
104           DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105
106   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
107       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
108       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
109       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
110
111   klass->context = g_main_context_new ();
112   klass->loop = g_main_loop_new (klass->context, TRUE);
113
114   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
115   if (error != NULL) {
116     g_critical ("could not start bus thread: %s", error->message);
117   }
118   klass->handle_message = default_handle_message;
119   klass->unprepare = default_unprepare;
120
121   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
122 }
123
124 static void
125 gst_rtsp_media_init (GstRTSPMedia * media)
126 {
127   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
128   media->lock = g_mutex_new ();
129   media->cond = g_cond_new ();
130
131   media->shared = DEFAULT_SHARED;
132   media->reusable = DEFAULT_REUSABLE;
133   media->protocols = DEFAULT_PROTOCOLS;
134   media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
135 }
136
137 /* FIXME. this should be done in multiudpsink */
138 typedef struct
139 {
140   gint count;
141   gchar *dest;
142   gint min, max;
143 } RTSPDestination;
144
145 static gint
146 dest_compare (RTSPDestination * a, RTSPDestination * b)
147 {
148   if ((a->min == b->min) && (a->max == b->max)
149       && (strcmp (a->dest, b->dest) == 0))
150     return 0;
151
152   return 1;
153 }
154
155 static RTSPDestination *
156 create_destination (const gchar * dest, gint min, gint max)
157 {
158   RTSPDestination *res;
159
160   res = g_slice_new (RTSPDestination);
161   res->count = 1;
162   res->dest = g_strdup (dest);
163   res->min = min;
164   res->max = max;
165
166   return res;
167 }
168
169 static void
170 free_destination (RTSPDestination * dest)
171 {
172   g_free (dest->dest);
173   g_slice_free (RTSPDestination, dest);
174 }
175
176 void
177 gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans)
178 {
179   if (trans->transport) {
180     gst_rtsp_transport_free (trans->transport);
181     trans->transport = NULL;
182   }
183   if (trans->rtpsource) {
184     g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL);
185     trans->rtpsource = NULL;
186   }
187 }
188
189 static void
190 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
191 {
192   if (stream->session)
193     g_object_unref (stream->session);
194
195   if (stream->caps)
196     gst_caps_unref (stream->caps);
197
198   if (stream->send_rtp_sink)
199     gst_object_unref (stream->send_rtp_sink);
200   if (stream->send_rtp_src)
201     gst_object_unref (stream->send_rtp_src);
202   if (stream->send_rtcp_src)
203     gst_object_unref (stream->send_rtcp_src);
204   if (stream->recv_rtcp_sink)
205     gst_object_unref (stream->recv_rtcp_sink);
206   if (stream->recv_rtp_sink)
207     gst_object_unref (stream->recv_rtp_sink);
208
209   g_list_free (stream->transports);
210
211   g_list_foreach (stream->destinations, (GFunc) free_destination, NULL);
212   g_list_free (stream->destinations);
213
214   g_free (stream);
215 }
216
217 static void
218 gst_rtsp_media_finalize (GObject * obj)
219 {
220   GstRTSPMedia *media;
221   guint i;
222
223   media = GST_RTSP_MEDIA (obj);
224
225   GST_INFO ("finalize media %p", media);
226
227   if (media->pipeline) {
228     unlock_streams (media);
229     gst_element_set_state (media->pipeline, GST_STATE_NULL);
230     gst_object_unref (media->pipeline);
231   }
232
233   for (i = 0; i < media->streams->len; i++) {
234     GstRTSPMediaStream *stream;
235
236     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
237
238     gst_rtsp_media_stream_free (stream);
239   }
240   g_array_free (media->streams, TRUE);
241
242   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
243   g_list_free (media->dynamic);
244
245   if (media->source) {
246     g_source_destroy (media->source);
247     g_source_unref (media->source);
248   }
249   g_mutex_free (media->lock);
250   g_cond_free (media->cond);
251
252   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
253 }
254
255 static void
256 gst_rtsp_media_get_property (GObject * object, guint propid,
257     GValue * value, GParamSpec * pspec)
258 {
259   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
260
261   switch (propid) {
262     case PROP_SHARED:
263       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
264       break;
265     case PROP_REUSABLE:
266       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
267       break;
268     case PROP_PROTOCOLS:
269       g_value_set_flags (value, gst_rtsp_media_get_protocols (media));
270       break;
271     case PROP_EOS_SHUTDOWN:
272       g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
273       break;
274     default:
275       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
276   }
277 }
278
279 static void
280 gst_rtsp_media_set_property (GObject * object, guint propid,
281     const GValue * value, GParamSpec * pspec)
282 {
283   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
284
285   switch (propid) {
286     case PROP_SHARED:
287       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
288       break;
289     case PROP_REUSABLE:
290       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
291       break;
292     case PROP_PROTOCOLS:
293       gst_rtsp_media_set_protocols (media, g_value_get_flags (value));
294       break;
295     case PROP_EOS_SHUTDOWN:
296       gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
297       break;
298     default:
299       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
300   }
301 }
302
303 static gpointer
304 do_loop (GstRTSPMediaClass * klass)
305 {
306   GST_INFO ("enter mainloop");
307   g_main_loop_run (klass->loop);
308   GST_INFO ("exit mainloop");
309
310   return NULL;
311 }
312
313 static void
314 collect_media_stats (GstRTSPMedia * media)
315 {
316   GstFormat format;
317   gint64 position, duration;
318
319   media->range.unit = GST_RTSP_RANGE_NPT;
320
321   if (media->is_live) {
322     media->range.min.type = GST_RTSP_TIME_NOW;
323     media->range.min.seconds = -1;
324     media->range.max.type = GST_RTSP_TIME_END;
325     media->range.max.seconds = -1;
326   } else {
327     /* get the position */
328     format = GST_FORMAT_TIME;
329     if (!gst_element_query_position (media->pipeline, &format, &position)) {
330       GST_INFO ("position query failed");
331       position = 0;
332     }
333
334     /* get the duration */
335     format = GST_FORMAT_TIME;
336     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
337       GST_INFO ("duration query failed");
338       duration = -1;
339     }
340
341     GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
342         GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
343
344     if (position == -1) {
345       media->range.min.type = GST_RTSP_TIME_NOW;
346       media->range.min.seconds = -1;
347     } else {
348       media->range.min.type = GST_RTSP_TIME_SECONDS;
349       media->range.min.seconds = ((gdouble) position) / GST_SECOND;
350     }
351     if (duration == -1) {
352       media->range.max.type = GST_RTSP_TIME_END;
353       media->range.max.seconds = -1;
354     } else {
355       media->range.max.type = GST_RTSP_TIME_SECONDS;
356       media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
357     }
358   }
359 }
360
361 /**
362  * gst_rtsp_media_new:
363  *
364  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
365  * element to produde RTP data for one or more related (audio/video/..) 
366  * streams.
367  *
368  * Returns: a new #GstRTSPMedia object.
369  */
370 GstRTSPMedia *
371 gst_rtsp_media_new (void)
372 {
373   GstRTSPMedia *result;
374
375   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
376
377   return result;
378 }
379
380 /**
381  * gst_rtsp_media_set_shared:
382  * @media: a #GstRTSPMedia
383  * @shared: the new value
384  *
385  * Set or unset if the pipeline for @media can be shared will multiple clients.
386  * When @shared is %TRUE, client requests for this media will share the media
387  * pipeline.
388  */
389 void
390 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
391 {
392   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
393
394   media->shared = shared;
395 }
396
397 /**
398  * gst_rtsp_media_is_shared:
399  * @media: a #GstRTSPMedia
400  *
401  * Check if the pipeline for @media can be shared between multiple clients.
402  *
403  * Returns: %TRUE if the media can be shared between clients.
404  */
405 gboolean
406 gst_rtsp_media_is_shared (GstRTSPMedia * media)
407 {
408   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
409
410   return media->shared;
411 }
412
413 /**
414  * gst_rtsp_media_set_reusable:
415  * @media: a #GstRTSPMedia
416  * @reusable: the new value
417  *
418  * Set or unset if the pipeline for @media can be reused after the pipeline has
419  * been unprepared.
420  */
421 void
422 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
423 {
424   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
425
426   media->reusable = reusable;
427 }
428
429 /**
430  * gst_rtsp_media_is_reusable:
431  * @media: a #GstRTSPMedia
432  *
433  * Check if the pipeline for @media can be reused after an unprepare.
434  *
435  * Returns: %TRUE if the media can be reused
436  */
437 gboolean
438 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
439 {
440   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
441
442   return media->reusable;
443 }
444
445 /**
446  * gst_rtsp_media_set_protocols:
447  * @media: a #GstRTSPMedia
448  * @protocols: the new flags
449  *
450  * Configure the allowed lower transport for @media.
451  */
452 void
453 gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols)
454 {
455   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
456
457   media->protocols = protocols;
458 }
459
460 /**
461  * gst_rtsp_media_get_protocols:
462  * @media: a #GstRTSPMedia
463  *
464  * Get the allowed protocols of @media.
465  *
466  * Returns: a #GstRTSPLowerTrans
467  */
468 GstRTSPLowerTrans
469 gst_rtsp_media_get_protocols (GstRTSPMedia * media)
470 {
471   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media),
472       GST_RTSP_LOWER_TRANS_UNKNOWN);
473
474   return media->protocols;
475 }
476
477 /**
478  * gst_rtsp_media_set_eos_shutdown:
479  * @media: a #GstRTSPMedia
480  * @eos_shutdown: the new value
481  *
482  * Set or unset if an EOS event will be sent to the pipeline for @media before
483  * it is unprepared.
484  */
485 void
486 gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown)
487 {
488   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
489
490   media->eos_shutdown = eos_shutdown;
491 }
492
493 /**
494  * gst_rtsp_media_is_eos_shutdown:
495  * @media: a #GstRTSPMedia
496  *
497  * Check if the pipeline for @media will send an EOS down the pipeline before
498  * unpreparing.
499  *
500  * Returns: %TRUE if the media will send EOS before unpreparing.
501  */
502 gboolean
503 gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media)
504 {
505   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
506
507   return media->eos_shutdown;
508 }
509
510 /**
511  * gst_rtsp_media_n_streams:
512  * @media: a #GstRTSPMedia
513  *
514  * Get the number of streams in this media.
515  *
516  * Returns: The number of streams.
517  */
518 guint
519 gst_rtsp_media_n_streams (GstRTSPMedia * media)
520 {
521   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
522
523   return media->streams->len;
524 }
525
526 /**
527  * gst_rtsp_media_get_stream:
528  * @media: a #GstRTSPMedia
529  * @idx: the stream index
530  *
531  * Retrieve the stream with index @idx from @media.
532  *
533  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
534  * that index did not exist.
535  */
536 GstRTSPMediaStream *
537 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
538 {
539   GstRTSPMediaStream *res;
540
541   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
542
543   if (idx < media->streams->len)
544     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
545   else
546     res = NULL;
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_media_seek:
553  * @media: a #GstRTSPMedia
554  * @range: a #GstRTSPTimeRange
555  *
556  * Seek the pipeline to @range.
557  *
558  * Returns: %TRUE on success.
559  */
560 gboolean
561 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
562 {
563   GstSeekFlags flags;
564   gboolean res;
565   gint64 start, stop;
566   GstSeekType start_type, stop_type;
567
568   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
569   g_return_val_if_fail (range != NULL, FALSE);
570
571   if (range->unit != GST_RTSP_RANGE_NPT)
572     goto not_supported;
573
574   /* depends on the current playing state of the pipeline. We might need to
575    * queue this until we get EOS. */
576   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
577
578   start_type = stop_type = GST_SEEK_TYPE_NONE;
579
580   switch (range->min.type) {
581     case GST_RTSP_TIME_NOW:
582       start = -1;
583       break;
584     case GST_RTSP_TIME_SECONDS:
585       /* only seek when something changed */
586       if (media->range.min.seconds == range->min.seconds) {
587         start = -1;
588       } else {
589         start = range->min.seconds * GST_SECOND;
590         start_type = GST_SEEK_TYPE_SET;
591       }
592       break;
593     case GST_RTSP_TIME_END:
594     default:
595       goto weird_type;
596   }
597   switch (range->max.type) {
598     case GST_RTSP_TIME_SECONDS:
599       /* only seek when something changed */
600       if (media->range.max.seconds == range->max.seconds) {
601         stop = -1;
602       } else {
603         stop = range->max.seconds * GST_SECOND;
604         stop_type = GST_SEEK_TYPE_SET;
605       }
606       break;
607     case GST_RTSP_TIME_END:
608       stop = -1;
609       stop_type = GST_SEEK_TYPE_SET;
610       break;
611     case GST_RTSP_TIME_NOW:
612     default:
613       goto weird_type;
614   }
615
616   if (start != -1 || stop != -1) {
617     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
618         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
619
620     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
621         flags, start_type, start, stop_type, stop);
622
623     /* and block for the seek to complete */
624     GST_INFO ("done seeking %d", res);
625     gst_element_get_state (media->pipeline, NULL, NULL, -1);
626     GST_INFO ("prerolled again");
627
628     collect_media_stats (media);
629   } else {
630     GST_INFO ("no seek needed");
631     res = TRUE;
632   }
633
634   return res;
635
636   /* ERRORS */
637 not_supported:
638   {
639     GST_WARNING ("seek unit %d not supported", range->unit);
640     return FALSE;
641   }
642 weird_type:
643   {
644     GST_WARNING ("weird range type %d not supported", range->min.type);
645     return FALSE;
646   }
647 }
648
649 /**
650  * gst_rtsp_media_stream_rtp:
651  * @stream: a #GstRTSPMediaStream
652  * @buffer: a #GstBuffer
653  *
654  * Handle an RTP buffer for the stream. This method is usually called when a
655  * message has been received from a client using the TCP transport.
656  *
657  * This function takes ownership of @buffer.
658  *
659  * Returns: a GstFlowReturn.
660  */
661 GstFlowReturn
662 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
663 {
664   GstFlowReturn ret;
665
666   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
667
668   return ret;
669 }
670
671 /**
672  * gst_rtsp_media_stream_rtcp:
673  * @stream: a #GstRTSPMediaStream
674  * @buffer: a #GstBuffer
675  *
676  * Handle an RTCP buffer for the stream. This method is usually called when a
677  * message has been received from a client using the TCP transport.
678  *
679  * This function takes ownership of @buffer.
680  *
681  * Returns: a GstFlowReturn.
682  */
683 GstFlowReturn
684 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
685 {
686   GstFlowReturn ret;
687
688   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
689
690   return ret;
691 }
692
693 /* Allocate the udp ports and sockets */
694 static gboolean
695 alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
696 {
697   GstStateChangeReturn ret;
698   GstElement *udpsrc0, *udpsrc1;
699   GstElement *udpsink0, *udpsink1;
700   gint tmp_rtp, tmp_rtcp;
701   guint count;
702   gint rtpport, rtcpport, sockfd;
703   const gchar *host;
704
705   udpsrc0 = NULL;
706   udpsrc1 = NULL;
707   udpsink0 = NULL;
708   udpsink1 = NULL;
709   count = 0;
710
711   /* Start with random port */
712   tmp_rtp = 0;
713
714   if (media->is_ipv6)
715     host = "udp://[::0]";
716   else
717     host = "udp://0.0.0.0";
718
719   /* try to allocate 2 UDP ports, the RTP port should be an even
720    * number and the RTCP port should be the next (uneven) port */
721 again:
722   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
723   if (udpsrc0 == NULL)
724     goto no_udp_protocol;
725   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
726
727   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
728   if (ret == GST_STATE_CHANGE_FAILURE) {
729     if (tmp_rtp != 0) {
730       tmp_rtp += 2;
731       if (++count > 20)
732         goto no_ports;
733
734       gst_element_set_state (udpsrc0, GST_STATE_NULL);
735       gst_object_unref (udpsrc0);
736
737       goto again;
738     }
739     goto no_udp_protocol;
740   }
741
742   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
743
744   /* check if port is even */
745   if ((tmp_rtp & 1) != 0) {
746     /* port not even, close and allocate another */
747     if (++count > 20)
748       goto no_ports;
749
750     gst_element_set_state (udpsrc0, GST_STATE_NULL);
751     gst_object_unref (udpsrc0);
752
753     tmp_rtp++;
754     goto again;
755   }
756
757   /* allocate port+1 for RTCP now */
758   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL);
759   if (udpsrc1 == NULL)
760     goto no_udp_rtcp_protocol;
761
762   /* set port */
763   tmp_rtcp = tmp_rtp + 1;
764   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
765
766   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
767   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
768   if (ret == GST_STATE_CHANGE_FAILURE) {
769
770     if (++count > 20)
771       goto no_ports;
772
773     gst_element_set_state (udpsrc0, GST_STATE_NULL);
774     gst_object_unref (udpsrc0);
775
776     gst_element_set_state (udpsrc1, GST_STATE_NULL);
777     gst_object_unref (udpsrc1);
778
779     tmp_rtp += 2;
780     goto again;
781   }
782
783   /* all fine, do port check */
784   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
785   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
786
787   /* this should not happen... */
788   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
789     goto port_error;
790
791   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
792   if (!udpsink0)
793     goto no_udp_protocol;
794
795   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
796   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
797   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
798
799   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
800   if (!udpsink1)
801     goto no_udp_protocol;
802
803   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
804           "send-duplicates")) {
805     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
806     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
807     stream->filter_duplicates = FALSE;
808   } else {
809     GST_WARNING ("multiudpsink version found without send-duplicates property");
810     stream->filter_duplicates = TRUE;
811   }
812
813   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
814   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
815   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
816   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
817   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
818
819   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
820   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
821   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
822   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
823
824   /* we keep these elements, we configure all in configure_transport when the
825    * server told us to really use the UDP ports. */
826   stream->udpsrc[0] = udpsrc0;
827   stream->udpsrc[1] = udpsrc1;
828   stream->udpsink[0] = udpsink0;
829   stream->udpsink[1] = udpsink1;
830   stream->server_port.min = rtpport;
831   stream->server_port.max = rtcpport;
832
833   return TRUE;
834
835   /* ERRORS */
836 no_udp_protocol:
837   {
838     goto cleanup;
839   }
840 no_ports:
841   {
842     goto cleanup;
843   }
844 no_udp_rtcp_protocol:
845   {
846     goto cleanup;
847   }
848 port_error:
849   {
850     goto cleanup;
851   }
852 cleanup:
853   {
854     if (udpsrc0) {
855       gst_element_set_state (udpsrc0, GST_STATE_NULL);
856       gst_object_unref (udpsrc0);
857     }
858     if (udpsrc1) {
859       gst_element_set_state (udpsrc1, GST_STATE_NULL);
860       gst_object_unref (udpsrc1);
861     }
862     if (udpsink0) {
863       gst_element_set_state (udpsink0, GST_STATE_NULL);
864       gst_object_unref (udpsink0);
865     }
866     if (udpsink1) {
867       gst_element_set_state (udpsink1, GST_STATE_NULL);
868       gst_object_unref (udpsink1);
869     }
870     return FALSE;
871   }
872 }
873
874 /* executed from streaming thread */
875 static void
876 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
877 {
878   gchar *capsstr;
879   GstCaps *newcaps, *oldcaps;
880
881   if ((newcaps = GST_PAD_CAPS (pad)))
882     gst_caps_ref (newcaps);
883
884   oldcaps = stream->caps;
885   stream->caps = newcaps;
886
887   if (oldcaps)
888     gst_caps_unref (oldcaps);
889
890   capsstr = gst_caps_to_string (newcaps);
891   GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
892   g_free (capsstr);
893 }
894
895 static void
896 dump_structure (const GstStructure * s)
897 {
898   gchar *sstr;
899
900   sstr = gst_structure_to_string (s);
901   GST_INFO ("structure: %s", sstr);
902   g_free (sstr);
903 }
904
905 static GstRTSPMediaTrans *
906 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
907 {
908   GList *walk;
909   GstRTSPMediaTrans *result = NULL;
910   const gchar *tmp;
911   gchar *dest;
912   guint port;
913
914   if (rtcp_from == NULL)
915     return NULL;
916
917   tmp = g_strrstr (rtcp_from, ":");
918   if (tmp == NULL)
919     return NULL;
920
921   port = atoi (tmp + 1);
922   dest = g_strndup (rtcp_from, tmp - rtcp_from);
923
924   GST_INFO ("finding %s:%d", dest, port);
925
926   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
927     GstRTSPMediaTrans *trans = walk->data;
928     gint min, max;
929
930     min = trans->transport->client_port.min;
931     max = trans->transport->client_port.max;
932
933     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
934             || max == port)) {
935       result = trans;
936       break;
937     }
938   }
939   g_free (dest);
940
941   return result;
942 }
943
944 static void
945 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
946 {
947   GstStructure *stats;
948   GstRTSPMediaTrans *trans;
949
950   GST_INFO ("%p: new source %p", stream, source);
951
952   /* see if we have a stream to match with the origin of the RTCP packet */
953   trans = g_object_get_qdata (source, ssrc_stream_map_key);
954   if (trans == NULL) {
955     g_object_get (source, "stats", &stats, NULL);
956     if (stats) {
957       const gchar *rtcp_from;
958
959       dump_structure (stats);
960
961       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
962       if ((trans = find_transport (stream, rtcp_from))) {
963         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
964             source);
965
966         /* keep ref to the source */
967         trans->rtpsource = source;
968
969         g_object_set_qdata (source, ssrc_stream_map_key, trans);
970       }
971       gst_structure_free (stats);
972     }
973   } else {
974     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
975   }
976 }
977
978 static void
979 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
980 {
981   GST_INFO ("%p: new SDES %p", stream, source);
982 }
983
984 static void
985 on_ssrc_active (GObject * session, GObject * source,
986     GstRTSPMediaStream * stream)
987 {
988   GstRTSPMediaTrans *trans;
989
990   trans = g_object_get_qdata (source, ssrc_stream_map_key);
991
992   GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
993
994   if (trans && trans->keep_alive)
995     trans->keep_alive (trans->ka_user_data);
996
997 #ifdef DUMP_STATS
998   {
999     GstStructure *stats;
1000     g_object_get (source, "stats", &stats, NULL);
1001     if (stats) {
1002       dump_structure (stats);
1003       gst_structure_free (stats);
1004     }
1005   }
1006 #endif
1007 }
1008
1009 static void
1010 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1011 {
1012   GST_INFO ("%p: source %p bye", stream, source);
1013 }
1014
1015 static void
1016 on_bye_timeout (GObject * session, GObject * source,
1017     GstRTSPMediaStream * stream)
1018 {
1019   GstRTSPMediaTrans *trans;
1020
1021   GST_INFO ("%p: source %p bye timeout", stream, source);
1022
1023   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1024     trans->rtpsource = NULL;
1025     trans->timeout = TRUE;
1026   }
1027 }
1028
1029 static void
1030 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
1031 {
1032   GstRTSPMediaTrans *trans;
1033
1034   GST_INFO ("%p: source %p timeout", stream, source);
1035
1036   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1037     trans->rtpsource = NULL;
1038     trans->timeout = TRUE;
1039   }
1040 }
1041
1042 static GstFlowReturn
1043 handle_new_buffer (GstAppSink * sink, gpointer user_data)
1044 {
1045   GList *walk;
1046   GstBuffer *buffer;
1047   GstRTSPMediaStream *stream;
1048
1049   buffer = gst_app_sink_pull_buffer (sink);
1050   if (!buffer)
1051     return GST_FLOW_OK;
1052
1053   stream = (GstRTSPMediaStream *) user_data;
1054
1055   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
1056     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
1057
1058     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
1059       if (tr->send_rtp)
1060         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
1061     } else {
1062       if (tr->send_rtcp)
1063         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
1064     }
1065   }
1066   gst_buffer_unref (buffer);
1067
1068   return GST_FLOW_OK;
1069 }
1070
1071 static GstAppSinkCallbacks sink_cb = {
1072   NULL,                         /* not interested in EOS */
1073   NULL,                         /* not interested in preroll buffers */
1074   handle_new_buffer
1075 };
1076
1077 /* prepare the pipeline objects to handle @stream in @media */
1078 static gboolean
1079 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
1080 {
1081   gchar *name;
1082   GstPad *pad, *teepad, *selpad;
1083   GstPadLinkReturn ret;
1084   gint i;
1085
1086   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
1087    * for sending RTP/RTCP. The sender and receiver ports are shared between the
1088    * elements */
1089   if (!alloc_udp_ports (media, stream))
1090     return FALSE;
1091
1092   /* add the ports to the pipeline */
1093   for (i = 0; i < 2; i++) {
1094     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
1095     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
1096   }
1097
1098   /* create elements for the TCP transfer */
1099   for (i = 0; i < 2; i++) {
1100     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1101     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
1102     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1103     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
1104     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
1105     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
1106     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
1107     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
1108         &sink_cb, stream, NULL);
1109   }
1110
1111   /* hook up the stream to the RTP session elements. */
1112   name = g_strdup_printf ("send_rtp_sink_%d", idx);
1113   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1114   g_free (name);
1115   name = g_strdup_printf ("send_rtp_src_%d", idx);
1116   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
1117   g_free (name);
1118   name = g_strdup_printf ("send_rtcp_src_%d", idx);
1119   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
1120   g_free (name);
1121   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
1122   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
1123   g_free (name);
1124   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
1125   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
1126   g_free (name);
1127
1128   /* get the session */
1129   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
1130       &stream->session);
1131
1132   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1133       stream);
1134   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1135       stream);
1136   g_signal_connect (stream->session, "on-ssrc-active",
1137       (GCallback) on_ssrc_active, stream);
1138   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1139       stream);
1140   g_signal_connect (stream->session, "on-bye-timeout",
1141       (GCallback) on_bye_timeout, stream);
1142   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
1143       stream);
1144
1145   /* link the RTP pad to the session manager */
1146   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
1147   if (ret != GST_PAD_LINK_OK)
1148     goto link_failed;
1149
1150   /* make tee for RTP and link to stream */
1151   stream->tee[0] = gst_element_factory_make ("tee", NULL);
1152   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
1153
1154   pad = gst_element_get_static_pad (stream->tee[0], "sink");
1155   gst_pad_link (stream->send_rtp_src, pad);
1156   gst_object_unref (pad);
1157
1158   /* link RTP sink, we're pretty sure this will work. */
1159   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1160   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
1161   gst_pad_link (teepad, pad);
1162   gst_object_unref (pad);
1163   gst_object_unref (teepad);
1164
1165   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
1166   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
1167   gst_pad_link (teepad, pad);
1168   gst_object_unref (pad);
1169   gst_object_unref (teepad);
1170
1171   /* make tee for RTCP */
1172   stream->tee[1] = gst_element_factory_make ("tee", NULL);
1173   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1174
1175   pad = gst_element_get_static_pad (stream->tee[1], "sink");
1176   gst_pad_link (stream->send_rtcp_src, pad);
1177   gst_object_unref (pad);
1178
1179   /* link RTCP elements */
1180   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1181   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1182   gst_pad_link (teepad, pad);
1183   gst_object_unref (pad);
1184   gst_object_unref (teepad);
1185
1186   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1187   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1188   gst_pad_link (teepad, pad);
1189   gst_object_unref (pad);
1190   gst_object_unref (teepad);
1191
1192   /* make selector for the RTP receivers */
1193   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1194   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1195   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1196
1197   pad = gst_element_get_static_pad (stream->selector[0], "src");
1198   gst_pad_link (pad, stream->recv_rtp_sink);
1199   gst_object_unref (pad);
1200
1201   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1202   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1203   gst_pad_link (pad, selpad);
1204   gst_object_unref (pad);
1205   gst_object_unref (selpad);
1206
1207   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1208   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1209   gst_pad_link (pad, selpad);
1210   gst_object_unref (pad);
1211   gst_object_unref (selpad);
1212
1213   /* make selector for the RTCP receivers */
1214   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1215   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1216   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1217
1218   pad = gst_element_get_static_pad (stream->selector[1], "src");
1219   gst_pad_link (pad, stream->recv_rtcp_sink);
1220   gst_object_unref (pad);
1221
1222   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1223   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1224   gst_pad_link (pad, selpad);
1225   gst_object_unref (pad);
1226   gst_object_unref (selpad);
1227
1228   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1229   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1230   gst_pad_link (pad, selpad);
1231   gst_object_unref (pad);
1232   gst_object_unref (selpad);
1233
1234   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1235    * values */
1236   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1237   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1238   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1239   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1240
1241   /* be notified of caps changes */
1242   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1243       (GCallback) caps_notify, stream);
1244
1245   stream->prepared = TRUE;
1246
1247   return TRUE;
1248
1249   /* ERRORS */
1250 link_failed:
1251   {
1252     GST_WARNING ("failed to link stream %d", idx);
1253     return FALSE;
1254   }
1255 }
1256
1257 static void
1258 unlock_streams (GstRTSPMedia * media)
1259 {
1260   guint i, n_streams;
1261
1262   /* unlock the udp src elements */
1263   n_streams = gst_rtsp_media_n_streams (media);
1264   for (i = 0; i < n_streams; i++) {
1265     GstRTSPMediaStream *stream;
1266
1267     stream = gst_rtsp_media_get_stream (media, i);
1268
1269     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1270     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1271   }
1272 }
1273
1274 static void
1275 gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
1276 {
1277   g_mutex_lock (media->lock);
1278   /* never overwrite the error status */
1279   if (media->status != GST_RTSP_MEDIA_STATUS_ERROR)
1280     media->status = status;
1281   GST_DEBUG ("setting new status to %d", status);
1282   g_cond_broadcast (media->cond);
1283   g_mutex_unlock (media->lock);
1284 }
1285
1286 static GstRTSPMediaStatus
1287 gst_rtsp_media_get_status (GstRTSPMedia * media)
1288 {
1289   GstRTSPMediaStatus result;
1290   GTimeVal timeout;
1291
1292   g_mutex_lock (media->lock);
1293   g_get_current_time (&timeout);
1294   g_time_val_add (&timeout, 20 * G_USEC_PER_SEC);
1295   /* while we are preparing, wait */
1296   while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) {
1297     GST_DEBUG ("waiting for status change");
1298     if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) {
1299       GST_DEBUG ("timeout, assuming error status");
1300       media->status = GST_RTSP_MEDIA_STATUS_ERROR;
1301     }
1302   }
1303   /* could be success or error */
1304   result = media->status;
1305   GST_DEBUG ("got status %d", result);
1306   g_mutex_unlock (media->lock);
1307
1308   return result;
1309 }
1310
1311 static gboolean
1312 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1313 {
1314   GstMessageType type;
1315
1316   type = GST_MESSAGE_TYPE (message);
1317
1318   switch (type) {
1319     case GST_MESSAGE_STATE_CHANGED:
1320       break;
1321     case GST_MESSAGE_BUFFERING:
1322     {
1323       gint percent;
1324
1325       gst_message_parse_buffering (message, &percent);
1326
1327       /* no state management needed for live pipelines */
1328       if (media->is_live)
1329         break;
1330
1331       if (percent == 100) {
1332         /* a 100% message means buffering is done */
1333         media->buffering = FALSE;
1334         /* if the desired state is playing, go back */
1335         if (media->target_state == GST_STATE_PLAYING) {
1336           GST_INFO ("Buffering done, setting pipeline to PLAYING");
1337           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1338         } else {
1339           GST_INFO ("Buffering done");
1340         }
1341       } else {
1342         /* buffering busy */
1343         if (media->buffering == FALSE) {
1344           if (media->target_state == GST_STATE_PLAYING) {
1345             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1346             GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1347             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1348           } else {
1349             GST_INFO ("Buffering ...");
1350           }
1351         }
1352         media->buffering = TRUE;
1353       }
1354       break;
1355     }
1356     case GST_MESSAGE_LATENCY:
1357     {
1358       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1359       break;
1360     }
1361     case GST_MESSAGE_ERROR:
1362     {
1363       GError *gerror;
1364       gchar *debug;
1365
1366       gst_message_parse_error (message, &gerror, &debug);
1367       GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1368       g_error_free (gerror);
1369       g_free (debug);
1370
1371       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_ERROR);
1372       break;
1373     }
1374     case GST_MESSAGE_WARNING:
1375     {
1376       GError *gerror;
1377       gchar *debug;
1378
1379       gst_message_parse_warning (message, &gerror, &debug);
1380       GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1381       g_error_free (gerror);
1382       g_free (debug);
1383       break;
1384     }
1385     case GST_MESSAGE_ELEMENT:
1386       break;
1387     case GST_MESSAGE_STREAM_STATUS:
1388       break;
1389     case GST_MESSAGE_ASYNC_DONE:
1390       GST_INFO ("%p: got ASYNC_DONE", media);
1391       collect_media_stats (media);
1392
1393       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
1394       break;
1395     case GST_MESSAGE_EOS:
1396       GST_INFO ("%p: got EOS", media);
1397       if (media->eos_pending) {
1398         GST_DEBUG ("shutting down after EOS");
1399         gst_element_set_state (media->pipeline, GST_STATE_NULL);
1400         media->eos_pending = FALSE;
1401         g_object_unref (media);
1402       }
1403       break;
1404     default:
1405       GST_INFO ("%p: got message type %s", media,
1406           gst_message_type_get_name (type));
1407       break;
1408   }
1409   return TRUE;
1410 }
1411
1412 static gboolean
1413 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1414 {
1415   GstRTSPMediaClass *klass;
1416   gboolean ret;
1417
1418   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1419
1420   if (klass->handle_message)
1421     ret = klass->handle_message (media, message);
1422   else
1423     ret = FALSE;
1424
1425   return ret;
1426 }
1427
1428 /* called from streaming threads */
1429 static void
1430 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1431 {
1432   GstRTSPMediaStream *stream;
1433   gchar *name;
1434   gint i;
1435
1436   i = media->streams->len + 1;
1437
1438   GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1439
1440   stream = g_new0 (GstRTSPMediaStream, 1);
1441   stream->payloader = element;
1442
1443   name = g_strdup_printf ("dynpay%d", i);
1444
1445   /* ghost the pad of the payloader to the element */
1446   stream->srcpad = gst_ghost_pad_new (name, pad);
1447   gst_pad_set_active (stream->srcpad, TRUE);
1448   gst_element_add_pad (media->element, stream->srcpad);
1449   g_free (name);
1450
1451   /* add stream now */
1452   g_array_append_val (media->streams, stream);
1453
1454   setup_stream (stream, i, media);
1455
1456   for (i = 0; i < 2; i++) {
1457     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1458     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1459     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1460     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1461     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1462   }
1463 }
1464
1465 static void
1466 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1467 {
1468   GST_INFO ("no more pads");
1469   if (media->fakesink) {
1470     gst_object_ref (media->fakesink);
1471     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1472     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1473     gst_object_unref (media->fakesink);
1474     media->fakesink = NULL;
1475     GST_INFO ("removed fakesink");
1476   }
1477 }
1478
1479 /**
1480  * gst_rtsp_media_prepare:
1481  * @media: a #GstRTSPMedia
1482  *
1483  * Prepare @media for streaming. This function will create the pipeline and
1484  * other objects to manage the streaming.
1485  *
1486  * It will preroll the pipeline and collect vital information about the streams
1487  * such as the duration.
1488  *
1489  * Returns: %TRUE on success.
1490  */
1491 gboolean
1492 gst_rtsp_media_prepare (GstRTSPMedia * media)
1493 {
1494   GstStateChangeReturn ret;
1495   GstRTSPMediaStatus status;
1496   guint i, n_streams;
1497   GstRTSPMediaClass *klass;
1498   GstBus *bus;
1499   GList *walk;
1500
1501   if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED)
1502     goto was_prepared;
1503
1504   if (!media->reusable && media->reused)
1505     goto is_reused;
1506
1507   GST_INFO ("preparing media %p", media);
1508
1509   /* reset some variables */
1510   media->is_live = FALSE;
1511   media->buffering = FALSE;
1512   /* we're preparing now */
1513   media->status = GST_RTSP_MEDIA_STATUS_PREPARING;
1514
1515   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1516
1517   /* add the pipeline bus to our custom mainloop */
1518   media->source = gst_bus_create_watch (bus);
1519   gst_object_unref (bus);
1520
1521   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1522
1523   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1524   media->id = g_source_attach (media->source, klass->context);
1525
1526   media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1527
1528   /* add stuff to the bin */
1529   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1530
1531   /* link streams we already have, other streams might appear when we have
1532    * dynamic elements */
1533   n_streams = gst_rtsp_media_n_streams (media);
1534   for (i = 0; i < n_streams; i++) {
1535     GstRTSPMediaStream *stream;
1536
1537     stream = gst_rtsp_media_get_stream (media, i);
1538
1539     setup_stream (stream, i, media);
1540   }
1541
1542   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1543     GstElement *elem = walk->data;
1544
1545     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1546     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1547
1548     /* we add a fakesink here in order to make the state change async. We remove
1549      * the fakesink again in the no-more-pads callback. */
1550     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1551     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1552   }
1553
1554   GST_INFO ("setting pipeline to PAUSED for media %p", media);
1555   /* first go to PAUSED */
1556   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1557   media->target_state = GST_STATE_PAUSED;
1558
1559   switch (ret) {
1560     case GST_STATE_CHANGE_SUCCESS:
1561       GST_INFO ("SUCCESS state change for media %p", media);
1562       break;
1563     case GST_STATE_CHANGE_ASYNC:
1564       GST_INFO ("ASYNC state change for media %p", media);
1565       break;
1566     case GST_STATE_CHANGE_NO_PREROLL:
1567       /* we need to go to PLAYING */
1568       GST_INFO ("NO_PREROLL state change: live media %p", media);
1569       media->is_live = TRUE;
1570       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1571       if (ret == GST_STATE_CHANGE_FAILURE)
1572         goto state_failed;
1573       break;
1574     case GST_STATE_CHANGE_FAILURE:
1575       goto state_failed;
1576   }
1577
1578   /* now wait for all pads to be prerolled */
1579   status = gst_rtsp_media_get_status (media);
1580   if (status == GST_RTSP_MEDIA_STATUS_ERROR)
1581     goto state_failed;
1582
1583   GST_INFO ("object %p is prerolled", media);
1584
1585   return TRUE;
1586
1587   /* OK */
1588 was_prepared:
1589   {
1590     return TRUE;
1591   }
1592   /* ERRORS */
1593 is_reused:
1594   {
1595     GST_WARNING ("can not reuse media %p", media);
1596     return FALSE;
1597   }
1598 state_failed:
1599   {
1600     GST_WARNING ("failed to preroll pipeline");
1601     unlock_streams (media);
1602     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1603     gst_rtsp_media_unprepare (media);
1604     return FALSE;
1605   }
1606 }
1607
1608 /**
1609  * gst_rtsp_media_unprepare:
1610  * @media: a #GstRTSPMedia
1611  *
1612  * Unprepare @media. After this call, the media should be prepared again before
1613  * it can be used again. If the media is set to be non-reusable, a new instance
1614  * must be created.
1615  *
1616  * Returns: %TRUE on success.
1617  */
1618 gboolean
1619 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1620 {
1621   GstRTSPMediaClass *klass;
1622   gboolean success;
1623
1624   if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED)
1625     return TRUE;
1626
1627   GST_INFO ("unprepare media %p", media);
1628   media->target_state = GST_STATE_NULL;
1629
1630   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1631   if (klass->unprepare)
1632     success = klass->unprepare (media);
1633   else
1634     success = TRUE;
1635
1636   media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED;
1637   media->reused = TRUE;
1638
1639   /* when the media is not reusable, this will effectively unref the media and
1640    * recreate it */
1641   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1642
1643   return success;
1644 }
1645
1646 static gboolean
1647 default_unprepare (GstRTSPMedia * media)
1648 {
1649   if (media->eos_shutdown) {
1650     GST_DEBUG ("sending EOS for shutdown");
1651     /* ref so that we don't disappear */
1652     g_object_ref (media);
1653     media->eos_pending = TRUE;
1654     gst_element_send_event (media->pipeline, gst_event_new_eos ());
1655     /* we need to go to playing again for the EOS to propagate, normally in this
1656      * state, nothing is receiving data from us anymore so this is ok. */
1657     gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1658   } else {
1659     GST_DEBUG ("shutting down");
1660     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1661   }
1662   return TRUE;
1663 }
1664
1665 static void
1666 add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1667     gchar * dest, gint min, gint max)
1668 {
1669   gboolean do_add = TRUE;
1670   RTSPDestination *ndest;
1671
1672   if (stream->filter_duplicates) {
1673     RTSPDestination fdest;
1674     GList *find;
1675
1676     fdest.dest = dest;
1677     fdest.min = min;
1678     fdest.max = max;
1679
1680     /* first see if we already added this destination */
1681     find =
1682         g_list_find_custom (stream->destinations, &fdest,
1683         (GCompareFunc) dest_compare);
1684     if (find) {
1685       ndest = (RTSPDestination *) find->data;
1686
1687       GST_INFO ("already streaming to %s:%d-%d with %d clients", dest, min, max,
1688           ndest->count);
1689       ndest->count++;
1690       do_add = FALSE;
1691     }
1692   }
1693
1694   if (do_add) {
1695     GST_INFO ("adding %s:%d-%d", dest, min, max);
1696     g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1697     g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1698
1699     if (stream->filter_duplicates) {
1700       ndest = create_destination (dest, min, max);
1701       stream->destinations = g_list_prepend (stream->destinations, ndest);
1702     }
1703   }
1704 }
1705
1706 static void
1707 remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
1708     gchar * dest, gint min, gint max)
1709 {
1710   gboolean do_remove = TRUE;
1711   RTSPDestination *ndest = NULL;
1712   GList *find = NULL;
1713
1714   if (stream->filter_duplicates) {
1715     RTSPDestination fdest;
1716
1717     fdest.dest = dest;
1718     fdest.min = min;
1719     fdest.max = max;
1720
1721     /* first see if we already added this destination */
1722     find =
1723         g_list_find_custom (stream->destinations, &fdest,
1724         (GCompareFunc) dest_compare);
1725     if (!find)
1726       return;
1727
1728     ndest = (RTSPDestination *) find->data;
1729     if (--ndest->count > 0) {
1730       do_remove = FALSE;
1731       GST_INFO ("still streaming to %s:%d-%d with %d clients", dest, min, max,
1732           ndest->count);
1733     }
1734   }
1735
1736   if (do_remove) {
1737     GST_INFO ("removing %s:%d-%d", dest, min, max);
1738     g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1739     g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1740
1741     if (stream->filter_duplicates) {
1742       stream->destinations = g_list_delete_link (stream->destinations, find);
1743       free_destination (ndest);
1744     }
1745   }
1746 }
1747
1748 /**
1749  * gst_rtsp_media_set_state:
1750  * @media: a #GstRTSPMedia
1751  * @state: the target state of the media
1752  * @transports: a #GArray of #GstRTSPMediaTrans pointers
1753  *
1754  * Set the state of @media to @state and for the transports in @transports.
1755  *
1756  * Returns: %TRUE on success.
1757  */
1758 gboolean
1759 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1760     GArray * transports)
1761 {
1762   gint i;
1763   GstStateChangeReturn ret;
1764   gboolean add, remove, do_state;
1765   gint old_active;
1766
1767   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1768   g_return_val_if_fail (transports != NULL, FALSE);
1769
1770   /* NULL and READY are the same */
1771   if (state == GST_STATE_READY)
1772     state = GST_STATE_NULL;
1773
1774   add = remove = FALSE;
1775
1776   GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1777       media);
1778
1779   switch (state) {
1780     case GST_STATE_NULL:
1781       /* unlock the streams so that they follow the state changes from now on */
1782       unlock_streams (media);
1783       /* fallthrough */
1784     case GST_STATE_PAUSED:
1785       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1786       if (media->target_state == GST_STATE_PLAYING)
1787         remove = TRUE;
1788       break;
1789     case GST_STATE_PLAYING:
1790       /* we're going to PLAYING, add */
1791       add = TRUE;
1792       break;
1793     default:
1794       break;
1795   }
1796   old_active = media->active;
1797
1798   for (i = 0; i < transports->len; i++) {
1799     GstRTSPMediaTrans *tr;
1800     GstRTSPMediaStream *stream;
1801     GstRTSPTransport *trans;
1802
1803     /* we need a non-NULL entry in the array */
1804     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1805     if (tr == NULL)
1806       continue;
1807
1808     /* we need a transport */
1809     if (!(trans = tr->transport))
1810       continue;
1811
1812     /* get the stream and add the destinations */
1813     stream = gst_rtsp_media_get_stream (media, tr->idx);
1814     switch (trans->lower_transport) {
1815       case GST_RTSP_LOWER_TRANS_UDP:
1816       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1817       {
1818         gchar *dest;
1819         gint min, max;
1820
1821         dest = trans->destination;
1822         if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1823           min = trans->port.min;
1824           max = trans->port.max;
1825         } else {
1826           min = trans->client_port.min;
1827           max = trans->client_port.max;
1828         }
1829
1830         if (add && !tr->active) {
1831           add_udp_destination (media, stream, dest, min, max);
1832           stream->transports = g_list_prepend (stream->transports, tr);
1833           tr->active = TRUE;
1834           media->active++;
1835         } else if (remove && tr->active) {
1836           remove_udp_destination (media, stream, dest, min, max);
1837           stream->transports = g_list_remove (stream->transports, tr);
1838           tr->active = FALSE;
1839           media->active--;
1840         }
1841         break;
1842       }
1843       case GST_RTSP_LOWER_TRANS_TCP:
1844         if (add && !tr->active) {
1845           GST_INFO ("adding TCP %s", trans->destination);
1846           stream->transports = g_list_prepend (stream->transports, tr);
1847           tr->active = TRUE;
1848           media->active++;
1849         } else if (remove && tr->active) {
1850           GST_INFO ("removing TCP %s", trans->destination);
1851           stream->transports = g_list_remove (stream->transports, tr);
1852           tr->active = FALSE;
1853           media->active--;
1854         }
1855         break;
1856       default:
1857         GST_INFO ("Unknown transport %d", trans->lower_transport);
1858         break;
1859     }
1860   }
1861
1862   /* we just added the first media, do the playing state change */
1863   if (old_active == 0 && add)
1864     do_state = TRUE;
1865   /* if we have no more active media, do the downward state changes */
1866   else if (media->active == 0)
1867     do_state = TRUE;
1868   else
1869     do_state = FALSE;
1870
1871   GST_INFO ("active %d media %p", media->active, media);
1872
1873   if (do_state && media->target_state != state) {
1874     if (state == GST_STATE_NULL) {
1875       gst_rtsp_media_unprepare (media);
1876     } else {
1877       GST_INFO ("state %s media %p", gst_element_state_get_name (state), media);
1878       media->target_state = state;
1879       ret = gst_element_set_state (media->pipeline, state);
1880     }
1881   }
1882
1883   /* remember where we are */
1884   if (state == GST_STATE_PAUSED)
1885     collect_media_stats (media);
1886
1887   return TRUE;
1888 }
1889
1890 /**
1891  * gst_rtsp_media_remove_elements:
1892  * @media: a #GstRTSPMedia
1893  *
1894  * Remove all elements and the pipeline controlled by @media.
1895  */
1896 void
1897 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1898 {
1899   gint i, j;
1900
1901   unlock_streams (media);
1902
1903   for (i = 0; i < media->streams->len; i++) {
1904     GstRTSPMediaStream *stream;
1905
1906     GST_INFO ("Removing elements of stream %d from pipeline", i);
1907
1908     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1909
1910     gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
1911
1912     g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
1913
1914     for (j = 0; j < 2; j++) {
1915       gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
1916       gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
1917       gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
1918       gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
1919       gst_element_set_state (stream->tee[j], GST_STATE_NULL);
1920       gst_element_set_state (stream->selector[j], GST_STATE_NULL);
1921
1922       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
1923       gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
1924       gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
1925       gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
1926       gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
1927       gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
1928     }
1929     if (stream->caps)
1930       gst_caps_unref (stream->caps);
1931     stream->caps = NULL;
1932     gst_rtsp_media_stream_free (stream);
1933   }
1934   g_array_remove_range (media->streams, 0, media->streams->len);
1935
1936   gst_element_set_state (media->rtpbin, GST_STATE_NULL);
1937   gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
1938
1939   gst_object_unref (media->pipeline);
1940   media->pipeline = NULL;
1941 }