2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
25 #include "rtsp-media.h"
27 #define DEFAULT_SHARED FALSE
28 #define DEFAULT_REUSABLE FALSE
30 /* define to dump received RTCP packets */
47 static GQuark ssrc_stream_map_key;
49 static void gst_rtsp_media_get_property (GObject *object, guint propid,
50 GValue *value, GParamSpec *pspec);
51 static void gst_rtsp_media_set_property (GObject *object, guint propid,
52 const GValue *value, GParamSpec *pspec);
53 static void gst_rtsp_media_finalize (GObject * obj);
55 static gpointer do_loop (GstRTSPMediaClass *klass);
56 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
57 static void unlock_streams (GstRTSPMedia *media);
59 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
61 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
64 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
66 GObjectClass *gobject_class;
69 gobject_class = G_OBJECT_CLASS (klass);
71 gobject_class->get_property = gst_rtsp_media_get_property;
72 gobject_class->set_property = gst_rtsp_media_set_property;
73 gobject_class->finalize = gst_rtsp_media_finalize;
75 g_object_class_install_property (gobject_class, PROP_SHARED,
76 g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
77 DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
79 g_object_class_install_property (gobject_class, PROP_REUSABLE,
80 g_param_spec_boolean ("reusable", "Reusable",
81 "If this media pipeline can be reused after an unprepare",
82 DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
84 gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
85 g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
86 G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
87 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
89 klass->context = g_main_context_new ();
90 klass->loop = g_main_loop_new (klass->context, TRUE);
92 klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
94 g_critical ("could not start bus thread: %s", error->message);
96 klass->handle_message = default_handle_message;
98 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
102 gst_rtsp_media_init (GstRTSPMedia * media)
104 media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
105 media->is_live = FALSE;
106 media->buffering = FALSE;
110 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
113 g_object_unref (stream->session);
116 gst_caps_unref (stream->caps);
118 g_list_free (stream->transports);
124 gst_rtsp_media_finalize (GObject * obj)
129 media = GST_RTSP_MEDIA (obj);
131 g_message ("finalize media %p", media);
133 if (media->pipeline) {
134 unlock_streams (media);
135 gst_element_set_state (media->pipeline, GST_STATE_NULL);
136 gst_object_unref (media->pipeline);
139 for (i = 0; i < media->streams->len; i++) {
140 GstRTSPMediaStream *stream;
142 stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
144 gst_rtsp_media_stream_free (stream);
146 g_array_free (media->streams, TRUE);
148 g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
149 g_list_free (media->dynamic);
152 g_source_destroy (media->source);
153 g_source_unref (media->source);
156 G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
160 gst_rtsp_media_get_property (GObject *object, guint propid,
161 GValue *value, GParamSpec *pspec)
163 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
167 g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
170 g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
173 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
178 gst_rtsp_media_set_property (GObject *object, guint propid,
179 const GValue *value, GParamSpec *pspec)
181 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
185 gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
188 gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
191 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
196 do_loop (GstRTSPMediaClass *klass)
198 g_message ("enter mainloop");
199 g_main_loop_run (klass->loop);
200 g_message ("exit mainloop");
206 collect_media_stats (GstRTSPMedia *media)
209 gint64 position, duration;
211 media->range.unit = GST_RTSP_RANGE_NPT;
213 if (media->is_live) {
214 media->range.min.type = GST_RTSP_TIME_NOW;
215 media->range.min.seconds = -1;
216 media->range.max.type = GST_RTSP_TIME_END;
217 media->range.max.seconds = -1;
220 /* get the position */
221 format = GST_FORMAT_TIME;
222 if (!gst_element_query_position (media->pipeline, &format, &position)) {
223 g_message ("position query failed");
227 /* get the duration */
228 format = GST_FORMAT_TIME;
229 if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
230 g_message ("duration query failed");
234 g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
235 GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
237 if (position == -1) {
238 media->range.min.type = GST_RTSP_TIME_NOW;
239 media->range.min.seconds = -1;
242 media->range.min.type = GST_RTSP_TIME_SECONDS;
243 media->range.min.seconds = ((gdouble)position) / GST_SECOND;
245 if (duration == -1) {
246 media->range.max.type = GST_RTSP_TIME_END;
247 media->range.max.seconds = -1;
250 media->range.max.type = GST_RTSP_TIME_SECONDS;
251 media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
257 * gst_rtsp_media_new:
259 * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
260 * element to produde RTP data for one or more related (audio/video/..)
263 * Returns: a new #GstRTSPMedia object.
266 gst_rtsp_media_new (void)
268 GstRTSPMedia *result;
270 result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
276 * gst_rtsp_media_set_shared:
277 * @media: a #GstRTSPMedia
278 * @shared: the new value
280 * Set or unset if the pipeline for @media can be shared will multiple clients.
281 * When @shared is %TRUE, client requests for this media will share the media
285 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
287 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
289 media->shared = shared;
293 * gst_rtsp_media_is_shared:
294 * @media: a #GstRTSPMedia
296 * Check if the pipeline for @media can be shared between multiple clients.
298 * Returns: %TRUE if the media can be shared between clients.
301 gst_rtsp_media_is_shared (GstRTSPMedia *media)
303 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
305 return media->shared;
309 * gst_rtsp_media_set_reusable:
310 * @media: a #GstRTSPMedia
311 * @reusable: the new value
313 * Set or unset if the pipeline for @media can be reused after the pipeline has
317 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
319 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
321 media->reusable = reusable;
325 * gst_rtsp_media_is_reusable:
326 * @media: a #GstRTSPMedia
328 * Check if the pipeline for @media can be reused after an unprepare.
330 * Returns: %TRUE if the media can be reused
333 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
335 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
337 return media->reusable;
341 * gst_rtsp_media_n_streams:
342 * @media: a #GstRTSPMedia
344 * Get the number of streams in this media.
346 * Returns: The number of streams.
349 gst_rtsp_media_n_streams (GstRTSPMedia *media)
351 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
353 return media->streams->len;
357 * gst_rtsp_media_get_stream:
358 * @media: a #GstRTSPMedia
359 * @idx: the stream index
361 * Retrieve the stream with index @idx from @media.
363 * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
364 * that index did not exist.
367 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
369 GstRTSPMediaStream *res;
371 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
373 if (idx < media->streams->len)
374 res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
382 * gst_rtsp_media_seek:
383 * @stream: a #GstRTSPMediaStream
384 * @range: a #GstRTSPTimeRange
386 * Seek the pipeline to @range.
388 * Returns: %TRUE on success.
391 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
396 GstSeekType start_type, stop_type;
398 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
399 g_return_val_if_fail (range != NULL, FALSE);
401 if (range->unit != GST_RTSP_RANGE_NPT)
404 /* depends on the current playing state of the pipeline. We might need to
405 * queue this until we get EOS. */
406 flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
408 start_type = stop_type = GST_SEEK_TYPE_NONE;
410 switch (range->min.type) {
411 case GST_RTSP_TIME_NOW:
414 case GST_RTSP_TIME_SECONDS:
415 /* only seek when something changed */
416 if (media->range.min.seconds == range->min.seconds) {
419 start = range->min.seconds * GST_SECOND;
420 start_type = GST_SEEK_TYPE_SET;
423 case GST_RTSP_TIME_END:
427 switch (range->max.type) {
428 case GST_RTSP_TIME_SECONDS:
429 /* only seek when something changed */
430 if (media->range.max.seconds == range->max.seconds) {
433 stop = range->max.seconds * GST_SECOND;
434 stop_type = GST_SEEK_TYPE_SET;
437 case GST_RTSP_TIME_END:
439 stop_type = GST_SEEK_TYPE_SET;
441 case GST_RTSP_TIME_NOW:
446 if (start != -1 || stop != -1) {
447 g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
448 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
450 res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
451 flags, start_type, start, stop_type, stop);
453 /* and block for the seek to complete */
454 g_message ("done seeking %d", res);
455 gst_element_get_state (media->pipeline, NULL, NULL, -1);
456 g_message ("prerolled again");
458 collect_media_stats (media);
461 g_message ("no seek needed");
470 g_warning ("seek unit %d not supported", range->unit);
475 g_warning ("weird range type %d not supported", range->min.type);
481 * gst_rtsp_media_stream_rtp:
482 * @stream: a #GstRTSPMediaStream
483 * @buffer: a #GstBuffer
485 * Handle an RTP buffer for the stream. This method is usually called when a
486 * message has been received from a client using the TCP transport.
488 * This function takes ownership of @buffer.
490 * Returns: a GstFlowReturn.
493 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
497 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
503 * gst_rtsp_media_stream_rtcp:
504 * @stream: a #GstRTSPMediaStream
505 * @buffer: a #GstBuffer
507 * Handle an RTCP buffer for the stream. This method is usually called when a
508 * message has been received from a client using the TCP transport.
510 * This function takes ownership of @buffer.
512 * Returns: a GstFlowReturn.
515 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
519 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
524 /* Allocate the udp ports and sockets */
526 alloc_udp_ports (GstRTSPMediaStream * stream)
528 GstStateChangeReturn ret;
529 GstElement *udpsrc0, *udpsrc1;
530 GstElement *udpsink0, *udpsink1;
531 gint tmp_rtp, tmp_rtcp;
533 gint rtpport, rtcpport, sockfd;
541 /* Start with random port */
544 /* try to allocate 2 UDP ports, the RTP port should be an even
545 * number and the RTCP port should be the next (uneven) port */
547 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
549 goto no_udp_protocol;
550 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
552 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
553 if (ret == GST_STATE_CHANGE_FAILURE) {
559 gst_element_set_state (udpsrc0, GST_STATE_NULL);
560 gst_object_unref (udpsrc0);
564 goto no_udp_protocol;
567 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
569 /* check if port is even */
570 if ((tmp_rtp & 1) != 0) {
571 /* port not even, close and allocate another */
575 gst_element_set_state (udpsrc0, GST_STATE_NULL);
576 gst_object_unref (udpsrc0);
582 /* allocate port+1 for RTCP now */
583 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
585 goto no_udp_rtcp_protocol;
588 tmp_rtcp = tmp_rtp + 1;
589 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
591 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
592 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
593 if (ret == GST_STATE_CHANGE_FAILURE) {
598 gst_element_set_state (udpsrc0, GST_STATE_NULL);
599 gst_object_unref (udpsrc0);
601 gst_element_set_state (udpsrc1, GST_STATE_NULL);
602 gst_object_unref (udpsrc1);
608 /* all fine, do port check */
609 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
610 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
612 /* this should not happen... */
613 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
616 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
618 goto no_udp_protocol;
620 g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
621 g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
622 g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
624 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
626 goto no_udp_protocol;
628 g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
629 g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
630 g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
631 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
632 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
634 /* we keep these elements, we configure all in configure_transport when the
635 * server told us to really use the UDP ports. */
636 stream->udpsrc[0] = udpsrc0;
637 stream->udpsrc[1] = udpsrc1;
638 stream->udpsink[0] = udpsink0;
639 stream->udpsink[1] = udpsink1;
640 stream->server_port.min = rtpport;
641 stream->server_port.max = rtcpport;
654 no_udp_rtcp_protocol:
665 gst_element_set_state (udpsrc0, GST_STATE_NULL);
666 gst_object_unref (udpsrc0);
669 gst_element_set_state (udpsrc1, GST_STATE_NULL);
670 gst_object_unref (udpsrc1);
673 gst_element_set_state (udpsink0, GST_STATE_NULL);
674 gst_object_unref (udpsink0);
677 gst_element_set_state (udpsink1, GST_STATE_NULL);
678 gst_object_unref (udpsink1);
685 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
688 GstCaps *newcaps, *oldcaps;
690 if ((newcaps = GST_PAD_CAPS (pad)))
691 gst_caps_ref (newcaps);
693 oldcaps = stream->caps;
694 stream->caps = newcaps;
697 gst_caps_unref (oldcaps);
699 capsstr = gst_caps_to_string (newcaps);
700 g_message ("stream %p received caps %s", stream, capsstr);
705 dump_structure (const GstStructure *s)
709 sstr = gst_structure_to_string (s);
710 g_message ("structure: %s", sstr);
714 static GstRTSPMediaTrans *
715 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
718 GstRTSPMediaTrans *result = NULL;
723 if (rtcp_from == NULL)
726 tmp = g_strrstr (rtcp_from, ":");
730 port = atoi (tmp + 1);
731 dest = g_strndup (rtcp_from, tmp - rtcp_from);
733 g_message ("finding %s:%d", dest, port);
735 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
736 GstRTSPMediaTrans *trans = walk->data;
739 min = trans->transport->client_port.min;
740 max = trans->transport->client_port.max;
742 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
753 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
756 GstRTSPMediaTrans *trans;
758 g_message ("%p: new source %p", stream, source);
760 /* see if we have a stream to match with the origin of the RTCP packet */
761 trans = g_object_get_qdata (source, ssrc_stream_map_key);
763 g_object_get (source, "stats", &stats, NULL);
765 const gchar *rtcp_from;
767 dump_structure (stats);
769 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
770 if ((trans = find_transport (stream, rtcp_from))) {
771 g_message ("%p: found transport %p for source %p", stream, trans, source);
773 /* keep ref to the source */
774 trans->rtpsource = source;
776 g_object_set_qdata (source, ssrc_stream_map_key, trans);
778 gst_structure_free (stats);
781 g_message ("%p: source %p for transport %p", stream, source, trans);
786 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
788 g_message ("%p: new SDES %p", stream, source);
792 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
794 GstRTSPMediaTrans *trans;
796 trans = g_object_get_qdata (source, ssrc_stream_map_key);
798 g_message ("%p: source %p in transport %p is active", stream, source, trans);
800 if (trans && trans->keep_alive)
801 trans->keep_alive (trans->ka_user_data);
806 g_object_get (source, "stats", &stats, NULL);
808 dump_structure (stats);
809 gst_structure_free (stats);
816 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
818 g_message ("%p: source %p bye", stream, source);
822 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
824 GstRTSPMediaTrans *trans;
826 g_message ("%p: source %p bye timeout", stream, source);
828 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
829 trans->rtpsource = NULL;
830 trans->timeout = TRUE;
835 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
837 GstRTSPMediaTrans *trans;
839 g_message ("%p: source %p timeout", stream, source);
841 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
842 trans->rtpsource = NULL;
843 trans->timeout = TRUE;
848 handle_new_buffer (GstAppSink *sink, gpointer user_data)
852 GstRTSPMediaStream *stream;
854 buffer = gst_app_sink_pull_buffer (sink);
858 stream = (GstRTSPMediaStream *) user_data;
860 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
861 GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
863 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
865 tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
869 tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
872 gst_buffer_unref (buffer);
877 static GstAppSinkCallbacks sink_cb = {
878 NULL, /* not interested in EOS */
879 NULL, /* not interested in preroll buffers */
883 /* prepare the pipeline objects to handle @stream in @media */
885 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
888 GstPad *pad, *teepad, *selpad;
889 GstPadLinkReturn ret;
892 /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
893 * for sending RTP/RTCP. The sender and receiver ports are shared between the
895 if (!alloc_udp_ports (stream))
898 /* add the ports to the pipeline */
899 for (i = 0; i < 2; i++) {
900 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
901 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
904 /* create elements for the TCP transfer */
905 for (i = 0; i < 2; i++) {
906 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
907 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
908 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
909 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
910 g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
911 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
912 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
913 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
914 &sink_cb, stream, NULL);
917 /* hook up the stream to the RTP session elements. */
918 name = g_strdup_printf ("send_rtp_sink_%d", idx);
919 stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
921 name = g_strdup_printf ("send_rtp_src_%d", idx);
922 stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
924 name = g_strdup_printf ("send_rtcp_src_%d", idx);
925 stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
927 name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
928 stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
930 name = g_strdup_printf ("recv_rtp_sink_%d", idx);
931 stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
934 /* get the session */
935 g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
938 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
940 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
942 g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
944 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
946 g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
948 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
951 /* link the RTP pad to the session manager */
952 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
953 if (ret != GST_PAD_LINK_OK)
956 /* make tee for RTP and link to stream */
957 stream->tee[0] = gst_element_factory_make ("tee", NULL);
958 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
960 pad = gst_element_get_static_pad (stream->tee[0], "sink");
961 gst_pad_link (stream->send_rtp_src, pad);
962 gst_object_unref (pad);
964 /* link RTP sink, we're pretty sure this will work. */
965 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
966 pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
967 gst_pad_link (teepad, pad);
968 gst_object_unref (pad);
969 gst_object_unref (teepad);
971 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
972 pad = gst_element_get_static_pad (stream->appsink[0], "sink");
973 gst_pad_link (teepad, pad);
974 gst_object_unref (pad);
975 gst_object_unref (teepad);
977 /* make tee for RTCP */
978 stream->tee[1] = gst_element_factory_make ("tee", NULL);
979 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
981 pad = gst_element_get_static_pad (stream->tee[1], "sink");
982 gst_pad_link (stream->send_rtcp_src, pad);
983 gst_object_unref (pad);
985 /* link RTCP elements */
986 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
987 pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
988 gst_pad_link (teepad, pad);
989 gst_object_unref (pad);
990 gst_object_unref (teepad);
992 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
993 pad = gst_element_get_static_pad (stream->appsink[1], "sink");
994 gst_pad_link (teepad, pad);
995 gst_object_unref (pad);
996 gst_object_unref (teepad);
998 /* make selector for the RTP receivers */
999 stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1000 g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1001 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1003 pad = gst_element_get_static_pad (stream->selector[0], "src");
1004 gst_pad_link (pad, stream->recv_rtp_sink);
1005 gst_object_unref (pad);
1007 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1008 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1009 gst_pad_link (pad, selpad);
1010 gst_object_unref (pad);
1011 gst_object_unref (selpad);
1013 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1014 pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1015 gst_pad_link (pad, selpad);
1016 gst_object_unref (pad);
1017 gst_object_unref (selpad);
1019 /* make selector for the RTCP receivers */
1020 stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1021 g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1022 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1024 pad = gst_element_get_static_pad (stream->selector[1], "src");
1025 gst_pad_link (pad, stream->recv_rtcp_sink);
1026 gst_object_unref (pad);
1028 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1029 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1030 gst_pad_link (pad, selpad);
1031 gst_object_unref (pad);
1032 gst_object_unref (selpad);
1034 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1035 pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1036 gst_pad_link (pad, selpad);
1037 gst_object_unref (pad);
1038 gst_object_unref (selpad);
1040 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1042 gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1043 gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1044 gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1045 gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1047 /* be notified of caps changes */
1048 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1049 (GCallback) caps_notify, stream);
1051 stream->prepared = TRUE;
1058 g_warning ("failed to link stream %d", idx);
1064 unlock_streams (GstRTSPMedia *media)
1068 /* unlock the udp src elements */
1069 n_streams = gst_rtsp_media_n_streams (media);
1070 for (i = 0; i < n_streams; i++) {
1071 GstRTSPMediaStream *stream;
1073 stream = gst_rtsp_media_get_stream (media, i);
1075 gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1076 gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1081 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1083 GstMessageType type;
1085 type = GST_MESSAGE_TYPE (message);
1088 case GST_MESSAGE_STATE_CHANGED:
1090 case GST_MESSAGE_BUFFERING:
1094 gst_message_parse_buffering (message, &percent);
1096 /* no state management needed for live pipelines */
1100 if (percent == 100) {
1101 /* a 100% message means buffering is done */
1102 media->buffering = FALSE;
1103 /* if the desired state is playing, go back */
1104 if (media->target_state == GST_STATE_PLAYING) {
1105 g_message ("Buffering done, setting pipeline to PLAYING");
1106 gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1109 g_message ("Buffering done");
1112 /* buffering busy */
1113 if (media->buffering == FALSE) {
1114 if (media->target_state == GST_STATE_PLAYING) {
1115 /* we were not buffering but PLAYING, PAUSE the pipeline. */
1116 g_message ("Buffering, setting pipeline to PAUSED ...");
1117 gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1120 g_message ("Buffering ...");
1123 media->buffering = TRUE;
1127 case GST_MESSAGE_LATENCY:
1129 gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1132 case GST_MESSAGE_ERROR:
1137 gst_message_parse_error (message, &gerror, &debug);
1138 g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1139 g_error_free (gerror);
1143 case GST_MESSAGE_WARNING:
1148 gst_message_parse_warning (message, &gerror, &debug);
1149 g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1150 g_error_free (gerror);
1154 case GST_MESSAGE_ELEMENT:
1159 g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1166 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1168 GstRTSPMediaClass *klass;
1171 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1173 if (klass->handle_message)
1174 ret = klass->handle_message (media, message);
1182 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1184 GstRTSPMediaStream *stream;
1188 i = media->streams->len + 1;
1190 g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1192 stream = g_new0 (GstRTSPMediaStream, 1);
1193 stream->payloader = element;
1195 name = g_strdup_printf ("dynpay%d", i);
1197 /* ghost the pad of the payloader to the element */
1198 stream->srcpad = gst_ghost_pad_new (name, pad);
1199 gst_pad_set_active (stream->srcpad, TRUE);
1200 gst_element_add_pad (media->element, stream->srcpad);
1203 /* add stream now */
1204 g_array_append_val (media->streams, stream);
1206 setup_stream (stream, i, media);
1208 for (i = 0; i < 2; i++) {
1209 gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1210 gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1211 gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1212 gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1213 gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1218 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1220 g_message ("no more pads");
1221 if (media->fakesink) {
1222 gst_object_ref (media->fakesink);
1223 gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1224 gst_element_set_state (media->fakesink, GST_STATE_NULL);
1225 gst_object_unref (media->fakesink);
1226 media->fakesink = NULL;
1227 g_message ("removed fakesink");
1232 * gst_rtsp_media_prepare:
1233 * @obj: a #GstRTSPMedia
1235 * Prepare @media for streaming. This function will create the pipeline and
1236 * other objects to manage the streaming.
1238 * It will preroll the pipeline and collect vital information about the streams
1239 * such as the duration.
1241 * Returns: %TRUE on success.
1244 gst_rtsp_media_prepare (GstRTSPMedia *media)
1246 GstStateChangeReturn ret;
1248 GstRTSPMediaClass *klass;
1252 if (media->prepared)
1255 if (!media->reusable && media->reused)
1258 g_message ("preparing media %p", media);
1260 media->pipeline = gst_pipeline_new ("media-pipeline");
1261 bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1263 /* add the pipeline bus to our custom mainloop */
1264 media->source = gst_bus_create_watch (bus);
1265 gst_object_unref (bus);
1267 g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1269 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1270 media->id = g_source_attach (media->source, klass->context);
1272 gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
1274 media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1276 /* add stuff to the bin */
1277 gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1279 /* link streams we already have, other streams might appear when we have
1280 * dynamic elements */
1281 n_streams = gst_rtsp_media_n_streams (media);
1282 for (i = 0; i < n_streams; i++) {
1283 GstRTSPMediaStream *stream;
1285 stream = gst_rtsp_media_get_stream (media, i);
1287 setup_stream (stream, i, media);
1290 for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1291 GstElement *elem = walk->data;
1293 g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1294 g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1296 media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1297 gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1300 /* first go to PAUSED */
1301 ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1302 media->target_state = GST_STATE_PAUSED;
1305 case GST_STATE_CHANGE_SUCCESS:
1307 case GST_STATE_CHANGE_ASYNC:
1309 case GST_STATE_CHANGE_NO_PREROLL:
1310 /* we need to go to PLAYING */
1311 g_message ("live media %p", media);
1312 media->is_live = TRUE;
1313 ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1314 if (ret == GST_STATE_CHANGE_FAILURE)
1317 case GST_STATE_CHANGE_FAILURE:
1321 /* now wait for all pads to be prerolled */
1322 ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1323 if (ret == GST_STATE_CHANGE_FAILURE)
1326 /* collect stats about the media */
1327 collect_media_stats (media);
1329 g_message ("object %p is prerolled", media);
1331 media->prepared = TRUE;
1343 g_warning ("failed to preroll pipeline");
1344 unlock_streams (media);
1345 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1350 g_warning ("can not reuse media %p", media);
1356 * gst_rtsp_media_unprepare:
1357 * @obj: a #GstRTSPMedia
1359 * Unprepare @media. After this call, the media should be prepared again before
1360 * it can be used again. If the media is set to be non-reusable, a new instance
1363 * Returns: %TRUE on success.
1366 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1368 if (!media->prepared)
1371 g_message ("unprepare media %p", media);
1372 media->target_state = GST_STATE_NULL;
1373 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1375 media->prepared = FALSE;
1376 media->reused = TRUE;
1378 /* when the media is not reusable, this will effectively unref the media and
1380 g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1386 * gst_rtsp_media_set_state:
1387 * @media: a #GstRTSPMedia
1388 * @state: the target state of the media
1389 * @transports: a GArray of #GstRTSPMediaTrans pointers
1391 * Set the state of @media to @state and for the transports in @transports.
1393 * Returns: %TRUE on success.
1396 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1399 GstStateChangeReturn ret;
1400 gboolean add, remove, do_state;
1403 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1404 g_return_val_if_fail (transports != NULL, FALSE);
1406 /* NULL and READY are the same */
1407 if (state == GST_STATE_READY)
1408 state = GST_STATE_NULL;
1410 add = remove = FALSE;
1412 g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1415 case GST_STATE_NULL:
1416 /* unlock the streams so that they follow the state changes from now on */
1417 unlock_streams (media);
1419 case GST_STATE_PAUSED:
1420 /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1421 if (media->target_state == GST_STATE_PLAYING)
1424 case GST_STATE_PLAYING:
1425 /* we're going to PLAYING, add */
1431 old_active = media->active;
1433 for (i = 0; i < transports->len; i++) {
1434 GstRTSPMediaTrans *tr;
1435 GstRTSPMediaStream *stream;
1436 GstRTSPTransport *trans;
1438 /* we need a non-NULL entry in the array */
1439 tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1443 /* we need a transport */
1444 if (!(trans = tr->transport))
1447 /* get the stream and add the destinations */
1448 stream = gst_rtsp_media_get_stream (media, tr->idx);
1449 switch (trans->lower_transport) {
1450 case GST_RTSP_LOWER_TRANS_UDP:
1451 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1456 dest = trans->destination;
1457 min = trans->client_port.min;
1458 max = trans->client_port.max;
1460 if (add && !tr->active) {
1461 g_message ("adding %s:%d-%d", dest, min, max);
1462 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1463 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1464 stream->transports = g_list_prepend (stream->transports, tr);
1467 } else if (remove && tr->active) {
1468 g_message ("removing %s:%d-%d", dest, min, max);
1469 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1470 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1471 stream->transports = g_list_remove (stream->transports, tr);
1477 case GST_RTSP_LOWER_TRANS_TCP:
1478 if (add && !tr->active) {
1479 g_message ("adding TCP %s", trans->destination);
1480 stream->transports = g_list_prepend (stream->transports, tr);
1483 } else if (remove && tr->active) {
1484 g_message ("removing TCP %s", trans->destination);
1485 stream->transports = g_list_remove (stream->transports, tr);
1491 g_message ("Unknown transport %d", trans->lower_transport);
1496 /* we just added the first media, do the playing state change */
1497 if (old_active == 0 && add)
1499 /* if we have no more active media, do the downward state changes */
1500 else if (media->active == 0)
1505 g_message ("active %d media %p", media->active, media);
1507 if (do_state && media->target_state != state) {
1508 if (state == GST_STATE_NULL) {
1509 gst_rtsp_media_unprepare (media);
1511 g_message ("state %s media %p", gst_element_state_get_name (state), media);
1512 media->target_state = state;
1513 ret = gst_element_set_state (media->pipeline, state);
1517 /* remember where we are */
1518 if (state == GST_STATE_PAUSED)
1519 collect_media_stats (media);