media: fix message
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
24
25 #include "rtsp-media.h"
26
27 #define DEFAULT_SHARED         FALSE
28 #define DEFAULT_REUSABLE       FALSE
29
30 /* define to dump received RTCP packets */
31 #undef DUMP_STATS
32
33 enum
34 {
35   PROP_0,
36   PROP_SHARED,
37   PROP_REUSABLE,
38   PROP_LAST
39 };
40
41 enum
42 {
43   SIGNAL_UNPREPARED,
44   SIGNAL_LAST
45 };
46
47 static GQuark ssrc_stream_map_key;
48
49 static void gst_rtsp_media_get_property (GObject *object, guint propid,
50     GValue *value, GParamSpec *pspec);
51 static void gst_rtsp_media_set_property (GObject *object, guint propid,
52     const GValue *value, GParamSpec *pspec);
53 static void gst_rtsp_media_finalize (GObject * obj);
54
55 static gpointer do_loop (GstRTSPMediaClass *klass);
56 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
57 static void unlock_streams (GstRTSPMedia *media);
58
59 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
60
61 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
62
63 static void
64 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
65 {
66   GObjectClass *gobject_class;
67   GError *error = NULL;
68
69   gobject_class = G_OBJECT_CLASS (klass);
70
71   gobject_class->get_property = gst_rtsp_media_get_property;
72   gobject_class->set_property = gst_rtsp_media_set_property;
73   gobject_class->finalize = gst_rtsp_media_finalize;
74
75   g_object_class_install_property (gobject_class, PROP_SHARED,
76       g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
77           DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
78
79   g_object_class_install_property (gobject_class, PROP_REUSABLE,
80       g_param_spec_boolean ("reusable", "Reusable",
81           "If this media pipeline can be reused after an unprepare",
82           DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
83
84   gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
85       g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
86       G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
87       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
88
89   klass->context = g_main_context_new ();
90   klass->loop = g_main_loop_new (klass->context, TRUE);
91
92   klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
93   if (error != NULL) {
94     g_critical ("could not start bus thread: %s", error->message);
95   }
96   klass->handle_message = default_handle_message;
97
98   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
99 }
100
101 static void
102 gst_rtsp_media_init (GstRTSPMedia * media)
103 {
104   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
105   media->is_live = FALSE;
106   media->buffering = FALSE;
107 }
108
109 static void
110 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
111 {
112   if (stream->session)
113     g_object_unref (stream->session);
114
115   if (stream->caps)
116     gst_caps_unref (stream->caps);
117
118   g_list_free (stream->transports);
119
120   g_free (stream);
121 }
122
123 static void
124 gst_rtsp_media_finalize (GObject * obj)
125 {
126   GstRTSPMedia *media;
127   guint i;
128
129   media = GST_RTSP_MEDIA (obj);
130
131   g_message ("finalize media %p", media);
132
133   if (media->pipeline) {
134     unlock_streams (media);
135     gst_element_set_state (media->pipeline, GST_STATE_NULL);
136     gst_object_unref (media->pipeline);
137   }
138
139   for (i = 0; i < media->streams->len; i++) {
140     GstRTSPMediaStream *stream;
141
142     stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
143
144     gst_rtsp_media_stream_free (stream);
145   }
146   g_array_free (media->streams, TRUE);
147
148   g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
149   g_list_free (media->dynamic);
150
151   if (media->source) {
152     g_source_destroy (media->source);
153     g_source_unref (media->source);
154   }
155
156   G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
157 }
158
159 static void
160 gst_rtsp_media_get_property (GObject *object, guint propid,
161     GValue *value, GParamSpec *pspec)
162 {
163   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
164
165   switch (propid) {
166     case PROP_SHARED:
167       g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
168       break;
169     case PROP_REUSABLE:
170       g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
171       break;
172     default:
173       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
174   }
175 }
176
177 static void
178 gst_rtsp_media_set_property (GObject *object, guint propid,
179     const GValue *value, GParamSpec *pspec)
180 {
181   GstRTSPMedia *media = GST_RTSP_MEDIA (object);
182
183   switch (propid) {
184     case PROP_SHARED:
185       gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
186       break;
187     case PROP_REUSABLE:
188       gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
189       break;
190     default:
191       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
192   }
193 }
194
195 static gpointer
196 do_loop (GstRTSPMediaClass *klass)
197 {
198   g_message ("enter mainloop");
199   g_main_loop_run (klass->loop);
200   g_message ("exit mainloop");
201
202   return NULL;
203 }
204
205 static void
206 collect_media_stats (GstRTSPMedia *media)
207 {
208   GstFormat format;
209   gint64 position, duration;
210
211   media->range.unit = GST_RTSP_RANGE_NPT;
212
213   if (media->is_live) {
214     media->range.min.type = GST_RTSP_TIME_NOW;
215     media->range.min.seconds = -1;
216     media->range.max.type = GST_RTSP_TIME_END;
217     media->range.max.seconds = -1;
218   }
219   else {
220     /* get the position */
221     format = GST_FORMAT_TIME;
222     if (!gst_element_query_position (media->pipeline, &format, &position)) {
223       g_message ("position query failed");
224       position = 0;
225     }
226
227     /* get the duration */
228     format = GST_FORMAT_TIME;
229     if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
230       g_message ("duration query failed");
231       duration = -1;
232     }
233
234     g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
235         GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
236
237     if (position == -1) {
238       media->range.min.type = GST_RTSP_TIME_NOW;
239       media->range.min.seconds = -1;
240     }
241     else {
242       media->range.min.type = GST_RTSP_TIME_SECONDS;
243       media->range.min.seconds = ((gdouble)position) / GST_SECOND;
244     }
245     if (duration == -1) {
246       media->range.max.type = GST_RTSP_TIME_END;
247       media->range.max.seconds = -1;
248     }
249     else {
250       media->range.max.type = GST_RTSP_TIME_SECONDS;
251       media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
252     }
253   }
254 }
255
256 /**
257  * gst_rtsp_media_new:
258  *
259  * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
260  * element to produde RTP data for one or more related (audio/video/..) 
261  * streams.
262  *
263  * Returns: a new #GstRTSPMedia object.
264  */
265 GstRTSPMedia *
266 gst_rtsp_media_new (void)
267 {
268   GstRTSPMedia *result;
269
270   result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
271
272   return result;
273 }
274
275 /**
276  * gst_rtsp_media_set_shared:
277  * @media: a #GstRTSPMedia
278  * @shared: the new value
279  *
280  * Set or unset if the pipeline for @media can be shared will multiple clients.
281  * When @shared is %TRUE, client requests for this media will share the media
282  * pipeline.
283  */
284 void
285 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
286 {
287   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
288
289   media->shared = shared;
290 }
291
292 /**
293  * gst_rtsp_media_is_shared:
294  * @media: a #GstRTSPMedia
295  *
296  * Check if the pipeline for @media can be shared between multiple clients.
297  *
298  * Returns: %TRUE if the media can be shared between clients.
299  */
300 gboolean
301 gst_rtsp_media_is_shared (GstRTSPMedia *media)
302 {
303   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
304
305   return media->shared;
306 }
307
308 /**
309  * gst_rtsp_media_set_reusable:
310  * @media: a #GstRTSPMedia
311  * @reusable: the new value
312  *
313  * Set or unset if the pipeline for @media can be reused after the pipeline has
314  * been unprepared.
315  */
316 void
317 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
318 {
319   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
320
321   media->reusable = reusable;
322 }
323
324 /**
325  * gst_rtsp_media_is_reusable:
326  * @media: a #GstRTSPMedia
327  *
328  * Check if the pipeline for @media can be reused after an unprepare.
329  *
330  * Returns: %TRUE if the media can be reused
331  */
332 gboolean
333 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
334 {
335   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
336
337   return media->reusable;
338 }
339
340 /**
341  * gst_rtsp_media_n_streams:
342  * @media: a #GstRTSPMedia
343  *
344  * Get the number of streams in this media.
345  *
346  * Returns: The number of streams.
347  */
348 guint
349 gst_rtsp_media_n_streams (GstRTSPMedia *media)
350 {
351   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
352
353   return media->streams->len;
354 }
355
356 /**
357  * gst_rtsp_media_get_stream:
358  * @media: a #GstRTSPMedia
359  * @idx: the stream index
360  *
361  * Retrieve the stream with index @idx from @media.
362  *
363  * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
364  * that index did not exist.
365  */
366 GstRTSPMediaStream *
367 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
368 {
369   GstRTSPMediaStream *res;
370   
371   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
372
373   if (idx < media->streams->len)
374     res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
375   else
376     res = NULL;
377
378   return res;
379 }
380
381 /**
382  * gst_rtsp_media_seek:
383  * @stream: a #GstRTSPMediaStream
384  * @range: a #GstRTSPTimeRange
385  *
386  * Seek the pipeline to @range.
387  *
388  * Returns: %TRUE on success.
389  */
390 gboolean
391 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
392 {
393   GstSeekFlags flags;
394   gboolean res;
395   gint64 start, stop;
396   GstSeekType start_type, stop_type;
397
398   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
399   g_return_val_if_fail (range != NULL, FALSE);
400
401   if (range->unit != GST_RTSP_RANGE_NPT)
402     goto not_supported;
403
404   /* depends on the current playing state of the pipeline. We might need to
405    * queue this until we get EOS. */
406   flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
407
408   start_type = stop_type = GST_SEEK_TYPE_NONE;
409
410   switch (range->min.type) {
411     case GST_RTSP_TIME_NOW:
412       start = -1;
413       break;
414     case GST_RTSP_TIME_SECONDS:
415       /* only seek when something changed */
416       if (media->range.min.seconds == range->min.seconds) {
417         start = -1;
418       } else {
419         start = range->min.seconds * GST_SECOND;
420         start_type = GST_SEEK_TYPE_SET;
421       }
422       break;
423     case GST_RTSP_TIME_END:
424     default:
425       goto weird_type;
426   }
427   switch (range->max.type) {
428     case GST_RTSP_TIME_SECONDS:
429       /* only seek when something changed */
430       if (media->range.max.seconds == range->max.seconds) {
431         stop = -1;
432       } else {
433         stop = range->max.seconds * GST_SECOND;
434         stop_type = GST_SEEK_TYPE_SET;
435       }
436       break;
437     case GST_RTSP_TIME_END:
438       stop = -1;
439       stop_type = GST_SEEK_TYPE_SET;
440       break;
441     case GST_RTSP_TIME_NOW:
442     default:
443       goto weird_type;
444   }
445   
446   if (start != -1 || stop != -1) {
447     g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
448                 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
449
450     res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
451         flags, start_type, start, stop_type, stop);
452
453     /* and block for the seek to complete */
454     g_message ("done seeking %d", res);
455     gst_element_get_state (media->pipeline, NULL, NULL, -1);
456     g_message ("prerolled again");
457
458     collect_media_stats (media);
459   }
460   else {
461     g_message ("no seek needed");
462     res = TRUE;
463   }
464
465   return res;
466
467   /* ERRORS */
468 not_supported:
469   {
470     g_warning ("seek unit %d not supported", range->unit);
471     return FALSE;
472   }
473 weird_type:
474   {
475     g_warning ("weird range type %d not supported", range->min.type);
476     return FALSE;
477   }
478 }
479
480 /**
481  * gst_rtsp_media_stream_rtp:
482  * @stream: a #GstRTSPMediaStream
483  * @buffer: a #GstBuffer
484  *
485  * Handle an RTP buffer for the stream. This method is usually called when a
486  * message has been received from a client using the TCP transport.
487  *
488  * This function takes ownership of @buffer.
489  *
490  * Returns: a GstFlowReturn.
491  */
492 GstFlowReturn
493 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
494 {
495   GstFlowReturn ret;
496
497   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
498
499   return ret;
500 }
501
502 /**
503  * gst_rtsp_media_stream_rtcp:
504  * @stream: a #GstRTSPMediaStream
505  * @buffer: a #GstBuffer
506  *
507  * Handle an RTCP buffer for the stream. This method is usually called when a
508  * message has been received from a client using the TCP transport.
509  *
510  * This function takes ownership of @buffer.
511  *
512  * Returns: a GstFlowReturn.
513  */
514 GstFlowReturn
515 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
516 {
517   GstFlowReturn ret;
518
519   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
520
521   return ret;
522 }
523
524 /* Allocate the udp ports and sockets */
525 static gboolean
526 alloc_udp_ports (GstRTSPMediaStream * stream)
527 {
528   GstStateChangeReturn ret;
529   GstElement *udpsrc0, *udpsrc1;
530   GstElement *udpsink0, *udpsink1;
531   gint tmp_rtp, tmp_rtcp;
532   guint count;
533   gint rtpport, rtcpport, sockfd;
534
535   udpsrc0 = NULL;
536   udpsrc1 = NULL;
537   udpsink0 = NULL;
538   udpsink1 = NULL;
539   count = 0;
540
541   /* Start with random port */
542   tmp_rtp = 0;
543
544   /* try to allocate 2 UDP ports, the RTP port should be an even
545    * number and the RTCP port should be the next (uneven) port */
546 again:
547   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
548   if (udpsrc0 == NULL)
549     goto no_udp_protocol;
550   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
551
552   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
553   if (ret == GST_STATE_CHANGE_FAILURE) {
554     if (tmp_rtp != 0) {
555       tmp_rtp += 2;
556       if (++count > 20)
557         goto no_ports;
558
559       gst_element_set_state (udpsrc0, GST_STATE_NULL);
560       gst_object_unref (udpsrc0);
561
562       goto again;
563     }
564     goto no_udp_protocol;
565   }
566
567   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
568
569   /* check if port is even */
570   if ((tmp_rtp & 1) != 0) {
571     /* port not even, close and allocate another */
572     if (++count > 20)
573       goto no_ports;
574
575     gst_element_set_state (udpsrc0, GST_STATE_NULL);
576     gst_object_unref (udpsrc0);
577
578     tmp_rtp++;
579     goto again;
580   }
581
582   /* allocate port+1 for RTCP now */
583   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
584   if (udpsrc1 == NULL)
585     goto no_udp_rtcp_protocol;
586
587   /* set port */
588   tmp_rtcp = tmp_rtp + 1;
589   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
590
591   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
592   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
593   if (ret == GST_STATE_CHANGE_FAILURE) {
594
595     if (++count > 20)
596       goto no_ports;
597
598     gst_element_set_state (udpsrc0, GST_STATE_NULL);
599     gst_object_unref (udpsrc0);
600
601     gst_element_set_state (udpsrc1, GST_STATE_NULL);
602     gst_object_unref (udpsrc1);
603
604     tmp_rtp += 2;
605     goto again;
606   }
607
608   /* all fine, do port check */
609   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
610   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
611
612   /* this should not happen... */
613   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
614     goto port_error;
615
616   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
617   if (!udpsink0)
618     goto no_udp_protocol;
619
620   g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
621   g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
622   g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
623
624   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
625   if (!udpsink1)
626     goto no_udp_protocol;
627
628   g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
629   g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
630   g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
631   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
632   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
633
634   /* we keep these elements, we configure all in configure_transport when the
635    * server told us to really use the UDP ports. */
636   stream->udpsrc[0] = udpsrc0;
637   stream->udpsrc[1] = udpsrc1;
638   stream->udpsink[0] = udpsink0;
639   stream->udpsink[1] = udpsink1;
640   stream->server_port.min = rtpport;
641   stream->server_port.max = rtcpport;
642
643   return TRUE;
644
645   /* ERRORS */
646 no_udp_protocol:
647   {
648     goto cleanup;
649   }
650 no_ports:
651   {
652     goto cleanup;
653   }
654 no_udp_rtcp_protocol:
655   {
656     goto cleanup;
657   }
658 port_error:
659   {
660     goto cleanup;
661   }
662 cleanup:
663   {
664     if (udpsrc0) {
665       gst_element_set_state (udpsrc0, GST_STATE_NULL);
666       gst_object_unref (udpsrc0);
667     }
668     if (udpsrc1) {
669       gst_element_set_state (udpsrc1, GST_STATE_NULL);
670       gst_object_unref (udpsrc1);
671     }
672     if (udpsink0) {
673       gst_element_set_state (udpsink0, GST_STATE_NULL);
674       gst_object_unref (udpsink0);
675     }
676     if (udpsink1) {
677       gst_element_set_state (udpsink1, GST_STATE_NULL);
678       gst_object_unref (udpsink1);
679     }
680     return FALSE;
681   }
682 }
683
684 static void
685 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
686 {
687   gchar *capsstr;
688   GstCaps *newcaps, *oldcaps;
689
690   if ((newcaps = GST_PAD_CAPS (pad)))
691     gst_caps_ref (newcaps);
692
693   oldcaps = stream->caps;
694   stream->caps = newcaps;
695
696   if (oldcaps)
697     gst_caps_unref (oldcaps);
698
699   capsstr = gst_caps_to_string (newcaps);
700   g_message ("stream %p received caps %s", stream, capsstr);
701   g_free (capsstr);
702 }
703
704 static void
705 dump_structure (const GstStructure *s)
706 {
707   gchar *sstr;
708
709   sstr = gst_structure_to_string (s);
710   g_message ("structure: %s", sstr);
711   g_free (sstr);
712 }
713
714 static GstRTSPMediaTrans *
715 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
716 {
717   GList *walk;
718   GstRTSPMediaTrans *result = NULL;
719   const gchar *tmp;
720   gchar *dest;
721   guint port;
722
723   if (rtcp_from == NULL)
724     return NULL;
725
726   tmp = g_strrstr (rtcp_from, ":");
727   if (tmp == NULL)
728     return NULL;
729
730   port = atoi (tmp + 1);
731   dest = g_strndup (rtcp_from, tmp - rtcp_from);
732
733   g_message ("finding %s:%d", dest, port);
734
735   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
736     GstRTSPMediaTrans *trans = walk->data;
737     gint min, max;
738
739     min = trans->transport->client_port.min;
740     max = trans->transport->client_port.max;
741
742     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
743       result = trans;
744       break;
745     }
746   }
747   g_free (dest);
748
749   return result;
750 }
751
752 static void
753 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
754 {
755   GstStructure *stats;
756   GstRTSPMediaTrans *trans;
757
758   g_message ("%p: new source %p", stream, source);
759
760   /* see if we have a stream to match with the origin of the RTCP packet */
761   trans = g_object_get_qdata (source, ssrc_stream_map_key);
762   if (trans == NULL) {
763     g_object_get (source, "stats", &stats, NULL);
764     if (stats) {
765       const gchar *rtcp_from;
766
767       dump_structure (stats);
768
769       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
770       if ((trans = find_transport (stream, rtcp_from))) {
771         g_message ("%p: found transport %p for source  %p", stream, trans, source);
772
773         /* keep ref to the source */
774         trans->rtpsource = source;
775
776         g_object_set_qdata (source, ssrc_stream_map_key, trans);
777       }
778       gst_structure_free (stats);
779     }
780   } else {
781     g_message ("%p: source %p for transport %p", stream, source, trans);
782   }
783 }
784
785 static void
786 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
787 {
788   g_message ("%p: new SDES %p", stream, source);
789 }
790
791 static void
792 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
793 {
794   GstRTSPMediaTrans *trans;
795
796   trans = g_object_get_qdata (source, ssrc_stream_map_key);
797
798   g_message ("%p: source %p in transport %p is active", stream, source, trans);
799
800   if (trans && trans->keep_alive)
801     trans->keep_alive (trans->ka_user_data);
802
803 #ifdef DUMP_STATS
804   {
805     GstStructure *stats;
806     g_object_get (source, "stats", &stats, NULL);
807     if (stats) {
808       dump_structure (stats);
809       gst_structure_free (stats);
810     }
811   }
812 #endif
813 }
814
815 static void
816 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
817 {
818   g_message ("%p: source %p bye", stream, source);
819 }
820
821 static void
822 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
823 {
824   GstRTSPMediaTrans *trans;
825
826   g_message ("%p: source %p bye timeout", stream, source);
827
828   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
829     trans->rtpsource = NULL;
830     trans->timeout = TRUE;
831   }
832 }
833
834 static void
835 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
836 {
837   GstRTSPMediaTrans *trans;
838
839   g_message ("%p: source %p timeout", stream, source);
840
841   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
842     trans->rtpsource = NULL;
843     trans->timeout = TRUE;
844   }
845 }
846
847 static GstFlowReturn
848 handle_new_buffer (GstAppSink *sink, gpointer user_data)
849 {
850   GList *walk;
851   GstBuffer *buffer;
852   GstRTSPMediaStream *stream;
853
854   buffer = gst_app_sink_pull_buffer (sink);
855   if (!buffer)
856     return GST_FLOW_OK;
857
858   stream = (GstRTSPMediaStream *) user_data;
859
860   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
861     GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
862
863     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
864       if (tr->send_rtp) 
865         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
866     }
867     else {
868       if (tr->send_rtcp) 
869         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
870     }
871   }
872   gst_buffer_unref (buffer);
873
874   return GST_FLOW_OK;
875 }
876
877 static GstAppSinkCallbacks sink_cb = {
878   NULL,  /* not interested in EOS */
879   NULL,  /* not interested in preroll buffers */
880   handle_new_buffer
881 };
882
883 /* prepare the pipeline objects to handle @stream in @media */
884 static gboolean
885 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
886 {
887   gchar *name;
888   GstPad *pad, *teepad, *selpad;
889   GstPadLinkReturn ret;
890   gint i;
891
892   /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
893    * for sending RTP/RTCP. The sender and receiver ports are shared between the
894    * elements */
895   if (!alloc_udp_ports (stream))
896     return FALSE;
897
898   /* add the ports to the pipeline */
899   for (i = 0; i < 2; i++) {
900     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
901     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
902   }
903
904   /* create elements for the TCP transfer */
905   for (i = 0; i < 2; i++) {
906     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
907     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
908     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
909     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
910     g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
911     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
912     gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
913     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
914                   &sink_cb, stream, NULL);
915   }
916
917   /* hook up the stream to the RTP session elements. */
918   name = g_strdup_printf ("send_rtp_sink_%d", idx);
919   stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
920   g_free (name);
921   name = g_strdup_printf ("send_rtp_src_%d", idx);
922   stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
923   g_free (name);
924   name = g_strdup_printf ("send_rtcp_src_%d", idx);
925   stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
926   g_free (name);
927   name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
928   stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
929   g_free (name);
930   name = g_strdup_printf ("recv_rtp_sink_%d", idx);
931   stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
932   g_free (name);
933
934   /* get the session */
935   g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
936                   &stream->session);
937
938   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
939       stream);
940   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
941       stream);
942   g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
943       stream);
944   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
945       stream);
946   g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
947       stream);
948   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
949       stream);
950
951   /* link the RTP pad to the session manager */
952   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
953   if (ret != GST_PAD_LINK_OK)
954     goto link_failed;
955
956   /* make tee for RTP and link to stream */
957   stream->tee[0] = gst_element_factory_make ("tee", NULL);
958   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
959
960   pad = gst_element_get_static_pad (stream->tee[0], "sink");
961   gst_pad_link (stream->send_rtp_src, pad);
962   gst_object_unref (pad);
963
964   /* link RTP sink, we're pretty sure this will work. */
965   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
966   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
967   gst_pad_link (teepad, pad);
968   gst_object_unref (pad);
969   gst_object_unref (teepad);
970
971   teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
972   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
973   gst_pad_link (teepad, pad);
974   gst_object_unref (pad);
975   gst_object_unref (teepad);
976
977   /* make tee for RTCP */
978   stream->tee[1] = gst_element_factory_make ("tee", NULL);
979   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
980
981   pad = gst_element_get_static_pad (stream->tee[1], "sink");
982   gst_pad_link (stream->send_rtcp_src, pad);
983   gst_object_unref (pad);
984
985   /* link RTCP elements */
986   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
987   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
988   gst_pad_link (teepad, pad);
989   gst_object_unref (pad);
990   gst_object_unref (teepad);
991
992   teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
993   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
994   gst_pad_link (teepad, pad);
995   gst_object_unref (pad);
996   gst_object_unref (teepad);
997
998   /* make selector for the RTP receivers */
999   stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1000   g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1001   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1002
1003   pad = gst_element_get_static_pad (stream->selector[0], "src");
1004   gst_pad_link (pad, stream->recv_rtp_sink);
1005   gst_object_unref (pad);
1006
1007   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1008   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1009   gst_pad_link (pad, selpad);
1010   gst_object_unref (pad);
1011   gst_object_unref (selpad);
1012
1013   selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1014   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1015   gst_pad_link (pad, selpad);
1016   gst_object_unref (pad);
1017   gst_object_unref (selpad);
1018
1019   /* make selector for the RTCP receivers */
1020   stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1021   g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1022   gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1023
1024   pad = gst_element_get_static_pad (stream->selector[1], "src");
1025   gst_pad_link (pad, stream->recv_rtcp_sink);
1026   gst_object_unref (pad);
1027
1028   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1029   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1030   gst_pad_link (pad, selpad);
1031   gst_object_unref (pad);
1032   gst_object_unref (selpad);
1033
1034   selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1035   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1036   gst_pad_link (pad, selpad);
1037   gst_object_unref (pad);
1038   gst_object_unref (selpad);
1039
1040   /* we set and keep these to playing so that they don't cause NO_PREROLL return
1041    * values */
1042   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1043   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1044   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1045   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1046  
1047   /* be notified of caps changes */
1048   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1049                   (GCallback) caps_notify, stream);
1050
1051   stream->prepared = TRUE;
1052
1053   return TRUE;
1054
1055   /* ERRORS */
1056 link_failed:
1057   {
1058     g_warning ("failed to link stream %d", idx);
1059     return FALSE;
1060   }
1061 }
1062
1063 static void
1064 unlock_streams (GstRTSPMedia *media)
1065 {
1066   guint i, n_streams;
1067
1068   /* unlock the udp src elements */
1069   n_streams = gst_rtsp_media_n_streams (media);
1070   for (i = 0; i < n_streams; i++) {
1071     GstRTSPMediaStream *stream;
1072
1073     stream = gst_rtsp_media_get_stream (media, i);
1074
1075     gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1076     gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1077   }
1078 }
1079
1080 static gboolean
1081 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1082 {
1083   GstMessageType type;
1084
1085   type = GST_MESSAGE_TYPE (message);
1086
1087   switch (type) {
1088     case GST_MESSAGE_STATE_CHANGED:
1089       break;
1090     case GST_MESSAGE_BUFFERING:
1091     {
1092       gint percent;
1093
1094       gst_message_parse_buffering (message, &percent);
1095
1096       /* no state management needed for live pipelines */
1097       if (media->is_live)
1098         break;
1099
1100       if (percent == 100) {
1101         /* a 100% message means buffering is done */
1102         media->buffering = FALSE;
1103         /* if the desired state is playing, go back */
1104         if (media->target_state == GST_STATE_PLAYING) {
1105           g_message ("Buffering done, setting pipeline to PLAYING");
1106           gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1107         }
1108         else {
1109           g_message ("Buffering done");
1110         }
1111       } else {
1112         /* buffering busy */
1113         if (media->buffering == FALSE) {
1114           if (media->target_state == GST_STATE_PLAYING) {
1115             /* we were not buffering but PLAYING, PAUSE  the pipeline. */
1116             g_message ("Buffering, setting pipeline to PAUSED ...");
1117             gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1118           }
1119           else {
1120             g_message ("Buffering ...");
1121           }
1122         }
1123         media->buffering = TRUE;
1124       }
1125       break;
1126     }
1127     case GST_MESSAGE_LATENCY:
1128     {
1129       gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1130       break;
1131     }
1132     case GST_MESSAGE_ERROR:
1133     {
1134       GError *gerror;
1135       gchar *debug;
1136
1137       gst_message_parse_error (message, &gerror, &debug);
1138       g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1139       g_error_free (gerror);
1140       g_free (debug);
1141       break;
1142     }
1143     case GST_MESSAGE_WARNING:
1144     {
1145       GError *gerror;
1146       gchar *debug;
1147
1148       gst_message_parse_warning (message, &gerror, &debug);
1149       g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1150       g_error_free (gerror);
1151       g_free (debug);
1152       break;
1153     }
1154     case GST_MESSAGE_ELEMENT:
1155     {
1156       break;
1157     }
1158     default:
1159       g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1160       break;
1161   }
1162   return TRUE;
1163 }
1164
1165 static gboolean
1166 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1167 {
1168   GstRTSPMediaClass *klass;
1169   gboolean ret;
1170   
1171   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1172
1173   if (klass->handle_message)
1174     ret = klass->handle_message (media, message);
1175   else
1176     ret = FALSE;
1177
1178   return ret;
1179 }
1180
1181 static void
1182 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1183 {
1184   GstRTSPMediaStream *stream;
1185   gchar *name;
1186   gint i;
1187
1188   i = media->streams->len + 1;
1189
1190   g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1191
1192   stream = g_new0 (GstRTSPMediaStream, 1);
1193   stream->payloader = element;
1194
1195   name = g_strdup_printf ("dynpay%d", i);
1196
1197   /* ghost the pad of the payloader to the element */
1198   stream->srcpad = gst_ghost_pad_new (name, pad);
1199   gst_pad_set_active (stream->srcpad, TRUE);
1200   gst_element_add_pad (media->element, stream->srcpad);
1201   g_free (name);
1202
1203   /* add stream now */
1204   g_array_append_val (media->streams, stream);
1205
1206   setup_stream (stream, i, media);
1207
1208   for (i = 0; i < 2; i++) {
1209     gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1210     gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1211     gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1212     gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1213     gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1214   }
1215 }
1216
1217 static void
1218 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1219 {
1220   g_message ("no more pads");
1221   if (media->fakesink) {
1222     gst_object_ref (media->fakesink);
1223     gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1224     gst_element_set_state (media->fakesink, GST_STATE_NULL);
1225     gst_object_unref (media->fakesink);
1226     media->fakesink = NULL;
1227     g_message ("removed fakesink");
1228   }
1229 }
1230
1231 /**
1232  * gst_rtsp_media_prepare:
1233  * @obj: a #GstRTSPMedia
1234  *
1235  * Prepare @media for streaming. This function will create the pipeline and
1236  * other objects to manage the streaming.
1237  *
1238  * It will preroll the pipeline and collect vital information about the streams
1239  * such as the duration.
1240  *
1241  * Returns: %TRUE on success.
1242  */
1243 gboolean
1244 gst_rtsp_media_prepare (GstRTSPMedia *media)
1245 {
1246   GstStateChangeReturn ret;
1247   guint i, n_streams;
1248   GstRTSPMediaClass *klass;
1249   GstBus *bus;
1250   GList *walk;
1251
1252   if (media->prepared)
1253     goto was_prepared;
1254
1255   if (!media->reusable && media->reused)
1256     goto is_reused;
1257
1258   g_message ("preparing media %p", media);
1259
1260   media->pipeline = gst_pipeline_new ("media-pipeline");
1261   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1262
1263   /* add the pipeline bus to our custom mainloop */
1264   media->source = gst_bus_create_watch (bus);
1265   gst_object_unref (bus);
1266
1267   g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1268
1269   klass = GST_RTSP_MEDIA_GET_CLASS (media);
1270   media->id = g_source_attach (media->source, klass->context);
1271
1272   gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
1273
1274   media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1275
1276   /* add stuff to the bin */
1277   gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1278
1279   /* link streams we already have, other streams might appear when we have
1280    * dynamic elements */
1281   n_streams = gst_rtsp_media_n_streams (media);
1282   for (i = 0; i < n_streams; i++) {
1283     GstRTSPMediaStream *stream;
1284
1285     stream = gst_rtsp_media_get_stream (media, i);
1286
1287     setup_stream (stream, i, media);
1288   }
1289
1290   for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1291     GstElement *elem = walk->data;
1292
1293     g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1294     g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1295
1296     media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1297     gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1298   }
1299
1300   /* first go to PAUSED */
1301   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1302   media->target_state = GST_STATE_PAUSED;
1303
1304   switch (ret) {
1305     case GST_STATE_CHANGE_SUCCESS:
1306       break;
1307     case GST_STATE_CHANGE_ASYNC:
1308       break;
1309     case GST_STATE_CHANGE_NO_PREROLL:
1310       /* we need to go to PLAYING */
1311       g_message ("live media %p", media);
1312       media->is_live = TRUE;
1313       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1314       if (ret == GST_STATE_CHANGE_FAILURE)
1315         goto state_failed;
1316       break;
1317     case GST_STATE_CHANGE_FAILURE:
1318       goto state_failed;
1319   }
1320
1321   /* now wait for all pads to be prerolled */
1322   ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1323   if (ret == GST_STATE_CHANGE_FAILURE)
1324     goto state_failed;
1325
1326   /* collect stats about the media */
1327   collect_media_stats (media);
1328
1329   g_message ("object %p is prerolled", media);
1330
1331   media->prepared = TRUE;
1332
1333   return TRUE;
1334
1335   /* OK */
1336 was_prepared:
1337   {
1338     return TRUE;
1339   }
1340   /* ERRORS */
1341 state_failed:
1342   {
1343     g_warning ("failed to preroll pipeline");
1344     unlock_streams (media);
1345     gst_element_set_state (media->pipeline, GST_STATE_NULL);
1346     return FALSE;
1347   }
1348 is_reused:
1349   {
1350     g_warning ("can not reuse media %p", media);
1351     return FALSE;
1352   }
1353 }
1354
1355 /**
1356  * gst_rtsp_media_unprepare:
1357  * @obj: a #GstRTSPMedia
1358  *
1359  * Unprepare @media. After this call, the media should be prepared again before
1360  * it can be used again. If the media is set to be non-reusable, a new instance
1361  * must be created.
1362  *
1363  * Returns: %TRUE on success.
1364  */
1365 gboolean
1366 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1367 {
1368   if (!media->prepared)
1369     return TRUE;
1370
1371   g_message ("unprepare media %p", media);
1372   media->target_state = GST_STATE_NULL;
1373   gst_element_set_state (media->pipeline, GST_STATE_NULL);
1374
1375   media->prepared = FALSE;
1376   media->reused = TRUE;
1377
1378   /* when the media is not reusable, this will effectively unref the media and
1379    * recreate it */
1380   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1381
1382   return TRUE;
1383 }
1384
1385 /**
1386  * gst_rtsp_media_set_state:
1387  * @media: a #GstRTSPMedia
1388  * @state: the target state of the media
1389  * @transports: a GArray of #GstRTSPMediaTrans pointers
1390  *
1391  * Set the state of @media to @state and for the transports in @transports.
1392  *
1393  * Returns: %TRUE on success.
1394  */
1395 gboolean
1396 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1397 {
1398   gint i;
1399   GstStateChangeReturn ret;
1400   gboolean add, remove, do_state;
1401   gint old_active;
1402
1403   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1404   g_return_val_if_fail (transports != NULL, FALSE);
1405
1406   /* NULL and READY are the same */
1407   if (state == GST_STATE_READY)
1408     state = GST_STATE_NULL;
1409
1410   add = remove = FALSE;
1411
1412   g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1413
1414   switch (state) {
1415     case GST_STATE_NULL:
1416       /* unlock the streams so that they follow the state changes from now on */
1417       unlock_streams (media);
1418       /* fallthrough */
1419     case GST_STATE_PAUSED:
1420       /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1421       if (media->target_state == GST_STATE_PLAYING)
1422         remove = TRUE;
1423       break;
1424     case GST_STATE_PLAYING:
1425       /* we're going to PLAYING, add */
1426       add = TRUE;
1427       break;
1428     default:
1429       break;
1430   }
1431   old_active = media->active;
1432
1433   for (i = 0; i < transports->len; i++) {
1434     GstRTSPMediaTrans *tr;
1435     GstRTSPMediaStream *stream;
1436     GstRTSPTransport *trans;
1437
1438     /* we need a non-NULL entry in the array */
1439     tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1440     if (tr == NULL)
1441       continue;
1442
1443     /* we need a transport */
1444     if (!(trans = tr->transport))
1445       continue;
1446
1447     /* get the stream and add the destinations */
1448     stream = gst_rtsp_media_get_stream (media, tr->idx);
1449     switch (trans->lower_transport) {
1450       case GST_RTSP_LOWER_TRANS_UDP:
1451       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1452       {
1453         gchar *dest;
1454         gint min, max;
1455
1456         dest = trans->destination;
1457         min = trans->client_port.min;
1458         max = trans->client_port.max;
1459
1460         if (add && !tr->active) {
1461           g_message ("adding %s:%d-%d", dest, min, max);
1462           g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1463           g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1464           stream->transports = g_list_prepend (stream->transports, tr);
1465           tr->active = TRUE;
1466           media->active++;
1467         } else if (remove && tr->active) {
1468           g_message ("removing %s:%d-%d", dest, min, max);
1469           g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1470           g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1471           stream->transports = g_list_remove (stream->transports, tr);
1472           tr->active = FALSE;
1473           media->active--;
1474         }
1475         break;
1476       }
1477       case GST_RTSP_LOWER_TRANS_TCP:
1478         if (add && !tr->active) {
1479           g_message ("adding TCP %s", trans->destination);
1480           stream->transports = g_list_prepend (stream->transports, tr);
1481           tr->active = TRUE;
1482           media->active++;
1483         } else if (remove && tr->active) {
1484           g_message ("removing TCP %s", trans->destination);
1485           stream->transports = g_list_remove (stream->transports, tr);
1486           tr->active = FALSE;
1487           media->active--;
1488         }
1489         break;
1490       default:
1491         g_message ("Unknown transport %d", trans->lower_transport);
1492         break;
1493     }
1494   }
1495
1496   /* we just added the first media, do the playing state change */
1497   if (old_active == 0 && add)
1498     do_state = TRUE;
1499   /* if we have no more active media, do the downward state changes */
1500   else if (media->active == 0)
1501     do_state = TRUE;
1502   else
1503     do_state = FALSE;
1504
1505   g_message ("active %d media %p", media->active, media);
1506
1507   if (do_state && media->target_state != state) {
1508     if (state == GST_STATE_NULL) {
1509       gst_rtsp_media_unprepare (media);
1510     } else {
1511       g_message ("state %s media %p", gst_element_state_get_name (state), media);
1512       media->target_state = state;
1513       ret = gst_element_set_state (media->pipeline, state);
1514     }
1515   }
1516
1517   /* remember where we are */
1518   if (state == GST_STATE_PAUSED)
1519     collect_media_stats (media);
1520
1521   return TRUE;
1522 }
1523