#define PAD_WAIT_EVENT(pad) G_STMT_START { \
- GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \
+ GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \
g_thread_self()); \
g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \
(&((GstAggregatorPad*)pad)->priv->lock)); \
- GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \
+ GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
g_thread_self()); \
} G_STMT_END
#define PAD_BROADCAST_EVENT(pad) G_STMT_START { \
- GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p", \
+ GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \
g_thread_self()); \
g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
} G_STMT_END
struct _GstAggregatorPadPrivate
{
- /* To always be used atomically */
- gboolean flushing;
-
/* Following fields are protected by the PAD_LOCK */
+ gboolean flushing;
gboolean pending_flush_start;
gboolean pending_flush_stop;
gboolean pending_eos;
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad)
{
PAD_LOCK (aggpad);
- g_atomic_int_set (&aggpad->priv->flushing, TRUE);
+ aggpad->priv->flushing = TRUE;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
PAD_FLUSH_LOCK (aggpad);
- if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
- goto flushing;
-
PAD_LOCK (aggpad);
if (aggpad->priv->pending_eos == TRUE)
goto eos;
- while (aggpad->priv->buffer
- && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
- GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
+ while (aggpad->priv->buffer && !aggpad->priv->flushing)
PAD_WAIT_EVENT (aggpad);
- }
- PAD_UNLOCK (aggpad);
- if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
+ if (aggpad->priv->flushing)
goto flushing;
+ PAD_UNLOCK (aggpad);
+
if (aggclass->clip) {
aggclass->clip (self, aggpad, buffer, &actual_buf);
}
return flow_return;
flushing:
+ PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer);
if (GST_QUERY_IS_SERIALIZED (query)) {
PAD_LOCK (aggpad);
- if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
- PAD_UNLOCK (aggpad);
- goto flushing;
- }
-
- while (aggpad->priv->buffer
- && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
- GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
+ while (aggpad->priv->buffer && !aggpad->priv->flushing)
PAD_WAIT_EVENT (aggpad);
- }
- PAD_UNLOCK (aggpad);
- if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
+ if (aggpad->priv->flushing)
goto flushing;
+
+ PAD_UNLOCK (aggpad);
}
return klass->sink_query (GST_AGGREGATOR (parent),
GST_AGGREGATOR_PAD (pad), query);
flushing:
+ PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
return FALSE;
}
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
PAD_LOCK (aggpad);
- if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
- && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
- PAD_UNLOCK (aggpad);
- goto flushing;
- }
- while (aggpad->priv->buffer
- && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
- GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
+ while (aggpad->priv->buffer && !aggpad->priv->flushing)
PAD_WAIT_EVENT (aggpad);
- }
- PAD_UNLOCK (aggpad);
- if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
+ if (aggpad->priv->flushing
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
goto flushing;
+
+ PAD_UNLOCK (aggpad);
}
return klass->sink_event (GST_AGGREGATOR (parent),
GST_AGGREGATOR_PAD (pad), event);
flushing:
+ PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (pad, event);
gst_aggregator_pad_set_flushing (aggpad);
} else {
PAD_LOCK (aggpad);
- g_atomic_int_set (&aggpad->priv->flushing, FALSE);
+ aggpad->priv->flushing = FALSE;
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}