srt: Accumulate total bytes sent/received over all connections/callers
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Wed, 18 Mar 2020 16:58:52 +0000 (17:58 +0100)
committerJan Alexander Steffens (heftig) <jsteffens@make.tv>
Wed, 15 Apr 2020 08:42:48 +0000 (10:42 +0200)
So we don't lose them. Split gst_srt_object_open_internal for internal
reconnections that don't reset the accumulated bytes.

ext/srt/gstsrtobject.c
ext/srt/gstsrtobject.h

index 23566ce..6970474 100644 (file)
@@ -59,6 +59,9 @@ typedef struct
   gboolean sent_headers;
 } SRTCaller;
 
+static GstStructure *gst_srt_object_accumulate_stats (GstSRTObject * srtobject,
+    SRTSOCKET srtsock);
+
 static SRTCaller *
 srt_caller_new (void)
 {
@@ -92,6 +95,14 @@ srt_caller_free (SRTCaller * caller)
 static void
 srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject)
 {
+  GstStructure *stats;
+
+  stats = gst_srt_object_accumulate_stats (srtobject, caller->sock);
+
+  /* FIXME: These are the final statistics for the caller before we close its
+   * socket. Deliver the stats to the app before we throw them away. */
+  gst_structure_free (stats);
+
   g_signal_emit_by_name (srtobject->element, "caller-removed", caller->sock,
       caller->sockaddr);
 }
@@ -966,9 +977,9 @@ gst_srt_object_open_connection (GstSRTObject * srtobject,
   return ret;
 }
 
-gboolean
-gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
-    GError ** error)
+static gboolean
+gst_srt_object_open_internal (GstSRTObject * srtobject,
+    GCancellable * cancellable, GError ** error)
 {
   GSocketAddress *socket_address = NULL;
   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
@@ -1044,6 +1055,15 @@ out:
   return ret;
 }
 
+gboolean
+gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
+    GError ** error)
+{
+  srtobject->previous_bytes = 0;
+
+  return gst_srt_object_open_internal (srtobject, cancellable, error);
+}
+
 void
 gst_srt_object_close (GstSRTObject * srtobject)
 {
@@ -1053,6 +1073,13 @@ gst_srt_object_close (GstSRTObject * srtobject)
   }
 
   if (srtobject->sock != SRT_INVALID_SOCK) {
+    GstStructure *stats;
+
+    stats = gst_srt_object_accumulate_stats (srtobject, srtobject->sock);
+
+    /* FIXME: These are the final statistics for the socket before we close it.
+     * Deliver the stats to the app before we throw them away. */
+    gst_structure_free (stats);
 
     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
         srtobject->sock);
@@ -1192,7 +1219,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
           GST_WARNING_OBJECT (srtobject->element,
               "Invalid SRT socket. Trying to reconnect");
           gst_srt_object_close (srtobject);
-          if (!gst_srt_object_open (srtobject, cancellable, error)) {
+          if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
             return -1;
           }
           continue;
@@ -1402,7 +1429,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
         GST_WARNING_OBJECT (srtobject->element,
             "Invalid SRT socket. Trying to reconnect");
         gst_srt_object_close (srtobject);
-        if (!gst_srt_object_open (srtobject, cancellable, error)) {
+        if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
           return -1;
         }
         continue;
@@ -1465,7 +1492,7 @@ gst_srt_object_write (GstSRTObject * srtobject,
 }
 
 static GstStructure *
-get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
+get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes)
 {
   GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
   int ret;
@@ -1474,7 +1501,7 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
   ret = srt_bstats (srtsock, &stats, 0);
 
   if (ret >= 0) {
-    if (is_sender)
+    if (is_sender) {
       gst_structure_set (s,
           /* number of sent data packets, including retransmissions */
           "packets-sent", G_TYPE_INT64, stats.pktSent,
@@ -1501,7 +1528,8 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
           /* busy sending time (i.e., idle time exclusive) */
           "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
           "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
-    else
+      *bytes += stats.byteSent;
+    } else {
       gst_structure_set (s,
           "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
           "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
@@ -1513,6 +1541,8 @@ get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
           "bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal,
           "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
           "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
+      *bytes += stats.byteRecvTotal;
+    }
 
     gst_structure_set (s,
         /* estimated bandwidth, in Mb/s */
@@ -1529,10 +1559,14 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
 {
   GstStructure *s = NULL;
   gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
+  guint64 bytes;
 
   g_mutex_lock (&srtobject->sock_lock);
+
+  bytes = srtobject->previous_bytes;
+
   if (srtobject->sock != SRT_INVALID_SOCK) {
-    s = get_stats_for_srtsock (srtobject->sock, is_sender);
+    s = get_stats_for_srtsock (srtobject->sock, is_sender, &bytes);
     goto done;
   }
 
@@ -1545,9 +1579,11 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
 
     for (item = srtobject->callers; item; item = item->next) {
       SRTCaller *caller = item->data;
-      GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender);
+      GstStructure *tmp;
       GValue *v;
 
+      tmp = get_stats_for_srtsock (caller->sock, is_sender, &bytes);
+
       g_value_array_append (callers_stats, NULL);
       v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
       g_value_init (v, GST_TYPE_STRUCTURE);
@@ -1560,7 +1596,23 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
   }
 
 done:
+  gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total",
+      G_TYPE_UINT64, bytes, NULL);
+
   g_mutex_unlock (&srtobject->sock_lock);
 
   return s;
 }
+
+static GstStructure *
+gst_srt_object_accumulate_stats (GstSRTObject * srtobject, SRTSOCKET srtsock)
+{
+  gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
+  GstStructure *stats;
+  guint64 bytes = 0;
+
+  stats = get_stats_for_srtsock (srtsock, is_sender, &bytes);
+  srtobject->previous_bytes += bytes;
+
+  return stats;
+}
index 2bc800a..2e0e2cd 100644 (file)
@@ -71,6 +71,8 @@ struct _GstSRTObject
   gchar                        *passphrase;
 
   gboolean                     wait_for_connection;
+
+  guint64                      previous_bytes;
 };
 
 GstSRTObject   *gst_srt_object_new              (GstElement *element);