gboolean first_buffer;
GQueue buffers;
+ GstBuffer *clipped_buffer;
guint num_buffers;
GstClockTime head_position;
GstClockTime tail_position;
GstClockTime head_time;
GstClockTime tail_time;
GstClockTime time_level;
+ GstSegment head_segment; /* segment before the queue */
gboolean eos;
aggpad->priv->flow_return = GST_FLOW_OK;
GST_OBJECT_LOCK (aggpad);
gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
- gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
+ gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (aggpad);
aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
aggpad->priv->time_level = 0;
+ aggpad->priv->first_buffer = TRUE;
}
static gboolean
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
- return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
+ return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+ pad->priv->clipped_buffer == NULL);
}
static gboolean
{
GstAggregatorPad *pad;
GList *l, *sinkpads;
- gboolean have_data = TRUE;
+ gboolean have_buffer = TRUE;
+ gboolean have_event = FALSE;
GST_LOG_OBJECT (self, "checking pads");
PAD_LOCK (pad);
- if (gst_aggregator_pad_queue_is_empty (pad)) {
+ if (pad->priv->num_buffers == 0) {
+ if (!gst_aggregator_pad_queue_is_empty (pad))
+ have_event = TRUE;
if (!pad->priv->eos) {
- have_data = FALSE;
+ have_buffer = FALSE;
/* If not live we need data on all pads, so leave the loop */
if (!self->priv->peer_latency_live) {
PAD_UNLOCK (pad);
}
- if (!have_data)
+ if (!have_buffer && !have_event)
goto pad_not_ready;
- self->priv->first_buffer = FALSE;
+ if (have_buffer)
+ self->priv->first_buffer = FALSE;
GST_OBJECT_UNLOCK (self);
GST_LOG_OBJECT (self, "pads are ready");
}
pad_not_ready:
{
- GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
+ if (have_event)
+ GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
+ " but waking up for serialized event");
+ else
+ GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
GST_OBJECT_UNLOCK (self);
- return FALSE;
+ return have_event;
}
}
event = NULL;
PAD_LOCK (pad);
- if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
+ if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
- if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
+ if (pad->priv->clipped_buffer == NULL &&
+ GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
event = g_queue_pop_tail (&pad->priv->buffers);
PAD_BROADCAST_EVENT (pad);
}
item = next;
}
aggpad->priv->num_buffers = 0;
+ gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
{
if (head) {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
- aggpad->clip_segment.format == GST_FORMAT_TIME)
+ aggpad->priv->head_segment.format == GST_FORMAT_TIME)
aggpad->priv->head_time =
- gst_segment_to_running_time (&aggpad->clip_segment,
+ gst_segment_to_running_time (&aggpad->priv->head_segment,
GST_FORMAT_TIME, aggpad->priv->head_position);
else
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+
+ if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
+ aggpad->priv->tail_time = aggpad->priv->head_time;
} else {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
aggpad->segment.format == GST_FORMAT_TIME)
GST_OBJECT_UNLOCK (self);
}
- aggpad->priv->first_buffer = TRUE;
-
/* We never forward the event */
goto eat;
}
*/
SRC_LOCK (self);
PAD_LOCK (aggpad);
- if (gst_aggregator_pad_queue_is_empty (aggpad)) {
+ if (aggpad->priv->num_buffers == 0) {
aggpad->priv->eos = TRUE;
} else {
aggpad->priv->pending_eos = TRUE;
gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
{
/* Empty queue always has space */
- if (g_queue_get_length (&aggpad->priv->buffers) == 0)
+ if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
return TRUE;
/* We also want at least two buffers, one is being processed and one is ready
gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
- GstBuffer *actual_buf = buffer;
- GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
GstFlowReturn flow_return;
GstClockTime buf_pts;
PAD_UNLOCK (aggpad);
- if (aggclass->clip && head) {
- aggclass->clip (self, aggpad, buffer, &actual_buf);
- }
-
- if (actual_buf == NULL) {
- GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
- goto done;
- }
-
- buf_pts = GST_BUFFER_PTS (actual_buf);
+ buf_pts = GST_BUFFER_PTS (buffer);
aggpad->priv->first_buffer = FALSE;
if (gst_aggregator_pad_has_space (self, aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
- g_queue_push_head (&aggpad->priv->buffers, actual_buf);
+ g_queue_push_head (&aggpad->priv->buffers, buffer);
else
- g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
- apply_buffer (aggpad, actual_buf, head);
+ g_queue_push_tail (&aggpad->priv->buffers, buffer);
+ apply_buffer (aggpad, buffer, head);
aggpad->priv->num_buffers++;
- actual_buf = buffer = NULL;
+ buffer = NULL;
SRC_BROADCAST (self);
break;
}
break;
case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
GST_OBJECT_LOCK (aggpad);
- if (aggpad->segment.format == GST_FORMAT_TIME) {
+ if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
start_time = buf_pts;
if (start_time != -1) {
- start_time = MAX (start_time, aggpad->segment.start);
+ start_time = MAX (start_time, aggpad->priv->head_segment.start);
start_time =
- gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
- start_time);
+ gst_segment_to_running_time (&aggpad->priv->head_segment,
+ GST_FORMAT_TIME, start_time);
}
} else {
start_time = 0;
GST_OBJECT_UNLOCK (self);
SRC_UNLOCK (self);
-done:
-
PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Done chaining");
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
- gst_event_copy_segment (event, &aggpad->clip_segment);
- aggpad->priv->head_position = aggpad->clip_segment.position;
+ gst_event_copy_segment (event, &aggpad->priv->head_segment);
+ aggpad->priv->head_position = aggpad->priv->head_segment.position;
update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad);
}
- if (!gst_aggregator_pad_queue_is_empty (aggpad) &&
- GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
+ if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
event);
g_queue_push_head (&aggpad->priv->buffers, event);
g_mutex_init (&pad->priv->flush_lock);
g_mutex_init (&pad->priv->lock);
- pad->priv->first_buffer = TRUE;
gst_aggregator_pad_reset_unlocked (pad);
}
+/* Must be called with the PAD_LOCK held */
+static void
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+{
+ pad->priv->num_buffers--;
+ GST_TRACE_OBJECT (pad, "Consuming buffer");
+ if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
+ pad->priv->pending_eos = FALSE;
+ pad->priv->eos = TRUE;
+ }
+ PAD_BROADCAST_EVENT (pad);
+}
+
+/* Must be called with the PAD_LOCK held */
+static void
+gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
+{
+ GstAggregator *self = NULL;
+ GstAggregatorClass *aggclass;
+ GstBuffer *buffer = NULL;
+
+ while (pad->priv->clipped_buffer == NULL &&
+ GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
+ buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+ apply_buffer (pad, buffer, FALSE);
+
+ /* We only take the parent here so that it's not taken if the buffer is
+ * already clipped or if the queue is empty.
+ */
+ if (self == NULL) {
+ self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
+ if (self == NULL) {
+ gst_buffer_unref (buffer);
+ return;
+ }
+
+ aggclass = GST_AGGREGATOR_GET_CLASS (self);
+ }
+
+ if (aggclass->clip) {
+ GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
+
+ buffer = aggclass->clip (self, pad, buffer);
+
+ if (buffer == NULL) {
+ gst_aggregator_pad_buffer_consumed (pad);
+ GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
+ }
+ }
+
+ pad->priv->clipped_buffer = buffer;
+ }
+
+ if (self)
+ gst_object_unref (self);
+}
+
/**
* gst_aggregator_pad_steal_buffer:
* @pad: the pad to get buffer from
GstBuffer *
gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
{
- GstBuffer *buffer = NULL;
+ GstBuffer *buffer;
PAD_LOCK (pad);
- if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
- buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+ gst_aggregator_pad_clip_buffer_unlocked (pad);
+
+ buffer = pad->priv->clipped_buffer;
+ pad->priv->clipped_buffer = NULL;
if (buffer) {
- apply_buffer (pad, buffer, FALSE);
- pad->priv->num_buffers--;
- GST_TRACE_OBJECT (pad, "Consuming buffer");
- if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
- pad->priv->pending_eos = FALSE;
- pad->priv->eos = TRUE;
- }
- PAD_BROADCAST_EVENT (pad);
+ gst_aggregator_pad_buffer_consumed (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
+
PAD_UNLOCK (pad);
return buffer;
GstBuffer *
gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
{
- GstBuffer *buffer = NULL;
+ GstBuffer *buffer;
PAD_LOCK (pad);
- buffer = g_queue_peek_tail (&pad->priv->buffers);
- /* The tail should always be a buffer, because if it is an event,
- * it will be consumed immeditaly in gst_aggregator_steal_buffer */
- if (GST_IS_BUFFER (buffer))
- gst_buffer_ref (buffer);
- else
+ gst_aggregator_pad_clip_buffer_unlocked (pad);
+
+ if (pad->priv->clipped_buffer) {
+ buffer = gst_buffer_ref (pad->priv->clipped_buffer);
+ } else {
buffer = NULL;
+ }
PAD_UNLOCK (pad);
return buffer;