2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
4 * 2002 Kristian Rietveld <kris@gtk.org>
5 * 2002,2003 Colin Walters <walters@gnu.org>
6 * 2001,2010 Bastien Nocera <hadess@hadess.net>
7 * 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Library General Public
13 * License as published by the Free Software Foundation; either
14 * version 2 of the License, or (at your option) any later version.
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Library General Public License for more details.
21 * You should have received a copy of the GNU Library General Public
22 * License along with this library; if not, write to the
23 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24 * Boston, MA 02110-1301, USA.
28 * SECTION:element-rtmpsrc
31 * This plugin reads data from a local or remote location specified
32 * by an URI. This location can be specified using any protocol supported by
33 * the RTMP library, i.e. rtmp, rtmpt, rtmps, rtmpe, rtmfp, rtmpte and rtmpts.
35 * ## Example launch lines
37 * gst-launch-1.0 -v rtmpsrc location=rtmp://somehost/someurl ! fakesink
38 * ]| Open an RTMP location and pass its content to fakesink.
46 #include <glib/gi18n-lib.h>
48 #include "gstrtmpsrc.h"
60 GST_DEBUG_CATEGORY_STATIC (rtmpsrc_debug);
61 #define GST_CAT_DEFAULT rtmpsrc_debug
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
79 #define DEFAULT_LOCATION NULL
80 #define DEFAULT_TIMEOUT 120
82 static void gst_rtmp_src_uri_handler_init (gpointer g_iface,
85 static void gst_rtmp_src_set_property (GObject * object, guint prop_id,
86 const GValue * value, GParamSpec * pspec);
87 static void gst_rtmp_src_get_property (GObject * object, guint prop_id,
88 GValue * value, GParamSpec * pspec);
89 static void gst_rtmp_src_finalize (GObject * object);
91 static gboolean gst_rtmp_src_connect (GstRTMPSrc * src);
92 static gboolean gst_rtmp_src_unlock (GstBaseSrc * src);
93 static gboolean gst_rtmp_src_stop (GstBaseSrc * src);
94 static gboolean gst_rtmp_src_start (GstBaseSrc * src);
95 static gboolean gst_rtmp_src_is_seekable (GstBaseSrc * src);
96 static gboolean gst_rtmp_src_prepare_seek_segment (GstBaseSrc * src,
97 GstEvent * event, GstSegment * segment);
98 static gboolean gst_rtmp_src_do_seek (GstBaseSrc * src, GstSegment * segment);
99 static GstFlowReturn gst_rtmp_src_create (GstPushSrc * pushsrc,
100 GstBuffer ** buffer);
101 static gboolean gst_rtmp_src_query (GstBaseSrc * src, GstQuery * query);
103 #define gst_rtmp_src_parent_class parent_class
104 G_DEFINE_TYPE_WITH_CODE (GstRTMPSrc, gst_rtmp_src, GST_TYPE_PUSH_SRC,
105 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
106 gst_rtmp_src_uri_handler_init));
109 gst_rtmp_src_class_init (GstRTMPSrcClass * klass)
111 GObjectClass *gobject_class;
112 GstElementClass *gstelement_class;
113 GstBaseSrcClass *gstbasesrc_class;
114 GstPushSrcClass *gstpushsrc_class;
116 gobject_class = G_OBJECT_CLASS (klass);
117 gstelement_class = GST_ELEMENT_CLASS (klass);
118 gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
119 gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
121 gobject_class->finalize = gst_rtmp_src_finalize;
122 gobject_class->set_property = gst_rtmp_src_set_property;
123 gobject_class->get_property = gst_rtmp_src_get_property;
126 g_object_class_install_property (gobject_class, PROP_LOCATION,
127 g_param_spec_string ("location", "RTMP Location",
128 "Location of the RTMP url to read",
129 DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
131 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
132 g_param_spec_int ("timeout", "RTMP Timeout",
133 "Time without receiving any data from the server to wait before to timeout the session",
135 DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
139 gst_element_class_set_static_metadata (gstelement_class,
143 "Bastien Nocera <hadess@hadess.net>, "
144 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
146 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_src_start);
147 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_src_stop);
148 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp_src_unlock);
149 gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_rtmp_src_is_seekable);
150 gstbasesrc_class->prepare_seek_segment =
151 GST_DEBUG_FUNCPTR (gst_rtmp_src_prepare_seek_segment);
152 gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_rtmp_src_do_seek);
153 gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_rtmp_src_create);
154 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_rtmp_src_query);
156 GST_DEBUG_CATEGORY_INIT (rtmpsrc_debug, "rtmpsrc", 0, "RTMP Source");
160 gst_rtmp_src_init (GstRTMPSrc * rtmpsrc)
165 if (WSAStartup (MAKEWORD (2, 2), &wsa_data) != 0) {
166 GST_ERROR_OBJECT (rtmpsrc, "WSAStartup failed: 0x%08x", WSAGetLastError ());
170 rtmpsrc->cur_offset = 0;
171 rtmpsrc->last_timestamp = 0;
172 rtmpsrc->timeout = DEFAULT_TIMEOUT;
174 gst_base_src_set_format (GST_BASE_SRC (rtmpsrc), GST_FORMAT_TIME);
178 gst_rtmp_src_finalize (GObject * object)
180 GstRTMPSrc *rtmpsrc = GST_RTMP_SRC (object);
182 g_free (rtmpsrc->uri);
189 G_OBJECT_CLASS (parent_class)->finalize (object);
193 * URI interface support.
197 gst_rtmp_src_uri_get_type (GType type)
202 static const gchar *const *
203 gst_rtmp_src_uri_get_protocols (GType type)
205 static const gchar *protocols[] =
206 { "rtmp", "rtmpt", "rtmps", "rtmpe", "rtmfp", "rtmpte", "rtmpts", NULL };
212 gst_rtmp_src_uri_get_uri (GstURIHandler * handler)
214 GstRTMPSrc *src = GST_RTMP_SRC (handler);
216 /* FIXME: make thread-safe */
217 return g_strdup (src->uri);
221 gst_rtmp_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
224 GstRTMPSrc *src = GST_RTMP_SRC (handler);
226 if (GST_STATE (src) >= GST_STATE_PAUSED) {
227 g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
228 "Changing the URI on rtmpsrc when it is running is not supported");
241 if (!RTMP_ParseURL (uri, &protocol, &host, &port, &playpath, &app) ||
242 !host.av_len || !playpath.av_len) {
243 GST_ERROR_OBJECT (src, "Failed to parse URI %s", uri);
244 g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
245 "Could not parse RTMP URI");
247 /* FIXME: we should not be freeing RTMP internals to avoid leaking */
248 free (playpath.av_val);
253 free (playpath.av_val);
255 src->uri = g_strdup (uri);
258 GST_DEBUG_OBJECT (src, "Changed URI to %s", GST_STR_NULL (uri));
264 gst_rtmp_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
266 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
268 iface->get_type = gst_rtmp_src_uri_get_type;
269 iface->get_protocols = gst_rtmp_src_uri_get_protocols;
270 iface->get_uri = gst_rtmp_src_uri_get_uri;
271 iface->set_uri = gst_rtmp_src_uri_set_uri;
275 gst_rtmp_src_set_property (GObject * object, guint prop_id,
276 const GValue * value, GParamSpec * pspec)
280 src = GST_RTMP_SRC (object);
284 gst_rtmp_src_uri_set_uri (GST_URI_HANDLER (src),
285 g_value_get_string (value), NULL);
289 src->timeout = g_value_get_int (value);
293 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
299 gst_rtmp_src_get_property (GObject * object, guint prop_id, GValue * value,
304 src = GST_RTMP_SRC (object);
308 g_value_set_string (value, src->uri);
311 g_value_set_int (value, src->timeout);
314 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
320 * Read a new buffer from src->reqoffset, takes care of events
321 * and seeking and such.
324 gst_rtmp_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
334 src = GST_RTMP_SRC (pushsrc);
336 g_return_val_if_fail (src->rtmp != NULL, GST_FLOW_ERROR);
338 if (!RTMP_IsConnected (src->rtmp)) {
339 GST_DEBUG_OBJECT (src, "reconnecting");
340 if (!gst_rtmp_src_connect (src))
341 return GST_FLOW_ERROR;
344 size = GST_BASE_SRC_CAST (pushsrc)->blocksize;
346 GST_DEBUG ("reading from %" G_GUINT64_FORMAT
347 ", size %u", src->cur_offset, size);
349 buf = gst_buffer_new_allocate (NULL, size, NULL);
350 if (G_UNLIKELY (buf == NULL)) {
351 GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
352 return GST_FLOW_ERROR;
356 gst_buffer_map (buf, &map, GST_MAP_WRITE);
361 int read = RTMP_Read (src->rtmp, (char *) data, todo);
363 if (G_UNLIKELY (read == 0 && todo == size))
366 if (G_UNLIKELY (read == 0))
369 if (G_UNLIKELY (read < 0))
380 GST_LOG (" got size %d", read);
382 gst_buffer_unmap (buf, &map);
383 gst_buffer_resize (buf, 0, bsize);
386 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
387 src->discont = FALSE;
390 GST_BUFFER_TIMESTAMP (buf) = src->last_timestamp;
391 GST_BUFFER_OFFSET (buf) = src->cur_offset;
393 src->cur_offset += size;
394 if (src->last_timestamp == GST_CLOCK_TIME_NONE)
395 src->last_timestamp = src->rtmp->m_mediaStamp * GST_MSECOND;
397 src->last_timestamp =
398 MAX (src->last_timestamp, src->rtmp->m_mediaStamp * GST_MSECOND);
400 GST_LOG_OBJECT (src, "Created buffer of size %u at %" G_GINT64_FORMAT
401 " with timestamp %" GST_TIME_FORMAT, size, GST_BUFFER_OFFSET (buf),
402 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
405 /* we're done, return the buffer */
412 gst_buffer_unmap (buf, &map);
413 gst_buffer_unref (buf);
414 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Failed to read data"));
415 return GST_FLOW_ERROR;
419 gst_buffer_unmap (buf, &map);
420 gst_buffer_unref (buf);
421 if (src->cur_offset == 0) {
422 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
423 ("Failed to read any data from stream, check your URL"));
424 return GST_FLOW_ERROR;
426 GST_DEBUG_OBJECT (src, "Reading data gave EOS");
433 gst_rtmp_src_query (GstBaseSrc * basesrc, GstQuery * query)
435 gboolean ret = FALSE;
436 GstRTMPSrc *src = GST_RTMP_SRC (basesrc);
438 switch (GST_QUERY_TYPE (query)) {
440 gst_query_set_uri (query, src->uri);
443 case GST_QUERY_POSITION:{
446 gst_query_parse_position (query, &format, NULL);
447 if (format == GST_FORMAT_TIME) {
448 gst_query_set_position (query, format, src->last_timestamp);
453 case GST_QUERY_DURATION:{
457 gst_query_parse_duration (query, &format, NULL);
458 if (format == GST_FORMAT_TIME && src->rtmp) {
459 duration = RTMP_GetDuration (src->rtmp);
460 if (duration != 0.0) {
461 gst_query_set_duration (query, format, duration * GST_SECOND);
467 case GST_QUERY_SCHEDULING:{
468 gst_query_set_scheduling (query,
469 GST_SCHEDULING_FLAG_SEQUENTIAL |
470 GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED, 1, -1, 0);
471 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
482 ret = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
488 gst_rtmp_src_is_seekable (GstBaseSrc * basesrc)
492 src = GST_RTMP_SRC (basesrc);
494 return src->seekable;
498 gst_rtmp_src_prepare_seek_segment (GstBaseSrc * basesrc, GstEvent * event,
499 GstSegment * segment)
502 GstSeekType cur_type, stop_type;
508 src = GST_RTMP_SRC (basesrc);
510 gst_event_parse_seek (event, &rate, &format, &flags,
511 &cur_type, &cur, &stop_type, &stop);
513 if (!src->seekable) {
514 GST_LOG_OBJECT (src, "Not a seekable stream");
519 GST_LOG_OBJECT (src, "Not connected yet");
523 if (format != GST_FORMAT_TIME) {
524 GST_LOG_OBJECT (src, "Seeking only supported in TIME format");
528 if (stop_type != GST_SEEK_TYPE_NONE) {
529 GST_LOG_OBJECT (src, "Setting a stop position is not supported");
533 gst_segment_init (segment, GST_FORMAT_TIME);
534 gst_segment_do_seek (segment, rate, format, flags, cur_type, cur, stop_type,
541 gst_rtmp_src_do_seek (GstBaseSrc * basesrc, GstSegment * segment)
545 src = GST_RTMP_SRC (basesrc);
547 if (segment->format != GST_FORMAT_TIME) {
548 GST_LOG_OBJECT (src, "Only time based seeks are supported");
553 GST_LOG_OBJECT (src, "Not connected yet");
558 if (src->cur_offset == 0 && segment->start == 0)
561 if (!src->seekable) {
562 GST_LOG_OBJECT (src, "Not a seekable stream");
566 /* If we have just disconnected in unlock(), we need to re-connect
567 * and also let librtmp read some data before sending a seek,
568 * otherwise it will stall. Calling create() does both. */
569 if (!RTMP_IsConnected (src->rtmp)) {
570 GstBuffer *buffer = NULL;
571 gst_rtmp_src_create (GST_PUSH_SRC (basesrc), &buffer);
572 gst_buffer_replace (&buffer, NULL);
575 src->last_timestamp = GST_CLOCK_TIME_NONE;
576 if (!RTMP_SendSeek (src->rtmp, segment->start / GST_MSECOND)) {
577 GST_ERROR_OBJECT (src, "Seeking failed");
578 src->seekable = FALSE;
583 /* This is set here so that the call to create() above doesn't clear it */
586 GST_DEBUG_OBJECT (src, "Seek to %" GST_TIME_FORMAT " successfull",
587 GST_TIME_ARGS (segment->start));
593 gst_rtmp_src_connect (GstRTMPSrc * src)
595 RTMP_Init (src->rtmp);
596 src->rtmp->Link.timeout = src->timeout;
597 if (!RTMP_SetupURL (src->rtmp, src->uri)) {
598 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
599 ("Failed to setup URL '%s'", src->uri));
602 src->seekable = !(src->rtmp->Link.lFlags & RTMP_LF_LIVE);
603 GST_INFO_OBJECT (src, "seekable %d", src->seekable);
605 /* open if required */
606 if (!RTMP_IsConnected (src->rtmp)) {
607 if (!RTMP_Connect (src->rtmp, NULL)) {
608 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
609 ("Could not connect to RTMP stream \"%s\" for reading", src->uri));
617 /* open the file, do stuff necessary to go to PAUSED state */
619 gst_rtmp_src_start (GstBaseSrc * basesrc)
623 src = GST_RTMP_SRC (basesrc);
626 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), ("No filename given"));
631 src->last_timestamp = 0;
634 src->rtmp = RTMP_Alloc ();
636 GST_ERROR_OBJECT (src, "Could not allocate librtmp's RTMP context");
640 if (!gst_rtmp_src_connect (src))
647 RTMP_Free (src->rtmp);
654 gst_rtmp_src_unlock (GstBaseSrc * basesrc)
656 GstRTMPSrc *rtmpsrc = GST_RTMP_SRC (basesrc);
658 GST_DEBUG_OBJECT (rtmpsrc, "unlock");
660 /* This closes the socket, which means that any pending socket calls
663 RTMP_Close (rtmpsrc->rtmp);
671 gst_rtmp_src_stop (GstBaseSrc * basesrc)
675 src = GST_RTMP_SRC (basesrc);
678 RTMP_Free (src->rtmp);
683 src->last_timestamp = 0;