rusage: implement windowing of cpuload
authorStefan Sauer <ensonic@users.sf.net>
Wed, 10 Sep 2014 05:55:33 +0000 (07:55 +0200)
committerStefan Sauer <ensonic@users.sf.net>
Mon, 5 Oct 2015 18:59:39 +0000 (20:59 +0200)
Add a local help to the rusage plugin that supports windowing of values. We want
to generalize this for use in other plugins.

plugins/tracers/gstrusage.c
tools/gst-stats.c

index 71d6e4d..398a78c 100644 (file)
  *
  * A tracing module that take rusage() snapshots and logs them. 
  */
-/* TODO: log more items, cpuload is calculated as an aggregated value
- * - in many cases a windowed value would be more interesting to see local
- *   cpu-load spikes
- */
 
 #ifdef HAVE_CONFIG_H
 #  include "config.h"
@@ -48,24 +44,119 @@ GST_DEBUG_CATEGORY_STATIC (gst_rusage_debug);
 G_DEFINE_TYPE_WITH_CODE (GstRUsageTracer, gst_rusage_tracer, GST_TYPE_TRACER,
     _do_init);
 
+/* we remember x measurements per self->window */
+#define WINDOW_SUBDIV 100
+
 /* for ts calibration */
 static gpointer main_thread_id = NULL;
 static guint64 tproc_base = G_GINT64_CONSTANT (0);
 
 typedef struct
 {
+  GstClockTime ts;
+  GstClockTime val;
+} GstTraceValue;
+
+typedef struct
+{
+  GstClockTime window;
+  GMutex lock;
+  GQueue values;                /* GstTraceValue* */
+} GstTraceValues;
+
+typedef struct
+{
   /* time spend in this thread */
   GstClockTime tthread;
+  GstTraceValues *tvs_thread;
 } GstThreadStats;
 
+static GstTraceValues *tvs_proc;
+
 static void gst_rusage_tracer_invoke (GstTracer * self, GstTracerHookId id,
     GstTracerMessageId mid, va_list var_args);
 
 /* data helper */
 
 static void
+free_trace_value (gpointer data)
+{
+  g_slice_free (GstTraceValue, data);
+}
+
+static GstTraceValues *
+make_trace_values (GstClockTime window)
+{
+  GstTraceValues *self = g_slice_new0 (GstTraceValues);
+  self->window = window;
+  g_mutex_init (&self->lock);
+  g_queue_init (&self->values);
+  return self;
+}
+
+static void
+free_trace_values (GstTraceValues * self)
+{
+  g_queue_free_full (&self->values, free_trace_value);
+  g_mutex_clear (&self->lock);
+  g_slice_free (GstTraceValues, self);
+}
+
+static gboolean
+update_trace_value (GstTraceValues * self, GstClockTime nts,
+    GstClockTime nval, GstClockTime * dts, GstClockTime * dval)
+{
+  GstTraceValue *lv;
+  GstClockTimeDiff dt;
+  GstClockTime window = self->window;
+  GQueue *q = &self->values;
+  GList *node = q->tail;
+  gboolean ret = FALSE;
+
+
+  /* search from the tail of the queue for a good GstTraceValue */
+  while (node) {
+    lv = node->data;
+    dt = GST_CLOCK_DIFF (lv->ts, nts);
+    if (dt < window) {
+      break;
+    } else {
+      node = g_list_previous (node);
+    }
+  }
+
+  if (node) {
+    /* calculate the windowed value */
+    *dts = dt;
+    *dval = GST_CLOCK_DIFF (lv->val, nval);
+
+    /* drop all older measurements */
+    while (q->tail != node) {
+      free_trace_value (g_queue_pop_tail (q));
+    }
+    ret = TRUE;
+  } else {
+    *dts = nts;
+    *dval = nval;
+  }
+
+  /* don't push too many data items */
+  lv = q->head ? q->head->data : NULL;
+  if (!lv || (GST_CLOCK_DIFF (lv->ts, nts) > (window / WINDOW_SUBDIV))) {
+    /* push the new measurement */
+    lv = g_slice_new0 (GstTraceValue);
+    lv->ts = nts;
+    lv->val = nval;
+    g_queue_push_head (q, lv);
+  }
+  return ret;
+}
+
+
+static void
 free_thread_stats (gpointer data)
 {
+  free_trace_values (((GstThreadStats *) data)->tvs_thread);
   g_slice_free (GstThreadStats, data);
 }
 
@@ -84,6 +175,7 @@ gst_rusage_tracer_init (GstRUsageTracer * self)
   self->threads = g_hash_table_new_full (NULL, NULL, NULL, free_thread_stats);
 
   main_thread_id = g_thread_self ();
+  tvs_proc = make_trace_values (GST_SECOND);
   GST_DEBUG ("rusage: main thread=%p", main_thread_id);
 
   /* announce trace formats */
@@ -92,11 +184,19 @@ gst_rusage_tracer_init (GstRUsageTracer * self)
       "thread-id", GST_TYPE_STRUCTURE, gst_structure_new ("scope",
           "related-to", G_TYPE_STRING, "thread",  // use genum
           NULL),
-      "cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
+      "average-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
           "type", G_TYPE_GTYPE, G_TYPE_UINT,
-          "description", G_TYPE_STRING, "cpu usage per thread",
+          "description", G_TYPE_STRING, "average cpu usage per thread",
           "flags", G_TYPE_STRING, "aggregated",  // use gflags 
-          "min", G_TYPE_UINT, 0, "max", G_TYPE_UINT, 100,
+          "min", G_TYPE_UINT, 0, 
+          "max", G_TYPE_UINT, 100,
+          NULL),
+      "current-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
+          "type", G_TYPE_GTYPE, G_TYPE_UINT,
+          "description", G_TYPE_STRING, "current cpu usage per thread",
+          "flags", G_TYPE_STRING, "windowed",  // use gflags 
+          "min", G_TYPE_UINT, 0, 
+          "max", G_TYPE_UINT, 100,
           NULL),
       "time", GST_TYPE_STRUCTURE, gst_structure_new ("value",
           "type", G_TYPE_GTYPE, G_TYPE_UINT64,
@@ -110,11 +210,19 @@ gst_rusage_tracer_init (GstRUsageTracer * self)
       "thread-id", GST_TYPE_STRUCTURE, gst_structure_new ("scope",
           "related-to", G_TYPE_STRING, "process",  // use genum
           NULL),
-      "cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
+      "average-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
           "type", G_TYPE_GTYPE, G_TYPE_UINT,
-          "description", G_TYPE_STRING, "cpu usage per process",
+          "description", G_TYPE_STRING, "average cpu usage per process",
           "flags", G_TYPE_STRING, "aggregated",  // use gflags 
-          "min", G_TYPE_UINT, 0, "max", G_TYPE_UINT, 100,
+          "min", G_TYPE_UINT, 0, 
+          "max", G_TYPE_UINT, 100,
+          NULL),
+      "current-cpuload", GST_TYPE_STRUCTURE, gst_structure_new ("value",
+          "type", G_TYPE_GTYPE, G_TYPE_UINT,
+          "description", G_TYPE_STRING, "current cpu usage per process",
+          "flags", G_TYPE_STRING, "windowed",  // use gflags 
+          "min", G_TYPE_UINT, 0, 
+          "max", G_TYPE_UINT, 100,
           NULL),
       "time", GST_TYPE_STRUCTURE, gst_structure_new ("value",
           "type", G_TYPE_GTYPE, G_TYPE_UINT64,
@@ -135,11 +243,10 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
   guint64 ts = va_arg (var_args, guint64);
   GstThreadStats *stats;
   gpointer thread_id = g_thread_self ();
-  guint cpuload = 0;
+  guint avg_cpuload, cur_cpuload;
   struct rusage ru;
   GstClockTime tproc = G_GUINT64_CONSTANT (0);
-
-  // FIXME(ensonic): not threadsafe
+  GstClockTime dts, dtproc;
   static GstClockTime last_ts = G_GUINT64_CONSTANT (0);
 
   getrusage (RUSAGE_SELF, &ru);
@@ -147,6 +254,7 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
   /* get stats record for current thread */
   if (!(stats = g_hash_table_lookup (self->threads, thread_id))) {
     stats = g_slice_new0 (GstThreadStats);
+    stats->tvs_thread = make_trace_values (GST_SECOND);
     g_hash_table_insert (self->threads, thread_id, stats);
   }
 #ifdef HAVE_CLOCK_GETTIME
@@ -175,9 +283,6 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
   stats->tthread += GST_CLOCK_DIFF (last_ts, ts);
 #endif
 
-  /* remember last timestamp for fallback calculations */
-  last_ts = ts;
-
   /* Calibrate ts for the process and main thread. For tthread[main] and tproc
    * the time is larger than ts, as our base-ts is taken after the process has
    * started.
@@ -195,7 +300,7 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
       stats->tthread -= tproc_base;
     }
   }
-  /* we always need to corect proc time */
+  /* we always need to correct proc time */
   tproc -= tproc_base;
 
   /* FIXME: how can we take cpu-frequency scaling into account?
@@ -206,18 +311,29 @@ gst_rusage_tracer_invoke (GstTracer * obj, GstTracerHookId hid,
    *   cpufreq-selector -g ondemand
    */
   /* *INDENT-OFF* */
-  cpuload = (guint) gst_util_uint64_scale (stats->tthread,
+  avg_cpuload = (guint) gst_util_uint64_scale (stats->tthread,
       G_GINT64_CONSTANT (100), ts);
+  update_trace_value (stats->tvs_thread, ts, stats->tthread, &dts, &dtproc);
+  cur_cpuload = (guint) gst_util_uint64_scale (dtproc, G_GINT64_CONSTANT (100), dts);
   gst_tracer_log_trace (gst_structure_new ("thread-rusage", 
       "ts", G_TYPE_UINT64, ts, 
       "thread-id", G_TYPE_UINT, GPOINTER_TO_UINT (thread_id), 
-      "cpuload", G_TYPE_UINT, cpuload,
+      "average-cpuload", G_TYPE_UINT, MIN (avg_cpuload, 1000),
+      "current-cpuload", G_TYPE_UINT, MIN (cur_cpuload, 1000),
       "time", G_TYPE_UINT64, stats->tthread,
       NULL));
-  cpuload = (guint) gst_util_uint64_scale (tproc, G_GINT64_CONSTANT (100), ts);
+
+  avg_cpuload = (guint) gst_util_uint64_scale (tproc, G_GINT64_CONSTANT (100), ts);
+  g_mutex_lock (&tvs_proc->lock);
+  update_trace_value (tvs_proc, ts, tproc, &dts, &dtproc);
+  /* remember last timestamp for fallback calculations */
+  last_ts = ts;
+  g_mutex_unlock (&tvs_proc->lock);
+  cur_cpuload = (guint) gst_util_uint64_scale (dtproc, G_GINT64_CONSTANT (100), dts);
   gst_tracer_log_trace (gst_structure_new ("proc-rusage", 
       "ts", G_TYPE_UINT64, ts, 
-      "cpuload", G_TYPE_UINT, cpuload,
+      "average-cpuload", G_TYPE_UINT, MIN (avg_cpuload, 1000),
+      "current-cpuload", G_TYPE_UINT, MIN (cur_cpuload, 1000),
       "time", G_TYPE_UINT64, tproc,
       NULL));
   /* *INDENT-ON* */
index bc65cbd..b2a71f8 100644 (file)
@@ -385,7 +385,8 @@ do_thread_rusage_stats (GstStructure * s)
 
   gst_structure_get (s, "ts", G_TYPE_UINT64, &ts,
       "thread-id", G_TYPE_UINT, &thread_id,
-      "cpuload", G_TYPE_UINT, &cpuload, "time", G_TYPE_UINT64, &tthread, NULL);
+      "average-cpuload", G_TYPE_UINT, &cpuload, "time", G_TYPE_UINT64, &tthread,
+      NULL);
   thread_stats = get_thread_stats (thread_id);
   thread_stats->cpuload = cpuload;
   thread_stats->tthread = tthread;
@@ -398,7 +399,7 @@ do_proc_rusage_stats (GstStructure * s)
   guint64 ts;
 
   gst_structure_get (s, "ts", G_TYPE_UINT64, &ts,
-      "cpuload", G_TYPE_UINT, &total_cpuload, NULL);
+      "average-cpuload", G_TYPE_UINT, &total_cpuload, NULL);
   last_ts = MAX (last_ts, ts);
 }