rtmp2: Add gst_rtmp_connection_get_stats and _get_null_stats
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Fri, 14 Feb 2020 13:53:46 +0000 (14:53 +0100)
committerJan Alexander Steffens (heftig) <jsteffens@make.tv>
Fri, 21 Feb 2020 18:26:35 +0000 (19:26 +0100)
The former uses a thread-safe way of getting statistics from the
connection without having to protect the fields with a lock.

The latter produces a zeroed statistics structure for use when no
connection exists.

gst/rtmp2/rtmp/rtmpconnection.c
gst/rtmp2/rtmp/rtmpconnection.h

index 396d745..d85f6ff 100644 (file)
@@ -52,7 +52,7 @@ struct _GstRtmpConnection
   GSocketConnection *connection;
   GCancellable *cancellable;
   GSocketClient *socket_client;
-  GAsyncQueue *output_queue;
+  GAsyncQueue *output_queue, *stats_queue;
   GMainContext *main_context;
 
   GSource *input_source;
@@ -243,6 +243,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
   rtmpconnection->cancellable = g_cancellable_new ();
   rtmpconnection->output_queue =
       g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
+  rtmpconnection->stats_queue =
+      g_async_queue_new_full ((GDestroyNotify) gst_structure_free);
   rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
   rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
 
@@ -280,6 +282,7 @@ gst_rtmp_connection_finalize (GObject * object)
   g_clear_object (&rtmpconnection->cancellable);
   g_clear_object (&rtmpconnection->connection);
   g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
+  g_clear_pointer (&rtmpconnection->stats_queue, g_async_queue_unref);
   g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
   g_clear_pointer (&rtmpconnection->output_streams,
       gst_rtmp_chunk_streams_free);
@@ -1225,3 +1228,50 @@ gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
         window_ack_size);
   }
 }
+
+static GstStructure *
+get_stats (GstRtmpConnection * self)
+{
+  return gst_structure_new ("GstRtmpConnectionStats",
+      "in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
+      "out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
+      "in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
+      "out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
+      "in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
+      "out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
+      "in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
+      "out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
+}
+
+GstStructure *
+gst_rtmp_connection_get_null_stats (void)
+{
+  return get_stats (NULL);
+}
+
+static gboolean
+get_stats_invoker (gpointer ptr)
+{
+  GstRtmpConnection *self = ptr;
+  g_async_queue_push (self->stats_queue, get_stats (self));
+  return G_SOURCE_REMOVE;
+}
+
+GstStructure *
+gst_rtmp_connection_get_stats (GstRtmpConnection * self)
+{
+  GstStructure *s;
+
+  g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
+
+  if (g_main_context_acquire (self->main_context)) {
+    s = get_stats (self);
+    g_main_context_release (self->main_context);
+  } else {
+    g_main_context_invoke_full (self->main_context, G_PRIORITY_HIGH,
+        get_stats_invoker, g_object_ref (self), g_object_unref);
+    s = g_async_queue_pop (self->stats_queue);
+  }
+
+  return s;
+}
index f7e78ce..b20bc19 100644 (file)
@@ -89,6 +89,9 @@ void gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
 void gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
     GstBuffer * buffer);
 
+GstStructure * gst_rtmp_connection_get_null_stats (void);
+GstStructure * gst_rtmp_connection_get_stats (GstRtmpConnection * connection);
+
 G_END_DECLS
 
 #endif