2 * Copyright (C) 2014 David Schleef <ds@schleef.org>
3 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19 * Boston, MA 02110-1335, USA.
22 * SECTION:element-rtmp2src
24 * The rtmp2src element receives input streams from an RTMP server.
27 * <title>Example launch line</title>
29 * gst-launch -v rtmp2src ! decodebin ! fakesink
31 * FIXME Describe what the pipeline does.
39 #include "gstrtmp2elements.h"
40 #include "gstrtmp2src.h"
42 #include "gstrtmp2locationhandler.h"
43 #include "rtmp/rtmpclient.h"
44 #include "rtmp/rtmpmessage.h"
46 #include <gst/base/gstpushsrc.h>
49 GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
50 #define GST_CAT_DEFAULT gst_rtmp2_src_debug_category
53 #define GST_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SRC,GstRtmp2Src))
54 #define GST_IS_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SRC))
58 GstPushSrc parent_instance;
61 GstRtmpLocation location;
62 gboolean async_connect;
66 /* If both self->lock and OBJECT_LOCK are needed,
67 * self->lock must be taken first */
71 gboolean running, flushing;
79 GMainContext *context;
81 GCancellable *cancellable;
82 GstRtmpConnection *connection;
92 GstPushSrcClass parent_class;
95 /* GObject virtual functions */
96 static void gst_rtmp2_src_set_property (GObject * object,
97 guint property_id, const GValue * value, GParamSpec * pspec);
98 static void gst_rtmp2_src_get_property (GObject * object,
99 guint property_id, GValue * value, GParamSpec * pspec);
100 static void gst_rtmp2_src_finalize (GObject * object);
101 static void gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface);
103 /* GstBaseSrc virtual functions */
104 static gboolean gst_rtmp2_src_start (GstBaseSrc * src);
105 static gboolean gst_rtmp2_src_stop (GstBaseSrc * src);
106 static gboolean gst_rtmp2_src_unlock (GstBaseSrc * src);
107 static gboolean gst_rtmp2_src_unlock_stop (GstBaseSrc * src);
108 static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset,
109 guint size, GstBuffer ** outbuf);
110 static gboolean gst_rtmp2_src_query (GstBaseSrc * src, GstQuery * query);
113 static void gst_rtmp2_src_task_func (gpointer user_data);
114 static void client_connect_done (GObject * source, GAsyncResult * result,
116 static void start_play_done (GObject * object, GAsyncResult * result,
118 static void connect_task_done (GObject * object, GAsyncResult * result,
121 static GstStructure *gst_rtmp2_src_get_stats (GstRtmp2Src * self);
137 PROP_TLS_VALIDATION_FLAGS,
144 #define DEFAULT_IDLE_TIMEOUT 0
148 static GstStaticPadTemplate gst_rtmp2_src_src_template =
149 GST_STATIC_PAD_TEMPLATE ("src",
152 GST_STATIC_CAPS ("video/x-flv")
155 /* class initialization */
157 G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC,
158 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
159 gst_rtmp2_src_uri_handler_init);
160 G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
161 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmp2src, "rtmp2src",
162 GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SRC, rtmp2_element_init (plugin));
165 gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
167 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
168 GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass);
170 gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
171 &gst_rtmp2_src_src_template);
173 gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
174 "RTMP source element", "Source", "Source element for RTMP streams",
175 "Make.TV, Inc. <info@make.tv>");
177 gobject_class->set_property = gst_rtmp2_src_set_property;
178 gobject_class->get_property = gst_rtmp2_src_get_property;
179 gobject_class->finalize = gst_rtmp2_src_finalize;
180 base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start);
181 base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop);
182 base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock);
183 base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop);
184 base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create);
185 base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query);
187 g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
188 g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
189 g_object_class_override_property (gobject_class, PROP_HOST, "host");
190 g_object_class_override_property (gobject_class, PROP_PORT, "port");
191 g_object_class_override_property (gobject_class, PROP_APPLICATION,
193 g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
194 g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
196 g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
197 g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
198 g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
199 g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
200 g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
201 "tls-validation-flags");
202 g_object_class_override_property (gobject_class, PROP_FLASH_VERSION,
205 g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
206 g_param_spec_boolean ("async-connect", "Async connect",
207 "Connect on READY, otherwise on first push", TRUE,
208 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_STATS,
211 g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
212 GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
214 g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT,
215 g_param_spec_uint ("idle-timeout", "Idle timeout",
216 "The maximum allowed time in seconds for valid packets not to arrive "
217 "from the peer (0 = no timeout)",
218 0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
219 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221 GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
222 "debug category for rtmp2src element");
226 gst_rtmp2_src_init (GstRtmp2Src * self)
228 self->async_connect = TRUE;
229 self->idle_timeout = DEFAULT_IDLE_TIMEOUT;
231 g_mutex_init (&self->lock);
232 g_cond_init (&self->cond);
234 self->task = gst_task_new (gst_rtmp2_src_task_func, self, NULL);
235 g_rec_mutex_init (&self->task_lock);
236 gst_task_set_lock (self->task, &self->task_lock);
240 gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface)
242 gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SRC);
246 gst_rtmp2_src_set_property (GObject * object, guint property_id,
247 const GValue * value, GParamSpec * pspec)
249 GstRtmp2Src *self = GST_RTMP2_SRC (object);
251 switch (property_id) {
253 gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
254 g_value_get_string (value));
257 GST_OBJECT_LOCK (self);
258 self->location.scheme = g_value_get_enum (value);
259 GST_OBJECT_UNLOCK (self);
262 GST_OBJECT_LOCK (self);
263 g_free (self->location.host);
264 self->location.host = g_value_dup_string (value);
265 GST_OBJECT_UNLOCK (self);
268 GST_OBJECT_LOCK (self);
269 self->location.port = g_value_get_int (value);
270 GST_OBJECT_UNLOCK (self);
272 case PROP_APPLICATION:
273 GST_OBJECT_LOCK (self);
274 g_free (self->location.application);
275 self->location.application = g_value_dup_string (value);
276 GST_OBJECT_UNLOCK (self);
279 GST_OBJECT_LOCK (self);
280 g_free (self->location.stream);
281 self->location.stream = g_value_dup_string (value);
282 GST_OBJECT_UNLOCK (self);
284 case PROP_SECURE_TOKEN:
285 GST_OBJECT_LOCK (self);
286 g_free (self->location.secure_token);
287 self->location.secure_token = g_value_dup_string (value);
288 GST_OBJECT_UNLOCK (self);
291 GST_OBJECT_LOCK (self);
292 g_free (self->location.username);
293 self->location.username = g_value_dup_string (value);
294 GST_OBJECT_UNLOCK (self);
297 GST_OBJECT_LOCK (self);
298 g_free (self->location.password);
299 self->location.password = g_value_dup_string (value);
300 GST_OBJECT_UNLOCK (self);
303 GST_OBJECT_LOCK (self);
304 self->location.authmod = g_value_get_enum (value);
305 GST_OBJECT_UNLOCK (self);
308 GST_OBJECT_LOCK (self);
309 self->location.timeout = g_value_get_uint (value);
310 GST_OBJECT_UNLOCK (self);
312 case PROP_TLS_VALIDATION_FLAGS:
313 GST_OBJECT_LOCK (self);
314 self->location.tls_flags = g_value_get_flags (value);
315 GST_OBJECT_UNLOCK (self);
317 case PROP_FLASH_VERSION:
318 GST_OBJECT_LOCK (self);
319 g_free (self->location.flash_ver);
320 self->location.flash_ver = g_value_dup_string (value);
321 GST_OBJECT_UNLOCK (self);
323 case PROP_ASYNC_CONNECT:
324 GST_OBJECT_LOCK (self);
325 self->async_connect = g_value_get_boolean (value);
326 GST_OBJECT_UNLOCK (self);
328 case PROP_IDLE_TIMEOUT:
329 GST_OBJECT_LOCK (self);
330 self->idle_timeout = g_value_get_uint (value);
331 GST_OBJECT_UNLOCK (self);
334 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
340 gst_rtmp2_src_get_property (GObject * object, guint property_id,
341 GValue * value, GParamSpec * pspec)
343 GstRtmp2Src *self = GST_RTMP2_SRC (object);
345 switch (property_id) {
347 GST_OBJECT_LOCK (self);
348 g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
350 GST_OBJECT_UNLOCK (self);
353 GST_OBJECT_LOCK (self);
354 g_value_set_enum (value, self->location.scheme);
355 GST_OBJECT_UNLOCK (self);
358 GST_OBJECT_LOCK (self);
359 g_value_set_string (value, self->location.host);
360 GST_OBJECT_UNLOCK (self);
363 GST_OBJECT_LOCK (self);
364 g_value_set_int (value, self->location.port);
365 GST_OBJECT_UNLOCK (self);
367 case PROP_APPLICATION:
368 GST_OBJECT_LOCK (self);
369 g_value_set_string (value, self->location.application);
370 GST_OBJECT_UNLOCK (self);
373 GST_OBJECT_LOCK (self);
374 g_value_set_string (value, self->location.stream);
375 GST_OBJECT_UNLOCK (self);
377 case PROP_SECURE_TOKEN:
378 GST_OBJECT_LOCK (self);
379 g_value_set_string (value, self->location.secure_token);
380 GST_OBJECT_UNLOCK (self);
383 GST_OBJECT_LOCK (self);
384 g_value_set_string (value, self->location.username);
385 GST_OBJECT_UNLOCK (self);
388 GST_OBJECT_LOCK (self);
389 g_value_set_string (value, self->location.password);
390 GST_OBJECT_UNLOCK (self);
393 GST_OBJECT_LOCK (self);
394 g_value_set_enum (value, self->location.authmod);
395 GST_OBJECT_UNLOCK (self);
398 GST_OBJECT_LOCK (self);
399 g_value_set_uint (value, self->location.timeout);
400 GST_OBJECT_UNLOCK (self);
402 case PROP_TLS_VALIDATION_FLAGS:
403 GST_OBJECT_LOCK (self);
404 g_value_set_flags (value, self->location.tls_flags);
405 GST_OBJECT_UNLOCK (self);
407 case PROP_FLASH_VERSION:
408 GST_OBJECT_LOCK (self);
409 g_value_set_string (value, self->location.flash_ver);
410 GST_OBJECT_UNLOCK (self);
412 case PROP_ASYNC_CONNECT:
413 GST_OBJECT_LOCK (self);
414 g_value_set_boolean (value, self->async_connect);
415 GST_OBJECT_UNLOCK (self);
418 g_value_take_boxed (value, gst_rtmp2_src_get_stats (self));
420 case PROP_IDLE_TIMEOUT:
421 GST_OBJECT_LOCK (self);
422 g_value_set_uint (value, self->idle_timeout);
423 GST_OBJECT_UNLOCK (self);
426 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
432 gst_rtmp2_src_finalize (GObject * object)
434 GstRtmp2Src *self = GST_RTMP2_SRC (object);
436 gst_buffer_replace (&self->message, NULL);
438 g_clear_object (&self->cancellable);
439 g_clear_object (&self->connection);
441 g_clear_object (&self->task);
442 g_rec_mutex_clear (&self->task_lock);
444 g_mutex_clear (&self->lock);
445 g_cond_clear (&self->cond);
447 g_clear_pointer (&self->stats, gst_structure_free);
448 gst_rtmp_location_clear (&self->location);
450 G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object);
454 gst_rtmp2_src_start (GstBaseSrc * src)
456 GstRtmp2Src *self = GST_RTMP2_SRC (src);
459 GST_OBJECT_LOCK (self);
460 async = self->async_connect;
461 GST_OBJECT_UNLOCK (self);
463 GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
465 g_clear_object (&self->cancellable);
467 self->running = TRUE;
468 self->cancellable = g_cancellable_new ();
470 self->sent_header = FALSE;
471 self->last_ts = GST_CLOCK_TIME_NONE;
472 self->timeout = FALSE;
473 self->started = FALSE;
476 gst_task_start (self->task);
483 quit_invoker (gpointer user_data)
485 g_main_loop_quit (user_data);
486 return G_SOURCE_REMOVE;
490 stop_task (GstRtmp2Src * self)
492 gst_task_stop (self->task);
493 self->running = FALSE;
495 if (self->cancellable) {
496 GST_DEBUG_OBJECT (self, "Cancelling");
497 g_cancellable_cancel (self->cancellable);
501 GST_DEBUG_OBJECT (self, "Stopping loop");
502 g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
503 quit_invoker, g_main_loop_ref (self->loop),
504 (GDestroyNotify) g_main_loop_unref);
507 g_cond_broadcast (&self->cond);
511 gst_rtmp2_src_stop (GstBaseSrc * src)
513 GstRtmp2Src *self = GST_RTMP2_SRC (src);
515 GST_DEBUG_OBJECT (self, "stop");
517 g_mutex_lock (&self->lock);
519 g_mutex_unlock (&self->lock);
521 gst_task_join (self->task);
527 gst_rtmp2_src_unlock (GstBaseSrc * src)
529 GstRtmp2Src *self = GST_RTMP2_SRC (src);
531 GST_DEBUG_OBJECT (self, "unlock");
533 g_mutex_lock (&self->lock);
534 self->flushing = TRUE;
535 g_cond_broadcast (&self->cond);
536 g_mutex_unlock (&self->lock);
542 gst_rtmp2_src_unlock_stop (GstBaseSrc * src)
544 GstRtmp2Src *self = GST_RTMP2_SRC (src);
546 GST_DEBUG_OBJECT (self, "unlock_stop");
548 g_mutex_lock (&self->lock);
549 self->flushing = FALSE;
550 g_mutex_unlock (&self->lock);
556 on_timeout (GstRtmp2Src * self)
558 g_mutex_lock (&self->lock);
559 self->timeout = TRUE;
560 g_cond_broadcast (&self->cond);
561 g_mutex_unlock (&self->lock);
563 return G_SOURCE_REMOVE;
567 gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
570 GstRtmp2Src *self = GST_RTMP2_SRC (src);
571 GstBuffer *message, *buffer;
573 guint32 timestamp = 0;
574 GSource *timeout = NULL;
575 GstFlowReturn ret = GST_FLOW_OK;
577 static const guint8 flv_header_data[] = {
578 0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
579 0x09, 0x00, 0x00, 0x00, 0x00,
582 GST_LOG_OBJECT (self, "create");
584 g_mutex_lock (&self->lock);
587 gst_task_start (self->task);
590 /* wait until GMainLoop begins running so that we can attach
591 * timeout source safely.
592 * If the task stopped meanwhile, "running" will be FALSE
593 * than stop_task() will wake up us as well
595 while ((!self->started && self->running) && (!self->loop
596 || !g_main_loop_is_running (self->loop)))
597 g_cond_wait (&self->cond, &self->lock);
599 GST_OBJECT_LOCK (self);
600 if (self->idle_timeout && self->context) {
601 timeout = g_timeout_source_new_seconds (self->idle_timeout);
603 g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL);
604 g_source_attach (timeout, self->context);
606 GST_OBJECT_UNLOCK (self);
608 while (!self->message) {
609 if (!self->running) {
613 if (self->flushing) {
614 ret = GST_FLOW_FLUSHING;
618 GST_DEBUG_OBJECT (self, "Idle timeout, return EOS");
622 g_cond_wait (&self->cond, &self->lock);
626 g_source_destroy (timeout);
627 g_source_unref (timeout);
630 message = self->message;
631 self->message = NULL;
632 g_cond_signal (&self->cond);
633 g_mutex_unlock (&self->lock);
635 meta = gst_buffer_get_rtmp_meta (message);
637 GST_ELEMENT_ERROR (self, CORE, FAILED,
638 ("Internal error: No RTMP meta on buffer"),
639 ("No RTMP meta on %" GST_PTR_FORMAT, message));
640 gst_buffer_unref (message);
641 return GST_FLOW_ERROR;
644 if (GST_BUFFER_DTS_IS_VALID (message)) {
645 GstClockTime last_ts = self->last_ts, ts = GST_BUFFER_DTS (message);
647 if (GST_CLOCK_TIME_IS_VALID (last_ts) && last_ts > ts) {
648 GST_LOG_OBJECT (self, "Timestamp regression: %" GST_TIME_FORMAT
649 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (last_ts), GST_TIME_ARGS (ts));
653 timestamp = ts / GST_MSECOND;
656 buffer = gst_buffer_copy_region (message, GST_BUFFER_COPY_MEMORY, 0, -1);
659 guint8 *tag_header = g_malloc (11);
661 gst_memory_new_wrapped (0, tag_header, 11, 0, 11, tag_header, g_free);
662 GST_WRITE_UINT8 (tag_header, meta->type);
663 GST_WRITE_UINT24_BE (tag_header + 1, meta->size);
664 GST_WRITE_UINT24_BE (tag_header + 4, timestamp);
665 GST_WRITE_UINT8 (tag_header + 7, timestamp >> 24);
666 GST_WRITE_UINT24_BE (tag_header + 8, 0);
667 gst_buffer_prepend_memory (buffer, memory);
671 guint8 *tag_footer = g_malloc (4);
673 gst_memory_new_wrapped (0, tag_footer, 4, 0, 4, tag_footer, g_free);
674 GST_WRITE_UINT32_BE (tag_footer, meta->size + 11);
675 gst_buffer_append_memory (buffer, memory);
678 if (!self->sent_header) {
679 GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
680 (guint8 *) flv_header_data, sizeof flv_header_data, 0,
681 sizeof flv_header_data, NULL, NULL);
682 gst_buffer_prepend_memory (buffer, memory);
683 self->sent_header = TRUE;
686 GST_BUFFER_DTS (buffer) = self->last_ts;
690 gst_buffer_unref (message);
695 g_source_destroy (timeout);
696 g_source_unref (timeout);
698 /* Keep the unlock after the destruction of the timeout source to workaround
699 * https://gitlab.gnome.org/GNOME/glib/-/issues/803
701 g_mutex_unlock (&self->lock);
707 gst_rtmp2_src_query (GstBaseSrc * basesrc, GstQuery * query)
709 gboolean ret = FALSE;
711 switch (GST_QUERY_TYPE (query)) {
712 case GST_QUERY_SCHEDULING:{
713 gst_query_set_scheduling (query,
714 GST_SCHEDULING_FLAG_SEQUENTIAL |
715 GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED, 1, -1, 0);
716 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
728 GST_BASE_SRC_CLASS (gst_rtmp2_src_parent_class)->query (basesrc, query);
734 main_loop_running_cb (GstRtmp2Src * self)
736 GST_TRACE_OBJECT (self, "Main loop running now");
738 g_mutex_lock (&self->lock);
739 self->started = TRUE;
740 g_cond_broadcast (&self->cond);
741 g_mutex_unlock (&self->lock);
743 return G_SOURCE_REMOVE;
748 gst_rtmp2_src_task_func (gpointer user_data)
750 GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
751 GMainContext *context;
756 GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
757 g_mutex_lock (&self->lock);
759 context = self->context = g_main_context_new ();
760 g_main_context_push_thread_default (context);
761 loop = self->loop = g_main_loop_new (context, TRUE);
763 source = g_idle_source_new ();
764 g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self,
766 g_source_attach (source, self->context);
767 g_source_unref (source);
769 connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
771 g_clear_pointer (&self->stats, gst_structure_free);
773 GST_OBJECT_LOCK (self);
774 gst_rtmp_client_connect_async (&self->location, self->cancellable,
775 client_connect_done, connector);
776 GST_OBJECT_UNLOCK (self);
779 g_mutex_unlock (&self->lock);
780 g_main_loop_run (loop);
781 g_mutex_lock (&self->lock);
783 if (self->connection) {
784 self->stats = gst_rtmp_connection_get_stats (self->connection);
787 g_clear_pointer (&self->loop, g_main_loop_unref);
788 g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
789 g_cond_broadcast (&self->cond);
791 /* Run loop cleanup */
792 g_mutex_unlock (&self->lock);
793 while (g_main_context_pending (context)) {
794 GST_DEBUG_OBJECT (self, "iterating main context to clean up");
795 g_main_context_iteration (context, FALSE);
797 g_main_context_pop_thread_default (context);
798 g_mutex_lock (&self->lock);
800 g_clear_pointer (&self->context, g_main_context_unref);
801 gst_buffer_replace (&self->message, NULL);
803 g_mutex_unlock (&self->lock);
804 GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task exiting");
808 client_connect_done (GObject * source, GAsyncResult * result,
811 GTask *task = user_data;
812 GstRtmp2Src *self = g_task_get_source_object (task);
813 GError *error = NULL;
814 GstRtmpConnection *connection;
816 connection = gst_rtmp_client_connect_finish (result, &error);
818 g_task_return_error (task, error);
819 g_object_unref (task);
823 g_task_set_task_data (task, connection, g_object_unref);
825 if (g_task_return_error_if_cancelled (task)) {
826 g_object_unref (task);
830 GST_OBJECT_LOCK (self);
831 gst_rtmp_client_start_play_async (connection, self->location.stream,
832 g_task_get_cancellable (task), start_play_done, task);
833 GST_OBJECT_UNLOCK (self);
837 start_play_done (GObject * source, GAsyncResult * result, gpointer user_data)
839 GTask *task = G_TASK (user_data);
840 GstRtmp2Src *self = g_task_get_source_object (task);
841 GstRtmpConnection *connection = g_task_get_task_data (task);
842 GError *error = NULL;
844 if (g_task_return_error_if_cancelled (task)) {
845 g_object_unref (task);
849 if (gst_rtmp_client_start_play_finish (connection, result,
850 &self->stream_id, &error)) {
851 g_task_return_pointer (task, g_object_ref (connection),
852 gst_rtmp_connection_close_and_unref);
854 g_task_return_error (task, error);
857 g_task_set_task_data (task, NULL, NULL);
858 g_object_unref (task);
862 got_message (GstRtmpConnection * connection, GstBuffer * buffer,
865 GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
866 GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
867 guint32 min_size = 1;
869 g_return_if_fail (meta);
871 if (meta->mstream != self->stream_id) {
872 GST_DEBUG_OBJECT (self, "Ignoring %s message with stream %" G_GUINT32_FORMAT
873 " != %" G_GUINT32_FORMAT, gst_rtmp_message_type_get_nick (meta->type),
874 meta->mstream, self->stream_id);
878 switch (meta->type) {
879 case GST_RTMP_MESSAGE_TYPE_VIDEO:
883 case GST_RTMP_MESSAGE_TYPE_AUDIO:
887 case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
891 GST_DEBUG_OBJECT (self, "Ignoring %s message, wrong type",
892 gst_rtmp_message_type_get_nick (meta->type));
896 if (meta->size < min_size) {
897 GST_DEBUG_OBJECT (self, "Ignoring too small %s message (%" G_GUINT32_FORMAT
898 " < %" G_GUINT32_FORMAT ")",
899 gst_rtmp_message_type_get_nick (meta->type), meta->size, min_size);
903 g_mutex_lock (&self->lock);
904 while (self->message) {
905 if (!self->running) {
908 g_cond_wait (&self->cond, &self->lock);
911 self->message = gst_buffer_ref (buffer);
912 g_cond_signal (&self->cond);
915 g_mutex_unlock (&self->lock);
920 error_callback (GstRtmpConnection * connection, GstRtmp2Src * self)
922 g_mutex_lock (&self->lock);
923 if (self->cancellable) {
924 g_cancellable_cancel (self->cancellable);
925 } else if (self->loop) {
926 GST_INFO_OBJECT (self, "Connection error");
929 g_mutex_unlock (&self->lock);
933 control_callback (GstRtmpConnection * connection, gint uc_type,
934 guint stream_id, GstRtmp2Src * self)
936 GST_INFO_OBJECT (self, "stream %u got %s", stream_id,
937 gst_rtmp_user_control_type_get_nick (uc_type));
939 if (uc_type == GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF && stream_id == 1) {
940 GST_INFO_OBJECT (self, "went EOS");
946 send_connect_error (GstRtmp2Src * self, GError * error)
949 GST_ERROR_OBJECT (self, "Connect failed with NULL error");
950 GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
954 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
955 GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
956 GST_STR_NULL (error->message));
960 GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
961 g_quark_to_string (error->domain), error->code,
962 GST_STR_NULL (error->message));
964 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
965 GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
966 ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
967 } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
968 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
969 ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
971 GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
972 ("Failed to connect"),
973 ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
974 GST_STR_NULL (error->message)));
979 connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
981 GstRtmp2Src *self = GST_RTMP2_SRC (object);
982 GTask *task = G_TASK (result);
983 GError *error = NULL;
985 g_mutex_lock (&self->lock);
987 g_warn_if_fail (g_task_is_valid (task, object));
989 if (self->cancellable == g_task_get_cancellable (task)) {
990 g_clear_object (&self->cancellable);
993 self->connection = g_task_propagate_pointer (task, &error);
994 if (self->connection) {
995 gst_rtmp_connection_set_input_handler (self->connection,
996 got_message, g_object_ref (self), g_object_unref);
997 g_signal_connect_object (self->connection, "error",
998 G_CALLBACK (error_callback), self, 0);
999 g_signal_connect_object (self->connection, "stream-control",
1000 G_CALLBACK (control_callback), self, 0);
1002 send_connect_error (self, error);
1004 g_error_free (error);
1007 g_cond_broadcast (&self->cond);
1008 g_mutex_unlock (&self->lock);
1011 static GstStructure *
1012 gst_rtmp2_src_get_stats (GstRtmp2Src * self)
1016 g_mutex_lock (&self->lock);
1018 if (self->connection) {
1019 s = gst_rtmp_connection_get_stats (self->connection);
1020 } else if (self->stats) {
1021 s = gst_structure_copy (self->stats);
1023 s = gst_rtmp_connection_get_null_stats ();
1026 g_mutex_unlock (&self->lock);