appsrc/sink: Fix optimization for only signalling waiters if someone is actually...
authorSebastian Dröge <sebastian@centricular.com>
Wed, 2 May 2018 15:11:58 +0000 (18:11 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 2 May 2018 15:11:58 +0000 (18:11 +0300)
It is possible that both application and the stream are waiting
currently, if for example the following happens:
  1) app is waiting because no buffer in appsink
  2) appsink providing a buffer and waking up app
  3) appsink getting another buffer and waiting because it's full now
  4) app thread getting back control

Previously step 4 would overwrite that the appsink is currently waiting,
so it would never be signalled again.

https://bugzilla.gnome.org/show_bug.cgi?id=795551

gst-libs/gst/app/gstappsink.c
gst-libs/gst/app/gstappsrc.c

index ab662f5..4d8be0d 100644 (file)
@@ -75,9 +75,9 @@
 
 typedef enum
 {
-  NOONE_WAITING,
-  STREAM_WAITING,               /* streaming thread is waiting for application thread */
-  APP_WAITING,                  /* application thread is waiting for streaming thread */
+  NOONE_WAITING = 0,
+  STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
+  APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
 } GstAppSinkWaitStatus;
 
 struct _GstAppSinkPrivate
@@ -731,9 +731,9 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
        * consumed, which is a bit confusing for the application
        */
       while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) {
-        priv->wait_status = STREAM_WAITING;
+        priv->wait_status |= STREAM_WAITING;
         g_cond_wait (&priv->cond, &priv->mutex);
-        priv->wait_status = NOONE_WAITING;
+        priv->wait_status &= ~STREAM_WAITING;
       }
       if (priv->flushing)
         emit = FALSE;
@@ -781,7 +781,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
   GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer);
   gst_buffer_replace (&priv->preroll_buffer, buffer);
 
-  if (priv->wait_status == APP_WAITING)
+  if ((priv->wait_status & APP_WAITING))
     g_cond_signal (&priv->cond);
 
   emit = priv->emit_signals;
@@ -902,9 +902,9 @@ restart:
       }
 
       /* wait for a buffer to be removed or flush */
-      priv->wait_status = STREAM_WAITING;
+      priv->wait_status |= STREAM_WAITING;
       g_cond_wait (&priv->cond, &priv->mutex);
-      priv->wait_status = NOONE_WAITING;
+      priv->wait_status &= ~STREAM_WAITING;
 
       if (priv->flushing)
         goto flushing;
@@ -914,7 +914,7 @@ restart:
   gst_queue_array_push_tail (priv->queue, gst_mini_object_ref (data));
   priv->num_buffers++;
 
-  if (priv->wait_status == APP_WAITING)
+  if ((priv->wait_status & APP_WAITING))
     g_cond_signal (&priv->cond);
 
   emit = priv->emit_signals;
@@ -1012,9 +1012,9 @@ gst_app_sink_query (GstBaseSink * bsink, GstQuery * query)
       g_mutex_lock (&priv->mutex);
       GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed");
       while (priv->num_buffers > 0 || priv->preroll_buffer) {
-        priv->wait_status = STREAM_WAITING;
+        priv->wait_status |= STREAM_WAITING;
         g_cond_wait (&priv->cond, &priv->mutex);
-        priv->wait_status = NOONE_WAITING;
+        priv->wait_status &= ~STREAM_WAITING;
       }
       g_mutex_unlock (&priv->mutex);
       ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
@@ -1528,14 +1528,14 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout)
 
     /* nothing to return, wait */
     GST_DEBUG_OBJECT (appsink, "waiting for the preroll buffer");
-    priv->wait_status = APP_WAITING;
+    priv->wait_status |= APP_WAITING;
     if (timeout_valid) {
       if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time))
         goto expired;
     } else {
       g_cond_wait (&priv->cond, &priv->mutex);
     }
-    priv->wait_status = NOONE_WAITING;
+    priv->wait_status &= ~APP_WAITING;
   }
   sample =
       gst_sample_new (priv->preroll_buffer, priv->preroll_caps,
@@ -1550,7 +1550,7 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout)
 expired:
   {
     GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL");
-    priv->wait_status = NOONE_WAITING;
+    priv->wait_status &= ~APP_WAITING;
     g_mutex_unlock (&priv->mutex);
     return NULL;
   }
@@ -1626,14 +1626,14 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
 
     /* nothing to return, wait */
     GST_DEBUG_OBJECT (appsink, "waiting for a buffer");
-    priv->wait_status = APP_WAITING;
+    priv->wait_status |= APP_WAITING;
     if (timeout_valid) {
       if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time))
         goto expired;
     } else {
       g_cond_wait (&priv->cond, &priv->mutex);
     }
-    priv->wait_status = NOONE_WAITING;
+    priv->wait_status &= ~APP_WAITING;
   }
 
   obj = dequeue_buffer (appsink);
@@ -1652,7 +1652,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
   }
   gst_mini_object_unref (obj);
 
-  if (priv->wait_status == STREAM_WAITING)
+  if ((priv->wait_status & STREAM_WAITING))
     g_cond_signal (&priv->cond);
 
   g_mutex_unlock (&priv->mutex);
@@ -1663,7 +1663,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
 expired:
   {
     GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL");
-    priv->wait_status = NOONE_WAITING;
+    priv->wait_status &= ~APP_WAITING;
     g_mutex_unlock (&priv->mutex);
     return NULL;
   }
index 1328893..541eb60 100644 (file)
 
 typedef enum
 {
-  NOONE_WAITING,
-  STREAM_WAITING,               /* streaming thread is waiting for application thread */
-  APP_WAITING,                  /* application thread is waiting for streaming thread */
+  NOONE_WAITING = 0,
+  STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
+  APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
 } GstAppSrcWaitStatus;
 
 struct _GstAppSrcPrivate
@@ -1242,7 +1242,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
         priv->offset += buf_size;
 
       /* signal that we removed an item */
-      if (priv->wait_status == APP_WAITING)
+      if ((priv->wait_status & APP_WAITING))
         g_cond_broadcast (&priv->cond);
 
       /* see if we go lower than the empty-percent */
@@ -1277,9 +1277,9 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
       goto eos;
 
     /* nothing to return, wait a while for new data or flushing. */
-    priv->wait_status = STREAM_WAITING;
+    priv->wait_status |= STREAM_WAITING;
     g_cond_wait (&priv->cond, &priv->mutex);
-    priv->wait_status = NOONE_WAITING;
+    priv->wait_status &= ~STREAM_WAITING;
   }
   g_mutex_unlock (&priv->mutex);
   return ret;
@@ -1840,9 +1840,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
         /* we are filled, wait until a buffer gets popped or when we
          * flush. */
-        priv->wait_status = APP_WAITING;
+        priv->wait_status |= APP_WAITING;
         g_cond_wait (&priv->cond, &priv->mutex);
-        priv->wait_status = NOONE_WAITING;
+        priv->wait_status &= ~APP_WAITING;
       } else {
         /* no need to wait for free space, we just pump more data into the
          * queue hoping that the caller reacts to the enough-data signal and
@@ -1867,7 +1867,7 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
     priv->queued_bytes += gst_buffer_get_size (buffer);
   }
 
-  if (priv->wait_status == STREAM_WAITING)
+  if ((priv->wait_status & STREAM_WAITING))
     g_cond_broadcast (&priv->cond);
 
   g_mutex_unlock (&priv->mutex);