2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
25 #include "rtsp-media.h"
27 #define DEFAULT_SHARED FALSE
28 #define DEFAULT_REUSABLE FALSE
44 static GQuark ssrc_stream_map_key;
46 static void gst_rtsp_media_get_property (GObject *object, guint propid,
47 GValue *value, GParamSpec *pspec);
48 static void gst_rtsp_media_set_property (GObject *object, guint propid,
49 const GValue *value, GParamSpec *pspec);
50 static void gst_rtsp_media_finalize (GObject * obj);
52 static gpointer do_loop (GstRTSPMediaClass *klass);
53 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
54 static void unlock_streams (GstRTSPMedia *media);
56 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
58 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
61 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
63 GObjectClass *gobject_class;
66 gobject_class = G_OBJECT_CLASS (klass);
68 gobject_class->get_property = gst_rtsp_media_get_property;
69 gobject_class->set_property = gst_rtsp_media_set_property;
70 gobject_class->finalize = gst_rtsp_media_finalize;
72 g_object_class_install_property (gobject_class, PROP_SHARED,
73 g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
74 DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
76 g_object_class_install_property (gobject_class, PROP_REUSABLE,
77 g_param_spec_boolean ("reusable", "Reusable",
78 "If this media pipeline can be reused after an unprepare",
79 DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
81 gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
82 g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
83 G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
84 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
86 klass->context = g_main_context_new ();
87 klass->loop = g_main_loop_new (klass->context, TRUE);
89 klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
91 g_critical ("could not start bus thread: %s", error->message);
93 klass->handle_message = default_handle_message;
95 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
99 gst_rtsp_media_init (GstRTSPMedia * media)
101 media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
102 media->is_live = FALSE;
103 media->buffering = FALSE;
107 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
110 g_object_unref (stream->session);
113 gst_caps_unref (stream->caps);
115 g_list_free (stream->transports);
121 gst_rtsp_media_finalize (GObject * obj)
126 media = GST_RTSP_MEDIA (obj);
128 g_message ("finalize media %p", media);
130 if (media->pipeline) {
131 unlock_streams (media);
132 gst_element_set_state (media->pipeline, GST_STATE_NULL);
133 gst_object_unref (media->pipeline);
136 for (i = 0; i < media->streams->len; i++) {
137 GstRTSPMediaStream *stream;
139 stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
141 gst_rtsp_media_stream_free (stream);
143 g_array_free (media->streams, TRUE);
145 g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
146 g_list_free (media->dynamic);
149 g_source_destroy (media->source);
150 g_source_unref (media->source);
153 G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
157 gst_rtsp_media_get_property (GObject *object, guint propid,
158 GValue *value, GParamSpec *pspec)
160 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
164 g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
167 g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
170 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
175 gst_rtsp_media_set_property (GObject *object, guint propid,
176 const GValue *value, GParamSpec *pspec)
178 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
182 gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
185 gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
188 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
193 do_loop (GstRTSPMediaClass *klass)
195 g_message ("enter mainloop");
196 g_main_loop_run (klass->loop);
197 g_message ("exit mainloop");
203 collect_media_stats (GstRTSPMedia *media)
206 gint64 position, duration;
208 media->range.unit = GST_RTSP_RANGE_NPT;
210 if (media->is_live) {
211 media->range.min.type = GST_RTSP_TIME_NOW;
212 media->range.min.seconds = -1;
213 media->range.max.type = GST_RTSP_TIME_END;
214 media->range.max.seconds = -1;
217 /* get the position */
218 format = GST_FORMAT_TIME;
219 if (!gst_element_query_position (media->pipeline, &format, &position)) {
220 g_message ("position query failed");
224 /* get the duration */
225 format = GST_FORMAT_TIME;
226 if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
227 g_message ("duration query failed");
231 g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
232 GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
234 if (position == -1) {
235 media->range.min.type = GST_RTSP_TIME_NOW;
236 media->range.min.seconds = -1;
239 media->range.min.type = GST_RTSP_TIME_SECONDS;
240 media->range.min.seconds = ((gdouble)position) / GST_SECOND;
242 if (duration == -1) {
243 media->range.max.type = GST_RTSP_TIME_END;
244 media->range.max.seconds = -1;
247 media->range.max.type = GST_RTSP_TIME_SECONDS;
248 media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
254 * gst_rtsp_media_new:
256 * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
257 * element to produde RTP data for one or more related (audio/video/..)
260 * Returns: a new #GstRTSPMedia object.
263 gst_rtsp_media_new (void)
265 GstRTSPMedia *result;
267 result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
273 * gst_rtsp_media_set_shared:
274 * @media: a #GstRTSPMedia
275 * @shared: the new value
277 * Set or unset if the pipeline for @media can be shared will multiple clients.
278 * When @shared is %TRUE, client requests for this media will share the media
282 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
284 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
286 media->shared = shared;
290 * gst_rtsp_media_is_shared:
291 * @media: a #GstRTSPMedia
293 * Check if the pipeline for @media can be shared between multiple clients.
295 * Returns: %TRUE if the media can be shared between clients.
298 gst_rtsp_media_is_shared (GstRTSPMedia *media)
300 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
302 return media->shared;
306 * gst_rtsp_media_set_reusable:
307 * @media: a #GstRTSPMedia
308 * @reusable: the new value
310 * Set or unset if the pipeline for @media can be reused after the pipeline has
314 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
316 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
318 media->reusable = reusable;
322 * gst_rtsp_media_is_reusable:
323 * @media: a #GstRTSPMedia
325 * Check if the pipeline for @media can be reused after an unprepare.
327 * Returns: %TRUE if the media can be reused
330 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
332 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
334 return media->reusable;
338 * gst_rtsp_media_n_streams:
339 * @media: a #GstRTSPMedia
341 * Get the number of streams in this media.
343 * Returns: The number of streams.
346 gst_rtsp_media_n_streams (GstRTSPMedia *media)
348 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
350 return media->streams->len;
354 * gst_rtsp_media_get_stream:
355 * @media: a #GstRTSPMedia
356 * @idx: the stream index
358 * Retrieve the stream with index @idx from @media.
360 * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
361 * that index did not exist.
364 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
366 GstRTSPMediaStream *res;
368 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
370 if (idx < media->streams->len)
371 res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
379 * gst_rtsp_media_seek:
380 * @stream: a #GstRTSPMediaStream
381 * @range: a #GstRTSPTimeRange
383 * Seek the pipeline to @range.
385 * Returns: %TRUE on success.
388 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
393 GstSeekType start_type, stop_type;
395 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
396 g_return_val_if_fail (range != NULL, FALSE);
398 if (range->unit != GST_RTSP_RANGE_NPT)
401 /* depends on the current playing state of the pipeline. We might need to
402 * queue this until we get EOS. */
403 flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
405 start_type = stop_type = GST_SEEK_TYPE_NONE;
407 switch (range->min.type) {
408 case GST_RTSP_TIME_NOW:
411 case GST_RTSP_TIME_SECONDS:
412 /* only seek when something changed */
413 if (media->range.min.seconds == range->min.seconds) {
416 start = range->min.seconds * GST_SECOND;
417 start_type = GST_SEEK_TYPE_SET;
420 case GST_RTSP_TIME_END:
424 switch (range->max.type) {
425 case GST_RTSP_TIME_SECONDS:
426 /* only seek when something changed */
427 if (media->range.max.seconds == range->max.seconds) {
430 stop = range->max.seconds * GST_SECOND;
431 stop_type = GST_SEEK_TYPE_SET;
434 case GST_RTSP_TIME_END:
436 stop_type = GST_SEEK_TYPE_SET;
438 case GST_RTSP_TIME_NOW:
443 if (start != -1 || stop != -1) {
444 g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
445 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
447 res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
448 flags, start_type, start, stop_type, stop);
450 /* and block for the seek to complete */
451 g_message ("done seeking %d", res);
452 gst_element_get_state (media->pipeline, NULL, NULL, -1);
453 g_message ("prerolled again");
455 collect_media_stats (media);
458 g_message ("no seek needed");
467 g_warning ("seek unit %d not supported", range->unit);
472 g_warning ("weird range type %d not supported", range->min.type);
478 * gst_rtsp_media_stream_rtp:
479 * @stream: a #GstRTSPMediaStream
480 * @buffer: a #GstBuffer
482 * Handle an RTP buffer for the stream. This method is usually called when a
483 * message has been received from a client using the TCP transport.
485 * This function takes ownership of @buffer.
487 * Returns: a GstFlowReturn.
490 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
494 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
500 * gst_rtsp_media_stream_rtcp:
501 * @stream: a #GstRTSPMediaStream
502 * @buffer: a #GstBuffer
504 * Handle an RTCP buffer for the stream. This method is usually called when a
505 * message has been received from a client using the TCP transport.
507 * This function takes ownership of @buffer.
509 * Returns: a GstFlowReturn.
512 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
516 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
521 /* Allocate the udp ports and sockets */
523 alloc_udp_ports (GstRTSPMediaStream * stream)
525 GstStateChangeReturn ret;
526 GstElement *udpsrc0, *udpsrc1;
527 GstElement *udpsink0, *udpsink1;
528 gint tmp_rtp, tmp_rtcp;
530 gint rtpport, rtcpport, sockfd;
538 /* Start with random port */
541 /* try to allocate 2 UDP ports, the RTP port should be an even
542 * number and the RTCP port should be the next (uneven) port */
544 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
546 goto no_udp_protocol;
547 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
549 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
550 if (ret == GST_STATE_CHANGE_FAILURE) {
556 gst_element_set_state (udpsrc0, GST_STATE_NULL);
557 gst_object_unref (udpsrc0);
561 goto no_udp_protocol;
564 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
566 /* check if port is even */
567 if ((tmp_rtp & 1) != 0) {
568 /* port not even, close and allocate another */
572 gst_element_set_state (udpsrc0, GST_STATE_NULL);
573 gst_object_unref (udpsrc0);
579 /* allocate port+1 for RTCP now */
580 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
582 goto no_udp_rtcp_protocol;
585 tmp_rtcp = tmp_rtp + 1;
586 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
588 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
589 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
590 if (ret == GST_STATE_CHANGE_FAILURE) {
595 gst_element_set_state (udpsrc0, GST_STATE_NULL);
596 gst_object_unref (udpsrc0);
598 gst_element_set_state (udpsrc1, GST_STATE_NULL);
599 gst_object_unref (udpsrc1);
605 /* all fine, do port check */
606 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
607 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
609 /* this should not happen... */
610 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
613 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
615 goto no_udp_protocol;
617 g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
618 g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
619 g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
621 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
623 goto no_udp_protocol;
625 g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
626 g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
627 g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
628 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
629 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
631 /* we keep these elements, we configure all in configure_transport when the
632 * server told us to really use the UDP ports. */
633 stream->udpsrc[0] = udpsrc0;
634 stream->udpsrc[1] = udpsrc1;
635 stream->udpsink[0] = udpsink0;
636 stream->udpsink[1] = udpsink1;
637 stream->server_port.min = rtpport;
638 stream->server_port.max = rtcpport;
651 no_udp_rtcp_protocol:
662 gst_element_set_state (udpsrc0, GST_STATE_NULL);
663 gst_object_unref (udpsrc0);
666 gst_element_set_state (udpsrc1, GST_STATE_NULL);
667 gst_object_unref (udpsrc1);
670 gst_element_set_state (udpsink0, GST_STATE_NULL);
671 gst_object_unref (udpsink0);
674 gst_element_set_state (udpsink1, GST_STATE_NULL);
675 gst_object_unref (udpsink1);
682 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
685 GstCaps *newcaps, *oldcaps;
687 if ((newcaps = GST_PAD_CAPS (pad)))
688 gst_caps_ref (newcaps);
690 oldcaps = stream->caps;
691 stream->caps = newcaps;
694 gst_caps_unref (oldcaps);
696 capsstr = gst_caps_to_string (newcaps);
697 g_message ("stream %p received caps %s", stream, capsstr);
702 dump_structure (const GstStructure *s)
706 sstr = gst_structure_to_string (s);
707 g_message ("structure: %s", sstr);
711 static GstRTSPMediaTrans *
712 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
715 GstRTSPMediaTrans *result = NULL;
720 if (rtcp_from == NULL)
723 tmp = g_strrstr (rtcp_from, ":");
727 port = atoi (tmp + 1);
728 dest = g_strndup (rtcp_from, tmp - rtcp_from);
730 g_message ("finding %s:%d", dest, port);
732 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
733 GstRTSPMediaTrans *trans = walk->data;
736 min = trans->transport->client_port.min;
737 max = trans->transport->client_port.max;
739 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
750 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
753 GstRTSPMediaTrans *trans;
755 g_message ("%p: new source %p", stream, source);
757 /* see if we have a stream to match with the origin of the RTCP packet */
758 trans = g_object_get_qdata (source, ssrc_stream_map_key);
760 g_object_get (source, "stats", &stats, NULL);
762 const gchar *rtcp_from;
764 dump_structure (stats);
766 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
767 if ((trans = find_transport (stream, rtcp_from))) {
768 g_message ("%p: found transport %p for source %p", stream, trans, source);
770 /* keep ref to the source */
771 trans->rtpsource = source;
773 g_object_set_qdata (source, ssrc_stream_map_key, trans);
775 gst_structure_free (stats);
778 g_message ("%p: source %p for transport %p", stream, source, trans);
783 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
785 g_message ("%p: new SDES %p", stream, source);
789 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
791 GstRTSPMediaTrans *trans;
793 trans = g_object_get_qdata (source, ssrc_stream_map_key);
795 g_message ("%p: source %p in transport %p is active", stream, trans, source);
797 if (trans && trans->keep_alive)
798 trans->keep_alive (trans->ka_user_data);
802 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
804 g_message ("%p: source %p bye", stream, source);
808 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
810 GstRTSPMediaTrans *trans;
812 g_message ("%p: source %p bye timeout", stream, source);
814 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
815 trans->rtpsource = NULL;
816 trans->timeout = TRUE;
821 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
823 GstRTSPMediaTrans *trans;
825 g_message ("%p: source %p timeout", stream, source);
827 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
828 trans->rtpsource = NULL;
829 trans->timeout = TRUE;
834 handle_new_buffer (GstAppSink *sink, gpointer user_data)
838 GstRTSPMediaStream *stream;
840 buffer = gst_app_sink_pull_buffer (sink);
844 stream = (GstRTSPMediaStream *) user_data;
846 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
847 GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
849 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
851 tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
855 tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
858 gst_buffer_unref (buffer);
863 static GstAppSinkCallbacks sink_cb = {
864 NULL, /* not interested in EOS */
865 NULL, /* not interested in preroll buffers */
869 /* prepare the pipeline objects to handle @stream in @media */
871 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
874 GstPad *pad, *teepad, *selpad;
875 GstPadLinkReturn ret;
878 /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
879 * for sending RTP/RTCP. The sender and receiver ports are shared between the
881 if (!alloc_udp_ports (stream))
884 /* add the ports to the pipeline */
885 for (i = 0; i < 2; i++) {
886 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
887 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
890 /* create elements for the TCP transfer */
891 for (i = 0; i < 2; i++) {
892 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
893 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
894 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
895 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
896 g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
897 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
898 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
899 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
900 &sink_cb, stream, NULL);
903 /* hook up the stream to the RTP session elements. */
904 name = g_strdup_printf ("send_rtp_sink_%d", idx);
905 stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
907 name = g_strdup_printf ("send_rtp_src_%d", idx);
908 stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
910 name = g_strdup_printf ("send_rtcp_src_%d", idx);
911 stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
913 name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
914 stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
916 name = g_strdup_printf ("recv_rtp_sink_%d", idx);
917 stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
920 /* get the session */
921 g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
924 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
926 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
928 g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
930 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
932 g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
934 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
937 /* link the RTP pad to the session manager */
938 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
939 if (ret != GST_PAD_LINK_OK)
942 /* make tee for RTP and link to stream */
943 stream->tee[0] = gst_element_factory_make ("tee", NULL);
944 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
946 pad = gst_element_get_static_pad (stream->tee[0], "sink");
947 gst_pad_link (stream->send_rtp_src, pad);
948 gst_object_unref (pad);
950 /* link RTP sink, we're pretty sure this will work. */
951 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
952 pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
953 gst_pad_link (teepad, pad);
954 gst_object_unref (pad);
955 gst_object_unref (teepad);
957 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
958 pad = gst_element_get_static_pad (stream->appsink[0], "sink");
959 gst_pad_link (teepad, pad);
960 gst_object_unref (pad);
961 gst_object_unref (teepad);
963 /* make tee for RTCP */
964 stream->tee[1] = gst_element_factory_make ("tee", NULL);
965 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
967 pad = gst_element_get_static_pad (stream->tee[1], "sink");
968 gst_pad_link (stream->send_rtcp_src, pad);
969 gst_object_unref (pad);
971 /* link RTCP elements */
972 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
973 pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
974 gst_pad_link (teepad, pad);
975 gst_object_unref (pad);
976 gst_object_unref (teepad);
978 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
979 pad = gst_element_get_static_pad (stream->appsink[1], "sink");
980 gst_pad_link (teepad, pad);
981 gst_object_unref (pad);
982 gst_object_unref (teepad);
984 /* make selector for the RTP receivers */
985 stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
986 g_object_set (stream->selector[0], "select-all", TRUE, NULL);
987 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
989 pad = gst_element_get_static_pad (stream->selector[0], "src");
990 gst_pad_link (pad, stream->recv_rtp_sink);
991 gst_object_unref (pad);
993 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
994 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
995 gst_pad_link (pad, selpad);
996 gst_object_unref (pad);
997 gst_object_unref (selpad);
999 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1000 pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1001 gst_pad_link (pad, selpad);
1002 gst_object_unref (pad);
1003 gst_object_unref (selpad);
1005 /* make selector for the RTCP receivers */
1006 stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1007 g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1008 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1010 pad = gst_element_get_static_pad (stream->selector[1], "src");
1011 gst_pad_link (pad, stream->recv_rtcp_sink);
1012 gst_object_unref (pad);
1014 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1015 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1016 gst_pad_link (pad, selpad);
1017 gst_object_unref (pad);
1018 gst_object_unref (selpad);
1020 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1021 pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1022 gst_pad_link (pad, selpad);
1023 gst_object_unref (pad);
1024 gst_object_unref (selpad);
1026 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1028 gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1029 gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1030 gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1031 gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1033 /* be notified of caps changes */
1034 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1035 (GCallback) caps_notify, stream);
1037 stream->prepared = TRUE;
1044 g_warning ("failed to link stream %d", idx);
1050 unlock_streams (GstRTSPMedia *media)
1054 /* unlock the udp src elements */
1055 n_streams = gst_rtsp_media_n_streams (media);
1056 for (i = 0; i < n_streams; i++) {
1057 GstRTSPMediaStream *stream;
1059 stream = gst_rtsp_media_get_stream (media, i);
1061 gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1062 gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1067 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1069 GstMessageType type;
1071 type = GST_MESSAGE_TYPE (message);
1074 case GST_MESSAGE_STATE_CHANGED:
1076 case GST_MESSAGE_BUFFERING:
1080 gst_message_parse_buffering (message, &percent);
1082 /* no state management needed for live pipelines */
1086 if (percent == 100) {
1087 /* a 100% message means buffering is done */
1088 media->buffering = FALSE;
1089 /* if the desired state is playing, go back */
1090 if (media->target_state == GST_STATE_PLAYING) {
1091 g_message ("Buffering done, setting pipeline to PLAYING");
1092 gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1095 g_message ("Buffering done");
1098 /* buffering busy */
1099 if (media->buffering == FALSE) {
1100 if (media->target_state == GST_STATE_PLAYING) {
1101 /* we were not buffering but PLAYING, PAUSE the pipeline. */
1102 g_message ("Buffering, setting pipeline to PAUSED ...");
1103 gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1106 g_message ("Buffering ...");
1109 media->buffering = TRUE;
1113 case GST_MESSAGE_LATENCY:
1115 gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1118 case GST_MESSAGE_ERROR:
1123 gst_message_parse_error (message, &gerror, &debug);
1124 g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1125 g_error_free (gerror);
1129 case GST_MESSAGE_WARNING:
1134 gst_message_parse_warning (message, &gerror, &debug);
1135 g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1136 g_error_free (gerror);
1140 case GST_MESSAGE_ELEMENT:
1145 g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1152 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1154 GstRTSPMediaClass *klass;
1157 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1159 if (klass->handle_message)
1160 ret = klass->handle_message (media, message);
1168 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1170 GstRTSPMediaStream *stream;
1174 i = media->streams->len + 1;
1176 g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1178 stream = g_new0 (GstRTSPMediaStream, 1);
1179 stream->payloader = element;
1181 name = g_strdup_printf ("dynpay%d", i);
1183 /* ghost the pad of the payloader to the element */
1184 stream->srcpad = gst_ghost_pad_new (name, pad);
1185 gst_pad_set_active (stream->srcpad, TRUE);
1186 gst_element_add_pad (media->element, stream->srcpad);
1189 /* add stream now */
1190 g_array_append_val (media->streams, stream);
1192 setup_stream (stream, i, media);
1194 for (i = 0; i < 2; i++) {
1195 gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1196 gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1197 gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1198 gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1199 gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1204 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1206 g_message ("no more pads");
1207 if (media->fakesink) {
1208 gst_object_ref (media->fakesink);
1209 gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1210 gst_element_set_state (media->fakesink, GST_STATE_NULL);
1211 gst_object_unref (media->fakesink);
1212 media->fakesink = NULL;
1213 g_message ("removed fakesink");
1218 * gst_rtsp_media_prepare:
1219 * @obj: a #GstRTSPMedia
1221 * Prepare @media for streaming. This function will create the pipeline and
1222 * other objects to manage the streaming.
1224 * It will preroll the pipeline and collect vital information about the streams
1225 * such as the duration.
1227 * Returns: %TRUE on success.
1230 gst_rtsp_media_prepare (GstRTSPMedia *media)
1232 GstStateChangeReturn ret;
1234 GstRTSPMediaClass *klass;
1238 if (media->prepared)
1241 if (!media->reusable && media->reused)
1244 g_message ("preparing media %p", media);
1246 media->pipeline = gst_pipeline_new ("media-pipeline");
1247 bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1249 /* add the pipeline bus to our custom mainloop */
1250 media->source = gst_bus_create_watch (bus);
1251 gst_object_unref (bus);
1253 g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1255 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1256 media->id = g_source_attach (media->source, klass->context);
1258 gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
1260 media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1262 /* add stuff to the bin */
1263 gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1265 /* link streams we already have, other streams might appear when we have
1266 * dynamic elements */
1267 n_streams = gst_rtsp_media_n_streams (media);
1268 for (i = 0; i < n_streams; i++) {
1269 GstRTSPMediaStream *stream;
1271 stream = gst_rtsp_media_get_stream (media, i);
1273 setup_stream (stream, i, media);
1276 for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1277 GstElement *elem = walk->data;
1279 g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1280 g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1282 media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1283 gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1286 /* first go to PAUSED */
1287 ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1288 media->target_state = GST_STATE_PAUSED;
1291 case GST_STATE_CHANGE_SUCCESS:
1293 case GST_STATE_CHANGE_ASYNC:
1295 case GST_STATE_CHANGE_NO_PREROLL:
1296 /* we need to go to PLAYING */
1297 g_message ("live media %p", media);
1298 media->is_live = TRUE;
1299 ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1300 if (ret == GST_STATE_CHANGE_FAILURE)
1303 case GST_STATE_CHANGE_FAILURE:
1307 /* now wait for all pads to be prerolled */
1308 ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1309 if (ret == GST_STATE_CHANGE_FAILURE)
1312 /* collect stats about the media */
1313 collect_media_stats (media);
1315 g_message ("object %p is prerolled", media);
1317 media->prepared = TRUE;
1329 g_warning ("failed to preroll pipeline");
1330 unlock_streams (media);
1331 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1336 g_warning ("can not reuse media %p", media);
1342 * gst_rtsp_media_unprepare:
1343 * @obj: a #GstRTSPMedia
1345 * Unprepare @media. After this call, the media should be prepared again before
1346 * it can be used again. If the media is set to be non-reusable, a new instance
1349 * Returns: %TRUE on success.
1352 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1354 if (!media->prepared)
1357 g_message ("unprepare media %p", media);
1358 media->target_state = GST_STATE_NULL;
1359 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1361 media->prepared = FALSE;
1362 media->reused = TRUE;
1364 /* when the media is not reusable, this will effectively unref the media and
1366 g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1372 * gst_rtsp_media_set_state:
1373 * @media: a #GstRTSPMedia
1374 * @state: the target state of the media
1375 * @transports: a GArray of #GstRTSPMediaTrans pointers
1377 * Set the state of @media to @state and for the transports in @transports.
1379 * Returns: %TRUE on success.
1382 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1385 GstStateChangeReturn ret;
1386 gboolean add, remove, do_state;
1389 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1390 g_return_val_if_fail (transports != NULL, FALSE);
1392 /* NULL and READY are the same */
1393 if (state == GST_STATE_READY)
1394 state = GST_STATE_NULL;
1396 add = remove = FALSE;
1398 g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1401 case GST_STATE_NULL:
1402 /* unlock the streams so that they follow the state changes from now on */
1403 unlock_streams (media);
1405 case GST_STATE_PAUSED:
1406 /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1407 if (media->target_state == GST_STATE_PLAYING)
1410 case GST_STATE_PLAYING:
1411 /* we're going to PLAYING, add */
1417 old_active = media->active;
1419 for (i = 0; i < transports->len; i++) {
1420 GstRTSPMediaTrans *tr;
1421 GstRTSPMediaStream *stream;
1422 GstRTSPTransport *trans;
1424 /* we need a non-NULL entry in the array */
1425 tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1429 /* we need a transport */
1430 if (!(trans = tr->transport))
1433 /* get the stream and add the destinations */
1434 stream = gst_rtsp_media_get_stream (media, tr->idx);
1435 switch (trans->lower_transport) {
1436 case GST_RTSP_LOWER_TRANS_UDP:
1437 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1442 dest = trans->destination;
1443 min = trans->client_port.min;
1444 max = trans->client_port.max;
1446 if (add && !tr->active) {
1447 g_message ("adding %s:%d-%d", dest, min, max);
1448 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1449 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1450 stream->transports = g_list_prepend (stream->transports, tr);
1453 } else if (remove && tr->active) {
1454 g_message ("removing %s:%d-%d", dest, min, max);
1455 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1456 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1457 stream->transports = g_list_remove (stream->transports, tr);
1463 case GST_RTSP_LOWER_TRANS_TCP:
1464 if (add && !tr->active) {
1465 g_message ("adding TCP %s", trans->destination);
1466 stream->transports = g_list_prepend (stream->transports, tr);
1469 } else if (remove && tr->active) {
1470 g_message ("removing TCP %s", trans->destination);
1471 stream->transports = g_list_remove (stream->transports, tr);
1477 g_message ("Unknown transport %d", trans->lower_transport);
1482 /* we just added the first media, do the playing state change */
1483 if (old_active == 0 && add)
1485 /* if we have no more active media, do the downward state changes */
1486 else if (media->active == 0)
1491 g_message ("active %d media %p", media->active, media);
1493 if (do_state && media->target_state != state) {
1494 if (state == GST_STATE_NULL) {
1495 gst_rtsp_media_unprepare (media);
1497 g_message ("state %s media %p", gst_element_state_get_name (state), media);
1498 media->target_state = state;
1499 ret = gst_element_set_state (media->pipeline, state);
1503 /* remember where we are */
1504 if (state == GST_STATE_PAUSED)
1505 collect_media_stats (media);