2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
25 #include "rtsp-media.h"
27 #define DEFAULT_SHARED FALSE
28 #define DEFAULT_REUSABLE FALSE
30 /* define to dump received RTCP packets */
47 GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
48 #define GST_CAT_DEFAULT rtsp_media_debug
50 static GQuark ssrc_stream_map_key;
52 static void gst_rtsp_media_get_property (GObject * object, guint propid,
53 GValue * value, GParamSpec * pspec);
54 static void gst_rtsp_media_set_property (GObject * object, guint propid,
55 const GValue * value, GParamSpec * pspec);
56 static void gst_rtsp_media_finalize (GObject * obj);
58 static gpointer do_loop (GstRTSPMediaClass * klass);
59 static gboolean default_handle_message (GstRTSPMedia * media,
60 GstMessage * message);
61 static gboolean default_unprepare (GstRTSPMedia * media);
62 static void unlock_streams (GstRTSPMedia * media);
64 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
66 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
69 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
71 GObjectClass *gobject_class;
74 gobject_class = G_OBJECT_CLASS (klass);
76 gobject_class->get_property = gst_rtsp_media_get_property;
77 gobject_class->set_property = gst_rtsp_media_set_property;
78 gobject_class->finalize = gst_rtsp_media_finalize;
80 g_object_class_install_property (gobject_class, PROP_SHARED,
81 g_param_spec_boolean ("shared", "Shared",
82 "If this media pipeline can be shared", DEFAULT_SHARED,
83 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
85 g_object_class_install_property (gobject_class, PROP_REUSABLE,
86 g_param_spec_boolean ("reusable", "Reusable",
87 "If this media pipeline can be reused after an unprepare",
88 DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
90 gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
91 g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
92 G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
93 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
95 klass->context = g_main_context_new ();
96 klass->loop = g_main_loop_new (klass->context, TRUE);
98 klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
100 g_critical ("could not start bus thread: %s", error->message);
102 klass->handle_message = default_handle_message;
103 klass->unprepare = default_unprepare;
105 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
109 gst_rtsp_media_init (GstRTSPMedia * media)
111 media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
115 gst_rtsp_media_stream_free (GstRTSPMediaStream * stream)
118 g_object_unref (stream->session);
121 gst_caps_unref (stream->caps);
123 if (stream->send_rtp_sink)
124 gst_object_unref (stream->send_rtp_sink);
125 if (stream->send_rtp_src)
126 gst_object_unref (stream->send_rtp_src);
127 if (stream->send_rtcp_src)
128 gst_object_unref (stream->send_rtcp_src);
129 if (stream->recv_rtcp_sink)
130 gst_object_unref (stream->recv_rtcp_sink);
131 if (stream->recv_rtp_sink)
132 gst_object_unref (stream->recv_rtp_sink);
134 g_list_free (stream->transports);
140 gst_rtsp_media_finalize (GObject * obj)
145 media = GST_RTSP_MEDIA (obj);
147 GST_INFO ("finalize media %p", media);
149 if (media->pipeline) {
150 unlock_streams (media);
151 gst_element_set_state (media->pipeline, GST_STATE_NULL);
152 gst_object_unref (media->pipeline);
155 for (i = 0; i < media->streams->len; i++) {
156 GstRTSPMediaStream *stream;
158 stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
160 gst_rtsp_media_stream_free (stream);
162 g_array_free (media->streams, TRUE);
164 g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
165 g_list_free (media->dynamic);
168 g_source_destroy (media->source);
169 g_source_unref (media->source);
172 G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
176 gst_rtsp_media_get_property (GObject * object, guint propid,
177 GValue * value, GParamSpec * pspec)
179 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
183 g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
186 g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
189 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
194 gst_rtsp_media_set_property (GObject * object, guint propid,
195 const GValue * value, GParamSpec * pspec)
197 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
201 gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
204 gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
207 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
212 do_loop (GstRTSPMediaClass * klass)
214 GST_INFO ("enter mainloop");
215 g_main_loop_run (klass->loop);
216 GST_INFO ("exit mainloop");
222 collect_media_stats (GstRTSPMedia * media)
225 gint64 position, duration;
227 media->range.unit = GST_RTSP_RANGE_NPT;
229 if (media->is_live) {
230 media->range.min.type = GST_RTSP_TIME_NOW;
231 media->range.min.seconds = -1;
232 media->range.max.type = GST_RTSP_TIME_END;
233 media->range.max.seconds = -1;
235 /* get the position */
236 format = GST_FORMAT_TIME;
237 if (!gst_element_query_position (media->pipeline, &format, &position)) {
238 GST_INFO ("position query failed");
242 /* get the duration */
243 format = GST_FORMAT_TIME;
244 if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
245 GST_INFO ("duration query failed");
249 GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
250 GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
252 if (position == -1) {
253 media->range.min.type = GST_RTSP_TIME_NOW;
254 media->range.min.seconds = -1;
256 media->range.min.type = GST_RTSP_TIME_SECONDS;
257 media->range.min.seconds = ((gdouble) position) / GST_SECOND;
259 if (duration == -1) {
260 media->range.max.type = GST_RTSP_TIME_END;
261 media->range.max.seconds = -1;
263 media->range.max.type = GST_RTSP_TIME_SECONDS;
264 media->range.max.seconds = ((gdouble) duration) / GST_SECOND;
270 * gst_rtsp_media_new:
272 * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
273 * element to produde RTP data for one or more related (audio/video/..)
276 * Returns: a new #GstRTSPMedia object.
279 gst_rtsp_media_new (void)
281 GstRTSPMedia *result;
283 result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
289 * gst_rtsp_media_set_shared:
290 * @media: a #GstRTSPMedia
291 * @shared: the new value
293 * Set or unset if the pipeline for @media can be shared will multiple clients.
294 * When @shared is %TRUE, client requests for this media will share the media
298 gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared)
300 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
302 media->shared = shared;
306 * gst_rtsp_media_is_shared:
307 * @media: a #GstRTSPMedia
309 * Check if the pipeline for @media can be shared between multiple clients.
311 * Returns: %TRUE if the media can be shared between clients.
314 gst_rtsp_media_is_shared (GstRTSPMedia * media)
316 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
318 return media->shared;
322 * gst_rtsp_media_set_reusable:
323 * @media: a #GstRTSPMedia
324 * @reusable: the new value
326 * Set or unset if the pipeline for @media can be reused after the pipeline has
330 gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable)
332 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
334 media->reusable = reusable;
338 * gst_rtsp_media_is_reusable:
339 * @media: a #GstRTSPMedia
341 * Check if the pipeline for @media can be reused after an unprepare.
343 * Returns: %TRUE if the media can be reused
346 gst_rtsp_media_is_reusable (GstRTSPMedia * media)
348 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
350 return media->reusable;
354 * gst_rtsp_media_n_streams:
355 * @media: a #GstRTSPMedia
357 * Get the number of streams in this media.
359 * Returns: The number of streams.
362 gst_rtsp_media_n_streams (GstRTSPMedia * media)
364 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
366 return media->streams->len;
370 * gst_rtsp_media_get_stream:
371 * @media: a #GstRTSPMedia
372 * @idx: the stream index
374 * Retrieve the stream with index @idx from @media.
376 * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
377 * that index did not exist.
380 gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx)
382 GstRTSPMediaStream *res;
384 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
386 if (idx < media->streams->len)
387 res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
395 * gst_rtsp_media_seek:
396 * @stream: a #GstRTSPMediaStream
397 * @range: a #GstRTSPTimeRange
399 * Seek the pipeline to @range.
401 * Returns: %TRUE on success.
404 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
409 GstSeekType start_type, stop_type;
411 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
412 g_return_val_if_fail (range != NULL, FALSE);
414 if (range->unit != GST_RTSP_RANGE_NPT)
417 /* depends on the current playing state of the pipeline. We might need to
418 * queue this until we get EOS. */
419 flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
421 start_type = stop_type = GST_SEEK_TYPE_NONE;
423 switch (range->min.type) {
424 case GST_RTSP_TIME_NOW:
427 case GST_RTSP_TIME_SECONDS:
428 /* only seek when something changed */
429 if (media->range.min.seconds == range->min.seconds) {
432 start = range->min.seconds * GST_SECOND;
433 start_type = GST_SEEK_TYPE_SET;
436 case GST_RTSP_TIME_END:
440 switch (range->max.type) {
441 case GST_RTSP_TIME_SECONDS:
442 /* only seek when something changed */
443 if (media->range.max.seconds == range->max.seconds) {
446 stop = range->max.seconds * GST_SECOND;
447 stop_type = GST_SEEK_TYPE_SET;
450 case GST_RTSP_TIME_END:
452 stop_type = GST_SEEK_TYPE_SET;
454 case GST_RTSP_TIME_NOW:
459 if (start != -1 || stop != -1) {
460 GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
461 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
463 res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
464 flags, start_type, start, stop_type, stop);
466 /* and block for the seek to complete */
467 GST_INFO ("done seeking %d", res);
468 gst_element_get_state (media->pipeline, NULL, NULL, -1);
469 GST_INFO ("prerolled again");
471 collect_media_stats (media);
473 GST_INFO ("no seek needed");
482 GST_WARNING ("seek unit %d not supported", range->unit);
487 GST_WARNING ("weird range type %d not supported", range->min.type);
493 * gst_rtsp_media_stream_rtp:
494 * @stream: a #GstRTSPMediaStream
495 * @buffer: a #GstBuffer
497 * Handle an RTP buffer for the stream. This method is usually called when a
498 * message has been received from a client using the TCP transport.
500 * This function takes ownership of @buffer.
502 * Returns: a GstFlowReturn.
505 gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer)
509 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
515 * gst_rtsp_media_stream_rtcp:
516 * @stream: a #GstRTSPMediaStream
517 * @buffer: a #GstBuffer
519 * Handle an RTCP buffer for the stream. This method is usually called when a
520 * message has been received from a client using the TCP transport.
522 * This function takes ownership of @buffer.
524 * Returns: a GstFlowReturn.
527 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer)
531 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
536 /* Allocate the udp ports and sockets */
538 alloc_udp_ports (GstRTSPMediaStream * stream)
540 GstStateChangeReturn ret;
541 GstElement *udpsrc0, *udpsrc1;
542 GstElement *udpsink0, *udpsink1;
543 gint tmp_rtp, tmp_rtcp;
545 gint rtpport, rtcpport, sockfd;
553 /* Start with random port */
556 /* try to allocate 2 UDP ports, the RTP port should be an even
557 * number and the RTCP port should be the next (uneven) port */
559 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
561 goto no_udp_protocol;
562 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
564 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
565 if (ret == GST_STATE_CHANGE_FAILURE) {
571 gst_element_set_state (udpsrc0, GST_STATE_NULL);
572 gst_object_unref (udpsrc0);
576 goto no_udp_protocol;
579 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
581 /* check if port is even */
582 if ((tmp_rtp & 1) != 0) {
583 /* port not even, close and allocate another */
587 gst_element_set_state (udpsrc0, GST_STATE_NULL);
588 gst_object_unref (udpsrc0);
594 /* allocate port+1 for RTCP now */
595 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
597 goto no_udp_rtcp_protocol;
600 tmp_rtcp = tmp_rtp + 1;
601 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
603 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
604 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
605 if (ret == GST_STATE_CHANGE_FAILURE) {
610 gst_element_set_state (udpsrc0, GST_STATE_NULL);
611 gst_object_unref (udpsrc0);
613 gst_element_set_state (udpsrc1, GST_STATE_NULL);
614 gst_object_unref (udpsrc1);
620 /* all fine, do port check */
621 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
622 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
624 /* this should not happen... */
625 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
628 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
630 goto no_udp_protocol;
632 g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
633 g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
634 g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
636 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
638 goto no_udp_protocol;
640 g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
641 g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
642 g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
643 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
644 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
646 g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
647 g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
648 g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
649 g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
651 /* we keep these elements, we configure all in configure_transport when the
652 * server told us to really use the UDP ports. */
653 stream->udpsrc[0] = udpsrc0;
654 stream->udpsrc[1] = udpsrc1;
655 stream->udpsink[0] = udpsink0;
656 stream->udpsink[1] = udpsink1;
657 stream->server_port.min = rtpport;
658 stream->server_port.max = rtcpport;
671 no_udp_rtcp_protocol:
682 gst_element_set_state (udpsrc0, GST_STATE_NULL);
683 gst_object_unref (udpsrc0);
686 gst_element_set_state (udpsrc1, GST_STATE_NULL);
687 gst_object_unref (udpsrc1);
690 gst_element_set_state (udpsink0, GST_STATE_NULL);
691 gst_object_unref (udpsink0);
694 gst_element_set_state (udpsink1, GST_STATE_NULL);
695 gst_object_unref (udpsink1);
702 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
705 GstCaps *newcaps, *oldcaps;
707 if ((newcaps = GST_PAD_CAPS (pad)))
708 gst_caps_ref (newcaps);
710 oldcaps = stream->caps;
711 stream->caps = newcaps;
714 gst_caps_unref (oldcaps);
716 capsstr = gst_caps_to_string (newcaps);
717 GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr);
722 dump_structure (const GstStructure * s)
726 sstr = gst_structure_to_string (s);
727 GST_INFO ("structure: %s", sstr);
731 static GstRTSPMediaTrans *
732 find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from)
735 GstRTSPMediaTrans *result = NULL;
740 if (rtcp_from == NULL)
743 tmp = g_strrstr (rtcp_from, ":");
747 port = atoi (tmp + 1);
748 dest = g_strndup (rtcp_from, tmp - rtcp_from);
750 GST_INFO ("finding %s:%d", dest, port);
752 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
753 GstRTSPMediaTrans *trans = walk->data;
756 min = trans->transport->client_port.min;
757 max = trans->transport->client_port.max;
759 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
771 on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
774 GstRTSPMediaTrans *trans;
776 GST_INFO ("%p: new source %p", stream, source);
778 /* see if we have a stream to match with the origin of the RTCP packet */
779 trans = g_object_get_qdata (source, ssrc_stream_map_key);
781 g_object_get (source, "stats", &stats, NULL);
783 const gchar *rtcp_from;
785 dump_structure (stats);
787 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
788 if ((trans = find_transport (stream, rtcp_from))) {
789 GST_INFO ("%p: found transport %p for source %p", stream, trans,
792 /* keep ref to the source */
793 trans->rtpsource = source;
795 g_object_set_qdata (source, ssrc_stream_map_key, trans);
797 gst_structure_free (stats);
800 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
805 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream)
807 GST_INFO ("%p: new SDES %p", stream, source);
811 on_ssrc_active (GObject * session, GObject * source,
812 GstRTSPMediaStream * stream)
814 GstRTSPMediaTrans *trans;
816 trans = g_object_get_qdata (source, ssrc_stream_map_key);
818 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
820 if (trans && trans->keep_alive)
821 trans->keep_alive (trans->ka_user_data);
826 g_object_get (source, "stats", &stats, NULL);
828 dump_structure (stats);
829 gst_structure_free (stats);
836 on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream)
838 GST_INFO ("%p: source %p bye", stream, source);
842 on_bye_timeout (GObject * session, GObject * source,
843 GstRTSPMediaStream * stream)
845 GstRTSPMediaTrans *trans;
847 GST_INFO ("%p: source %p bye timeout", stream, source);
849 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
850 trans->rtpsource = NULL;
851 trans->timeout = TRUE;
856 on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream)
858 GstRTSPMediaTrans *trans;
860 GST_INFO ("%p: source %p timeout", stream, source);
862 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
863 trans->rtpsource = NULL;
864 trans->timeout = TRUE;
869 handle_new_buffer (GstAppSink * sink, gpointer user_data)
873 GstRTSPMediaStream *stream;
875 buffer = gst_app_sink_pull_buffer (sink);
879 stream = (GstRTSPMediaStream *) user_data;
881 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
882 GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
884 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
886 tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
889 tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
892 gst_buffer_unref (buffer);
897 static GstAppSinkCallbacks sink_cb = {
898 NULL, /* not interested in EOS */
899 NULL, /* not interested in preroll buffers */
903 /* prepare the pipeline objects to handle @stream in @media */
905 setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media)
908 GstPad *pad, *teepad, *selpad;
909 GstPadLinkReturn ret;
912 /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
913 * for sending RTP/RTCP. The sender and receiver ports are shared between the
915 if (!alloc_udp_ports (stream))
918 /* add the ports to the pipeline */
919 for (i = 0; i < 2; i++) {
920 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
921 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
924 /* create elements for the TCP transfer */
925 for (i = 0; i < 2; i++) {
926 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
927 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
928 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
929 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
930 g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
931 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
932 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
933 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
934 &sink_cb, stream, NULL);
937 /* hook up the stream to the RTP session elements. */
938 name = g_strdup_printf ("send_rtp_sink_%d", idx);
939 stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
941 name = g_strdup_printf ("send_rtp_src_%d", idx);
942 stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
944 name = g_strdup_printf ("send_rtcp_src_%d", idx);
945 stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
947 name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
948 stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
950 name = g_strdup_printf ("recv_rtp_sink_%d", idx);
951 stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
954 /* get the session */
955 g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
958 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
960 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
962 g_signal_connect (stream->session, "on-ssrc-active",
963 (GCallback) on_ssrc_active, stream);
964 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
966 g_signal_connect (stream->session, "on-bye-timeout",
967 (GCallback) on_bye_timeout, stream);
968 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
971 /* link the RTP pad to the session manager */
972 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
973 if (ret != GST_PAD_LINK_OK)
976 /* make tee for RTP and link to stream */
977 stream->tee[0] = gst_element_factory_make ("tee", NULL);
978 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
980 pad = gst_element_get_static_pad (stream->tee[0], "sink");
981 gst_pad_link (stream->send_rtp_src, pad);
982 gst_object_unref (pad);
984 /* link RTP sink, we're pretty sure this will work. */
985 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
986 pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
987 gst_pad_link (teepad, pad);
988 gst_object_unref (pad);
989 gst_object_unref (teepad);
991 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
992 pad = gst_element_get_static_pad (stream->appsink[0], "sink");
993 gst_pad_link (teepad, pad);
994 gst_object_unref (pad);
995 gst_object_unref (teepad);
997 /* make tee for RTCP */
998 stream->tee[1] = gst_element_factory_make ("tee", NULL);
999 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
1001 pad = gst_element_get_static_pad (stream->tee[1], "sink");
1002 gst_pad_link (stream->send_rtcp_src, pad);
1003 gst_object_unref (pad);
1005 /* link RTCP elements */
1006 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1007 pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1008 gst_pad_link (teepad, pad);
1009 gst_object_unref (pad);
1010 gst_object_unref (teepad);
1012 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1013 pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1014 gst_pad_link (teepad, pad);
1015 gst_object_unref (pad);
1016 gst_object_unref (teepad);
1018 /* make selector for the RTP receivers */
1019 stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1020 g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1021 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1023 pad = gst_element_get_static_pad (stream->selector[0], "src");
1024 gst_pad_link (pad, stream->recv_rtp_sink);
1025 gst_object_unref (pad);
1027 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1028 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1029 gst_pad_link (pad, selpad);
1030 gst_object_unref (pad);
1031 gst_object_unref (selpad);
1033 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1034 pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1035 gst_pad_link (pad, selpad);
1036 gst_object_unref (pad);
1037 gst_object_unref (selpad);
1039 /* make selector for the RTCP receivers */
1040 stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1041 g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1042 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1044 pad = gst_element_get_static_pad (stream->selector[1], "src");
1045 gst_pad_link (pad, stream->recv_rtcp_sink);
1046 gst_object_unref (pad);
1048 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1049 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1050 gst_pad_link (pad, selpad);
1051 gst_object_unref (pad);
1052 gst_object_unref (selpad);
1054 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1055 pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1056 gst_pad_link (pad, selpad);
1057 gst_object_unref (pad);
1058 gst_object_unref (selpad);
1060 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1062 gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1063 gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1064 gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1065 gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1067 /* be notified of caps changes */
1068 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1069 (GCallback) caps_notify, stream);
1071 stream->prepared = TRUE;
1078 GST_WARNING ("failed to link stream %d", idx);
1084 unlock_streams (GstRTSPMedia * media)
1088 /* unlock the udp src elements */
1089 n_streams = gst_rtsp_media_n_streams (media);
1090 for (i = 0; i < n_streams; i++) {
1091 GstRTSPMediaStream *stream;
1093 stream = gst_rtsp_media_get_stream (media, i);
1095 gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1096 gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1101 default_handle_message (GstRTSPMedia * media, GstMessage * message)
1103 GstMessageType type;
1105 type = GST_MESSAGE_TYPE (message);
1108 case GST_MESSAGE_STATE_CHANGED:
1110 case GST_MESSAGE_BUFFERING:
1114 gst_message_parse_buffering (message, &percent);
1116 /* no state management needed for live pipelines */
1120 if (percent == 100) {
1121 /* a 100% message means buffering is done */
1122 media->buffering = FALSE;
1123 /* if the desired state is playing, go back */
1124 if (media->target_state == GST_STATE_PLAYING) {
1125 GST_INFO ("Buffering done, setting pipeline to PLAYING");
1126 gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1128 GST_INFO ("Buffering done");
1131 /* buffering busy */
1132 if (media->buffering == FALSE) {
1133 if (media->target_state == GST_STATE_PLAYING) {
1134 /* we were not buffering but PLAYING, PAUSE the pipeline. */
1135 GST_INFO ("Buffering, setting pipeline to PAUSED ...");
1136 gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1138 GST_INFO ("Buffering ...");
1141 media->buffering = TRUE;
1145 case GST_MESSAGE_LATENCY:
1147 gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1150 case GST_MESSAGE_ERROR:
1155 gst_message_parse_error (message, &gerror, &debug);
1156 GST_WARNING ("%p: got error %s (%s)", media, gerror->message, debug);
1157 g_error_free (gerror);
1161 case GST_MESSAGE_WARNING:
1166 gst_message_parse_warning (message, &gerror, &debug);
1167 GST_WARNING ("%p: got warning %s (%s)", media, gerror->message, debug);
1168 g_error_free (gerror);
1172 case GST_MESSAGE_ELEMENT:
1174 case GST_MESSAGE_STREAM_STATUS:
1177 GST_INFO ("%p: got message type %s", media,
1178 gst_message_type_get_name (type));
1185 bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media)
1187 GstRTSPMediaClass *klass;
1190 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1192 if (klass->handle_message)
1193 ret = klass->handle_message (media, message);
1201 pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
1203 GstRTSPMediaStream *stream;
1207 i = media->streams->len + 1;
1209 GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1211 stream = g_new0 (GstRTSPMediaStream, 1);
1212 stream->payloader = element;
1214 name = g_strdup_printf ("dynpay%d", i);
1216 /* ghost the pad of the payloader to the element */
1217 stream->srcpad = gst_ghost_pad_new (name, pad);
1218 gst_pad_set_active (stream->srcpad, TRUE);
1219 gst_element_add_pad (media->element, stream->srcpad);
1222 /* add stream now */
1223 g_array_append_val (media->streams, stream);
1225 setup_stream (stream, i, media);
1227 for (i = 0; i < 2; i++) {
1228 gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1229 gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1230 gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1231 gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1232 gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1237 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
1239 GST_INFO ("no more pads");
1240 if (media->fakesink) {
1241 gst_object_ref (media->fakesink);
1242 gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1243 gst_element_set_state (media->fakesink, GST_STATE_NULL);
1244 gst_object_unref (media->fakesink);
1245 media->fakesink = NULL;
1246 GST_INFO ("removed fakesink");
1251 * gst_rtsp_media_prepare:
1252 * @obj: a #GstRTSPMedia
1254 * Prepare @media for streaming. This function will create the pipeline and
1255 * other objects to manage the streaming.
1257 * It will preroll the pipeline and collect vital information about the streams
1258 * such as the duration.
1260 * Returns: %TRUE on success.
1263 gst_rtsp_media_prepare (GstRTSPMedia * media)
1265 GstStateChangeReturn ret;
1267 GstRTSPMediaClass *klass;
1271 if (media->prepared)
1274 if (!media->reusable && media->reused)
1277 GST_INFO ("preparing media %p", media);
1279 /* reset some variables */
1280 media->is_live = FALSE;
1281 media->buffering = FALSE;
1283 bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1285 /* add the pipeline bus to our custom mainloop */
1286 media->source = gst_bus_create_watch (bus);
1287 gst_object_unref (bus);
1289 g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1291 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1292 media->id = g_source_attach (media->source, klass->context);
1294 media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL);
1296 /* add stuff to the bin */
1297 gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1299 /* link streams we already have, other streams might appear when we have
1300 * dynamic elements */
1301 n_streams = gst_rtsp_media_n_streams (media);
1302 for (i = 0; i < n_streams; i++) {
1303 GstRTSPMediaStream *stream;
1305 stream = gst_rtsp_media_get_stream (media, i);
1307 setup_stream (stream, i, media);
1310 for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1311 GstElement *elem = walk->data;
1313 g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1314 g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1316 /* we add a fakesink here in order to make the state change async. We remove
1317 * the fakesink again in the no-more-pads callback. */
1318 media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1319 gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1322 GST_INFO ("setting pipeline to PAUSED for media %p", media);
1323 /* first go to PAUSED */
1324 ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1325 media->target_state = GST_STATE_PAUSED;
1328 case GST_STATE_CHANGE_SUCCESS:
1329 GST_INFO ("SUCCESS state change for media %p", media);
1331 case GST_STATE_CHANGE_ASYNC:
1332 GST_INFO ("ASYNC state change for media %p", media);
1334 case GST_STATE_CHANGE_NO_PREROLL:
1335 /* we need to go to PLAYING */
1336 GST_INFO ("NO_PREROLL state change: live media %p", media);
1337 media->is_live = TRUE;
1338 ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1339 if (ret == GST_STATE_CHANGE_FAILURE)
1342 case GST_STATE_CHANGE_FAILURE:
1346 /* now wait for all pads to be prerolled */
1347 ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1348 if (ret == GST_STATE_CHANGE_FAILURE)
1351 /* collect stats about the media */
1352 collect_media_stats (media);
1354 GST_INFO ("object %p is prerolled", media);
1356 media->prepared = TRUE;
1368 GST_WARNING ("failed to preroll pipeline");
1369 unlock_streams (media);
1370 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1375 GST_WARNING ("can not reuse media %p", media);
1381 * gst_rtsp_media_unprepare:
1382 * @obj: a #GstRTSPMedia
1384 * Unprepare @media. After this call, the media should be prepared again before
1385 * it can be used again. If the media is set to be non-reusable, a new instance
1388 * Returns: %TRUE on success.
1391 gst_rtsp_media_unprepare (GstRTSPMedia * media)
1393 GstRTSPMediaClass *klass;
1396 if (!media->prepared)
1399 GST_INFO ("unprepare media %p", media);
1400 media->target_state = GST_STATE_NULL;
1402 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1403 if (klass->unprepare)
1404 success = klass->unprepare (media);
1408 media->prepared = FALSE;
1409 media->reused = TRUE;
1411 /* when the media is not reusable, this will effectively unref the media and
1413 g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1419 default_unprepare (GstRTSPMedia * media)
1421 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1427 * gst_rtsp_media_set_state:
1428 * @media: a #GstRTSPMedia
1429 * @state: the target state of the media
1430 * @transports: a GArray of #GstRTSPMediaTrans pointers
1432 * Set the state of @media to @state and for the transports in @transports.
1434 * Returns: %TRUE on success.
1437 gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
1438 GArray * transports)
1441 GstStateChangeReturn ret;
1442 gboolean add, remove, do_state;
1445 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1446 g_return_val_if_fail (transports != NULL, FALSE);
1448 /* NULL and READY are the same */
1449 if (state == GST_STATE_READY)
1450 state = GST_STATE_NULL;
1452 add = remove = FALSE;
1454 GST_INFO ("going to state %s media %p", gst_element_state_get_name (state),
1458 case GST_STATE_NULL:
1459 /* unlock the streams so that they follow the state changes from now on */
1460 unlock_streams (media);
1462 case GST_STATE_PAUSED:
1463 /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1464 if (media->target_state == GST_STATE_PLAYING)
1467 case GST_STATE_PLAYING:
1468 /* we're going to PLAYING, add */
1474 old_active = media->active;
1476 for (i = 0; i < transports->len; i++) {
1477 GstRTSPMediaTrans *tr;
1478 GstRTSPMediaStream *stream;
1479 GstRTSPTransport *trans;
1481 /* we need a non-NULL entry in the array */
1482 tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1486 /* we need a transport */
1487 if (!(trans = tr->transport))
1490 /* get the stream and add the destinations */
1491 stream = gst_rtsp_media_get_stream (media, tr->idx);
1492 switch (trans->lower_transport) {
1493 case GST_RTSP_LOWER_TRANS_UDP:
1494 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1499 dest = trans->destination;
1500 if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1501 min = trans->port.min;
1502 max = trans->port.max;
1504 min = trans->client_port.min;
1505 max = trans->client_port.max;
1508 if (add && !tr->active) {
1509 GST_INFO ("adding %s:%d-%d", dest, min, max);
1510 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1511 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1512 stream->transports = g_list_prepend (stream->transports, tr);
1515 } else if (remove && tr->active) {
1516 GST_INFO ("removing %s:%d-%d", dest, min, max);
1517 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1518 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1519 stream->transports = g_list_remove (stream->transports, tr);
1525 case GST_RTSP_LOWER_TRANS_TCP:
1526 if (add && !tr->active) {
1527 GST_INFO ("adding TCP %s", trans->destination);
1528 stream->transports = g_list_prepend (stream->transports, tr);
1531 } else if (remove && tr->active) {
1532 GST_INFO ("removing TCP %s", trans->destination);
1533 stream->transports = g_list_remove (stream->transports, tr);
1539 GST_INFO ("Unknown transport %d", trans->lower_transport);
1544 /* we just added the first media, do the playing state change */
1545 if (old_active == 0 && add)
1547 /* if we have no more active media, do the downward state changes */
1548 else if (media->active == 0)
1553 GST_INFO ("active %d media %p", media->active, media);
1555 if (do_state && media->target_state != state) {
1556 if (state == GST_STATE_NULL) {
1557 gst_rtsp_media_unprepare (media);
1559 GST_INFO ("state %s media %p", gst_element_state_get_name (state), media);
1560 media->target_state = state;
1561 ret = gst_element_set_state (media->pipeline, state);
1565 /* remember where we are */
1566 if (state == GST_STATE_PAUSED)
1567 collect_media_stats (media);
1573 * gst_rtsp_media_remove_elements:
1574 * @media: a #GstRTSPMedia
1576 * Remove all elements and the pipeline controlled by @media.
1579 gst_rtsp_media_remove_elements (GstRTSPMedia * media)
1583 unlock_streams (media);
1585 for (i = 0; i < media->streams->len; i++) {
1586 GstRTSPMediaStream *stream;
1588 GST_INFO ("Removing elements of stream %d from pipeline", i);
1590 stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
1592 gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
1594 g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
1596 for (j = 0; j < 2; j++) {
1597 gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL);
1598 gst_element_set_state (stream->udpsink[j], GST_STATE_NULL);
1599 gst_element_set_state (stream->appsrc[j], GST_STATE_NULL);
1600 gst_element_set_state (stream->appsink[j], GST_STATE_NULL);
1601 gst_element_set_state (stream->tee[j], GST_STATE_NULL);
1602 gst_element_set_state (stream->selector[j], GST_STATE_NULL);
1604 gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]);
1605 gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]);
1606 gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]);
1607 gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]);
1608 gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]);
1609 gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]);
1612 gst_caps_unref (stream->caps);
1613 stream->caps = NULL;
1614 gst_rtsp_media_stream_free (stream);
1616 g_array_remove_range (media->streams, 0, media->streams->len);
1618 gst_element_set_state (media->rtpbin, GST_STATE_NULL);
1619 gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
1621 gst_object_unref (media->pipeline);
1622 media->pipeline = NULL;