2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
25 #include "rtsp-media.h"
27 #define DEFAULT_SHARED FALSE
28 #define DEFAULT_REUSABLE FALSE
30 /* define to dump received RTCP packets */
47 static GQuark ssrc_stream_map_key;
49 static void gst_rtsp_media_get_property (GObject *object, guint propid,
50 GValue *value, GParamSpec *pspec);
51 static void gst_rtsp_media_set_property (GObject *object, guint propid,
52 const GValue *value, GParamSpec *pspec);
53 static void gst_rtsp_media_finalize (GObject * obj);
55 static gpointer do_loop (GstRTSPMediaClass *klass);
56 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
57 static gboolean default_unprepare (GstRTSPMedia *media);
58 static void unlock_streams (GstRTSPMedia *media);
60 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
62 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
65 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
67 GObjectClass *gobject_class;
70 gobject_class = G_OBJECT_CLASS (klass);
72 gobject_class->get_property = gst_rtsp_media_get_property;
73 gobject_class->set_property = gst_rtsp_media_set_property;
74 gobject_class->finalize = gst_rtsp_media_finalize;
76 g_object_class_install_property (gobject_class, PROP_SHARED,
77 g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
78 DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
80 g_object_class_install_property (gobject_class, PROP_REUSABLE,
81 g_param_spec_boolean ("reusable", "Reusable",
82 "If this media pipeline can be reused after an unprepare",
83 DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
85 gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
86 g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
87 G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
88 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
90 klass->context = g_main_context_new ();
91 klass->loop = g_main_loop_new (klass->context, TRUE);
93 klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
95 g_critical ("could not start bus thread: %s", error->message);
97 klass->handle_message = default_handle_message;
98 klass->unprepare = default_unprepare;
100 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
104 gst_rtsp_media_init (GstRTSPMedia * media)
106 media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
107 media->is_live = FALSE;
108 media->buffering = FALSE;
112 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
115 g_object_unref (stream->session);
118 gst_caps_unref (stream->caps);
120 if (stream->send_rtp_sink)
121 gst_object_unref (stream->send_rtp_sink);
122 if (stream->send_rtp_src)
123 gst_object_unref (stream->send_rtp_src);
124 if (stream->send_rtcp_src)
125 gst_object_unref (stream->send_rtcp_src);
126 if (stream->recv_rtcp_sink)
127 gst_object_unref (stream->recv_rtcp_sink);
128 if (stream->recv_rtp_sink)
129 gst_object_unref (stream->recv_rtp_sink);
131 g_list_free (stream->transports);
137 gst_rtsp_media_finalize (GObject * obj)
142 media = GST_RTSP_MEDIA (obj);
144 g_message ("finalize media %p", media);
146 if (media->pipeline) {
147 unlock_streams (media);
148 gst_element_set_state (media->pipeline, GST_STATE_NULL);
149 gst_object_unref (media->pipeline);
152 for (i = 0; i < media->streams->len; i++) {
153 GstRTSPMediaStream *stream;
155 stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
157 gst_rtsp_media_stream_free (stream);
159 g_array_free (media->streams, TRUE);
161 g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
162 g_list_free (media->dynamic);
165 g_source_destroy (media->source);
166 g_source_unref (media->source);
169 G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
173 gst_rtsp_media_get_property (GObject *object, guint propid,
174 GValue *value, GParamSpec *pspec)
176 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
180 g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
183 g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
186 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
191 gst_rtsp_media_set_property (GObject *object, guint propid,
192 const GValue *value, GParamSpec *pspec)
194 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
198 gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
201 gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
204 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
209 do_loop (GstRTSPMediaClass *klass)
211 g_message ("enter mainloop");
212 g_main_loop_run (klass->loop);
213 g_message ("exit mainloop");
219 collect_media_stats (GstRTSPMedia *media)
222 gint64 position, duration;
224 media->range.unit = GST_RTSP_RANGE_NPT;
226 if (media->is_live) {
227 media->range.min.type = GST_RTSP_TIME_NOW;
228 media->range.min.seconds = -1;
229 media->range.max.type = GST_RTSP_TIME_END;
230 media->range.max.seconds = -1;
233 /* get the position */
234 format = GST_FORMAT_TIME;
235 if (!gst_element_query_position (media->pipeline, &format, &position)) {
236 g_message ("position query failed");
240 /* get the duration */
241 format = GST_FORMAT_TIME;
242 if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
243 g_message ("duration query failed");
247 g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
248 GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
250 if (position == -1) {
251 media->range.min.type = GST_RTSP_TIME_NOW;
252 media->range.min.seconds = -1;
255 media->range.min.type = GST_RTSP_TIME_SECONDS;
256 media->range.min.seconds = ((gdouble)position) / GST_SECOND;
258 if (duration == -1) {
259 media->range.max.type = GST_RTSP_TIME_END;
260 media->range.max.seconds = -1;
263 media->range.max.type = GST_RTSP_TIME_SECONDS;
264 media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
270 * gst_rtsp_media_new:
272 * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
273 * element to produde RTP data for one or more related (audio/video/..)
276 * Returns: a new #GstRTSPMedia object.
279 gst_rtsp_media_new (void)
281 GstRTSPMedia *result;
283 result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
289 * gst_rtsp_media_set_shared:
290 * @media: a #GstRTSPMedia
291 * @shared: the new value
293 * Set or unset if the pipeline for @media can be shared will multiple clients.
294 * When @shared is %TRUE, client requests for this media will share the media
298 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
300 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
302 media->shared = shared;
306 * gst_rtsp_media_is_shared:
307 * @media: a #GstRTSPMedia
309 * Check if the pipeline for @media can be shared between multiple clients.
311 * Returns: %TRUE if the media can be shared between clients.
314 gst_rtsp_media_is_shared (GstRTSPMedia *media)
316 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
318 return media->shared;
322 * gst_rtsp_media_set_reusable:
323 * @media: a #GstRTSPMedia
324 * @reusable: the new value
326 * Set or unset if the pipeline for @media can be reused after the pipeline has
330 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
332 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
334 media->reusable = reusable;
338 * gst_rtsp_media_is_reusable:
339 * @media: a #GstRTSPMedia
341 * Check if the pipeline for @media can be reused after an unprepare.
343 * Returns: %TRUE if the media can be reused
346 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
348 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
350 return media->reusable;
354 * gst_rtsp_media_n_streams:
355 * @media: a #GstRTSPMedia
357 * Get the number of streams in this media.
359 * Returns: The number of streams.
362 gst_rtsp_media_n_streams (GstRTSPMedia *media)
364 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
366 return media->streams->len;
370 * gst_rtsp_media_get_stream:
371 * @media: a #GstRTSPMedia
372 * @idx: the stream index
374 * Retrieve the stream with index @idx from @media.
376 * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
377 * that index did not exist.
380 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
382 GstRTSPMediaStream *res;
384 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
386 if (idx < media->streams->len)
387 res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
395 * gst_rtsp_media_seek:
396 * @stream: a #GstRTSPMediaStream
397 * @range: a #GstRTSPTimeRange
399 * Seek the pipeline to @range.
401 * Returns: %TRUE on success.
404 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
409 GstSeekType start_type, stop_type;
411 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
412 g_return_val_if_fail (range != NULL, FALSE);
414 if (range->unit != GST_RTSP_RANGE_NPT)
417 /* depends on the current playing state of the pipeline. We might need to
418 * queue this until we get EOS. */
419 flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
421 start_type = stop_type = GST_SEEK_TYPE_NONE;
423 switch (range->min.type) {
424 case GST_RTSP_TIME_NOW:
427 case GST_RTSP_TIME_SECONDS:
428 /* only seek when something changed */
429 if (media->range.min.seconds == range->min.seconds) {
432 start = range->min.seconds * GST_SECOND;
433 start_type = GST_SEEK_TYPE_SET;
436 case GST_RTSP_TIME_END:
440 switch (range->max.type) {
441 case GST_RTSP_TIME_SECONDS:
442 /* only seek when something changed */
443 if (media->range.max.seconds == range->max.seconds) {
446 stop = range->max.seconds * GST_SECOND;
447 stop_type = GST_SEEK_TYPE_SET;
450 case GST_RTSP_TIME_END:
452 stop_type = GST_SEEK_TYPE_SET;
454 case GST_RTSP_TIME_NOW:
459 if (start != -1 || stop != -1) {
460 g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
461 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
463 res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
464 flags, start_type, start, stop_type, stop);
466 /* and block for the seek to complete */
467 g_message ("done seeking %d", res);
468 gst_element_get_state (media->pipeline, NULL, NULL, -1);
469 g_message ("prerolled again");
471 collect_media_stats (media);
474 g_message ("no seek needed");
483 g_warning ("seek unit %d not supported", range->unit);
488 g_warning ("weird range type %d not supported", range->min.type);
494 * gst_rtsp_media_stream_rtp:
495 * @stream: a #GstRTSPMediaStream
496 * @buffer: a #GstBuffer
498 * Handle an RTP buffer for the stream. This method is usually called when a
499 * message has been received from a client using the TCP transport.
501 * This function takes ownership of @buffer.
503 * Returns: a GstFlowReturn.
506 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
510 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
516 * gst_rtsp_media_stream_rtcp:
517 * @stream: a #GstRTSPMediaStream
518 * @buffer: a #GstBuffer
520 * Handle an RTCP buffer for the stream. This method is usually called when a
521 * message has been received from a client using the TCP transport.
523 * This function takes ownership of @buffer.
525 * Returns: a GstFlowReturn.
528 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
532 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
537 /* Allocate the udp ports and sockets */
539 alloc_udp_ports (GstRTSPMediaStream * stream)
541 GstStateChangeReturn ret;
542 GstElement *udpsrc0, *udpsrc1;
543 GstElement *udpsink0, *udpsink1;
544 gint tmp_rtp, tmp_rtcp;
546 gint rtpport, rtcpport, sockfd;
554 /* Start with random port */
557 /* try to allocate 2 UDP ports, the RTP port should be an even
558 * number and the RTCP port should be the next (uneven) port */
560 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
562 goto no_udp_protocol;
563 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
565 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
566 if (ret == GST_STATE_CHANGE_FAILURE) {
572 gst_element_set_state (udpsrc0, GST_STATE_NULL);
573 gst_object_unref (udpsrc0);
577 goto no_udp_protocol;
580 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
582 /* check if port is even */
583 if ((tmp_rtp & 1) != 0) {
584 /* port not even, close and allocate another */
588 gst_element_set_state (udpsrc0, GST_STATE_NULL);
589 gst_object_unref (udpsrc0);
595 /* allocate port+1 for RTCP now */
596 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
598 goto no_udp_rtcp_protocol;
601 tmp_rtcp = tmp_rtp + 1;
602 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
604 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
605 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
606 if (ret == GST_STATE_CHANGE_FAILURE) {
611 gst_element_set_state (udpsrc0, GST_STATE_NULL);
612 gst_object_unref (udpsrc0);
614 gst_element_set_state (udpsrc1, GST_STATE_NULL);
615 gst_object_unref (udpsrc1);
621 /* all fine, do port check */
622 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
623 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
625 /* this should not happen... */
626 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
629 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
631 goto no_udp_protocol;
633 g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
634 g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
635 g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
637 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
639 goto no_udp_protocol;
641 g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
642 g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
643 g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
644 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
645 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
647 /* we keep these elements, we configure all in configure_transport when the
648 * server told us to really use the UDP ports. */
649 stream->udpsrc[0] = udpsrc0;
650 stream->udpsrc[1] = udpsrc1;
651 stream->udpsink[0] = udpsink0;
652 stream->udpsink[1] = udpsink1;
653 stream->server_port.min = rtpport;
654 stream->server_port.max = rtcpport;
667 no_udp_rtcp_protocol:
678 gst_element_set_state (udpsrc0, GST_STATE_NULL);
679 gst_object_unref (udpsrc0);
682 gst_element_set_state (udpsrc1, GST_STATE_NULL);
683 gst_object_unref (udpsrc1);
686 gst_element_set_state (udpsink0, GST_STATE_NULL);
687 gst_object_unref (udpsink0);
690 gst_element_set_state (udpsink1, GST_STATE_NULL);
691 gst_object_unref (udpsink1);
698 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
701 GstCaps *newcaps, *oldcaps;
703 if ((newcaps = GST_PAD_CAPS (pad)))
704 gst_caps_ref (newcaps);
706 oldcaps = stream->caps;
707 stream->caps = newcaps;
710 gst_caps_unref (oldcaps);
712 capsstr = gst_caps_to_string (newcaps);
713 g_message ("stream %p received caps %p, %s", stream, newcaps, capsstr);
718 dump_structure (const GstStructure *s)
722 sstr = gst_structure_to_string (s);
723 g_message ("structure: %s", sstr);
727 static GstRTSPMediaTrans *
728 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
731 GstRTSPMediaTrans *result = NULL;
736 if (rtcp_from == NULL)
739 tmp = g_strrstr (rtcp_from, ":");
743 port = atoi (tmp + 1);
744 dest = g_strndup (rtcp_from, tmp - rtcp_from);
746 g_message ("finding %s:%d", dest, port);
748 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
749 GstRTSPMediaTrans *trans = walk->data;
752 min = trans->transport->client_port.min;
753 max = trans->transport->client_port.max;
755 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
766 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
769 GstRTSPMediaTrans *trans;
771 g_message ("%p: new source %p", stream, source);
773 /* see if we have a stream to match with the origin of the RTCP packet */
774 trans = g_object_get_qdata (source, ssrc_stream_map_key);
776 g_object_get (source, "stats", &stats, NULL);
778 const gchar *rtcp_from;
780 dump_structure (stats);
782 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
783 if ((trans = find_transport (stream, rtcp_from))) {
784 g_message ("%p: found transport %p for source %p", stream, trans, source);
786 /* keep ref to the source */
787 trans->rtpsource = source;
789 g_object_set_qdata (source, ssrc_stream_map_key, trans);
791 gst_structure_free (stats);
794 g_message ("%p: source %p for transport %p", stream, source, trans);
799 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
801 g_message ("%p: new SDES %p", stream, source);
805 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
807 GstRTSPMediaTrans *trans;
809 trans = g_object_get_qdata (source, ssrc_stream_map_key);
811 g_message ("%p: source %p in transport %p is active", stream, source, trans);
813 if (trans && trans->keep_alive)
814 trans->keep_alive (trans->ka_user_data);
819 g_object_get (source, "stats", &stats, NULL);
821 dump_structure (stats);
822 gst_structure_free (stats);
829 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
831 g_message ("%p: source %p bye", stream, source);
835 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
837 GstRTSPMediaTrans *trans;
839 g_message ("%p: source %p bye timeout", stream, source);
841 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
842 trans->rtpsource = NULL;
843 trans->timeout = TRUE;
848 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
850 GstRTSPMediaTrans *trans;
852 g_message ("%p: source %p timeout", stream, source);
854 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
855 trans->rtpsource = NULL;
856 trans->timeout = TRUE;
861 handle_new_buffer (GstAppSink *sink, gpointer user_data)
865 GstRTSPMediaStream *stream;
867 buffer = gst_app_sink_pull_buffer (sink);
871 stream = (GstRTSPMediaStream *) user_data;
873 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
874 GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
876 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
878 tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
882 tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
885 gst_buffer_unref (buffer);
890 static GstAppSinkCallbacks sink_cb = {
891 NULL, /* not interested in EOS */
892 NULL, /* not interested in preroll buffers */
896 /* prepare the pipeline objects to handle @stream in @media */
898 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
901 GstPad *pad, *teepad, *selpad;
902 GstPadLinkReturn ret;
905 /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
906 * for sending RTP/RTCP. The sender and receiver ports are shared between the
908 if (!alloc_udp_ports (stream))
911 /* add the ports to the pipeline */
912 for (i = 0; i < 2; i++) {
913 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
914 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
917 /* create elements for the TCP transfer */
918 for (i = 0; i < 2; i++) {
919 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
920 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
921 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
922 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
923 g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
924 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
925 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
926 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
927 &sink_cb, stream, NULL);
930 /* hook up the stream to the RTP session elements. */
931 name = g_strdup_printf ("send_rtp_sink_%d", idx);
932 stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
934 name = g_strdup_printf ("send_rtp_src_%d", idx);
935 stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
937 name = g_strdup_printf ("send_rtcp_src_%d", idx);
938 stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
940 name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
941 stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
943 name = g_strdup_printf ("recv_rtp_sink_%d", idx);
944 stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
947 /* get the session */
948 g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
951 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
953 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
955 g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
957 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
959 g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
961 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
964 /* link the RTP pad to the session manager */
965 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
966 if (ret != GST_PAD_LINK_OK)
969 /* make tee for RTP and link to stream */
970 stream->tee[0] = gst_element_factory_make ("tee", NULL);
971 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
973 pad = gst_element_get_static_pad (stream->tee[0], "sink");
974 gst_pad_link (stream->send_rtp_src, pad);
975 gst_object_unref (pad);
977 /* link RTP sink, we're pretty sure this will work. */
978 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
979 pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
980 gst_pad_link (teepad, pad);
981 gst_object_unref (pad);
982 gst_object_unref (teepad);
984 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
985 pad = gst_element_get_static_pad (stream->appsink[0], "sink");
986 gst_pad_link (teepad, pad);
987 gst_object_unref (pad);
988 gst_object_unref (teepad);
990 /* make tee for RTCP */
991 stream->tee[1] = gst_element_factory_make ("tee", NULL);
992 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
994 pad = gst_element_get_static_pad (stream->tee[1], "sink");
995 gst_pad_link (stream->send_rtcp_src, pad);
996 gst_object_unref (pad);
998 /* link RTCP elements */
999 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1000 pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
1001 gst_pad_link (teepad, pad);
1002 gst_object_unref (pad);
1003 gst_object_unref (teepad);
1005 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1006 pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1007 gst_pad_link (teepad, pad);
1008 gst_object_unref (pad);
1009 gst_object_unref (teepad);
1011 /* make selector for the RTP receivers */
1012 stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1013 g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1014 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1016 pad = gst_element_get_static_pad (stream->selector[0], "src");
1017 gst_pad_link (pad, stream->recv_rtp_sink);
1018 gst_object_unref (pad);
1020 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1021 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1022 gst_pad_link (pad, selpad);
1023 gst_object_unref (pad);
1024 gst_object_unref (selpad);
1026 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1027 pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1028 gst_pad_link (pad, selpad);
1029 gst_object_unref (pad);
1030 gst_object_unref (selpad);
1032 /* make selector for the RTCP receivers */
1033 stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1034 g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1035 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1037 pad = gst_element_get_static_pad (stream->selector[1], "src");
1038 gst_pad_link (pad, stream->recv_rtcp_sink);
1039 gst_object_unref (pad);
1041 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1042 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1043 gst_pad_link (pad, selpad);
1044 gst_object_unref (pad);
1045 gst_object_unref (selpad);
1047 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1048 pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1049 gst_pad_link (pad, selpad);
1050 gst_object_unref (pad);
1051 gst_object_unref (selpad);
1053 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1055 gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1056 gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1057 gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1058 gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1060 /* be notified of caps changes */
1061 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1062 (GCallback) caps_notify, stream);
1064 stream->prepared = TRUE;
1071 g_warning ("failed to link stream %d", idx);
1077 unlock_streams (GstRTSPMedia *media)
1081 /* unlock the udp src elements */
1082 n_streams = gst_rtsp_media_n_streams (media);
1083 for (i = 0; i < n_streams; i++) {
1084 GstRTSPMediaStream *stream;
1086 stream = gst_rtsp_media_get_stream (media, i);
1088 gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1089 gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1094 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1096 GstMessageType type;
1098 type = GST_MESSAGE_TYPE (message);
1101 case GST_MESSAGE_STATE_CHANGED:
1103 case GST_MESSAGE_BUFFERING:
1107 gst_message_parse_buffering (message, &percent);
1109 /* no state management needed for live pipelines */
1113 if (percent == 100) {
1114 /* a 100% message means buffering is done */
1115 media->buffering = FALSE;
1116 /* if the desired state is playing, go back */
1117 if (media->target_state == GST_STATE_PLAYING) {
1118 g_message ("Buffering done, setting pipeline to PLAYING");
1119 gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1122 g_message ("Buffering done");
1125 /* buffering busy */
1126 if (media->buffering == FALSE) {
1127 if (media->target_state == GST_STATE_PLAYING) {
1128 /* we were not buffering but PLAYING, PAUSE the pipeline. */
1129 g_message ("Buffering, setting pipeline to PAUSED ...");
1130 gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1133 g_message ("Buffering ...");
1136 media->buffering = TRUE;
1140 case GST_MESSAGE_LATENCY:
1142 gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1145 case GST_MESSAGE_ERROR:
1150 gst_message_parse_error (message, &gerror, &debug);
1151 g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1152 g_error_free (gerror);
1156 case GST_MESSAGE_WARNING:
1161 gst_message_parse_warning (message, &gerror, &debug);
1162 g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1163 g_error_free (gerror);
1167 case GST_MESSAGE_ELEMENT:
1169 case GST_MESSAGE_STREAM_STATUS:
1172 g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1179 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1181 GstRTSPMediaClass *klass;
1184 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1186 if (klass->handle_message)
1187 ret = klass->handle_message (media, message);
1195 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1197 GstRTSPMediaStream *stream;
1201 i = media->streams->len + 1;
1203 g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1205 stream = g_new0 (GstRTSPMediaStream, 1);
1206 stream->payloader = element;
1208 name = g_strdup_printf ("dynpay%d", i);
1210 /* ghost the pad of the payloader to the element */
1211 stream->srcpad = gst_ghost_pad_new (name, pad);
1212 gst_pad_set_active (stream->srcpad, TRUE);
1213 gst_element_add_pad (media->element, stream->srcpad);
1216 /* add stream now */
1217 g_array_append_val (media->streams, stream);
1219 setup_stream (stream, i, media);
1221 for (i = 0; i < 2; i++) {
1222 gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1223 gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1224 gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1225 gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1226 gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1231 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1233 g_message ("no more pads");
1234 if (media->fakesink) {
1235 gst_object_ref (media->fakesink);
1236 gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1237 gst_element_set_state (media->fakesink, GST_STATE_NULL);
1238 gst_object_unref (media->fakesink);
1239 media->fakesink = NULL;
1240 g_message ("removed fakesink");
1245 * gst_rtsp_media_prepare:
1246 * @obj: a #GstRTSPMedia
1248 * Prepare @media for streaming. This function will create the pipeline and
1249 * other objects to manage the streaming.
1251 * It will preroll the pipeline and collect vital information about the streams
1252 * such as the duration.
1254 * Returns: %TRUE on success.
1257 gst_rtsp_media_prepare (GstRTSPMedia *media)
1259 GstStateChangeReturn ret;
1261 GstRTSPMediaClass *klass;
1265 if (media->prepared)
1268 if (!media->reusable && media->reused)
1271 g_message ("preparing media %p", media);
1273 bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1275 /* add the pipeline bus to our custom mainloop */
1276 media->source = gst_bus_create_watch (bus);
1277 gst_object_unref (bus);
1279 g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1281 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1282 media->id = g_source_attach (media->source, klass->context);
1284 media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1286 /* add stuff to the bin */
1287 gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1289 /* link streams we already have, other streams might appear when we have
1290 * dynamic elements */
1291 n_streams = gst_rtsp_media_n_streams (media);
1292 for (i = 0; i < n_streams; i++) {
1293 GstRTSPMediaStream *stream;
1295 stream = gst_rtsp_media_get_stream (media, i);
1297 setup_stream (stream, i, media);
1300 for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1301 GstElement *elem = walk->data;
1303 g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1304 g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1306 media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1307 gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1310 /* first go to PAUSED */
1311 ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1312 media->target_state = GST_STATE_PAUSED;
1315 case GST_STATE_CHANGE_SUCCESS:
1317 case GST_STATE_CHANGE_ASYNC:
1319 case GST_STATE_CHANGE_NO_PREROLL:
1320 /* we need to go to PLAYING */
1321 g_message ("live media %p", media);
1322 media->is_live = TRUE;
1323 ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1324 if (ret == GST_STATE_CHANGE_FAILURE)
1327 case GST_STATE_CHANGE_FAILURE:
1331 /* now wait for all pads to be prerolled */
1332 ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1333 if (ret == GST_STATE_CHANGE_FAILURE)
1336 /* collect stats about the media */
1337 collect_media_stats (media);
1339 g_message ("object %p is prerolled", media);
1341 media->prepared = TRUE;
1353 g_warning ("failed to preroll pipeline");
1354 unlock_streams (media);
1355 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1360 g_warning ("can not reuse media %p", media);
1366 * gst_rtsp_media_unprepare:
1367 * @obj: a #GstRTSPMedia
1369 * Unprepare @media. After this call, the media should be prepared again before
1370 * it can be used again. If the media is set to be non-reusable, a new instance
1373 * Returns: %TRUE on success.
1376 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1378 GstRTSPMediaClass *klass;
1381 if (!media->prepared)
1384 g_message ("unprepare media %p", media);
1385 media->target_state = GST_STATE_NULL;
1387 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1388 if (klass->unprepare)
1389 success = klass->unprepare (media);
1393 media->prepared = FALSE;
1394 media->reused = TRUE;
1396 /* when the media is not reusable, this will effectively unref the media and
1398 g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1404 default_unprepare (GstRTSPMedia *media) {
1405 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1410 * gst_rtsp_media_set_state:
1411 * @media: a #GstRTSPMedia
1412 * @state: the target state of the media
1413 * @transports: a GArray of #GstRTSPMediaTrans pointers
1415 * Set the state of @media to @state and for the transports in @transports.
1417 * Returns: %TRUE on success.
1420 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1423 GstStateChangeReturn ret;
1424 gboolean add, remove, do_state;
1427 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1428 g_return_val_if_fail (transports != NULL, FALSE);
1430 /* NULL and READY are the same */
1431 if (state == GST_STATE_READY)
1432 state = GST_STATE_NULL;
1434 add = remove = FALSE;
1436 g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1439 case GST_STATE_NULL:
1440 /* unlock the streams so that they follow the state changes from now on */
1441 unlock_streams (media);
1443 case GST_STATE_PAUSED:
1444 /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1445 if (media->target_state == GST_STATE_PLAYING)
1448 case GST_STATE_PLAYING:
1449 /* we're going to PLAYING, add */
1455 old_active = media->active;
1457 for (i = 0; i < transports->len; i++) {
1458 GstRTSPMediaTrans *tr;
1459 GstRTSPMediaStream *stream;
1460 GstRTSPTransport *trans;
1462 /* we need a non-NULL entry in the array */
1463 tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1467 /* we need a transport */
1468 if (!(trans = tr->transport))
1471 /* get the stream and add the destinations */
1472 stream = gst_rtsp_media_get_stream (media, tr->idx);
1473 switch (trans->lower_transport) {
1474 case GST_RTSP_LOWER_TRANS_UDP:
1475 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1480 dest = trans->destination;
1481 min = trans->client_port.min;
1482 max = trans->client_port.max;
1484 if (add && !tr->active) {
1485 g_message ("adding %s:%d-%d", dest, min, max);
1486 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1487 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1488 stream->transports = g_list_prepend (stream->transports, tr);
1491 } else if (remove && tr->active) {
1492 g_message ("removing %s:%d-%d", dest, min, max);
1493 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1494 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1495 stream->transports = g_list_remove (stream->transports, tr);
1501 case GST_RTSP_LOWER_TRANS_TCP:
1502 if (add && !tr->active) {
1503 g_message ("adding TCP %s", trans->destination);
1504 stream->transports = g_list_prepend (stream->transports, tr);
1507 } else if (remove && tr->active) {
1508 g_message ("removing TCP %s", trans->destination);
1509 stream->transports = g_list_remove (stream->transports, tr);
1515 g_message ("Unknown transport %d", trans->lower_transport);
1520 /* we just added the first media, do the playing state change */
1521 if (old_active == 0 && add)
1523 /* if we have no more active media, do the downward state changes */
1524 else if (media->active == 0)
1529 g_message ("active %d media %p", media->active, media);
1531 if (do_state && media->target_state != state) {
1532 if (state == GST_STATE_NULL) {
1533 gst_rtsp_media_unprepare (media);
1535 g_message ("state %s media %p", gst_element_state_get_name (state), media);
1536 media->target_state = state;
1537 ret = gst_element_set_state (media->pipeline, state);
1541 /* remember where we are */
1542 if (state == GST_STATE_PAUSED)
1543 collect_media_stats (media);