Merging gst-plugins-bad
[platform/upstream/gstreamer.git] / gst / rtmp2 / gstrtmp2src.c
1 /* GStreamer
2  * Copyright (C) 2014 David Schleef <ds@schleef.org>
3  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
4  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
19  * Boston, MA 02110-1335, USA.
20  */
21 /**
22  * SECTION:element-rtmp2src
23  *
24  * The rtmp2src element receives input streams from an RTMP server.
25  *
26  * <refsect2>
27  * <title>Example launch line</title>
28  * |[
29  * gst-launch -v rtmp2src ! decodebin ! fakesink
30  * ]|
31  * FIXME Describe what the pipeline does.
32  * </refsect2>
33  */
34
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38
39 #include "gstrtmp2elements.h"
40 #include "gstrtmp2src.h"
41
42 #include "gstrtmp2locationhandler.h"
43 #include "rtmp/rtmpclient.h"
44 #include "rtmp/rtmpmessage.h"
45
46 #include <gst/base/gstpushsrc.h>
47 #include <string.h>
48
49 GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
50 #define GST_CAT_DEFAULT gst_rtmp2_src_debug_category
51
52 /* prototypes */
53 #define GST_RTMP2_SRC(obj)   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SRC,GstRtmp2Src))
54 #define GST_IS_RTMP2_SRC(obj)   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SRC))
55
56 typedef struct
57 {
58   GstPushSrc parent_instance;
59
60   /* properties */
61   GstRtmpLocation location;
62   gboolean async_connect;
63   GstStructure *stats;
64   guint idle_timeout;
65
66   /* If both self->lock and OBJECT_LOCK are needed,
67    * self->lock must be taken first */
68   GMutex lock;
69   GCond cond;
70
71   gboolean running, flushing;
72   gboolean timeout;
73   gboolean started;
74
75   GstTask *task;
76   GRecMutex task_lock;
77
78   GMainLoop *loop;
79   GMainContext *context;
80
81   GCancellable *cancellable;
82   GstRtmpConnection *connection;
83   guint32 stream_id;
84
85   GstBuffer *message;
86   gboolean sent_header;
87   GstClockTime last_ts;
88 } GstRtmp2Src;
89
90 typedef struct
91 {
92   GstPushSrcClass parent_class;
93 } GstRtmp2SrcClass;
94
95 /* GObject virtual functions */
96 static void gst_rtmp2_src_set_property (GObject * object,
97     guint property_id, const GValue * value, GParamSpec * pspec);
98 static void gst_rtmp2_src_get_property (GObject * object,
99     guint property_id, GValue * value, GParamSpec * pspec);
100 static void gst_rtmp2_src_finalize (GObject * object);
101 static void gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface);
102
103 /* GstBaseSrc virtual functions */
104 static gboolean gst_rtmp2_src_start (GstBaseSrc * src);
105 static gboolean gst_rtmp2_src_stop (GstBaseSrc * src);
106 static gboolean gst_rtmp2_src_unlock (GstBaseSrc * src);
107 static gboolean gst_rtmp2_src_unlock_stop (GstBaseSrc * src);
108 static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset,
109     guint size, GstBuffer ** outbuf);
110 static gboolean gst_rtmp2_src_query (GstBaseSrc * src, GstQuery * query);
111
112 /* Internal API */
113 static void gst_rtmp2_src_task_func (gpointer user_data);
114 static void client_connect_done (GObject * source, GAsyncResult * result,
115     gpointer user_data);
116 static void start_play_done (GObject * object, GAsyncResult * result,
117     gpointer user_data);
118 static void connect_task_done (GObject * object, GAsyncResult * result,
119     gpointer user_data);
120
121 static GstStructure *gst_rtmp2_src_get_stats (GstRtmp2Src * self);
122
123 enum
124 {
125   PROP_0,
126   PROP_LOCATION,
127   PROP_SCHEME,
128   PROP_HOST,
129   PROP_PORT,
130   PROP_APPLICATION,
131   PROP_STREAM,
132   PROP_SECURE_TOKEN,
133   PROP_USERNAME,
134   PROP_PASSWORD,
135   PROP_AUTHMOD,
136   PROP_TIMEOUT,
137   PROP_TLS_VALIDATION_FLAGS,
138   PROP_FLASH_VERSION,
139   PROP_ASYNC_CONNECT,
140   PROP_STATS,
141   PROP_IDLE_TIMEOUT,
142 };
143
144 #define DEFAULT_IDLE_TIMEOUT 0
145
146 /* pad templates */
147
148 static GstStaticPadTemplate gst_rtmp2_src_src_template =
149 GST_STATIC_PAD_TEMPLATE ("src",
150     GST_PAD_SRC,
151     GST_PAD_ALWAYS,
152     GST_STATIC_CAPS ("video/x-flv")
153     );
154
155 /* class initialization */
156
157 G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC,
158     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
159         gst_rtmp2_src_uri_handler_init);
160     G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
161 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmp2src, "rtmp2src",
162     GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SRC, rtmp2_element_init (plugin));
163
164 static void
165 gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
166 {
167   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
168   GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass);
169
170   gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
171       &gst_rtmp2_src_src_template);
172
173   gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
174       "RTMP source element", "Source", "Source element for RTMP streams",
175       "Make.TV, Inc. <info@make.tv>");
176
177   gobject_class->set_property = gst_rtmp2_src_set_property;
178   gobject_class->get_property = gst_rtmp2_src_get_property;
179   gobject_class->finalize = gst_rtmp2_src_finalize;
180   base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start);
181   base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop);
182   base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock);
183   base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop);
184   base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create);
185   base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query);
186
187   g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
188   g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
189   g_object_class_override_property (gobject_class, PROP_HOST, "host");
190   g_object_class_override_property (gobject_class, PROP_PORT, "port");
191   g_object_class_override_property (gobject_class, PROP_APPLICATION,
192       "application");
193   g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
194   g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
195       "secure-token");
196   g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
197   g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
198   g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
199   g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
200   g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
201       "tls-validation-flags");
202   g_object_class_override_property (gobject_class, PROP_FLASH_VERSION,
203       "flash-version");
204
205   g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
206       g_param_spec_boolean ("async-connect", "Async connect",
207           "Connect on READY, otherwise on first push", TRUE,
208           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_class, PROP_STATS,
211       g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
212           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
213
214   g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT,
215       g_param_spec_uint ("idle-timeout", "Idle timeout",
216           "The maximum allowed time in seconds for valid packets not to arrive "
217           "from the peer (0 = no timeout)",
218           0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
219           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221   GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
222       "debug category for rtmp2src element");
223 }
224
225 static void
226 gst_rtmp2_src_init (GstRtmp2Src * self)
227 {
228   self->async_connect = TRUE;
229   self->idle_timeout = DEFAULT_IDLE_TIMEOUT;
230
231   g_mutex_init (&self->lock);
232   g_cond_init (&self->cond);
233
234   self->task = gst_task_new (gst_rtmp2_src_task_func, self, NULL);
235   g_rec_mutex_init (&self->task_lock);
236   gst_task_set_lock (self->task, &self->task_lock);
237 }
238
239 static void
240 gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface)
241 {
242   gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SRC);
243 }
244
245 static void
246 gst_rtmp2_src_set_property (GObject * object, guint property_id,
247     const GValue * value, GParamSpec * pspec)
248 {
249   GstRtmp2Src *self = GST_RTMP2_SRC (object);
250
251   switch (property_id) {
252     case PROP_LOCATION:
253       gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
254           g_value_get_string (value));
255       break;
256     case PROP_SCHEME:
257       GST_OBJECT_LOCK (self);
258       self->location.scheme = g_value_get_enum (value);
259       GST_OBJECT_UNLOCK (self);
260       break;
261     case PROP_HOST:
262       GST_OBJECT_LOCK (self);
263       g_free (self->location.host);
264       self->location.host = g_value_dup_string (value);
265       GST_OBJECT_UNLOCK (self);
266       break;
267     case PROP_PORT:
268       GST_OBJECT_LOCK (self);
269       self->location.port = g_value_get_int (value);
270       GST_OBJECT_UNLOCK (self);
271       break;
272     case PROP_APPLICATION:
273       GST_OBJECT_LOCK (self);
274       g_free (self->location.application);
275       self->location.application = g_value_dup_string (value);
276       GST_OBJECT_UNLOCK (self);
277       break;
278     case PROP_STREAM:
279       GST_OBJECT_LOCK (self);
280       g_free (self->location.stream);
281       self->location.stream = g_value_dup_string (value);
282       GST_OBJECT_UNLOCK (self);
283       break;
284     case PROP_SECURE_TOKEN:
285       GST_OBJECT_LOCK (self);
286       g_free (self->location.secure_token);
287       self->location.secure_token = g_value_dup_string (value);
288       GST_OBJECT_UNLOCK (self);
289       break;
290     case PROP_USERNAME:
291       GST_OBJECT_LOCK (self);
292       g_free (self->location.username);
293       self->location.username = g_value_dup_string (value);
294       GST_OBJECT_UNLOCK (self);
295       break;
296     case PROP_PASSWORD:
297       GST_OBJECT_LOCK (self);
298       g_free (self->location.password);
299       self->location.password = g_value_dup_string (value);
300       GST_OBJECT_UNLOCK (self);
301       break;
302     case PROP_AUTHMOD:
303       GST_OBJECT_LOCK (self);
304       self->location.authmod = g_value_get_enum (value);
305       GST_OBJECT_UNLOCK (self);
306       break;
307     case PROP_TIMEOUT:
308       GST_OBJECT_LOCK (self);
309       self->location.timeout = g_value_get_uint (value);
310       GST_OBJECT_UNLOCK (self);
311       break;
312     case PROP_TLS_VALIDATION_FLAGS:
313       GST_OBJECT_LOCK (self);
314       self->location.tls_flags = g_value_get_flags (value);
315       GST_OBJECT_UNLOCK (self);
316       break;
317     case PROP_FLASH_VERSION:
318       GST_OBJECT_LOCK (self);
319       g_free (self->location.flash_ver);
320       self->location.flash_ver = g_value_dup_string (value);
321       GST_OBJECT_UNLOCK (self);
322       break;
323     case PROP_ASYNC_CONNECT:
324       GST_OBJECT_LOCK (self);
325       self->async_connect = g_value_get_boolean (value);
326       GST_OBJECT_UNLOCK (self);
327       break;
328     case PROP_IDLE_TIMEOUT:
329       GST_OBJECT_LOCK (self);
330       self->idle_timeout = g_value_get_uint (value);
331       GST_OBJECT_UNLOCK (self);
332       break;
333     default:
334       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
335       break;
336   }
337 }
338
339 static void
340 gst_rtmp2_src_get_property (GObject * object, guint property_id,
341     GValue * value, GParamSpec * pspec)
342 {
343   GstRtmp2Src *self = GST_RTMP2_SRC (object);
344
345   switch (property_id) {
346     case PROP_LOCATION:
347       GST_OBJECT_LOCK (self);
348       g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
349               TRUE));
350       GST_OBJECT_UNLOCK (self);
351       break;
352     case PROP_SCHEME:
353       GST_OBJECT_LOCK (self);
354       g_value_set_enum (value, self->location.scheme);
355       GST_OBJECT_UNLOCK (self);
356       break;
357     case PROP_HOST:
358       GST_OBJECT_LOCK (self);
359       g_value_set_string (value, self->location.host);
360       GST_OBJECT_UNLOCK (self);
361       break;
362     case PROP_PORT:
363       GST_OBJECT_LOCK (self);
364       g_value_set_int (value, self->location.port);
365       GST_OBJECT_UNLOCK (self);
366       break;
367     case PROP_APPLICATION:
368       GST_OBJECT_LOCK (self);
369       g_value_set_string (value, self->location.application);
370       GST_OBJECT_UNLOCK (self);
371       break;
372     case PROP_STREAM:
373       GST_OBJECT_LOCK (self);
374       g_value_set_string (value, self->location.stream);
375       GST_OBJECT_UNLOCK (self);
376       break;
377     case PROP_SECURE_TOKEN:
378       GST_OBJECT_LOCK (self);
379       g_value_set_string (value, self->location.secure_token);
380       GST_OBJECT_UNLOCK (self);
381       break;
382     case PROP_USERNAME:
383       GST_OBJECT_LOCK (self);
384       g_value_set_string (value, self->location.username);
385       GST_OBJECT_UNLOCK (self);
386       break;
387     case PROP_PASSWORD:
388       GST_OBJECT_LOCK (self);
389       g_value_set_string (value, self->location.password);
390       GST_OBJECT_UNLOCK (self);
391       break;
392     case PROP_AUTHMOD:
393       GST_OBJECT_LOCK (self);
394       g_value_set_enum (value, self->location.authmod);
395       GST_OBJECT_UNLOCK (self);
396       break;
397     case PROP_TIMEOUT:
398       GST_OBJECT_LOCK (self);
399       g_value_set_uint (value, self->location.timeout);
400       GST_OBJECT_UNLOCK (self);
401       break;
402     case PROP_TLS_VALIDATION_FLAGS:
403       GST_OBJECT_LOCK (self);
404       g_value_set_flags (value, self->location.tls_flags);
405       GST_OBJECT_UNLOCK (self);
406       break;
407     case PROP_FLASH_VERSION:
408       GST_OBJECT_LOCK (self);
409       g_value_set_string (value, self->location.flash_ver);
410       GST_OBJECT_UNLOCK (self);
411       break;
412     case PROP_ASYNC_CONNECT:
413       GST_OBJECT_LOCK (self);
414       g_value_set_boolean (value, self->async_connect);
415       GST_OBJECT_UNLOCK (self);
416       break;
417     case PROP_STATS:
418       g_value_take_boxed (value, gst_rtmp2_src_get_stats (self));
419       break;
420     case PROP_IDLE_TIMEOUT:
421       GST_OBJECT_LOCK (self);
422       g_value_set_uint (value, self->idle_timeout);
423       GST_OBJECT_UNLOCK (self);
424       break;
425     default:
426       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
427       break;
428   }
429 }
430
431 static void
432 gst_rtmp2_src_finalize (GObject * object)
433 {
434   GstRtmp2Src *self = GST_RTMP2_SRC (object);
435
436   gst_buffer_replace (&self->message, NULL);
437
438   g_clear_object (&self->cancellable);
439   g_clear_object (&self->connection);
440
441   g_clear_object (&self->task);
442   g_rec_mutex_clear (&self->task_lock);
443
444   g_mutex_clear (&self->lock);
445   g_cond_clear (&self->cond);
446
447   g_clear_pointer (&self->stats, gst_structure_free);
448   gst_rtmp_location_clear (&self->location);
449
450   G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object);
451 }
452
453 static gboolean
454 gst_rtmp2_src_start (GstBaseSrc * src)
455 {
456   GstRtmp2Src *self = GST_RTMP2_SRC (src);
457   gboolean async;
458
459   GST_OBJECT_LOCK (self);
460   async = self->async_connect;
461   GST_OBJECT_UNLOCK (self);
462
463   GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
464
465   g_clear_object (&self->cancellable);
466
467   self->running = TRUE;
468   self->cancellable = g_cancellable_new ();
469   self->stream_id = 0;
470   self->sent_header = FALSE;
471   self->last_ts = GST_CLOCK_TIME_NONE;
472   self->timeout = FALSE;
473   self->started = FALSE;
474
475   if (async) {
476     gst_task_start (self->task);
477   }
478
479   return TRUE;
480 }
481
482 static gboolean
483 quit_invoker (gpointer user_data)
484 {
485   g_main_loop_quit (user_data);
486   return G_SOURCE_REMOVE;
487 }
488
489 static void
490 stop_task (GstRtmp2Src * self)
491 {
492   gst_task_stop (self->task);
493   self->running = FALSE;
494
495   if (self->cancellable) {
496     GST_DEBUG_OBJECT (self, "Cancelling");
497     g_cancellable_cancel (self->cancellable);
498   }
499
500   if (self->loop) {
501     GST_DEBUG_OBJECT (self, "Stopping loop");
502     g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
503         quit_invoker, g_main_loop_ref (self->loop),
504         (GDestroyNotify) g_main_loop_unref);
505   }
506
507   g_cond_broadcast (&self->cond);
508 }
509
510 static gboolean
511 gst_rtmp2_src_stop (GstBaseSrc * src)
512 {
513   GstRtmp2Src *self = GST_RTMP2_SRC (src);
514
515   GST_DEBUG_OBJECT (self, "stop");
516
517   g_mutex_lock (&self->lock);
518   stop_task (self);
519   g_mutex_unlock (&self->lock);
520
521   gst_task_join (self->task);
522
523   return TRUE;
524 }
525
526 static gboolean
527 gst_rtmp2_src_unlock (GstBaseSrc * src)
528 {
529   GstRtmp2Src *self = GST_RTMP2_SRC (src);
530
531   GST_DEBUG_OBJECT (self, "unlock");
532
533   g_mutex_lock (&self->lock);
534   self->flushing = TRUE;
535   g_cond_broadcast (&self->cond);
536   g_mutex_unlock (&self->lock);
537
538   return TRUE;
539 }
540
541 static gboolean
542 gst_rtmp2_src_unlock_stop (GstBaseSrc * src)
543 {
544   GstRtmp2Src *self = GST_RTMP2_SRC (src);
545
546   GST_DEBUG_OBJECT (self, "unlock_stop");
547
548   g_mutex_lock (&self->lock);
549   self->flushing = FALSE;
550   g_mutex_unlock (&self->lock);
551
552   return TRUE;
553 }
554
555 static gboolean
556 on_timeout (GstRtmp2Src * self)
557 {
558   g_mutex_lock (&self->lock);
559   self->timeout = TRUE;
560   g_cond_broadcast (&self->cond);
561   g_mutex_unlock (&self->lock);
562
563   return G_SOURCE_REMOVE;
564 }
565
566 static GstFlowReturn
567 gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
568     GstBuffer ** outbuf)
569 {
570   GstRtmp2Src *self = GST_RTMP2_SRC (src);
571   GstBuffer *message, *buffer;
572   GstRtmpMeta *meta;
573   guint32 timestamp = 0;
574   GSource *timeout = NULL;
575   GstFlowReturn ret = GST_FLOW_OK;
576
577   static const guint8 flv_header_data[] = {
578     0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
579     0x09, 0x00, 0x00, 0x00, 0x00,
580   };
581
582   GST_LOG_OBJECT (self, "create");
583
584   g_mutex_lock (&self->lock);
585
586   if (self->running) {
587     gst_task_start (self->task);
588   }
589
590   /* wait until GMainLoop begins running so that we can attach
591    * timeout source safely.
592    * If the task stopped meanwhile, "running" will be FALSE
593    * than stop_task() will wake up us as well
594    */
595   while ((!self->started && self->running) && (!self->loop
596           || !g_main_loop_is_running (self->loop)))
597     g_cond_wait (&self->cond, &self->lock);
598
599   GST_OBJECT_LOCK (self);
600   if (self->idle_timeout && self->context) {
601     timeout = g_timeout_source_new_seconds (self->idle_timeout);
602
603     g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL);
604     g_source_attach (timeout, self->context);
605   }
606   GST_OBJECT_UNLOCK (self);
607
608   while (!self->message) {
609     if (!self->running) {
610       ret = GST_FLOW_EOS;
611       goto out;
612     }
613     if (self->flushing) {
614       ret = GST_FLOW_FLUSHING;
615       goto out;
616     }
617     if (self->timeout) {
618       GST_DEBUG_OBJECT (self, "Idle timeout, return EOS");
619       ret = GST_FLOW_EOS;
620       goto out;
621     }
622     g_cond_wait (&self->cond, &self->lock);
623   }
624
625   if (timeout) {
626     g_source_destroy (timeout);
627     g_source_unref (timeout);
628   }
629
630   message = self->message;
631   self->message = NULL;
632   g_cond_signal (&self->cond);
633   g_mutex_unlock (&self->lock);
634
635   meta = gst_buffer_get_rtmp_meta (message);
636   if (!meta) {
637     GST_ELEMENT_ERROR (self, CORE, FAILED,
638         ("Internal error: No RTMP meta on buffer"),
639         ("No RTMP meta on %" GST_PTR_FORMAT, message));
640     gst_buffer_unref (message);
641     return GST_FLOW_ERROR;
642   }
643
644   if (GST_BUFFER_DTS_IS_VALID (message)) {
645     GstClockTime last_ts = self->last_ts, ts = GST_BUFFER_DTS (message);
646
647     if (GST_CLOCK_TIME_IS_VALID (last_ts) && last_ts > ts) {
648       GST_LOG_OBJECT (self, "Timestamp regression: %" GST_TIME_FORMAT
649           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (last_ts), GST_TIME_ARGS (ts));
650     }
651
652     self->last_ts = ts;
653     timestamp = ts / GST_MSECOND;
654   }
655
656   buffer = gst_buffer_copy_region (message, GST_BUFFER_COPY_MEMORY, 0, -1);
657
658   {
659     guint8 *tag_header = g_malloc (11);
660     GstMemory *memory =
661         gst_memory_new_wrapped (0, tag_header, 11, 0, 11, tag_header, g_free);
662     GST_WRITE_UINT8 (tag_header, meta->type);
663     GST_WRITE_UINT24_BE (tag_header + 1, meta->size);
664     GST_WRITE_UINT24_BE (tag_header + 4, timestamp);
665     GST_WRITE_UINT8 (tag_header + 7, timestamp >> 24);
666     GST_WRITE_UINT24_BE (tag_header + 8, 0);
667     gst_buffer_prepend_memory (buffer, memory);
668   }
669
670   {
671     guint8 *tag_footer = g_malloc (4);
672     GstMemory *memory =
673         gst_memory_new_wrapped (0, tag_footer, 4, 0, 4, tag_footer, g_free);
674     GST_WRITE_UINT32_BE (tag_footer, meta->size + 11);
675     gst_buffer_append_memory (buffer, memory);
676   }
677
678   if (!self->sent_header) {
679     GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
680         (guint8 *) flv_header_data, sizeof flv_header_data, 0,
681         sizeof flv_header_data, NULL, NULL);
682     gst_buffer_prepend_memory (buffer, memory);
683     self->sent_header = TRUE;
684   }
685
686   GST_BUFFER_DTS (buffer) = self->last_ts;
687
688   *outbuf = buffer;
689
690   gst_buffer_unref (message);
691   return GST_FLOW_OK;
692
693 out:
694   if (timeout) {
695     g_source_destroy (timeout);
696     g_source_unref (timeout);
697   }
698   /* Keep the unlock after the destruction of the timeout source to workaround
699    * https://gitlab.gnome.org/GNOME/glib/-/issues/803
700    */
701   g_mutex_unlock (&self->lock);
702
703   return ret;
704 }
705
706 static gboolean
707 gst_rtmp2_src_query (GstBaseSrc * basesrc, GstQuery * query)
708 {
709   gboolean ret = FALSE;
710
711   switch (GST_QUERY_TYPE (query)) {
712     case GST_QUERY_SCHEDULING:{
713       gst_query_set_scheduling (query,
714           GST_SCHEDULING_FLAG_SEQUENTIAL |
715           GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED, 1, -1, 0);
716       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
717
718       ret = TRUE;
719       break;
720     }
721     default:
722       ret = FALSE;
723       break;
724   }
725
726   if (!ret)
727     ret =
728         GST_BASE_SRC_CLASS (gst_rtmp2_src_parent_class)->query (basesrc, query);
729
730   return ret;
731 }
732
733 static gboolean
734 main_loop_running_cb (GstRtmp2Src * self)
735 {
736   GST_TRACE_OBJECT (self, "Main loop running now");
737
738   g_mutex_lock (&self->lock);
739   self->started = TRUE;
740   g_cond_broadcast (&self->cond);
741   g_mutex_unlock (&self->lock);
742
743   return G_SOURCE_REMOVE;
744 }
745
746 /* Mainloop task */
747 static void
748 gst_rtmp2_src_task_func (gpointer user_data)
749 {
750   GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
751   GMainContext *context;
752   GMainLoop *loop;
753   GTask *connector;
754   GSource *source;
755
756   GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
757   g_mutex_lock (&self->lock);
758
759   context = self->context = g_main_context_new ();
760   g_main_context_push_thread_default (context);
761   loop = self->loop = g_main_loop_new (context, TRUE);
762
763   source = g_idle_source_new ();
764   g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self,
765       NULL);
766   g_source_attach (source, self->context);
767   g_source_unref (source);
768
769   connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
770
771   g_clear_pointer (&self->stats, gst_structure_free);
772
773   GST_OBJECT_LOCK (self);
774   gst_rtmp_client_connect_async (&self->location, self->cancellable,
775       client_connect_done, connector);
776   GST_OBJECT_UNLOCK (self);
777
778   /* Run loop */
779   g_mutex_unlock (&self->lock);
780   g_main_loop_run (loop);
781   g_mutex_lock (&self->lock);
782
783   if (self->connection) {
784     self->stats = gst_rtmp_connection_get_stats (self->connection);
785   }
786
787   g_clear_pointer (&self->loop, g_main_loop_unref);
788   g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
789   g_cond_broadcast (&self->cond);
790
791   /* Run loop cleanup */
792   g_mutex_unlock (&self->lock);
793   while (g_main_context_pending (context)) {
794     GST_DEBUG_OBJECT (self, "iterating main context to clean up");
795     g_main_context_iteration (context, FALSE);
796   }
797   g_main_context_pop_thread_default (context);
798   g_mutex_lock (&self->lock);
799
800   g_clear_pointer (&self->context, g_main_context_unref);
801   gst_buffer_replace (&self->message, NULL);
802
803   g_mutex_unlock (&self->lock);
804   GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task exiting");
805 }
806
807 static void
808 client_connect_done (GObject * source, GAsyncResult * result,
809     gpointer user_data)
810 {
811   GTask *task = user_data;
812   GstRtmp2Src *self = g_task_get_source_object (task);
813   GError *error = NULL;
814   GstRtmpConnection *connection;
815
816   connection = gst_rtmp_client_connect_finish (result, &error);
817   if (!connection) {
818     g_task_return_error (task, error);
819     g_object_unref (task);
820     return;
821   }
822
823   g_task_set_task_data (task, connection, g_object_unref);
824
825   if (g_task_return_error_if_cancelled (task)) {
826     g_object_unref (task);
827     return;
828   }
829
830   GST_OBJECT_LOCK (self);
831   gst_rtmp_client_start_play_async (connection, self->location.stream,
832       g_task_get_cancellable (task), start_play_done, task);
833   GST_OBJECT_UNLOCK (self);
834 }
835
836 static void
837 start_play_done (GObject * source, GAsyncResult * result, gpointer user_data)
838 {
839   GTask *task = G_TASK (user_data);
840   GstRtmp2Src *self = g_task_get_source_object (task);
841   GstRtmpConnection *connection = g_task_get_task_data (task);
842   GError *error = NULL;
843
844   if (g_task_return_error_if_cancelled (task)) {
845     g_object_unref (task);
846     return;
847   }
848
849   if (gst_rtmp_client_start_play_finish (connection, result,
850           &self->stream_id, &error)) {
851     g_task_return_pointer (task, g_object_ref (connection),
852         gst_rtmp_connection_close_and_unref);
853   } else {
854     g_task_return_error (task, error);
855   }
856
857   g_task_set_task_data (task, NULL, NULL);
858   g_object_unref (task);
859 }
860
861 static void
862 got_message (GstRtmpConnection * connection, GstBuffer * buffer,
863     gpointer user_data)
864 {
865   GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
866   GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
867   guint32 min_size = 1;
868
869   g_return_if_fail (meta);
870
871   if (meta->mstream != self->stream_id) {
872     GST_DEBUG_OBJECT (self, "Ignoring %s message with stream %" G_GUINT32_FORMAT
873         " != %" G_GUINT32_FORMAT, gst_rtmp_message_type_get_nick (meta->type),
874         meta->mstream, self->stream_id);
875     return;
876   }
877
878   switch (meta->type) {
879     case GST_RTMP_MESSAGE_TYPE_VIDEO:
880       min_size = 6;
881       break;
882
883     case GST_RTMP_MESSAGE_TYPE_AUDIO:
884       min_size = 2;
885       break;
886
887     case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
888       break;
889
890     default:
891       GST_DEBUG_OBJECT (self, "Ignoring %s message, wrong type",
892           gst_rtmp_message_type_get_nick (meta->type));
893       return;
894   }
895
896   if (meta->size < min_size) {
897     GST_DEBUG_OBJECT (self, "Ignoring too small %s message (%" G_GUINT32_FORMAT
898         " < %" G_GUINT32_FORMAT ")",
899         gst_rtmp_message_type_get_nick (meta->type), meta->size, min_size);
900     return;
901   }
902
903   g_mutex_lock (&self->lock);
904   while (self->message) {
905     if (!self->running) {
906       goto out;
907     }
908     g_cond_wait (&self->cond, &self->lock);
909   }
910
911   self->message = gst_buffer_ref (buffer);
912   g_cond_signal (&self->cond);
913
914 out:
915   g_mutex_unlock (&self->lock);
916   return;
917 }
918
919 static void
920 error_callback (GstRtmpConnection * connection, GstRtmp2Src * self)
921 {
922   g_mutex_lock (&self->lock);
923   if (self->cancellable) {
924     g_cancellable_cancel (self->cancellable);
925   } else if (self->loop) {
926     GST_INFO_OBJECT (self, "Connection error");
927     stop_task (self);
928   }
929   g_mutex_unlock (&self->lock);
930 }
931
932 static void
933 control_callback (GstRtmpConnection * connection, gint uc_type,
934     guint stream_id, GstRtmp2Src * self)
935 {
936   GST_INFO_OBJECT (self, "stream %u got %s", stream_id,
937       gst_rtmp_user_control_type_get_nick (uc_type));
938
939   if (uc_type == GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF && stream_id == 1) {
940     GST_INFO_OBJECT (self, "went EOS");
941     stop_task (self);
942   }
943 }
944
945 static void
946 send_connect_error (GstRtmp2Src * self, GError * error)
947 {
948   if (!error) {
949     GST_ERROR_OBJECT (self, "Connect failed with NULL error");
950     GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
951     return;
952   }
953
954   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
955     GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
956         GST_STR_NULL (error->message));
957     return;
958   }
959
960   GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
961       g_quark_to_string (error->domain), error->code,
962       GST_STR_NULL (error->message));
963
964   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
965     GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
966         ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
967   } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
968     GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
969         ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
970   } else {
971     GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
972         ("Failed to connect"),
973         ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
974             GST_STR_NULL (error->message)));
975   }
976 }
977
978 static void
979 connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
980 {
981   GstRtmp2Src *self = GST_RTMP2_SRC (object);
982   GTask *task = G_TASK (result);
983   GError *error = NULL;
984
985   g_mutex_lock (&self->lock);
986
987   g_warn_if_fail (g_task_is_valid (task, object));
988
989   if (self->cancellable == g_task_get_cancellable (task)) {
990     g_clear_object (&self->cancellable);
991   }
992
993   self->connection = g_task_propagate_pointer (task, &error);
994   if (self->connection) {
995     gst_rtmp_connection_set_input_handler (self->connection,
996         got_message, g_object_ref (self), g_object_unref);
997     g_signal_connect_object (self->connection, "error",
998         G_CALLBACK (error_callback), self, 0);
999     g_signal_connect_object (self->connection, "stream-control",
1000         G_CALLBACK (control_callback), self, 0);
1001   } else {
1002     send_connect_error (self, error);
1003     stop_task (self);
1004     g_error_free (error);
1005   }
1006
1007   g_cond_broadcast (&self->cond);
1008   g_mutex_unlock (&self->lock);
1009 }
1010
1011 static GstStructure *
1012 gst_rtmp2_src_get_stats (GstRtmp2Src * self)
1013 {
1014   GstStructure *s;
1015
1016   g_mutex_lock (&self->lock);
1017
1018   if (self->connection) {
1019     s = gst_rtmp_connection_get_stats (self->connection);
1020   } else if (self->stats) {
1021     s = gst_structure_copy (self->stats);
1022   } else {
1023     s = gst_rtmp_connection_get_null_stats ();
1024   }
1025
1026   g_mutex_unlock (&self->lock);
1027
1028   return s;
1029 }