2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
22 #include <gst/app/gstappsrc.h>
23 #include <gst/app/gstappsink.h>
25 #include "rtsp-media.h"
27 #define DEFAULT_SHARED FALSE
28 #define DEFAULT_REUSABLE FALSE
30 /* define to dump received RTCP packets */
47 static GQuark ssrc_stream_map_key;
49 static void gst_rtsp_media_get_property (GObject *object, guint propid,
50 GValue *value, GParamSpec *pspec);
51 static void gst_rtsp_media_set_property (GObject *object, guint propid,
52 const GValue *value, GParamSpec *pspec);
53 static void gst_rtsp_media_finalize (GObject * obj);
55 static gpointer do_loop (GstRTSPMediaClass *klass);
56 static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
57 static void unlock_streams (GstRTSPMedia *media);
59 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
61 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
64 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
66 GObjectClass *gobject_class;
69 gobject_class = G_OBJECT_CLASS (klass);
71 gobject_class->get_property = gst_rtsp_media_get_property;
72 gobject_class->set_property = gst_rtsp_media_set_property;
73 gobject_class->finalize = gst_rtsp_media_finalize;
75 g_object_class_install_property (gobject_class, PROP_SHARED,
76 g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
77 DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
79 g_object_class_install_property (gobject_class, PROP_REUSABLE,
80 g_param_spec_boolean ("reusable", "Reusable",
81 "If this media pipeline can be reused after an unprepare",
82 DEFAULT_REUSABLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
84 gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
85 g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
86 G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
87 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
89 klass->context = g_main_context_new ();
90 klass->loop = g_main_loop_new (klass->context, TRUE);
92 klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
94 g_critical ("could not start bus thread: %s", error->message);
96 klass->handle_message = default_handle_message;
98 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
102 gst_rtsp_media_init (GstRTSPMedia * media)
104 media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
105 media->is_live = FALSE;
106 media->buffering = FALSE;
110 gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
113 g_object_unref (stream->session);
116 gst_caps_unref (stream->caps);
118 if (stream->send_rtp_sink)
119 gst_object_unref (stream->send_rtp_sink);
120 if (stream->send_rtp_src)
121 gst_object_unref (stream->send_rtp_src);
122 if (stream->send_rtcp_src)
123 gst_object_unref (stream->send_rtcp_src);
124 if (stream->recv_rtcp_sink)
125 gst_object_unref (stream->recv_rtcp_sink);
126 if (stream->recv_rtp_sink)
127 gst_object_unref (stream->recv_rtp_sink);
129 g_list_free (stream->transports);
135 gst_rtsp_media_finalize (GObject * obj)
140 media = GST_RTSP_MEDIA (obj);
142 g_message ("finalize media %p", media);
144 if (media->pipeline) {
145 unlock_streams (media);
146 gst_element_set_state (media->pipeline, GST_STATE_NULL);
147 gst_object_unref (media->pipeline);
150 for (i = 0; i < media->streams->len; i++) {
151 GstRTSPMediaStream *stream;
153 stream = g_array_index (media->streams, GstRTSPMediaStream *, i);
155 gst_rtsp_media_stream_free (stream);
157 g_array_free (media->streams, TRUE);
159 g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL);
160 g_list_free (media->dynamic);
163 g_source_destroy (media->source);
164 g_source_unref (media->source);
167 G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
171 gst_rtsp_media_get_property (GObject *object, guint propid,
172 GValue *value, GParamSpec *pspec)
174 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
178 g_value_set_boolean (value, gst_rtsp_media_is_shared (media));
181 g_value_set_boolean (value, gst_rtsp_media_is_reusable (media));
184 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
189 gst_rtsp_media_set_property (GObject *object, guint propid,
190 const GValue *value, GParamSpec *pspec)
192 GstRTSPMedia *media = GST_RTSP_MEDIA (object);
196 gst_rtsp_media_set_shared (media, g_value_get_boolean (value));
199 gst_rtsp_media_set_reusable (media, g_value_get_boolean (value));
202 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
207 do_loop (GstRTSPMediaClass *klass)
209 g_message ("enter mainloop");
210 g_main_loop_run (klass->loop);
211 g_message ("exit mainloop");
217 collect_media_stats (GstRTSPMedia *media)
220 gint64 position, duration;
222 media->range.unit = GST_RTSP_RANGE_NPT;
224 if (media->is_live) {
225 media->range.min.type = GST_RTSP_TIME_NOW;
226 media->range.min.seconds = -1;
227 media->range.max.type = GST_RTSP_TIME_END;
228 media->range.max.seconds = -1;
231 /* get the position */
232 format = GST_FORMAT_TIME;
233 if (!gst_element_query_position (media->pipeline, &format, &position)) {
234 g_message ("position query failed");
238 /* get the duration */
239 format = GST_FORMAT_TIME;
240 if (!gst_element_query_duration (media->pipeline, &format, &duration)) {
241 g_message ("duration query failed");
245 g_message ("stats: position %"GST_TIME_FORMAT", duration %"GST_TIME_FORMAT,
246 GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
248 if (position == -1) {
249 media->range.min.type = GST_RTSP_TIME_NOW;
250 media->range.min.seconds = -1;
253 media->range.min.type = GST_RTSP_TIME_SECONDS;
254 media->range.min.seconds = ((gdouble)position) / GST_SECOND;
256 if (duration == -1) {
257 media->range.max.type = GST_RTSP_TIME_END;
258 media->range.max.seconds = -1;
261 media->range.max.type = GST_RTSP_TIME_SECONDS;
262 media->range.max.seconds = ((gdouble)duration) / GST_SECOND;
268 * gst_rtsp_media_new:
270 * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the
271 * element to produde RTP data for one or more related (audio/video/..)
274 * Returns: a new #GstRTSPMedia object.
277 gst_rtsp_media_new (void)
279 GstRTSPMedia *result;
281 result = g_object_new (GST_TYPE_RTSP_MEDIA, NULL);
287 * gst_rtsp_media_set_shared:
288 * @media: a #GstRTSPMedia
289 * @shared: the new value
291 * Set or unset if the pipeline for @media can be shared will multiple clients.
292 * When @shared is %TRUE, client requests for this media will share the media
296 gst_rtsp_media_set_shared (GstRTSPMedia *media, gboolean shared)
298 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
300 media->shared = shared;
304 * gst_rtsp_media_is_shared:
305 * @media: a #GstRTSPMedia
307 * Check if the pipeline for @media can be shared between multiple clients.
309 * Returns: %TRUE if the media can be shared between clients.
312 gst_rtsp_media_is_shared (GstRTSPMedia *media)
314 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
316 return media->shared;
320 * gst_rtsp_media_set_reusable:
321 * @media: a #GstRTSPMedia
322 * @reusable: the new value
324 * Set or unset if the pipeline for @media can be reused after the pipeline has
328 gst_rtsp_media_set_reusable (GstRTSPMedia *media, gboolean reusable)
330 g_return_if_fail (GST_IS_RTSP_MEDIA (media));
332 media->reusable = reusable;
336 * gst_rtsp_media_is_reusable:
337 * @media: a #GstRTSPMedia
339 * Check if the pipeline for @media can be reused after an unprepare.
341 * Returns: %TRUE if the media can be reused
344 gst_rtsp_media_is_reusable (GstRTSPMedia *media)
346 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
348 return media->reusable;
352 * gst_rtsp_media_n_streams:
353 * @media: a #GstRTSPMedia
355 * Get the number of streams in this media.
357 * Returns: The number of streams.
360 gst_rtsp_media_n_streams (GstRTSPMedia *media)
362 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
364 return media->streams->len;
368 * gst_rtsp_media_get_stream:
369 * @media: a #GstRTSPMedia
370 * @idx: the stream index
372 * Retrieve the stream with index @idx from @media.
374 * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with
375 * that index did not exist.
378 gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
380 GstRTSPMediaStream *res;
382 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
384 if (idx < media->streams->len)
385 res = g_array_index (media->streams, GstRTSPMediaStream *, idx);
393 * gst_rtsp_media_seek:
394 * @stream: a #GstRTSPMediaStream
395 * @range: a #GstRTSPTimeRange
397 * Seek the pipeline to @range.
399 * Returns: %TRUE on success.
402 gst_rtsp_media_seek (GstRTSPMedia *media, GstRTSPTimeRange *range)
407 GstSeekType start_type, stop_type;
409 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
410 g_return_val_if_fail (range != NULL, FALSE);
412 if (range->unit != GST_RTSP_RANGE_NPT)
415 /* depends on the current playing state of the pipeline. We might need to
416 * queue this until we get EOS. */
417 flags = GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_KEY_UNIT;
419 start_type = stop_type = GST_SEEK_TYPE_NONE;
421 switch (range->min.type) {
422 case GST_RTSP_TIME_NOW:
425 case GST_RTSP_TIME_SECONDS:
426 /* only seek when something changed */
427 if (media->range.min.seconds == range->min.seconds) {
430 start = range->min.seconds * GST_SECOND;
431 start_type = GST_SEEK_TYPE_SET;
434 case GST_RTSP_TIME_END:
438 switch (range->max.type) {
439 case GST_RTSP_TIME_SECONDS:
440 /* only seek when something changed */
441 if (media->range.max.seconds == range->max.seconds) {
444 stop = range->max.seconds * GST_SECOND;
445 stop_type = GST_SEEK_TYPE_SET;
448 case GST_RTSP_TIME_END:
450 stop_type = GST_SEEK_TYPE_SET;
452 case GST_RTSP_TIME_NOW:
457 if (start != -1 || stop != -1) {
458 g_message ("seeking to %"GST_TIME_FORMAT" - %"GST_TIME_FORMAT,
459 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
461 res = gst_element_seek (media->pipeline, 1.0, GST_FORMAT_TIME,
462 flags, start_type, start, stop_type, stop);
464 /* and block for the seek to complete */
465 g_message ("done seeking %d", res);
466 gst_element_get_state (media->pipeline, NULL, NULL, -1);
467 g_message ("prerolled again");
469 collect_media_stats (media);
472 g_message ("no seek needed");
481 g_warning ("seek unit %d not supported", range->unit);
486 g_warning ("weird range type %d not supported", range->min.type);
492 * gst_rtsp_media_stream_rtp:
493 * @stream: a #GstRTSPMediaStream
494 * @buffer: a #GstBuffer
496 * Handle an RTP buffer for the stream. This method is usually called when a
497 * message has been received from a client using the TCP transport.
499 * This function takes ownership of @buffer.
501 * Returns: a GstFlowReturn.
504 gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
508 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
514 * gst_rtsp_media_stream_rtcp:
515 * @stream: a #GstRTSPMediaStream
516 * @buffer: a #GstBuffer
518 * Handle an RTCP buffer for the stream. This method is usually called when a
519 * message has been received from a client using the TCP transport.
521 * This function takes ownership of @buffer.
523 * Returns: a GstFlowReturn.
526 gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
530 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
535 /* Allocate the udp ports and sockets */
537 alloc_udp_ports (GstRTSPMediaStream * stream)
539 GstStateChangeReturn ret;
540 GstElement *udpsrc0, *udpsrc1;
541 GstElement *udpsink0, *udpsink1;
542 gint tmp_rtp, tmp_rtcp;
544 gint rtpport, rtcpport, sockfd;
552 /* Start with random port */
555 /* try to allocate 2 UDP ports, the RTP port should be an even
556 * number and the RTCP port should be the next (uneven) port */
558 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
560 goto no_udp_protocol;
561 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
563 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
564 if (ret == GST_STATE_CHANGE_FAILURE) {
570 gst_element_set_state (udpsrc0, GST_STATE_NULL);
571 gst_object_unref (udpsrc0);
575 goto no_udp_protocol;
578 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
580 /* check if port is even */
581 if ((tmp_rtp & 1) != 0) {
582 /* port not even, close and allocate another */
586 gst_element_set_state (udpsrc0, GST_STATE_NULL);
587 gst_object_unref (udpsrc0);
593 /* allocate port+1 for RTCP now */
594 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, "udp://0.0.0.0", NULL);
596 goto no_udp_rtcp_protocol;
599 tmp_rtcp = tmp_rtp + 1;
600 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
602 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
603 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
604 if (ret == GST_STATE_CHANGE_FAILURE) {
609 gst_element_set_state (udpsrc0, GST_STATE_NULL);
610 gst_object_unref (udpsrc0);
612 gst_element_set_state (udpsrc1, GST_STATE_NULL);
613 gst_object_unref (udpsrc1);
619 /* all fine, do port check */
620 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
621 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
623 /* this should not happen... */
624 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
627 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
629 goto no_udp_protocol;
631 g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
632 g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
633 g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
635 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
637 goto no_udp_protocol;
639 g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
640 g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
641 g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
642 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
643 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
645 /* we keep these elements, we configure all in configure_transport when the
646 * server told us to really use the UDP ports. */
647 stream->udpsrc[0] = udpsrc0;
648 stream->udpsrc[1] = udpsrc1;
649 stream->udpsink[0] = udpsink0;
650 stream->udpsink[1] = udpsink1;
651 stream->server_port.min = rtpport;
652 stream->server_port.max = rtcpport;
665 no_udp_rtcp_protocol:
676 gst_element_set_state (udpsrc0, GST_STATE_NULL);
677 gst_object_unref (udpsrc0);
680 gst_element_set_state (udpsrc1, GST_STATE_NULL);
681 gst_object_unref (udpsrc1);
684 gst_element_set_state (udpsink0, GST_STATE_NULL);
685 gst_object_unref (udpsink0);
688 gst_element_set_state (udpsink1, GST_STATE_NULL);
689 gst_object_unref (udpsink1);
696 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream)
699 GstCaps *newcaps, *oldcaps;
701 if ((newcaps = GST_PAD_CAPS (pad)))
702 gst_caps_ref (newcaps);
704 oldcaps = stream->caps;
705 stream->caps = newcaps;
708 gst_caps_unref (oldcaps);
710 capsstr = gst_caps_to_string (newcaps);
711 g_message ("stream %p received caps %p, %s", stream, newcaps, capsstr);
716 dump_structure (const GstStructure *s)
720 sstr = gst_structure_to_string (s);
721 g_message ("structure: %s", sstr);
725 static GstRTSPMediaTrans *
726 find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
729 GstRTSPMediaTrans *result = NULL;
734 if (rtcp_from == NULL)
737 tmp = g_strrstr (rtcp_from, ":");
741 port = atoi (tmp + 1);
742 dest = g_strndup (rtcp_from, tmp - rtcp_from);
744 g_message ("finding %s:%d", dest, port);
746 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
747 GstRTSPMediaTrans *trans = walk->data;
750 min = trans->transport->client_port.min;
751 max = trans->transport->client_port.max;
753 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
764 on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
767 GstRTSPMediaTrans *trans;
769 g_message ("%p: new source %p", stream, source);
771 /* see if we have a stream to match with the origin of the RTCP packet */
772 trans = g_object_get_qdata (source, ssrc_stream_map_key);
774 g_object_get (source, "stats", &stats, NULL);
776 const gchar *rtcp_from;
778 dump_structure (stats);
780 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
781 if ((trans = find_transport (stream, rtcp_from))) {
782 g_message ("%p: found transport %p for source %p", stream, trans, source);
784 /* keep ref to the source */
785 trans->rtpsource = source;
787 g_object_set_qdata (source, ssrc_stream_map_key, trans);
789 gst_structure_free (stats);
792 g_message ("%p: source %p for transport %p", stream, source, trans);
797 on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
799 g_message ("%p: new SDES %p", stream, source);
803 on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
805 GstRTSPMediaTrans *trans;
807 trans = g_object_get_qdata (source, ssrc_stream_map_key);
809 g_message ("%p: source %p in transport %p is active", stream, source, trans);
811 if (trans && trans->keep_alive)
812 trans->keep_alive (trans->ka_user_data);
817 g_object_get (source, "stats", &stats, NULL);
819 dump_structure (stats);
820 gst_structure_free (stats);
827 on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
829 g_message ("%p: source %p bye", stream, source);
833 on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
835 GstRTSPMediaTrans *trans;
837 g_message ("%p: source %p bye timeout", stream, source);
839 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
840 trans->rtpsource = NULL;
841 trans->timeout = TRUE;
846 on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
848 GstRTSPMediaTrans *trans;
850 g_message ("%p: source %p timeout", stream, source);
852 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
853 trans->rtpsource = NULL;
854 trans->timeout = TRUE;
859 handle_new_buffer (GstAppSink *sink, gpointer user_data)
863 GstRTSPMediaStream *stream;
865 buffer = gst_app_sink_pull_buffer (sink);
869 stream = (GstRTSPMediaStream *) user_data;
871 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
872 GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
874 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
876 tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
880 tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
883 gst_buffer_unref (buffer);
888 static GstAppSinkCallbacks sink_cb = {
889 NULL, /* not interested in EOS */
890 NULL, /* not interested in preroll buffers */
894 /* prepare the pipeline objects to handle @stream in @media */
896 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
899 GstPad *pad, *teepad, *selpad;
900 GstPadLinkReturn ret;
903 /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
904 * for sending RTP/RTCP. The sender and receiver ports are shared between the
906 if (!alloc_udp_ports (stream))
909 /* add the ports to the pipeline */
910 for (i = 0; i < 2; i++) {
911 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
912 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
915 /* create elements for the TCP transfer */
916 for (i = 0; i < 2; i++) {
917 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
918 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
919 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
920 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
921 g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
922 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
923 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
924 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
925 &sink_cb, stream, NULL);
928 /* hook up the stream to the RTP session elements. */
929 name = g_strdup_printf ("send_rtp_sink_%d", idx);
930 stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
932 name = g_strdup_printf ("send_rtp_src_%d", idx);
933 stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name);
935 name = g_strdup_printf ("send_rtcp_src_%d", idx);
936 stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name);
938 name = g_strdup_printf ("recv_rtcp_sink_%d", idx);
939 stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name);
941 name = g_strdup_printf ("recv_rtp_sink_%d", idx);
942 stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name);
945 /* get the session */
946 g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx,
949 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
951 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
953 g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
955 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
957 g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
959 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
962 /* link the RTP pad to the session manager */
963 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
964 if (ret != GST_PAD_LINK_OK)
967 /* make tee for RTP and link to stream */
968 stream->tee[0] = gst_element_factory_make ("tee", NULL);
969 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]);
971 pad = gst_element_get_static_pad (stream->tee[0], "sink");
972 gst_pad_link (stream->send_rtp_src, pad);
973 gst_object_unref (pad);
975 /* link RTP sink, we're pretty sure this will work. */
976 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
977 pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
978 gst_pad_link (teepad, pad);
979 gst_object_unref (pad);
980 gst_object_unref (teepad);
982 teepad = gst_element_get_request_pad (stream->tee[0], "src%d");
983 pad = gst_element_get_static_pad (stream->appsink[0], "sink");
984 gst_pad_link (teepad, pad);
985 gst_object_unref (pad);
986 gst_object_unref (teepad);
988 /* make tee for RTCP */
989 stream->tee[1] = gst_element_factory_make ("tee", NULL);
990 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]);
992 pad = gst_element_get_static_pad (stream->tee[1], "sink");
993 gst_pad_link (stream->send_rtcp_src, pad);
994 gst_object_unref (pad);
996 /* link RTCP elements */
997 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
998 pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
999 gst_pad_link (teepad, pad);
1000 gst_object_unref (pad);
1001 gst_object_unref (teepad);
1003 teepad = gst_element_get_request_pad (stream->tee[1], "src%d");
1004 pad = gst_element_get_static_pad (stream->appsink[1], "sink");
1005 gst_pad_link (teepad, pad);
1006 gst_object_unref (pad);
1007 gst_object_unref (teepad);
1009 /* make selector for the RTP receivers */
1010 stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
1011 g_object_set (stream->selector[0], "select-all", TRUE, NULL);
1012 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
1014 pad = gst_element_get_static_pad (stream->selector[0], "src");
1015 gst_pad_link (pad, stream->recv_rtp_sink);
1016 gst_object_unref (pad);
1018 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1019 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1020 gst_pad_link (pad, selpad);
1021 gst_object_unref (pad);
1022 gst_object_unref (selpad);
1024 selpad = gst_element_get_request_pad (stream->selector[0], "sink%d");
1025 pad = gst_element_get_static_pad (stream->appsrc[0], "src");
1026 gst_pad_link (pad, selpad);
1027 gst_object_unref (pad);
1028 gst_object_unref (selpad);
1030 /* make selector for the RTCP receivers */
1031 stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
1032 g_object_set (stream->selector[1], "select-all", TRUE, NULL);
1033 gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
1035 pad = gst_element_get_static_pad (stream->selector[1], "src");
1036 gst_pad_link (pad, stream->recv_rtcp_sink);
1037 gst_object_unref (pad);
1039 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1040 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1041 gst_pad_link (pad, selpad);
1042 gst_object_unref (pad);
1043 gst_object_unref (selpad);
1045 selpad = gst_element_get_request_pad (stream->selector[1], "sink%d");
1046 pad = gst_element_get_static_pad (stream->appsrc[1], "src");
1047 gst_pad_link (pad, selpad);
1048 gst_object_unref (pad);
1049 gst_object_unref (selpad);
1051 /* we set and keep these to playing so that they don't cause NO_PREROLL return
1053 gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
1054 gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
1055 gst_element_set_locked_state (stream->udpsrc[0], TRUE);
1056 gst_element_set_locked_state (stream->udpsrc[1], TRUE);
1058 /* be notified of caps changes */
1059 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
1060 (GCallback) caps_notify, stream);
1062 stream->prepared = TRUE;
1069 g_warning ("failed to link stream %d", idx);
1075 unlock_streams (GstRTSPMedia *media)
1079 /* unlock the udp src elements */
1080 n_streams = gst_rtsp_media_n_streams (media);
1081 for (i = 0; i < n_streams; i++) {
1082 GstRTSPMediaStream *stream;
1084 stream = gst_rtsp_media_get_stream (media, i);
1086 gst_element_set_locked_state (stream->udpsrc[0], FALSE);
1087 gst_element_set_locked_state (stream->udpsrc[1], FALSE);
1092 default_handle_message (GstRTSPMedia *media, GstMessage *message)
1094 GstMessageType type;
1096 type = GST_MESSAGE_TYPE (message);
1099 case GST_MESSAGE_STATE_CHANGED:
1101 case GST_MESSAGE_BUFFERING:
1105 gst_message_parse_buffering (message, &percent);
1107 /* no state management needed for live pipelines */
1111 if (percent == 100) {
1112 /* a 100% message means buffering is done */
1113 media->buffering = FALSE;
1114 /* if the desired state is playing, go back */
1115 if (media->target_state == GST_STATE_PLAYING) {
1116 g_message ("Buffering done, setting pipeline to PLAYING");
1117 gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1120 g_message ("Buffering done");
1123 /* buffering busy */
1124 if (media->buffering == FALSE) {
1125 if (media->target_state == GST_STATE_PLAYING) {
1126 /* we were not buffering but PLAYING, PAUSE the pipeline. */
1127 g_message ("Buffering, setting pipeline to PAUSED ...");
1128 gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1131 g_message ("Buffering ...");
1134 media->buffering = TRUE;
1138 case GST_MESSAGE_LATENCY:
1140 gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
1143 case GST_MESSAGE_ERROR:
1148 gst_message_parse_error (message, &gerror, &debug);
1149 g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
1150 g_error_free (gerror);
1154 case GST_MESSAGE_WARNING:
1159 gst_message_parse_warning (message, &gerror, &debug);
1160 g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
1161 g_error_free (gerror);
1165 case GST_MESSAGE_ELEMENT:
1167 case GST_MESSAGE_STREAM_STATUS:
1170 g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
1177 bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
1179 GstRTSPMediaClass *klass;
1182 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1184 if (klass->handle_message)
1185 ret = klass->handle_message (media, message);
1193 pad_added_cb (GstElement *element, GstPad *pad, GstRTSPMedia *media)
1195 GstRTSPMediaStream *stream;
1199 i = media->streams->len + 1;
1201 g_message ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i);
1203 stream = g_new0 (GstRTSPMediaStream, 1);
1204 stream->payloader = element;
1206 name = g_strdup_printf ("dynpay%d", i);
1208 /* ghost the pad of the payloader to the element */
1209 stream->srcpad = gst_ghost_pad_new (name, pad);
1210 gst_pad_set_active (stream->srcpad, TRUE);
1211 gst_element_add_pad (media->element, stream->srcpad);
1214 /* add stream now */
1215 g_array_append_val (media->streams, stream);
1217 setup_stream (stream, i, media);
1219 for (i = 0; i < 2; i++) {
1220 gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED);
1221 gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
1222 gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
1223 gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
1224 gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
1229 no_more_pads_cb (GstElement *element, GstRTSPMedia *media)
1231 g_message ("no more pads");
1232 if (media->fakesink) {
1233 gst_object_ref (media->fakesink);
1234 gst_bin_remove (GST_BIN (media->pipeline), media->fakesink);
1235 gst_element_set_state (media->fakesink, GST_STATE_NULL);
1236 gst_object_unref (media->fakesink);
1237 media->fakesink = NULL;
1238 g_message ("removed fakesink");
1243 * gst_rtsp_media_prepare:
1244 * @obj: a #GstRTSPMedia
1246 * Prepare @media for streaming. This function will create the pipeline and
1247 * other objects to manage the streaming.
1249 * It will preroll the pipeline and collect vital information about the streams
1250 * such as the duration.
1252 * Returns: %TRUE on success.
1255 gst_rtsp_media_prepare (GstRTSPMedia *media)
1257 GstStateChangeReturn ret;
1259 GstRTSPMediaClass *klass;
1263 if (media->prepared)
1266 if (!media->reusable && media->reused)
1269 g_message ("preparing media %p", media);
1271 media->pipeline = gst_pipeline_new ("media-pipeline");
1272 bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
1274 /* add the pipeline bus to our custom mainloop */
1275 media->source = gst_bus_create_watch (bus);
1276 gst_object_unref (bus);
1278 g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
1280 klass = GST_RTSP_MEDIA_GET_CLASS (media);
1281 media->id = g_source_attach (media->source, klass->context);
1283 gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
1285 media->rtpbin = gst_element_factory_make ("gstrtpbin", "rtpbin");
1287 /* add stuff to the bin */
1288 gst_bin_add (GST_BIN (media->pipeline), media->rtpbin);
1290 /* link streams we already have, other streams might appear when we have
1291 * dynamic elements */
1292 n_streams = gst_rtsp_media_n_streams (media);
1293 for (i = 0; i < n_streams; i++) {
1294 GstRTSPMediaStream *stream;
1296 stream = gst_rtsp_media_get_stream (media, i);
1298 setup_stream (stream, i, media);
1301 for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
1302 GstElement *elem = walk->data;
1304 g_signal_connect (elem, "pad-added", (GCallback) pad_added_cb, media);
1305 g_signal_connect (elem, "no-more-pads", (GCallback) no_more_pads_cb, media);
1307 media->fakesink = gst_element_factory_make ("fakesink", "fakesink");
1308 gst_bin_add (GST_BIN (media->pipeline), media->fakesink);
1311 /* first go to PAUSED */
1312 ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
1313 media->target_state = GST_STATE_PAUSED;
1316 case GST_STATE_CHANGE_SUCCESS:
1318 case GST_STATE_CHANGE_ASYNC:
1320 case GST_STATE_CHANGE_NO_PREROLL:
1321 /* we need to go to PLAYING */
1322 g_message ("live media %p", media);
1323 media->is_live = TRUE;
1324 ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
1325 if (ret == GST_STATE_CHANGE_FAILURE)
1328 case GST_STATE_CHANGE_FAILURE:
1332 /* now wait for all pads to be prerolled */
1333 ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
1334 if (ret == GST_STATE_CHANGE_FAILURE)
1337 /* collect stats about the media */
1338 collect_media_stats (media);
1340 g_message ("object %p is prerolled", media);
1342 media->prepared = TRUE;
1354 g_warning ("failed to preroll pipeline");
1355 unlock_streams (media);
1356 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1361 g_warning ("can not reuse media %p", media);
1367 * gst_rtsp_media_unprepare:
1368 * @obj: a #GstRTSPMedia
1370 * Unprepare @media. After this call, the media should be prepared again before
1371 * it can be used again. If the media is set to be non-reusable, a new instance
1374 * Returns: %TRUE on success.
1377 gst_rtsp_media_unprepare (GstRTSPMedia *media)
1379 if (!media->prepared)
1382 g_message ("unprepare media %p", media);
1383 media->target_state = GST_STATE_NULL;
1384 gst_element_set_state (media->pipeline, GST_STATE_NULL);
1386 media->prepared = FALSE;
1387 media->reused = TRUE;
1389 /* when the media is not reusable, this will effectively unref the media and
1391 g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL);
1397 * gst_rtsp_media_set_state:
1398 * @media: a #GstRTSPMedia
1399 * @state: the target state of the media
1400 * @transports: a GArray of #GstRTSPMediaTrans pointers
1402 * Set the state of @media to @state and for the transports in @transports.
1404 * Returns: %TRUE on success.
1407 gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
1410 GstStateChangeReturn ret;
1411 gboolean add, remove, do_state;
1414 g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
1415 g_return_val_if_fail (transports != NULL, FALSE);
1417 /* NULL and READY are the same */
1418 if (state == GST_STATE_READY)
1419 state = GST_STATE_NULL;
1421 add = remove = FALSE;
1423 g_message ("going to state %s media %p", gst_element_state_get_name (state), media);
1426 case GST_STATE_NULL:
1427 /* unlock the streams so that they follow the state changes from now on */
1428 unlock_streams (media);
1430 case GST_STATE_PAUSED:
1431 /* we're going from PLAYING to PAUSED, READY or NULL, remove */
1432 if (media->target_state == GST_STATE_PLAYING)
1435 case GST_STATE_PLAYING:
1436 /* we're going to PLAYING, add */
1442 old_active = media->active;
1444 for (i = 0; i < transports->len; i++) {
1445 GstRTSPMediaTrans *tr;
1446 GstRTSPMediaStream *stream;
1447 GstRTSPTransport *trans;
1449 /* we need a non-NULL entry in the array */
1450 tr = g_array_index (transports, GstRTSPMediaTrans *, i);
1454 /* we need a transport */
1455 if (!(trans = tr->transport))
1458 /* get the stream and add the destinations */
1459 stream = gst_rtsp_media_get_stream (media, tr->idx);
1460 switch (trans->lower_transport) {
1461 case GST_RTSP_LOWER_TRANS_UDP:
1462 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1467 dest = trans->destination;
1468 min = trans->client_port.min;
1469 max = trans->client_port.max;
1471 if (add && !tr->active) {
1472 g_message ("adding %s:%d-%d", dest, min, max);
1473 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1474 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1475 stream->transports = g_list_prepend (stream->transports, tr);
1478 } else if (remove && tr->active) {
1479 g_message ("removing %s:%d-%d", dest, min, max);
1480 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1481 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1482 stream->transports = g_list_remove (stream->transports, tr);
1488 case GST_RTSP_LOWER_TRANS_TCP:
1489 if (add && !tr->active) {
1490 g_message ("adding TCP %s", trans->destination);
1491 stream->transports = g_list_prepend (stream->transports, tr);
1494 } else if (remove && tr->active) {
1495 g_message ("removing TCP %s", trans->destination);
1496 stream->transports = g_list_remove (stream->transports, tr);
1502 g_message ("Unknown transport %d", trans->lower_transport);
1507 /* we just added the first media, do the playing state change */
1508 if (old_active == 0 && add)
1510 /* if we have no more active media, do the downward state changes */
1511 else if (media->active == 0)
1516 g_message ("active %d media %p", media->active, media);
1518 if (do_state && media->target_state != state) {
1519 if (state == GST_STATE_NULL) {
1520 gst_rtsp_media_unprepare (media);
1522 g_message ("state %s media %p", gst_element_state_get_name (state), media);
1523 media->target_state = state;
1524 ret = gst_element_set_state (media->pipeline, state);
1528 /* remember where we are */
1529 if (state == GST_STATE_PAUSED)
1530 collect_media_stats (media);