g_thread_self()); \
} G_STMT_END
+#define PAD_STREAM_LOCK(pad) G_STMT_START { \
+ GST_LOG_OBJECT (pad, "Taking lock from thread %p", \
+ g_thread_self()); \
+ g_mutex_lock(&pad->priv->stream_lock); \
+ GST_LOG_OBJECT (pad, "Took lock from thread %p", \
+ g_thread_self()); \
+ } G_STMT_END
+
+#define PAD_STREAM_UNLOCK(pad) G_STMT_START { \
+ GST_LOG_OBJECT (pad, "Releasing lock from thread %p", \
+ g_thread_self()); \
+ g_mutex_unlock(&pad->priv->stream_lock); \
+ GST_LOG_OBJECT (pad, "Release lock from thread %p", \
+ g_thread_self()); \
+ } G_STMT_END
+
struct _GstAggregatorPadPrivate
{
gboolean pending_flush_start;
GMutex event_lock;
GCond event_cond;
+
+ GMutex stream_lock;
};
static gboolean
return TRUE;
}
+static void
+_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
+{
+ GstBuffer *tmpbuf;
+ GstAggregatorPrivate *priv = self->priv;
+ GstAggregatorPadPrivate *padpriv = aggpad->priv;
+
+ g_atomic_int_set (&aggpad->priv->flushing, TRUE);
+ /* Remove pad buffer and wake up the streaming thread */
+ tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
+ gst_buffer_replace (&tmpbuf, NULL);
+ PAD_STREAM_LOCK (aggpad);
+ if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
+ TRUE, FALSE) == TRUE) {
+ GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
+ g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
+ }
+
+ if (g_atomic_int_get (&priv->flush_seeking)) {
+ /* If flush_seeking we forward the first FLUSH_START */
+ if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
+ TRUE, FALSE) == TRUE) {
+
+ GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
+ _stop_srcpad_task (self, event);
+ priv->flow_return = GST_FLOW_OK;
+
+ GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
+ GST_PAD_STREAM_LOCK (self->srcpad);
+ GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
+ event = NULL;
+ }
+ } else {
+ gst_event_unref (event);
+ }
+ PAD_STREAM_UNLOCK (aggpad);
+
+ tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
+ gst_buffer_replace (&tmpbuf, NULL);
+}
+
/* GstAggregator vmethods default implementations */
static gboolean
_sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
gboolean res = TRUE;
GstPad *pad = GST_PAD (aggpad);
GstAggregatorPrivate *priv = self->priv;
- GstAggregatorPadPrivate *padpriv = aggpad->priv;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
- GstBuffer *tmpbuf;
-
- g_atomic_int_set (&aggpad->priv->flushing, TRUE);
- /* Remove pad buffer and wake up the streaming thread */
- tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
- gst_buffer_replace (&tmpbuf, NULL);
- if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
- TRUE, FALSE) == TRUE) {
- GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
- g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
- }
-
- if (g_atomic_int_get (&priv->flush_seeking)) {
- /* If flush_seeking we forward the first FLUSH_START */
- if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
- TRUE, FALSE) == TRUE) {
-
- GST_DEBUG_OBJECT (self, "Flushing, pausing srcpad task");
- _stop_srcpad_task (self, event);
- priv->flow_return = GST_FLOW_OK;
-
- GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
- GST_PAD_STREAM_LOCK (self->srcpad);
- GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
- event = NULL;
- goto eat;
- }
- }
-
+ _flush_start (self, aggpad, event);
/* We forward only in one case: right after flush_seeking */
+ event = NULL;
goto eat;
}
case GST_EVENT_FLUSH_STOP:
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
+ PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
-
if (aggclass->clip) {
aggclass->clip (self, aggpad, buffer, &actual_buf);
}
gst_buffer_unref (aggpad->buffer);
aggpad->buffer = actual_buf;
PAD_UNLOCK_EVENT (aggpad);
+ PAD_STREAM_UNLOCK (aggpad);
_add_aggregate_gsource (self);
return priv->flow_return;
flushing:
+ PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "We are flushing");
return GST_FLOW_FLUSHING;
eos:
+ PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (pad, "We are EOS already...");
g_mutex_clear (&pad->priv->event_lock);
g_cond_clear (&pad->priv->event_cond);
+ g_mutex_clear (&pad->priv->stream_lock);
G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
}
g_mutex_init (&pad->priv->event_lock);
g_cond_init (&pad->priv->event_cond);
+ g_mutex_init (&pad->priv->stream_lock);
}
/**