GMutex blocking_push_mutex;
GMutex priv_mutex;
+ GCond buf_or_eos_cond;
+ GMutex buf_or_eos_mutex;
+ gboolean eos_received;
+
GPtrArray *stress;
};
g_mutex_lock (&priv->blocking_push_mutex);
g_atomic_int_inc (&priv->recv_buffers);
- if (priv->drop_buffers)
+ if (priv->drop_buffers) {
gst_buffer_unref (buffer);
- else
+ } else {
+ g_mutex_lock (&priv->buf_or_eos_mutex);
g_async_queue_push (priv->buffer_queue, buffer);
+ g_cond_signal (&priv->buf_or_eos_cond);
+ g_mutex_unlock (&priv->buf_or_eos_mutex);
+ }
+
if (priv->blocking_push_mode) {
g_cond_wait (&priv->blocking_push_cond, &priv->blocking_push_mutex);
HARNESS_LOCK (h);
} else {
g_async_queue_push (priv->sink_event_queue, event);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+ g_mutex_lock (&priv->buf_or_eos_mutex);
+ priv->eos_received = TRUE;
+ g_cond_signal (&priv->buf_or_eos_cond);
+ g_mutex_unlock (&priv->buf_or_eos_mutex);
+ }
}
HARNESS_UNLOCK (h);
g_cond_init (&priv->blocking_push_cond);
g_mutex_init (&priv->priv_mutex);
+ g_mutex_init (&priv->buf_or_eos_mutex);
+ g_cond_init (&priv->buf_or_eos_cond);
+ priv->eos_received = FALSE;
+
priv->stress = g_ptr_array_new_with_free_func (
(GDestroyNotify) gst_harness_stress_free);
g_mutex_clear (&priv->blocking_push_mutex);
g_mutex_clear (&priv->priv_mutex);
+ g_mutex_clear (&priv->buf_or_eos_mutex);
+ g_cond_clear (&priv->buf_or_eos_cond);
+ priv->eos_received = FALSE;
+
g_async_queue_unref (priv->buffer_queue);
g_async_queue_unref (priv->src_event_queue);
g_async_queue_unref (priv->sink_event_queue);
}
/**
+ * gst_harness_pull_until_eos:
+ * @h: a #GstHarness
+ * @buf: (out) (transfer full): A #GstBuffer, or %NULL if EOS or timeout occures
+ * first.
+ *
+ * Pulls a #GstBuffer from the #GAsyncQueue on the #GstHarness sinkpad. The pull
+ * will block until an EOS event is received, or timeout in 60 seconds.
+ * MT safe.
+ *
+ * Returns: %TRUE on success, %FALSE on timeout.
+ *
+ * Since: 1.18
+ */
+gboolean
+gst_harness_pull_until_eos (GstHarness * h, GstBuffer ** buf)
+{
+ GstHarnessPrivate *priv = h->priv;
+ gboolean success = TRUE;
+ gint64 end_time = g_get_monotonic_time () + 60 * G_TIME_SPAN_SECOND;
+
+ g_mutex_lock (&priv->buf_or_eos_mutex);
+ while (success) {
+ *buf = g_async_queue_try_pop (priv->buffer_queue);
+ if (*buf || priv->eos_received)
+ break;
+ success = g_cond_wait_until (&priv->buf_or_eos_cond,
+ &priv->buf_or_eos_mutex, end_time);
+ }
+ g_mutex_unlock (&priv->buf_or_eos_mutex);
+
+ return success;
+}
+
+/**
* gst_harness_try_pull:
* @h: a #GstHarness
*