{
/* Following fields are protected by the PAD_LOCK */
GstFlowReturn flow_return;
- gboolean pending_flush_start;
- gboolean pending_flush_stop;
+
+ guint32 last_flush_start_seqnum;
+ guint32 last_flush_stop_seqnum;
gboolean first_buffer;
/* Our state is >= PAUSED */
gboolean running; /* protected by src_lock */
- /* seqnum from seek or segment,
- * to be applied to synthetic segment/eos events */
- gint seqnum;
+ /* seqnum from last seek or common seqnum to flush start events received
+ * on all pads, for flushing without a seek */
+ guint32 next_seqnum;
+ /* seqnum to apply to synthetic segment/eos events */
+ guint32 seqnum;
gboolean send_stream_start; /* protected by srcpad stream lock */
gboolean send_segment;
- gboolean flush_seeking;
- gboolean pending_flush_start;
+ gboolean flushing;
gboolean send_eos; /* protected by srcpad stream lock */
GstCaps *srccaps; /* protected by the srcpad stream lock */
}
GST_OBJECT_LOCK (self);
- if (self->priv->send_segment && !self->priv->flush_seeking) {
+ if (self->priv->send_segment && !self->priv->flushing) {
segment =
gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
}
- if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
+ if (priv->tags && priv->tags_changed && !self->priv->flushing) {
tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
priv->tags_changed = FALSE;
}
gst_aggregator_push_mandatory_events (self);
GST_OBJECT_LOCK (self);
- if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
+ if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
GST_OBJECT_UNLOCK (self);
return gst_pad_push (self->srcpad, buffer);
} else {
GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
- self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
+ self->priv->flushing, gst_pad_is_active (self->srcpad));
GST_OBJECT_UNLOCK (self);
gst_buffer_unref (buffer);
return GST_FLOW_OK;
continue;
GST_OBJECT_LOCK (self);
- if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
+ if (flow_return == GST_FLOW_FLUSHING && priv->flushing) {
/* We don't want to set the pads to flushing, but we want to
* stop the thread, so just break here */
GST_OBJECT_UNLOCK (self);
}
static gboolean
-_check_pending_flush_stop (GstAggregatorPad * pad)
-{
- gboolean res;
-
- PAD_LOCK (pad);
- res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
- PAD_UNLOCK (pad);
-
- return res;
-}
-
-static gboolean
gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
{
gboolean res = TRUE;
GST_DEBUG_OBJECT (self, "Flushing everything");
GST_OBJECT_LOCK (self);
priv->send_segment = TRUE;
- priv->flush_seeking = FALSE;
+ priv->flushing = FALSE;
priv->tags_changed = FALSE;
GST_OBJECT_UNLOCK (self);
if (klass->flush)
/* Called with GstAggregator's object lock held */
static gboolean
-gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
+gst_aggregator_all_flush_stop_received (GstAggregator * self, guint32 seqnum)
+{
+ GList *tmp;
+ GstAggregatorPad *tmppad;
+
+ for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
+ tmppad = (GstAggregatorPad *) tmp->data;
+
+ if (tmppad->priv->last_flush_stop_seqnum != seqnum)
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/* Called with GstAggregator's object lock held */
+
+static gboolean
+gst_aggregator_all_flush_start_received (GstAggregator * self, guint32 seqnum)
{
GList *tmp;
GstAggregatorPad *tmppad;
for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
tmppad = (GstAggregatorPad *) tmp->data;
- if (_check_pending_flush_stop (tmppad) == FALSE) {
- GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
- tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
+ if (tmppad->priv->last_flush_start_seqnum != seqnum) {
return FALSE;
}
}
{
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
+ guint32 seqnum = gst_event_get_seqnum (event);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad);
- if (padpriv->pending_flush_start) {
- GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
-
- padpriv->pending_flush_start = FALSE;
- padpriv->pending_flush_stop = TRUE;
- }
+ padpriv->last_flush_start_seqnum = seqnum;
PAD_UNLOCK (aggpad);
GST_OBJECT_LOCK (self);
- if (priv->flush_seeking) {
- /* If flush_seeking we forward the first FLUSH_START */
- if (priv->pending_flush_start) {
- priv->pending_flush_start = FALSE;
- GST_OBJECT_UNLOCK (self);
- GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
- gst_aggregator_stop_srcpad_task (self, event);
+ if (!priv->flushing && gst_aggregator_all_flush_start_received (self, seqnum)) {
+ /* Make sure we don't forward more than one FLUSH_START */
+ priv->flushing = TRUE;
+ priv->next_seqnum = seqnum;
+ GST_OBJECT_UNLOCK (self);
- GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
- GST_PAD_STREAM_LOCK (self->srcpad);
- GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
- event = NULL;
- } else {
- GST_OBJECT_UNLOCK (self);
- gst_event_unref (event);
- }
+ GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
+ gst_aggregator_stop_srcpad_task (self, event);
+
+ GST_INFO_OBJECT (self, "Getting STREAM_LOCK while flushing");
+ GST_PAD_STREAM_LOCK (self->srcpad);
+ GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
+
+ event = NULL;
} else {
- GST_OBJECT_UNLOCK (self);
gst_event_unref (event);
+ GST_OBJECT_UNLOCK (self);
}
+
PAD_FLUSH_UNLOCK (aggpad);
}
case GST_EVENT_FLUSH_START:
{
gst_aggregator_flush_start (self, aggpad, event);
- /* We forward only in one case: right after flush_seeking */
+ /* We forward only in one case: right after flushing */
event = NULL;
goto eat;
}
case GST_EVENT_FLUSH_STOP:
{
+ guint32 seqnum = gst_event_get_seqnum (event);
+
+ PAD_FLUSH_LOCK (aggpad);
+ PAD_LOCK (aggpad);
+ aggpad->priv->last_flush_stop_seqnum = seqnum;
+ PAD_UNLOCK (aggpad);
+
+ /* aggregate might be running if this FLUSH_STOP was not
+ * sent following a flushing seek, let's make sure we don't
+ * flush the pad's current buffer before aggregate has returned
+ */
+ GST_PAD_STREAM_LOCK (self->srcpad);
gst_aggregator_pad_flush (aggpad, self);
+ GST_PAD_STREAM_UNLOCK (self->srcpad);
+
GST_OBJECT_LOCK (self);
- if (priv->flush_seeking) {
- g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
- if (gst_aggregator_all_flush_stop_received_locked (self)) {
- GST_OBJECT_UNLOCK (self);
- /* That means we received FLUSH_STOP/FLUSH_STOP on
- * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
- gst_aggregator_flush (self);
- gst_pad_push_event (self->srcpad, event);
- event = NULL;
- SRC_LOCK (self);
- priv->send_eos = TRUE;
- SRC_BROADCAST (self);
- SRC_UNLOCK (self);
-
- GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
- GST_PAD_STREAM_UNLOCK (self->srcpad);
- gst_aggregator_start_srcpad_task (self);
- } else {
- GST_OBJECT_UNLOCK (self);
- }
+ if (priv->flushing
+ && gst_aggregator_all_flush_stop_received (self, seqnum)) {
+ GST_OBJECT_UNLOCK (self);
+ /* That means we received FLUSH_STOP/FLUSH_STOP on
+ * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
+ gst_aggregator_flush (self);
+ gst_pad_push_event (self->srcpad, event);
+ event = NULL;
+ SRC_LOCK (self);
+ priv->send_eos = TRUE;
+ SRC_BROADCAST (self);
+ SRC_UNLOCK (self);
+
+ GST_INFO_OBJECT (self,
+ "Flush stopped, releasing source pad STREAM_LOCK");
+ GST_PAD_STREAM_UNLOCK (self->srcpad);
+
+ gst_aggregator_start_srcpad_task (self);
} else {
GST_OBJECT_UNLOCK (self);
}
+ PAD_FLUSH_UNLOCK (aggpad);
+
/* We never forward the event */
goto eat;
}
gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
+ if (agg->priv->running) {
+ /* As sinkpads get deactivated after the src pad, we
+ * may have restarted the source pad task after receiving
+ * flush events on one of our sinkpads. Stop our src pad
+ * task again if that is the case */
+ gst_aggregator_stop_srcpad_task (agg, NULL);
+ }
+
return result;
}
GST_OBJECT_LOCK (self);
gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
flags, start_type, start, stop_type, stop, NULL);
- self->priv->seqnum = gst_event_get_seqnum (event);
+ self->priv->next_seqnum = gst_event_get_seqnum (event);
self->priv->first_buffer = FALSE;
GST_OBJECT_UNLOCK (self);
}
GST_STATE_UNLOCK (element);
-
return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
event);
}
gst_query_unref (seeking);
}
-
- if (evdata->flush) {
- PAD_LOCK (aggpad);
- aggpad->priv->pending_flush_start = FALSE;
- aggpad->priv->pending_flush_stop = FALSE;
- PAD_UNLOCK (aggpad);
- }
} else {
evdata->one_actually_seeked = TRUE;
}
evdata->result = TRUE;
evdata->one_actually_seeked = FALSE;
- /* We first need to set all pads as flushing in a first pass
- * as flush_start flush_stop is sometimes sent synchronously
- * while we send the seek event */
- if (evdata->flush) {
- GList *l;
-
- GST_OBJECT_LOCK (self);
- for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
- GstAggregatorPad *pad = l->data;
-
- PAD_LOCK (pad);
- pad->priv->pending_flush_start = TRUE;
- pad->priv->pending_flush_stop = FALSE;
- PAD_UNLOCK (pad);
- }
- GST_OBJECT_UNLOCK (self);
- }
-
gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
gst_event_unref (evdata->event);
flush = flags & GST_SEEK_FLAG_FLUSH;
GST_OBJECT_LOCK (self);
- if (flush) {
- priv->pending_flush_start = TRUE;
- priv->flush_seeking = TRUE;
- }
+ self->priv->next_seqnum = gst_event_get_seqnum (event);
gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
flags, start_type, start, stop_type, stop, NULL);
/* Seeking sets a position */
self->priv->first_buffer = FALSE;
+
+ if (flush)
+ priv->flushing = TRUE;
+
GST_OBJECT_UNLOCK (self);
+ if (flush) {
+ GstEvent *event = gst_event_new_flush_start ();
+
+ gst_event_set_seqnum (event, self->priv->next_seqnum);
+ gst_aggregator_stop_srcpad_task (self, event);
+ }
+
/* forward the seek upstream */
evdata.event = event;
evdata.flush = flush;
if (!evdata.result || !evdata.one_actually_seeked) {
GST_OBJECT_LOCK (self);
- priv->flush_seeking = FALSE;
- priv->pending_flush_start = FALSE;
+ priv->flushing = FALSE;
GST_OBJECT_UNLOCK (self);
}
/* deactivating */
GST_INFO_OBJECT (self, "Deactivating srcpad");
+
gst_aggregator_stop_srcpad_task (self, FALSE);
return TRUE;
ChainData data2 = { 0, };
TestData test = { 0, };
GstBuffer *buf;
+ guint32 seqnum;
_test_data_init (&test, TRUE);
/* now do a successful flushing seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
+ seqnum = gst_event_get_seqnum (event);
fail_unless (gst_pad_send_event (test.srcpad, event));
- /* flushing starts once one of the upstream elements sends the first
- * FLUSH_START */
- fail_unless_equals_int (test.flush_start_events, 0);
+ /* flushing starts when a flushing seek is received, and stops
+ * when all sink pads have received FLUSH_STOP */
+ fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0);
- /* send a first FLUSH_START on agg:sink_0, will be sent downstream */
+ /* send a first FLUSH_START on agg:sink_0, nothing will be sent
+ * downstream */
GST_DEBUG_OBJECT (data2.sinkpad, "send flush_start");
- fail_unless (gst_pad_push_event (data2.srcpad, gst_event_new_flush_start ()));
+ event = gst_event_new_flush_start ();
+ gst_event_set_seqnum (event, seqnum);
+ fail_unless (gst_pad_push_event (data2.srcpad, event));
fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0);
data2.expected_result = GST_FLOW_FLUSHING;
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
- /* this should send not additional flush_start */
+ /* this should send no additional flush_start */
GST_DEBUG_OBJECT (data1.sinkpad, "send flush_start");
- fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ()));
+ event = gst_event_new_flush_start ();
+ gst_event_set_seqnum (event, seqnum);
+ fail_unless (gst_pad_push_event (data1.srcpad, event));
fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0);
/* the first FLUSH_STOP is not forwarded downstream */
GST_DEBUG_OBJECT (data1.srcpad, "send flush_stop");
- fail_unless (gst_pad_push_event (data1.srcpad,
- gst_event_new_flush_stop (TRUE)));
+ event = gst_event_new_flush_stop (TRUE);
+ gst_event_set_seqnum (event, seqnum);
+ fail_unless (gst_pad_push_event (data1.srcpad, event));
fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0);
/* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
* sent downstream */
GST_DEBUG_OBJECT (data2.srcpad, "send flush_stop");
- gst_pad_push_event (data2.srcpad, gst_event_new_flush_stop (TRUE));
+ event = gst_event_new_flush_stop (TRUE);
+ gst_event_set_seqnum (event, seqnum);
+ gst_pad_push_event (data2.srcpad, event);
/* and the last FLUSH_STOP is forwarded downstream */
fail_unless_equals_int (test.flush_stop_events, 1);