if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_NO_PREROLL)
goto was_no_preroll;
-
old_state = GST_STATE (bin);
/* when we PLAYING we go back to PAUSED, when preroll happens, we go back to
{
GstObject *src;
GstMessageType type;
+ GstMessage *tmessage;
+ guint32 seqnum;
src = GST_MESSAGE_SRC (message);
type = GST_MESSAGE_TYPE (message);
/* if we are completely EOS, we forward an EOS message */
if (eos) {
- GST_DEBUG_OBJECT (bin, "all sinks posted EOS");
- gst_element_post_message (GST_ELEMENT_CAST (bin),
- gst_message_new_eos (GST_OBJECT_CAST (bin)));
+ seqnum = gst_message_get_seqnum (message);
+ tmessage = gst_message_new_eos (GST_OBJECT_CAST (bin));
+ gst_message_set_seqnum (tmessage, seqnum);
+
+ GST_DEBUG_OBJECT (bin,
+ "all sinks posted EOS, posting seqnum #%" G_GUINT32_FORMAT, seqnum);
+ gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
}
break;
}
}
GST_OBJECT_UNLOCK (bin);
if (post) {
+ seqnum = gst_message_get_seqnum (message);
+ tmessage = gst_message_new_segment_done (GST_OBJECT_CAST (bin),
+ format, position);
+ gst_message_set_seqnum (tmessage, seqnum);
+
/* post segment done with latest format and position. */
- gst_element_post_message (GST_ELEMENT_CAST (bin),
- gst_message_new_segment_done (GST_OBJECT_CAST (bin),
- format, position));
+ gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
}
break;
}
fold_data.query = query;
+ /* set the result of the query to FALSE initially */
g_value_init (&ret, G_TYPE_BOOLEAN);
g_value_set_boolean (&ret, res);
/* caps for pull based scheduling */
GstCaps *pull_caps;
+ /* blocksize for pulling */
guint blocksize;
gboolean discont;
+
+ /* seqnum of the stream */
+ guint32 seqnum;
};
#define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
goto flushing;
if (G_LIKELY (event_res)) {
+ guint32 seqnum;
+
+ seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
+ GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
+
switch (type) {
case GST_EVENT_EOS:
+ {
+ GstMessage *message;
+
/* the EOS event is completely handled so we mark
* ourselves as being in the EOS state. eos is also
* protected by the object lock so we can read it when
GST_OBJECT_LOCK (basesink);
basesink->eos = TRUE;
GST_OBJECT_UNLOCK (basesink);
+
/* ok, now we can post the message */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
- gst_element_post_message (GST_ELEMENT_CAST (basesink),
- gst_message_new_eos (GST_OBJECT_CAST (basesink)));
+
+ message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
+ gst_message_set_seqnum (message, seqnum);
+ gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
break;
+ }
case GST_EVENT_NEWSEGMENT:
/* configure the segment */
gst_base_sink_configure_segment (basesink, pad, event,
gst_event_unref (event);
} else {
/* we set the received EOS flag here so that we can use it when testing if
- * we are prerolled and to refure more buffers. */
+ * we are prerolled and to refuse more buffers. */
basesink->priv->received_eos = TRUE;
/* EOS is a prerollable object, we call the unlocked version because it
basesink->playing_async = FALSE;
basesink->need_preroll = FALSE;
if (basesink->eos) {
+ GstMessage *message;
+
/* need to post EOS message here */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
- gst_element_post_message (GST_ELEMENT_CAST (basesink),
- gst_message_new_eos (GST_OBJECT_CAST (basesink)));
+ message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
+ gst_message_set_seqnum (message, basesink->priv->seqnum);
+ gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
} else {
GST_DEBUG_OBJECT (basesink, "signal preroll");
GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
GstClockTimeDiff ts_offset;
gboolean do_timestamp;
+
+ /* stream sequence number */
+ guint32 seqnum;
};
static GstElementClass *parent_class = NULL;
gboolean relative_seek = FALSE;
gboolean seekseg_configured = FALSE;
GstSegment seeksegment;
+ guint32 seqnum;
+ GstEvent *tevent;
GST_DEBUG_OBJECT (src, "doing seek");
}
flush = flags & GST_SEEK_FLAG_FLUSH;
+ seqnum = gst_event_get_seqnum (event);
} else {
flush = FALSE;
+ /* get next seqnum */
+ seqnum = gst_util_seqnum_next ();
}
/* send flush start */
- if (flush)
- gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
- else
+ if (flush) {
+ tevent = gst_event_new_flush_start ();
+ gst_event_set_seqnum (tevent, seqnum);
+ gst_pad_push_event (src->srcpad, tevent);
+ } else
gst_pad_pause_task (src->srcpad);
/* unblock streaming thread. */
* because the task is paused, our streaming thread stopped
* or because our peer is flushing. */
GST_PAD_STREAM_LOCK (src->srcpad);
+ if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
+ /* we have seen this event before, issue a warning for now */
+ GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
+ seqnum);
+ } else {
+ src->priv->seqnum = seqnum;
+ GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
+ }
gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
/* and prepare to continue streaming */
if (flush) {
+ tevent = gst_event_new_flush_stop ();
+ gst_event_set_seqnum (tevent, seqnum);
/* send flush stop, peer will accept data and events again. We
* are not yet providing data as we still have the STREAM_LOCK. */
- gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
+ gst_pad_push_event (src->srcpad, tevent);
} else if (res && src->data.ABI.running) {
/* we are running the current segment and doing a non-flushing seek,
* close the segment first based on the last_stop. */
gst_event_new_new_segment_full (TRUE,
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.start, src->segment.last_stop, src->segment.time);
+ gst_event_set_seqnum (src->priv->close_segment, seqnum);
}
/* The subclass must have converted the segment to the processing format
memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
- gst_element_post_message (GST_ELEMENT (src),
- gst_message_new_segment_start (GST_OBJECT (src),
- src->segment.format, src->segment.last_stop));
+ GstMessage *message;
+
+ message = gst_message_new_segment_start (GST_OBJECT (src),
+ src->segment.format, src->segment.last_stop);
+ gst_message_set_seqnum (message, seqnum);
+
+ gst_element_post_message (GST_ELEMENT (src), message);
}
/* for deriving a stop position for the playback segment from the seek
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.start, src->segment.last_stop, src->segment.time);
}
+ gst_event_set_seqnum (src->priv->start_segment, seqnum);
}
src->priv->discont = TRUE;
pause:
{
const gchar *reason = gst_flow_get_name (ret);
+ GstEvent *event;
GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
src->data.ABI.running = FALSE;
if (ret == GST_FLOW_UNEXPECTED) {
/* perform EOS logic */
if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
- gst_element_post_message (GST_ELEMENT_CAST (src),
- gst_message_new_segment_done (GST_OBJECT_CAST (src),
- src->segment.format, src->segment.last_stop));
+ GstMessage *message;
+
+ message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
+ src->segment.format, src->segment.last_stop);
+ gst_message_set_seqnum (message, src->priv->seqnum);
+ gst_element_post_message (GST_ELEMENT_CAST (src), message);
} else {
- gst_pad_push_event (pad, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, src->priv->seqnum);
+ gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
} else {
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, src->priv->seqnum);
/* for fatal errors we post an error message, post the error
* first so the app knows about the error first. */
GST_ELEMENT_ERROR (src, STREAM, FAILED,
(_("Internal data flow error.")),
("streaming task paused, reason %s (%d)", reason, ret));
- gst_pad_push_event (pad, gst_event_new_eos ());
+ gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
}
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
{
- GstEvent **event_p;
+ GstEvent **event_p, *event;
/* we don't need to unblock anything here, the pad deactivation code
* already did this */
* the EOS event to the element */
if (!basesrc->priv->last_sent_eos) {
GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
- gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, basesrc->priv->seqnum);
+ gst_pad_push_event (basesrc->srcpad, event);
basesrc->priv->last_sent_eos = TRUE;
}
g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);