rtmp2src: Add idle-timeout property
authorSeungha Yang <seungha@centricular.com>
Fri, 27 Mar 2020 06:40:00 +0000 (15:40 +0900)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 27 Mar 2020 10:25:37 +0000 (10:25 +0000)
Add new property to signalling that there is no incoming data
from peer. This can be useful if users want to stop the streaming
when the connection is alive but no packet is arriving.

gst/rtmp2/gstrtmp2src.c

index 4ed4f98..dff4eac 100644 (file)
@@ -60,6 +60,7 @@ typedef struct
   GstRtmpLocation location;
   gboolean async_connect;
   GstStructure *stats;
+  guint idle_timeout;
 
   /* If both self->lock and OBJECT_LOCK are needed,
    * self->lock must be taken first */
@@ -67,6 +68,8 @@ typedef struct
   GCond cond;
 
   gboolean running, flushing;
+  gboolean timeout;
+  gboolean started;
 
   GstTask *task;
   GRecMutex task_lock;
@@ -133,8 +136,11 @@ enum
   PROP_FLASH_VERSION,
   PROP_ASYNC_CONNECT,
   PROP_STATS,
+  PROP_IDLE_TIMEOUT,
 };
 
+#define DEFAULT_IDLE_TIMEOUT 0
+
 /* pad templates */
 
 static GstStaticPadTemplate gst_rtmp2_src_src_template =
@@ -200,6 +206,13 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
       g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT,
+      g_param_spec_uint ("idle-timeout", "Idle timeout",
+          "The maximum allowed time in seconds for valid packets not to arrive "
+          "from the peer (0 = no timeout)",
+          0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
       "debug category for rtmp2src element");
 }
@@ -208,6 +221,7 @@ static void
 gst_rtmp2_src_init (GstRtmp2Src * self)
 {
   self->async_connect = TRUE;
+  self->idle_timeout = DEFAULT_IDLE_TIMEOUT;
 
   g_mutex_init (&self->lock);
   g_cond_init (&self->cond);
@@ -306,6 +320,11 @@ gst_rtmp2_src_set_property (GObject * object, guint property_id,
       self->async_connect = g_value_get_boolean (value);
       GST_OBJECT_UNLOCK (self);
       break;
+    case PROP_IDLE_TIMEOUT:
+      GST_OBJECT_LOCK (self);
+      self->idle_timeout = g_value_get_uint (value);
+      GST_OBJECT_UNLOCK (self);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
       break;
@@ -393,6 +412,11 @@ gst_rtmp2_src_get_property (GObject * object, guint property_id,
     case PROP_STATS:
       g_value_take_boxed (value, gst_rtmp2_src_get_stats (self));
       break;
+    case PROP_IDLE_TIMEOUT:
+      GST_OBJECT_LOCK (self);
+      g_value_set_uint (value, self->idle_timeout);
+      GST_OBJECT_UNLOCK (self);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
       break;
@@ -440,6 +464,8 @@ gst_rtmp2_src_start (GstBaseSrc * src)
   self->stream_id = 0;
   self->sent_header = FALSE;
   self->last_ts = GST_CLOCK_TIME_NONE;
+  self->timeout = FALSE;
+  self->started = FALSE;
 
   if (async) {
     gst_task_start (self->task);
@@ -521,6 +547,17 @@ gst_rtmp2_src_unlock_stop (GstBaseSrc * src)
   return TRUE;
 }
 
+static gboolean
+on_timeout (GstRtmp2Src * self)
+{
+  g_mutex_lock (&self->lock);
+  self->timeout = TRUE;
+  g_cond_broadcast (&self->cond);
+  g_mutex_unlock (&self->lock);
+
+  return G_SOURCE_REMOVE;
+}
+
 static GstFlowReturn
 gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
     GstBuffer ** outbuf)
@@ -529,6 +566,8 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
   GstBuffer *message, *buffer;
   GstRtmpMeta *meta;
   guint32 timestamp = 0;
+  GSource *timeout = NULL;
+  GstFlowReturn ret = GST_FLOW_OK;
 
   static const guint8 flv_header_data[] = {
     0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
@@ -543,18 +582,46 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
     gst_task_start (self->task);
   }
 
+  /* wait until GMainLoop begins running so that we can attach
+   * timeout source safely.
+   * If the task stopped meanwhile, "running" will be FALSE
+   * than stop_task() will wake up us as well
+   */
+  while ((!self->started && self->running) && (!self->loop
+          || !g_main_loop_is_running (self->loop)))
+    g_cond_wait (&self->cond, &self->lock);
+
+  GST_OBJECT_LOCK (self);
+  if (self->idle_timeout && self->context) {
+    timeout = g_timeout_source_new_seconds (self->idle_timeout);
+
+    g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL);
+    g_source_attach (timeout, self->context);
+  }
+  GST_OBJECT_UNLOCK (self);
+
   while (!self->message) {
     if (!self->running) {
-      g_mutex_unlock (&self->lock);
-      return GST_FLOW_EOS;
+      ret = GST_FLOW_EOS;
+      goto out;
     }
     if (self->flushing) {
-      g_mutex_unlock (&self->lock);
-      return GST_FLOW_FLUSHING;
+      ret = GST_FLOW_FLUSHING;
+      goto out;
+    }
+    if (self->timeout) {
+      GST_DEBUG_OBJECT (self, "Idle timeout, return EOS");
+      ret = GST_FLOW_EOS;
+      goto out;
     }
     g_cond_wait (&self->cond, &self->lock);
   }
 
+  if (timeout) {
+    g_source_destroy (timeout);
+    g_source_unref (timeout);
+  }
+
   message = self->message;
   self->message = NULL;
   g_cond_signal (&self->cond);
@@ -615,6 +682,28 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
 
   gst_buffer_unref (message);
   return GST_FLOW_OK;
+
+out:
+  g_mutex_unlock (&self->lock);
+  if (timeout) {
+    g_source_destroy (timeout);
+    g_source_unref (timeout);
+  }
+
+  return ret;
+}
+
+static gboolean
+main_loop_running_cb (GstRtmp2Src * self)
+{
+  GST_TRACE_OBJECT (self, "Main loop running now");
+
+  g_mutex_lock (&self->lock);
+  self->started = TRUE;
+  g_cond_broadcast (&self->cond);
+  g_mutex_unlock (&self->lock);
+
+  return G_SOURCE_REMOVE;
 }
 
 /* Mainloop task */
@@ -625,6 +714,7 @@ gst_rtmp2_src_task_func (gpointer user_data)
   GMainContext *context;
   GMainLoop *loop;
   GTask *connector;
+  GSource *source;
 
   GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
   g_mutex_lock (&self->lock);
@@ -632,6 +722,13 @@ gst_rtmp2_src_task_func (gpointer user_data)
   context = self->context = g_main_context_new ();
   g_main_context_push_thread_default (context);
   loop = self->loop = g_main_loop_new (context, TRUE);
+
+  source = g_idle_source_new ();
+  g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self,
+      NULL);
+  g_source_attach (source, self->context);
+  g_source_unref (source);
+
   connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
 
   g_clear_pointer (&self->stats, gst_structure_free);