2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
4 * 2002 Kristian Rietveld <kris@gtk.org>
5 * 2002,2003 Colin Walters <walters@gnu.org>
6 * 2001,2010 Bastien Nocera <hadess@hadess.net>
7 * 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Library General Public
13 * License as published by the Free Software Foundation; either
14 * version 2 of the License, or (at your option) any later version.
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Library General Public License for more details.
21 * You should have received a copy of the GNU Library General Public
22 * License along with this library; if not, write to the
23 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24 * Boston, MA 02110-1301, USA.
28 * SECTION:element-rtmpsrc
31 * This plugin reads data from a local or remote location specified
32 * by an URI. This location can be specified using any protocol supported by
33 * the RTMP library, i.e. rtmp, rtmpt, rtmps, rtmpe, rtmfp, rtmpte and rtmpts.
35 * ## Example launch lines
37 * gst-launch-1.0 -v rtmpsrc location=rtmp://somehost/someurl ! fakesink
38 * ]| Open an RTMP location and pass its content to fakesink.
46 #include <glib/gi18n-lib.h>
48 #include "gstrtmpsrc.h"
60 GST_DEBUG_CATEGORY_STATIC (rtmpsrc_debug);
61 #define GST_CAT_DEFAULT rtmpsrc_debug
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
79 #define DEFAULT_LOCATION NULL
80 #define DEFAULT_TIMEOUT 120
82 static void gst_rtmp_src_uri_handler_init (gpointer g_iface,
85 static void gst_rtmp_src_set_property (GObject * object, guint prop_id,
86 const GValue * value, GParamSpec * pspec);
87 static void gst_rtmp_src_get_property (GObject * object, guint prop_id,
88 GValue * value, GParamSpec * pspec);
89 static void gst_rtmp_src_finalize (GObject * object);
91 static gboolean gst_rtmp_src_connect (GstRTMPSrc * src);
92 static gboolean gst_rtmp_src_unlock (GstBaseSrc * src);
93 static gboolean gst_rtmp_src_stop (GstBaseSrc * src);
94 static gboolean gst_rtmp_src_start (GstBaseSrc * src);
95 static gboolean gst_rtmp_src_is_seekable (GstBaseSrc * src);
96 static gboolean gst_rtmp_src_prepare_seek_segment (GstBaseSrc * src,
97 GstEvent * event, GstSegment * segment);
98 static gboolean gst_rtmp_src_do_seek (GstBaseSrc * src, GstSegment * segment);
99 static GstFlowReturn gst_rtmp_src_create (GstPushSrc * pushsrc,
100 GstBuffer ** buffer);
101 static gboolean gst_rtmp_src_query (GstBaseSrc * src, GstQuery * query);
103 #define gst_rtmp_src_parent_class parent_class
104 G_DEFINE_TYPE_WITH_CODE (GstRTMPSrc, gst_rtmp_src, GST_TYPE_PUSH_SRC,
105 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
106 gst_rtmp_src_uri_handler_init));
109 gst_rtmp_src_class_init (GstRTMPSrcClass * klass)
111 GObjectClass *gobject_class;
112 GstElementClass *gstelement_class;
113 GstBaseSrcClass *gstbasesrc_class;
114 GstPushSrcClass *gstpushsrc_class;
116 gobject_class = G_OBJECT_CLASS (klass);
117 gstelement_class = GST_ELEMENT_CLASS (klass);
118 gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
119 gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
121 gobject_class->finalize = gst_rtmp_src_finalize;
122 gobject_class->set_property = gst_rtmp_src_set_property;
123 gobject_class->get_property = gst_rtmp_src_get_property;
126 g_object_class_install_property (gobject_class, PROP_LOCATION,
127 g_param_spec_string ("location", "RTMP Location",
128 "Location of the RTMP url to read",
129 DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
131 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
132 g_param_spec_int ("timeout", "RTMP Timeout",
133 "Time without receiving any data from the server to wait before to timeout the session",
135 DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
139 gst_element_class_set_static_metadata (gstelement_class,
143 "Bastien Nocera <hadess@hadess.net>, "
144 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
146 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_src_start);
147 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_src_stop);
148 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp_src_unlock);
149 gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_rtmp_src_is_seekable);
150 gstbasesrc_class->prepare_seek_segment =
151 GST_DEBUG_FUNCPTR (gst_rtmp_src_prepare_seek_segment);
152 gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_rtmp_src_do_seek);
153 gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_rtmp_src_create);
154 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_rtmp_src_query);
156 GST_DEBUG_CATEGORY_INIT (rtmpsrc_debug, "rtmpsrc", 0, "RTMP Source");
160 gst_rtmp_src_init (GstRTMPSrc * rtmpsrc)
165 if (WSAStartup (MAKEWORD (2, 2), &wsa_data) != 0) {
166 GST_ERROR_OBJECT (rtmpsrc, "WSAStartup failed: 0x%08x", WSAGetLastError ());
170 rtmpsrc->cur_offset = 0;
171 rtmpsrc->last_timestamp = 0;
172 rtmpsrc->timeout = DEFAULT_TIMEOUT;
174 gst_base_src_set_format (GST_BASE_SRC (rtmpsrc), GST_FORMAT_TIME);
178 gst_rtmp_src_finalize (GObject * object)
180 GstRTMPSrc *rtmpsrc = GST_RTMP_SRC (object);
182 g_free (rtmpsrc->uri);
189 G_OBJECT_CLASS (parent_class)->finalize (object);
193 * URI interface support.
197 gst_rtmp_src_uri_get_type (GType type)
202 static const gchar *const *
203 gst_rtmp_src_uri_get_protocols (GType type)
205 static const gchar *protocols[] =
206 { "rtmp", "rtmpt", "rtmps", "rtmpe", "rtmfp", "rtmpte", "rtmpts", NULL };
212 gst_rtmp_src_uri_get_uri (GstURIHandler * handler)
214 GstRTMPSrc *src = GST_RTMP_SRC (handler);
216 /* FIXME: make thread-safe */
217 return g_strdup (src->uri);
221 gst_rtmp_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
224 GstRTMPSrc *src = GST_RTMP_SRC (handler);
226 if (GST_STATE (src) >= GST_STATE_PAUSED) {
227 g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
228 "Changing the URI on rtmpsrc when it is running is not supported");
241 if (!RTMP_ParseURL (uri, &protocol, &host, &port, &playpath, &app) ||
242 !host.av_len || !playpath.av_len) {
243 GST_ERROR_OBJECT (src, "Failed to parse URI %s", uri);
244 g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
245 "Could not parse RTMP URI");
246 /* FIXME: we should not be freeing RTMP internals to avoid leaking */
247 free (playpath.av_val);
250 free (playpath.av_val);
251 src->uri = g_strdup (uri);
254 GST_DEBUG_OBJECT (src, "Changed URI to %s", GST_STR_NULL (uri));
260 gst_rtmp_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
262 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
264 iface->get_type = gst_rtmp_src_uri_get_type;
265 iface->get_protocols = gst_rtmp_src_uri_get_protocols;
266 iface->get_uri = gst_rtmp_src_uri_get_uri;
267 iface->set_uri = gst_rtmp_src_uri_set_uri;
271 gst_rtmp_src_set_property (GObject * object, guint prop_id,
272 const GValue * value, GParamSpec * pspec)
276 src = GST_RTMP_SRC (object);
280 gst_rtmp_src_uri_set_uri (GST_URI_HANDLER (src),
281 g_value_get_string (value), NULL);
285 src->timeout = g_value_get_int (value);
289 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
295 gst_rtmp_src_get_property (GObject * object, guint prop_id, GValue * value,
300 src = GST_RTMP_SRC (object);
304 g_value_set_string (value, src->uri);
307 g_value_set_int (value, src->timeout);
310 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
316 * Read a new buffer from src->reqoffset, takes care of events
317 * and seeking and such.
320 gst_rtmp_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
330 src = GST_RTMP_SRC (pushsrc);
332 g_return_val_if_fail (src->rtmp != NULL, GST_FLOW_ERROR);
334 if (!RTMP_IsConnected (src->rtmp)) {
335 GST_DEBUG_OBJECT (src, "reconnecting");
336 if (!gst_rtmp_src_connect (src))
337 return GST_FLOW_ERROR;
340 size = GST_BASE_SRC_CAST (pushsrc)->blocksize;
342 GST_DEBUG ("reading from %" G_GUINT64_FORMAT
343 ", size %u", src->cur_offset, size);
345 buf = gst_buffer_new_allocate (NULL, size, NULL);
346 if (G_UNLIKELY (buf == NULL)) {
347 GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
348 return GST_FLOW_ERROR;
352 gst_buffer_map (buf, &map, GST_MAP_WRITE);
357 int read = RTMP_Read (src->rtmp, (char *) data, todo);
359 if (G_UNLIKELY (read == 0 && todo == size))
362 if (G_UNLIKELY (read == 0))
365 if (G_UNLIKELY (read < 0))
376 GST_LOG (" got size %d", read);
378 gst_buffer_unmap (buf, &map);
379 gst_buffer_resize (buf, 0, bsize);
382 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
383 src->discont = FALSE;
386 GST_BUFFER_TIMESTAMP (buf) = src->last_timestamp;
387 GST_BUFFER_OFFSET (buf) = src->cur_offset;
389 src->cur_offset += size;
390 if (src->last_timestamp == GST_CLOCK_TIME_NONE)
391 src->last_timestamp = src->rtmp->m_mediaStamp * GST_MSECOND;
393 src->last_timestamp =
394 MAX (src->last_timestamp, src->rtmp->m_mediaStamp * GST_MSECOND);
396 GST_LOG_OBJECT (src, "Created buffer of size %u at %" G_GINT64_FORMAT
397 " with timestamp %" GST_TIME_FORMAT, size, GST_BUFFER_OFFSET (buf),
398 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
401 /* we're done, return the buffer */
408 gst_buffer_unmap (buf, &map);
409 gst_buffer_unref (buf);
410 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Failed to read data"));
411 return GST_FLOW_ERROR;
415 gst_buffer_unmap (buf, &map);
416 gst_buffer_unref (buf);
417 if (src->cur_offset == 0) {
418 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
419 ("Failed to read any data from stream, check your URL"));
420 return GST_FLOW_ERROR;
422 GST_DEBUG_OBJECT (src, "Reading data gave EOS");
429 gst_rtmp_src_query (GstBaseSrc * basesrc, GstQuery * query)
431 gboolean ret = FALSE;
432 GstRTMPSrc *src = GST_RTMP_SRC (basesrc);
434 switch (GST_QUERY_TYPE (query)) {
436 gst_query_set_uri (query, src->uri);
439 case GST_QUERY_POSITION:{
442 gst_query_parse_position (query, &format, NULL);
443 if (format == GST_FORMAT_TIME) {
444 gst_query_set_position (query, format, src->last_timestamp);
449 case GST_QUERY_DURATION:{
453 gst_query_parse_duration (query, &format, NULL);
454 if (format == GST_FORMAT_TIME && src->rtmp) {
455 duration = RTMP_GetDuration (src->rtmp);
456 if (duration != 0.0) {
457 gst_query_set_duration (query, format, duration * GST_SECOND);
463 case GST_QUERY_SCHEDULING:{
464 gst_query_set_scheduling (query,
465 GST_SCHEDULING_FLAG_SEQUENTIAL |
466 GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED, 1, -1, 0);
467 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
478 ret = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
484 gst_rtmp_src_is_seekable (GstBaseSrc * basesrc)
488 src = GST_RTMP_SRC (basesrc);
490 return src->seekable;
494 gst_rtmp_src_prepare_seek_segment (GstBaseSrc * basesrc, GstEvent * event,
495 GstSegment * segment)
498 GstSeekType cur_type, stop_type;
504 src = GST_RTMP_SRC (basesrc);
506 gst_event_parse_seek (event, &rate, &format, &flags,
507 &cur_type, &cur, &stop_type, &stop);
509 if (!src->seekable) {
510 GST_LOG_OBJECT (src, "Not a seekable stream");
515 GST_LOG_OBJECT (src, "Not connected yet");
519 if (format != GST_FORMAT_TIME) {
520 GST_LOG_OBJECT (src, "Seeking only supported in TIME format");
524 if (stop_type != GST_SEEK_TYPE_NONE) {
525 GST_LOG_OBJECT (src, "Setting a stop position is not supported");
529 gst_segment_init (segment, GST_FORMAT_TIME);
530 gst_segment_do_seek (segment, rate, format, flags, cur_type, cur, stop_type,
537 gst_rtmp_src_do_seek (GstBaseSrc * basesrc, GstSegment * segment)
541 src = GST_RTMP_SRC (basesrc);
543 if (segment->format != GST_FORMAT_TIME) {
544 GST_LOG_OBJECT (src, "Only time based seeks are supported");
549 GST_LOG_OBJECT (src, "Not connected yet");
554 if (src->cur_offset == 0 && segment->start == 0)
557 if (!src->seekable) {
558 GST_LOG_OBJECT (src, "Not a seekable stream");
562 /* If we have just disconnected in unlock(), we need to re-connect
563 * and also let librtmp read some data before sending a seek,
564 * otherwise it will stall. Calling create() does both. */
565 if (!RTMP_IsConnected (src->rtmp)) {
566 GstBuffer *buffer = NULL;
567 gst_rtmp_src_create (GST_PUSH_SRC (basesrc), &buffer);
568 gst_buffer_replace (&buffer, NULL);
571 src->last_timestamp = GST_CLOCK_TIME_NONE;
572 if (!RTMP_SendSeek (src->rtmp, segment->start / GST_MSECOND)) {
573 GST_ERROR_OBJECT (src, "Seeking failed");
574 src->seekable = FALSE;
579 /* This is set here so that the call to create() above doesn't clear it */
582 GST_DEBUG_OBJECT (src, "Seek to %" GST_TIME_FORMAT " successfull",
583 GST_TIME_ARGS (segment->start));
589 gst_rtmp_src_connect (GstRTMPSrc * src)
591 RTMP_Init (src->rtmp);
592 src->rtmp->Link.timeout = src->timeout;
593 if (!RTMP_SetupURL (src->rtmp, src->uri)) {
594 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
595 ("Failed to setup URL '%s'", src->uri));
598 src->seekable = !(src->rtmp->Link.lFlags & RTMP_LF_LIVE);
599 GST_INFO_OBJECT (src, "seekable %d", src->seekable);
601 /* open if required */
602 if (!RTMP_IsConnected (src->rtmp)) {
603 if (!RTMP_Connect (src->rtmp, NULL)) {
604 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
605 ("Could not connect to RTMP stream \"%s\" for reading", src->uri));
613 /* open the file, do stuff necessary to go to PAUSED state */
615 gst_rtmp_src_start (GstBaseSrc * basesrc)
619 src = GST_RTMP_SRC (basesrc);
622 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), ("No filename given"));
627 src->last_timestamp = 0;
630 src->rtmp = RTMP_Alloc ();
632 GST_ERROR_OBJECT (src, "Could not allocate librtmp's RTMP context");
636 if (!gst_rtmp_src_connect (src))
643 RTMP_Free (src->rtmp);
650 gst_rtmp_src_unlock (GstBaseSrc * basesrc)
652 GstRTMPSrc *rtmpsrc = GST_RTMP_SRC (basesrc);
654 GST_DEBUG_OBJECT (rtmpsrc, "unlock");
656 /* This closes the socket, which means that any pending socket calls
659 RTMP_Close (rtmpsrc->rtmp);
667 gst_rtmp_src_stop (GstBaseSrc * basesrc)
671 src = GST_RTMP_SRC (basesrc);
674 RTMP_Free (src->rtmp);
679 src->last_timestamp = 0;