#define GST_CAT_DEFAULT aggregator_debug
/* GstAggregatorPad definitions */
-#define PAD_LOCK_EVENT(pad) G_STMT_START { \
- GST_TRACE_OBJECT (pad, "Taking EVENT lock from thread %p", \
+#define PAD_LOCK(pad) G_STMT_START { \
+ GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \
g_thread_self()); \
- g_mutex_lock(&pad->priv->event_lock); \
- GST_TRACE_OBJECT (pad, "Took EVENT lock from thread %p", \
+ GST_OBJECT_LOCK (pad); \
+ GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \
g_thread_self()); \
} G_STMT_END
-#define PAD_UNLOCK_EVENT(pad) G_STMT_START { \
- GST_TRACE_OBJECT (pad, "Releasing EVENT lock from thread %p", \
+#define PAD_UNLOCK(pad) G_STMT_START { \
+ GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \
g_thread_self()); \
- g_mutex_unlock(&pad->priv->event_lock); \
- GST_TRACE_OBJECT (pad, "Release EVENT lock from thread %p", \
+ GST_OBJECT_UNLOCK (pad); \
+ GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \
g_thread_self()); \
} G_STMT_END
GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \
g_thread_self()); \
g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \
- &(pad->priv->event_lock)); \
+ GST_OBJECT_GET_LOCK (pad)); \
GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \
g_thread_self()); \
} G_STMT_END
gboolean pending_eos;
gboolean flushing;
- GMutex event_lock;
GCond event_cond;
GMutex stream_lock;
* called
*/
SRC_STREAM_LOCK (self);
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
if (!aggpad->buffer) {
aggpad->eos = TRUE;
} else {
aggpad->priv->pending_eos = TRUE;
}
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
SRC_STREAM_BROADCAST (self);
SRC_STREAM_UNLOCK (self);
}
case GST_EVENT_SEGMENT:
{
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->segment);
self->priv->seqnum = gst_event_get_seqnum (event);
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
goto eat;
}
case GST_EVENT_STREAM_START:
if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
goto eos;
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
}
SRC_STREAM_LOCK (self);
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
if (aggpad->buffer)
gst_buffer_unref (aggpad->buffer);
aggpad->buffer = actual_buf;
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
PAD_STREAM_UNLOCK (aggpad);
if (gst_aggregator_check_pads_ready (self))
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_QUERY_IS_SERIALIZED (query)) {
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
goto flushing;
}
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
goto flushing;
}
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (active == FALSE) {
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
g_atomic_int_set (&aggpad->priv->flushing, TRUE);
gst_buffer_replace (&aggpad->buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
} else {
- PAD_LOCK_EVENT (aggpad);
+ PAD_LOCK (aggpad);
g_atomic_int_set (&aggpad->priv->flushing, FALSE);
PAD_BROADCAST_EVENT (aggpad);
- PAD_UNLOCK_EVENT (aggpad);
+ PAD_UNLOCK (aggpad);
}
return TRUE;
{
GstAggregatorPad *pad = (GstAggregatorPad *) object;
- g_mutex_clear (&pad->priv->event_lock);
g_cond_clear (&pad->priv->event_cond);
g_mutex_clear (&pad->priv->stream_lock);
GstAggregatorPadPrivate);
pad->buffer = NULL;
- g_mutex_init (&pad->priv->event_lock);
g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->stream_lock);
}
/**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_steal_buffer_unlocked:
* @pad: the pad to get buffer from
*
* Steal the ref to the buffer currently queued in @pad.
*
+ * MUST be called with the pad's object lock held.
+ *
* Returns: (transfer full): The buffer in @pad or NULL if no buffer was
* queued. You should unref the buffer after usage.
*/
GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad)
{
GstBuffer *buffer = NULL;
- PAD_LOCK_EVENT (pad);
if (pad->buffer) {
GST_TRACE_OBJECT (pad, "Consuming buffer");
buffer = pad->buffer;
PAD_BROADCAST_EVENT (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
- PAD_UNLOCK_EVENT (pad);
+
+ return buffer;
+}
+
+/**
+ * gst_aggregator_pad_steal_buffer:
+ * @pad: the pad to get buffer from
+ *
+ * Steal the ref to the buffer currently queued in @pad.
+ *
+ * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
+ * queued. You should unref the buffer after usage.
+ */
+GstBuffer *
+gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+{
+ GstBuffer *buffer = NULL;
+
+ PAD_LOCK (pad);
+ buffer = gst_aggregator_pad_steal_buffer_unlocked (pad);
+ PAD_UNLOCK (pad);
return buffer;
}
{
GstBuffer *buffer = NULL;
- PAD_LOCK_EVENT (pad);
+ PAD_LOCK (pad);
if (pad->buffer)
buffer = gst_buffer_ref (pad->buffer);
- PAD_UNLOCK_EVENT (pad);
+ PAD_UNLOCK (pad);
return buffer;
}