From 0a711700067583f496bf8eaa41949c95e199df1b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 4 Nov 2008 15:56:55 +0000 Subject: [PATCH] Copy seqnums from events to messages so that they can all be related back to eachother. Original commit message from CVS: * gst/gstbin.c: (bin_handle_async_start), (gst_bin_handle_message_func), (gst_bin_query): * libs/gst/base/gstbasesink.c: (gst_base_sink_render_object), (gst_base_sink_event), (gst_base_sink_change_state): * libs/gst/base/gstbasesrc.c: (gst_base_src_perform_seek), (gst_base_src_loop), (gst_base_src_change_state): Copy seqnums from events to messages so that they can all be related back to eachother. --- ChangeLog | 11 ++++++++ gst/gstbin.c | 23 +++++++++++----- libs/gst/base/gstbasesink.c | 29 ++++++++++++++++---- libs/gst/base/gstbasesrc.c | 64 +++++++++++++++++++++++++++++++++++---------- 4 files changed, 101 insertions(+), 26 deletions(-) diff --git a/ChangeLog b/ChangeLog index d468628..94a0ac8 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,16 @@ 2008-11-04 Wim Taymans + * gst/gstbin.c: (bin_handle_async_start), + (gst_bin_handle_message_func), (gst_bin_query): + * libs/gst/base/gstbasesink.c: (gst_base_sink_render_object), + (gst_base_sink_event), (gst_base_sink_change_state): + * libs/gst/base/gstbasesrc.c: (gst_base_src_perform_seek), + (gst_base_src_loop), (gst_base_src_change_state): + Copy seqnums from events to messages so that they can all be related + back to eachother. + +2008-11-04 Wim Taymans + * tools/gst-launch.c: (event_loop): Print the message seqnums. diff --git a/gst/gstbin.c b/gst/gstbin.c index d522489..0591570 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -2519,7 +2519,6 @@ bin_handle_async_start (GstBin * bin, gboolean new_base_time) if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_NO_PREROLL) goto was_no_preroll; - old_state = GST_STATE (bin); /* when we PLAYING we go back to PAUSED, when preroll happens, we go back to @@ -2763,6 +2762,8 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message) { GstObject *src; GstMessageType type; + GstMessage *tmessage; + guint32 seqnum; src = GST_MESSAGE_SRC (message); type = GST_MESSAGE_TYPE (message); @@ -2784,9 +2785,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message) /* if we are completely EOS, we forward an EOS message */ if (eos) { - GST_DEBUG_OBJECT (bin, "all sinks posted EOS"); - gst_element_post_message (GST_ELEMENT_CAST (bin), - gst_message_new_eos (GST_OBJECT_CAST (bin))); + seqnum = gst_message_get_seqnum (message); + tmessage = gst_message_new_eos (GST_OBJECT_CAST (bin)); + gst_message_set_seqnum (tmessage, seqnum); + + GST_DEBUG_OBJECT (bin, + "all sinks posted EOS, posting seqnum #%" G_GUINT32_FORMAT, seqnum); + gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage); } break; } @@ -2827,10 +2832,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message) } GST_OBJECT_UNLOCK (bin); if (post) { + seqnum = gst_message_get_seqnum (message); + tmessage = gst_message_new_segment_done (GST_OBJECT_CAST (bin), + format, position); + gst_message_set_seqnum (tmessage, seqnum); + /* post segment done with latest format and position. */ - gst_element_post_message (GST_ELEMENT_CAST (bin), - gst_message_new_segment_done (GST_OBJECT_CAST (bin), - format, position)); + gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage); } break; } @@ -3245,6 +3253,7 @@ gst_bin_query (GstElement * element, GstQuery * query) fold_data.query = query; + /* set the result of the query to FALSE initially */ g_value_init (&ret, G_TYPE_BOOLEAN); g_value_set_boolean (&ret, res); diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index fb11e94..f77c46c 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -215,9 +215,13 @@ struct _GstBaseSinkPrivate /* caps for pull based scheduling */ GstCaps *pull_caps; + /* blocksize for pulling */ guint blocksize; gboolean discont; + + /* seqnum of the stream */ + guint32 seqnum; }; #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size)) @@ -2350,8 +2354,16 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad, goto flushing; if (G_LIKELY (event_res)) { + guint32 seqnum; + + seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event); + GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum); + switch (type) { case GST_EVENT_EOS: + { + GstMessage *message; + /* the EOS event is completely handled so we mark * ourselves as being in the EOS state. eos is also * protected by the object lock so we can read it when @@ -2359,11 +2371,15 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad, GST_OBJECT_LOCK (basesink); basesink->eos = TRUE; GST_OBJECT_UNLOCK (basesink); + /* ok, now we can post the message */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_eos (GST_OBJECT_CAST (basesink))); + + message = gst_message_new_eos (GST_OBJECT_CAST (basesink)); + gst_message_set_seqnum (message, seqnum); + gst_element_post_message (GST_ELEMENT_CAST (basesink), message); break; + } case GST_EVENT_NEWSEGMENT: /* configure the segment */ gst_base_sink_configure_segment (basesink, pad, event, @@ -2665,7 +2681,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) gst_event_unref (event); } else { /* we set the received EOS flag here so that we can use it when testing if - * we are prerolled and to refure more buffers. */ + * we are prerolled and to refuse more buffers. */ basesink->priv->received_eos = TRUE; /* EOS is a prerollable object, we call the unlocked version because it @@ -3998,10 +4014,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) basesink->playing_async = FALSE; basesink->need_preroll = FALSE; if (basesink->eos) { + GstMessage *message; + /* need to post EOS message here */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_eos (GST_OBJECT_CAST (basesink))); + message = gst_message_new_eos (GST_OBJECT_CAST (basesink)); + gst_message_set_seqnum (message, basesink->priv->seqnum); + gst_element_post_message (GST_ELEMENT_CAST (basesink), message); } else { GST_DEBUG_OBJECT (basesink, "signal preroll"); GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index e28cc1a..4c3b4e7 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -229,6 +229,9 @@ struct _GstBaseSrcPrivate GstClockTimeDiff ts_offset; gboolean do_timestamp; + + /* stream sequence number */ + guint32 seqnum; }; static GstElementClass *parent_class = NULL; @@ -1154,6 +1157,8 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) gboolean relative_seek = FALSE; gboolean seekseg_configured = FALSE; GstSegment seeksegment; + guint32 seqnum; + GstEvent *tevent; GST_DEBUG_OBJECT (src, "doing seek"); @@ -1180,14 +1185,19 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) } flush = flags & GST_SEEK_FLAG_FLUSH; + seqnum = gst_event_get_seqnum (event); } else { flush = FALSE; + /* get next seqnum */ + seqnum = gst_util_seqnum_next (); } /* send flush start */ - if (flush) - gst_pad_push_event (src->srcpad, gst_event_new_flush_start ()); - else + if (flush) { + tevent = gst_event_new_flush_start (); + gst_event_set_seqnum (tevent, seqnum); + gst_pad_push_event (src->srcpad, tevent); + } else gst_pad_pause_task (src->srcpad); /* unblock streaming thread. */ @@ -1197,6 +1207,14 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) * because the task is paused, our streaming thread stopped * or because our peer is flushing. */ GST_PAD_STREAM_LOCK (src->srcpad); + if (G_UNLIKELY (src->priv->seqnum == seqnum)) { + /* we have seen this event before, issue a warning for now */ + GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT, + seqnum); + } else { + src->priv->seqnum = seqnum; + GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum); + } gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL); @@ -1239,9 +1257,11 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) /* and prepare to continue streaming */ if (flush) { + tevent = gst_event_new_flush_stop (); + gst_event_set_seqnum (tevent, seqnum); /* send flush stop, peer will accept data and events again. We * are not yet providing data as we still have the STREAM_LOCK. */ - gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ()); + gst_pad_push_event (src->srcpad, tevent); } else if (res && src->data.ABI.running) { /* we are running the current segment and doing a non-flushing seek, * close the segment first based on the last_stop. */ @@ -1255,6 +1275,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) gst_event_new_new_segment_full (TRUE, src->segment.rate, src->segment.applied_rate, src->segment.format, src->segment.start, src->segment.last_stop, src->segment.time); + gst_event_set_seqnum (src->priv->close_segment, seqnum); } /* The subclass must have converted the segment to the processing format @@ -1271,9 +1292,13 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) memcpy (&src->segment, &seeksegment, sizeof (GstSegment)); if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { - gst_element_post_message (GST_ELEMENT (src), - gst_message_new_segment_start (GST_OBJECT (src), - src->segment.format, src->segment.last_stop)); + GstMessage *message; + + message = gst_message_new_segment_start (GST_OBJECT (src), + src->segment.format, src->segment.last_stop); + gst_message_set_seqnum (message, seqnum); + + gst_element_post_message (GST_ELEMENT (src), message); } /* for deriving a stop position for the playback segment from the seek @@ -1301,6 +1326,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) src->segment.rate, src->segment.applied_rate, src->segment.format, src->segment.start, src->segment.last_stop, src->segment.time); } + gst_event_set_seqnum (src->priv->start_segment, seqnum); } src->priv->discont = TRUE; @@ -2273,6 +2299,7 @@ flushing: pause: { const gchar *reason = gst_flow_get_name (ret); + GstEvent *event; GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); src->data.ABI.running = FALSE; @@ -2281,20 +2308,27 @@ pause: if (ret == GST_FLOW_UNEXPECTED) { /* perform EOS logic */ if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { - gst_element_post_message (GST_ELEMENT_CAST (src), - gst_message_new_segment_done (GST_OBJECT_CAST (src), - src->segment.format, src->segment.last_stop)); + GstMessage *message; + + message = gst_message_new_segment_done (GST_OBJECT_CAST (src), + src->segment.format, src->segment.last_stop); + gst_message_set_seqnum (message, src->priv->seqnum); + gst_element_post_message (GST_ELEMENT_CAST (src), message); } else { - gst_pad_push_event (pad, gst_event_new_eos ()); + event = gst_event_new_eos (); + gst_event_set_seqnum (event, src->priv->seqnum); + gst_pad_push_event (pad, event); src->priv->last_sent_eos = TRUE; } } else { + event = gst_event_new_eos (); + gst_event_set_seqnum (event, src->priv->seqnum); /* for fatal errors we post an error message, post the error * first so the app knows about the error first. */ GST_ELEMENT_ERROR (src, STREAM, FAILED, (_("Internal data flow error.")), ("streaming task paused, reason %s (%d)", reason, ret)); - gst_pad_push_event (pad, gst_event_new_eos ()); + gst_pad_push_event (pad, event); src->priv->last_sent_eos = TRUE; } } @@ -2793,7 +2827,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_READY: { - GstEvent **event_p; + GstEvent **event_p, *event; /* we don't need to unblock anything here, the pad deactivation code * already did this */ @@ -2803,7 +2837,9 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) * the EOS event to the element */ if (!basesrc->priv->last_sent_eos) { GST_DEBUG_OBJECT (basesrc, "Sending EOS event"); - gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ()); + event = gst_event_new_eos (); + gst_event_set_seqnum (event, basesrc->priv->seqnum); + gst_pad_push_event (basesrc->srcpad, event); basesrc->priv->last_sent_eos = TRUE; } g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); -- 2.7.4