inputselector: Fix waiting on sync-mode=clock
authorVivia Nikolaidou <vivia@ahiru.eu>
Mon, 24 Oct 2022 13:49:47 +0000 (16:49 +0300)
committerVivia Nikolaidou <vivia@ahiru.eu>
Tue, 22 Nov 2022 19:21:40 +0000 (21:21 +0200)
Basically copy over what clocksync does, but taking into account that we
have multiple upstream latencies.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3256>

subprojects/gstreamer/plugins/elements/gstinputselector.c
subprojects/gstreamer/plugins/elements/gstinputselector.h

index 069e732..f673c46 100644 (file)
@@ -165,6 +165,8 @@ struct _GstSelectorPad
 
   gboolean sending_cached_buffers;
   GQueue *cached_buffers;
+
+  GstClockID clock_id;
 };
 
 struct _GstSelectorPadCachedBuffer
@@ -345,6 +347,11 @@ gst_selector_pad_reset (GstSelectorPad * pad)
   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
   pad->sending_cached_buffers = FALSE;
   gst_selector_pad_free_cached_buffers (pad);
+  if (pad->clock_id) {
+    gst_clock_id_unschedule (pad->clock_id);
+    gst_clock_id_unref (pad->clock_id);
+  }
+  pad->clock_id = NULL;
   GST_OBJECT_UNLOCK (pad);
 }
 
@@ -474,6 +481,10 @@ gst_input_selector_eos_wait (GstInputSelector * self, GstSelectorPad * pad,
 
       gst_pad_push_event (self->srcpad, gst_event_ref (eos_event));
       GST_INPUT_SELECTOR_LOCK (self);
+      if (pad->clock_id) {
+        GST_DEBUG_OBJECT (pad, "unlock clock wait");
+        gst_clock_id_unschedule (pad->clock_id);
+      }
       /* Wake up other pads so they can continue when syncing to
        * running time, as this pad just switched to EOS and
        * may enable others to progress */
@@ -558,6 +569,10 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
       selpad->flushing = TRUE;
       sel->eos = FALSE;
       selpad->group_done = FALSE;
+      if (selpad->clock_id) {
+        GST_DEBUG_OBJECT (selpad, "unlock clock wait");
+        gst_clock_id_unschedule (selpad->clock_id);
+      }
       GST_INPUT_SELECTOR_BROADCAST (sel);
       break;
     case GST_EVENT_FLUSH_STOP:
@@ -750,7 +765,6 @@ gst_input_selector_wait_running_time (GstInputSelector * sel,
   while (TRUE) {
     GstPad *active_sinkpad;
     GstSelectorPad *active_selpad;
-    GstClock *clock;
     gint64 cur_running_time;
     GstClockTime running_time;
 
@@ -774,21 +788,7 @@ gst_input_selector_wait_running_time (GstInputSelector * sel,
     }
 
     cur_running_time = GST_CLOCK_TIME_NONE;
-    if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) {
-      clock = gst_element_get_clock (GST_ELEMENT_CAST (sel));
-      if (clock) {
-        GstClockTime base_time;
-
-        cur_running_time = gst_clock_get_time (clock);
-        base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel));
-        if (base_time <= cur_running_time)
-          cur_running_time -= base_time;
-        else
-          cur_running_time = 0;
-
-        gst_object_unref (clock);
-      }
-    } else {
+    if (sel->sync_mode != GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) {
       GstSegment *active_seg;
 
       active_seg = &active_selpad->segment;
@@ -818,9 +818,61 @@ gst_input_selector_wait_running_time (GstInputSelector * sel,
       break;
     }
 
-    if (selpad != active_selpad && !sel->eos && !sel->flushing
-        && !selpad->flushing && (cur_running_time == GST_CLOCK_TIME_NONE
-            || running_time >= cur_running_time)) {
+    if (selpad == active_selpad || sel->eos || sel->flushing
+        || selpad->flushing) {
+      GST_DEBUG_OBJECT (selpad, "Waiting aborted. Unblocking");
+      GST_INPUT_SELECTOR_UNLOCK (sel);
+      break;
+    }
+
+    if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK
+        && GST_CLOCK_TIME_IS_VALID (running_time)) {
+      GstClock *clock;
+      GstClockReturn cret;
+      GstClockTime base_time;
+      GstClockTimeDiff jitter;
+      GstClockID clock_id;
+
+      base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel));
+      if (!GST_CLOCK_TIME_IS_VALID (base_time)) {
+        GST_DEBUG_OBJECT (selpad, "sync-mode=clock but no base time. Blocking");
+        GST_INPUT_SELECTOR_WAIT (sel);
+        continue;
+      }
+
+      clock = gst_element_get_clock (GST_ELEMENT_CAST (sel));
+      if (!clock) {
+        GST_DEBUG_OBJECT (selpad, "sync-mode=clock but no clock. Blocking");
+        GST_INPUT_SELECTOR_WAIT (sel);
+        continue;
+      }
+
+      /* FIXME: If no upstream latency was queried yet, do one now */
+      clock_id =
+          gst_clock_new_single_shot_id (clock,
+          running_time + base_time + sel->upstream_latency);
+      selpad->clock_id = gst_clock_id_ref (clock_id);
+      GST_INPUT_SELECTOR_UNLOCK (sel);
+
+      gst_object_unref (clock);
+      cret = gst_clock_id_wait (clock_id, &jitter);
+      gst_clock_id_unref (clock_id);
+
+      GST_DEBUG_OBJECT (sel, "Clock returned %d, jitter %" GST_STIME_FORMAT,
+          cret, GST_STIME_ARGS (jitter));
+
+      GST_INPUT_SELECTOR_LOCK (sel);
+      if (selpad->clock_id) {
+        gst_clock_id_unref (selpad->clock_id);
+        selpad->clock_id = NULL;
+      }
+      if (cret == GST_CLOCK_OK ||
+          cret == GST_CLOCK_EARLY || cret == GST_CLOCK_DONE) {
+        GST_INPUT_SELECTOR_UNLOCK (sel);
+        break;
+      }
+    } else if (!GST_CLOCK_TIME_IS_VALID (cur_running_time)
+        || running_time >= cur_running_time) {
       GST_DEBUG_OBJECT (selpad,
           "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
@@ -1333,6 +1385,8 @@ gst_input_selector_init (GstInputSelector * sel)
   g_cond_init (&sel->cond);
   sel->eos = FALSE;
 
+  sel->upstream_latency = 0;
+
   /* lets give a change for downstream to do something on
    * active-pad change before we start pushing new buffers */
   g_signal_connect_data (sel, "notify::active-pad",
@@ -1714,7 +1768,16 @@ retry:
       GST_ERROR_OBJECT (pad, "minimum latency bigger than maximum latency");
     }
 
-    gst_query_set_latency (query, fold_data.live, fold_data.min, fold_data.max);
+    GST_INPUT_SELECTOR_LOCK (sel);
+    if (fold_data.live)
+      sel->upstream_latency = fold_data.min;
+    else
+      sel->upstream_latency = 0;
+
+    gst_query_set_latency (query, fold_data.live
+        || (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK),
+        fold_data.min, fold_data.max);
+    GST_INPUT_SELECTOR_UNLOCK (sel);
   } else {
     GST_LOG_OBJECT (pad, "latency query failed");
   }
@@ -1876,6 +1939,7 @@ gst_input_selector_reset (GstInputSelector * sel)
     }
   }
   sel->have_group_id = TRUE;
+  sel->upstream_latency = 0;
   GST_INPUT_SELECTOR_UNLOCK (sel);
 }
 
@@ -1902,6 +1966,19 @@ gst_input_selector_change_state (GstElement * element,
       GST_INPUT_SELECTOR_BROADCAST (self);
       GST_INPUT_SELECTOR_UNLOCK (self);
       break;
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{
+      GList *walk;
+
+      for (walk = GST_ELEMENT_CAST (self)->sinkpads; walk;
+          walk = g_list_next (walk)) {
+        GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
+        if (selpad->clock_id) {
+          GST_DEBUG_OBJECT (selpad, "unlock clock wait");
+          gst_clock_id_unschedule (selpad->clock_id);
+        }
+      }
+      break;
+    }
     default:
       break;
   }
index 9d2eb6a..79a5578 100644 (file)
@@ -80,6 +80,8 @@ struct _GstInputSelector {
   gboolean eos;
   gboolean eos_sent;
   gboolean flushing;
+
+  GstClockTime upstream_latency;
 };
 
 struct _GstInputSelectorClass {