pad->priv->clipped_buffer == NULL);
}
+/* Will return FALSE if there's no buffer available on every non-EOS pad, or
+ * if at least one of the pads has an event or query at the top of its queue.
+ *
+ * Only returns TRUE if all non-EOS pads have a buffer available at the top of
+ * their queue or a clipped buffer already.
+ */
static gboolean
-gst_aggregator_check_pads_ready (GstAggregator * self)
+gst_aggregator_check_pads_ready (GstAggregator * self,
+ gboolean * have_event_or_query_ret)
{
GstAggregatorPad *pad = NULL;
GList *l, *sinkpads;
PAD_LOCK (pad);
- if (pad->priv->num_buffers == 0) {
- if (!gst_aggregator_pad_queue_is_empty (pad))
- have_event_or_query = TRUE;
- if (!pad->priv->eos) {
- have_buffer = FALSE;
+ /* If there's an event or query at the top of the queue and we don't yet
+ * have taken the top buffer out and stored it as clip_buffer, remember
+ * that and exit the loop. We first have to handle all events/queries
+ * before we handle any buffers. */
+ if (!pad->priv->clipped_buffer
+ && (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))
+ || GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) {
+ PAD_UNLOCK (pad);
+ have_event_or_query = TRUE;
+ break;
+ }
- /* If not live we need data on all pads, so leave the loop */
- if (!self->priv->peer_latency_live) {
- PAD_UNLOCK (pad);
- goto pad_not_ready;
- }
- }
+ /* Otherwise check if we have a clipped buffer or a buffer at the top of
+ * the queue, and if not then this pad is not ready unless it is also EOS */
+ if (!pad->priv->clipped_buffer
+ && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+ /* We must not have any buffers at all in this pad then as otherwise we
+ * would've had an event/query at the top of the queue */
+ g_assert (pad->priv->num_buffers == 0);
+
+ /* Only consider this pad as worth waiting for if it's not already EOS.
+ * There's no point in waiting for buffers on EOS pads */
+ if (!pad->priv->eos)
+ have_buffer = FALSE;
} else if (self->priv->peer_latency_live) {
/* In live mode, having a single pad with buffers is enough to
* generate a start time from it. In non-live mode all pads need
PAD_UNLOCK (pad);
}
- if (!have_buffer && !have_event_or_query)
+ if (have_event_or_query)
+ goto pad_not_ready_but_event_or_query;
+
+ if (!have_buffer)
goto pad_not_ready;
if (have_buffer)
GST_OBJECT_UNLOCK (self);
GST_LOG_OBJECT (self, "pads are ready");
+
+ if (have_event_or_query_ret)
+ *have_event_or_query_ret = have_event_or_query;
+
return TRUE;
no_sinkpads:
{
GST_LOG_OBJECT (self, "pads not ready: no sink pads");
GST_OBJECT_UNLOCK (self);
+
+ if (have_event_or_query_ret)
+ *have_event_or_query_ret = have_event_or_query;
+
return FALSE;
}
pad_not_ready:
{
- if (have_event_or_query)
- GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
- " but waking up for serialized event");
- else
- GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
+ GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
GST_OBJECT_UNLOCK (self);
- return have_event_or_query;
+
+ if (have_event_or_query_ret)
+ *have_event_or_query_ret = have_event_or_query;
+
+ return FALSE;
+ }
+pad_not_ready_but_event_or_query:
+ {
+ GST_LOG_OBJECT (pad,
+ "pad not ready to be aggregated yet, need to handle serialized event or query first");
+ GST_OBJECT_UNLOCK (self);
+
+ if (have_event_or_query_ret)
+ *have_event_or_query_ret = have_event_or_query;
+
+ return FALSE;
}
}
GstClockTime latency;
GstClockTime start;
gboolean res;
+ gboolean have_event_or_query = FALSE;
*timeout = FALSE;
latency = gst_aggregator_get_latency_unlocked (self);
- if (gst_aggregator_check_pads_ready (self)) {
+ if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) {
GST_DEBUG_OBJECT (self, "all pads have data");
SRC_UNLOCK (self);
return TRUE;
}
+ /* If we have an event or query, immediately return FALSE instead of waiting
+ * and handle it immediately */
+ if (have_event_or_query) {
+ GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first");
+ SRC_UNLOCK (self);
+ return FALSE;
+ }
+
/* Before waiting, check if we're actually still running */
if (!self->priv->running || !self->priv->send_eos) {
SRC_UNLOCK (self);
}
}
- res = gst_aggregator_check_pads_ready (self);
+ res = gst_aggregator_check_pads_ready (self, NULL);
SRC_UNLOCK (self);
return res;
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
- events_query_data.processed_event = FALSE;
- events_query_data.flow_ret = GST_FLOW_OK;
- gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
- gst_aggregator_do_events_and_queries, &events_query_data);
-
- if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
- goto handle_error;
-
- if (events_query_data.processed_event)
- continue;
-
if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
if (!gst_aggregator_negotiate_unlocked (self)) {
gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));