rtmp2: Replace stats queue with stats lock
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>
Tue, 1 Sep 2020 11:28:44 +0000 (13:28 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 9 Sep 2020 06:34:51 +0000 (06:34 +0000)
Making the thread receiving the stats wait on the loop to respond was
not a good idea, as the latter can get blocked on the streaming thread.

Have get_stats read the values directly, adding a lock to ensure we
don't read garbage.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1550>

gst/rtmp2/gstrtmp2sink.c
gst/rtmp2/gstrtmp2src.c
gst/rtmp2/rtmp/rtmpconnection.c

index 30b51d7..72026eb 100644 (file)
@@ -1216,18 +1216,7 @@ gst_rtmp2_sink_get_stats (GstRtmp2Sink * self)
   g_mutex_lock (&self->lock);
 
   if (self->connection) {
-    GstRtmpConnection *connection = g_object_ref (self->connection);
-
-    g_mutex_unlock (&self->lock);
-
-    /* We need to do this without holding the lock as the g_async_queue_pop
-     * waits on the loop thread to deliver the stats. The loop thread might
-     * attempt to take the lock as well, leading to a deadlock. */
-    s = gst_rtmp_connection_get_stats (connection);
-
-    g_mutex_lock (&self->lock);
-
-    g_object_unref (connection);
+    s = gst_rtmp_connection_get_stats (self->connection);
   } else if (self->stats) {
     s = gst_structure_copy (self->stats);
   } else {
index 2b7a05f..f5c356b 100644 (file)
@@ -1008,18 +1008,7 @@ gst_rtmp2_src_get_stats (GstRtmp2Src * self)
   g_mutex_lock (&self->lock);
 
   if (self->connection) {
-    GstRtmpConnection *connection = g_object_ref (self->connection);
-
-    g_mutex_unlock (&self->lock);
-
-    /* We need to do this without holding the lock as the g_async_queue_pop
-     * waits on the loop thread to deliver the stats. The loop thread might
-     * attempt to take the lock as well, leading to a deadlock. */
-    s = gst_rtmp_connection_get_stats (connection);
-
-    g_mutex_lock (&self->lock);
-
-    g_object_unref (connection);
+    s = gst_rtmp_connection_get_stats (self->connection);
   } else if (self->stats) {
     s = gst_structure_copy (self->stats);
   } else {
index b9fe767..22d1e69 100644 (file)
@@ -52,7 +52,7 @@ struct _GstRtmpConnection
   GSocketConnection *connection;
   GCancellable *cancellable;
   GSocketClient *socket_client;
-  GAsyncQueue *output_queue, *stats_queue;
+  GAsyncQueue *output_queue;
   GMainContext *main_context;
 
   GSource *input_source;
@@ -73,6 +73,12 @@ struct _GstRtmpConnection
 
   gboolean writing;
 
+  /* Protects the values below during concurrent access.
+   * - Taken by the loop thread when writing, but not reading.
+   * - Taken by other threads when reading (calling get_stats).
+   */
+  GMutex stats_lock;
+
   /* RTMP configuration */
   guint32 in_chunk_size;
   guint32 out_chunk_size, out_chunk_size_pending;
@@ -248,8 +254,6 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
   rtmpconnection->cancellable = g_cancellable_new ();
   rtmpconnection->output_queue =
       g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
-  rtmpconnection->stats_queue =
-      g_async_queue_new_full ((GDestroyNotify) gst_structure_free);
   rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
   rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
 
@@ -258,6 +262,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
 
   rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
   rtmpconnection->input_needed_bytes = 1;
+
+  g_mutex_init (&rtmpconnection->stats_lock);
 }
 
 void
@@ -284,10 +290,10 @@ gst_rtmp_connection_finalize (GObject * object)
 
   /* clean up object here */
 
+  g_mutex_clear (&rtmpconnection->stats_lock);
   g_clear_object (&rtmpconnection->cancellable);
   g_clear_object (&rtmpconnection->connection);
   g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
-  g_clear_pointer (&rtmpconnection->stats_queue, g_async_queue_unref);
   g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
   g_clear_pointer (&rtmpconnection->output_streams,
       gst_rtmp_chunk_streams_free);
@@ -468,7 +474,10 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
 
   GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
 
+  g_mutex_lock (&sc->stats_lock);
   sc->in_bytes_total += ret;
+  g_mutex_unlock (&sc->stats_lock);
+
   bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
   if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
     gst_rtmp_connection_send_ack (sc);
@@ -569,7 +578,9 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
   res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
       &bytes_written, &error);
 
+  g_mutex_lock (&self->stats_lock);
   self->out_bytes_total += bytes_written;
+  g_mutex_unlock (&self->stats_lock);
 
   if (!res) {
     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@@ -923,7 +934,9 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
         "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
   }
 
+  g_mutex_lock (&self->stats_lock);
   self->in_chunk_size = chunk_size;
+  g_mutex_unlock (&self->stats_lock);
 }
 
 static void
@@ -948,7 +961,9 @@ gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
   GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
       new_ack - last_ack);
 
+  g_mutex_lock (&self->stats_lock);
   self->out_bytes_acked = new_ack;
+  g_mutex_unlock (&self->stats_lock);
 }
 
 static void
@@ -961,7 +976,9 @@ gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
         window_ack_size);
   }
 
+  g_mutex_lock (&self->stats_lock);
   self->in_window_ack_size = window_ack_size;
+  g_mutex_unlock (&self->stats_lock);
 }
 
 static gboolean
@@ -1173,7 +1190,9 @@ gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
   gst_rtmp_connection_queue_message (connection,
       gst_rtmp_message_new_protocol_control (&pc));
 
+  g_mutex_lock (&connection->stats_lock);
   connection->in_bytes_acked = in_bytes_total;
+  g_mutex_unlock (&connection->stats_lock);
 }
 
 static void
@@ -1301,15 +1320,23 @@ gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
 
   chunk_size = self->out_chunk_size_pending;
   if (chunk_size) {
-    self->out_chunk_size = chunk_size;
     self->out_chunk_size_pending = 0;
+
+    g_mutex_lock (&self->stats_lock);
+    self->out_chunk_size = chunk_size;
+    g_mutex_unlock (&self->stats_lock);
+
     GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
   }
 
   window_ack_size = self->out_window_ack_size_pending;
   if (window_ack_size) {
-    self->out_window_ack_size = window_ack_size;
     self->out_window_ack_size_pending = 0;
+
+    g_mutex_lock (&self->stats_lock);
+    self->out_window_ack_size = window_ack_size;
+    g_mutex_unlock (&self->stats_lock);
+
     GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
         window_ack_size);
   }
@@ -1335,14 +1362,6 @@ gst_rtmp_connection_get_null_stats (void)
   return get_stats (NULL);
 }
 
-static gboolean
-get_stats_invoker (gpointer ptr)
-{
-  GstRtmpConnection *self = ptr;
-  g_async_queue_push (self->stats_queue, get_stats (self));
-  return G_SOURCE_REMOVE;
-}
-
 GstStructure *
 gst_rtmp_connection_get_stats (GstRtmpConnection * self)
 {
@@ -1350,14 +1369,9 @@ gst_rtmp_connection_get_stats (GstRtmpConnection * self)
 
   g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
 
-  if (g_main_context_acquire (self->main_context)) {
-    s = get_stats (self);
-    g_main_context_release (self->main_context);
-  } else {
-    g_main_context_invoke_full (self->main_context, G_PRIORITY_HIGH,
-        get_stats_invoker, g_object_ref (self), g_object_unref);
-    s = g_async_queue_pop (self->stats_queue);
-  }
+  g_mutex_lock (&self->stats_lock);
+  s = get_stats (self);
+  g_mutex_unlock (&self->stats_lock);
 
   return s;
 }