Copy seqnums from events to messages so that they can all be related back to eachother.
authorWim Taymans <wim.taymans@gmail.com>
Tue, 4 Nov 2008 15:56:55 +0000 (15:56 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 4 Nov 2008 15:56:55 +0000 (15:56 +0000)
Original commit message from CVS:
* gst/gstbin.c: (bin_handle_async_start),
(gst_bin_handle_message_func), (gst_bin_query):
* libs/gst/base/gstbasesink.c: (gst_base_sink_render_object),
(gst_base_sink_event), (gst_base_sink_change_state):
* libs/gst/base/gstbasesrc.c: (gst_base_src_perform_seek),
(gst_base_src_loop), (gst_base_src_change_state):
Copy seqnums from events to messages so that they can all be related
back to eachother.

ChangeLog
gst/gstbin.c
libs/gst/base/gstbasesink.c
libs/gst/base/gstbasesrc.c

index d468628..94a0ac8 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,16 @@
 2008-11-04  Wim Taymans  <wim.taymans@collabora.co.uk>
 
+       * gst/gstbin.c: (bin_handle_async_start),
+       (gst_bin_handle_message_func), (gst_bin_query):
+       * libs/gst/base/gstbasesink.c: (gst_base_sink_render_object),
+       (gst_base_sink_event), (gst_base_sink_change_state):
+       * libs/gst/base/gstbasesrc.c: (gst_base_src_perform_seek),
+       (gst_base_src_loop), (gst_base_src_change_state):
+       Copy seqnums from events to messages so that they can all be related
+       back to eachother.
+
+2008-11-04  Wim Taymans  <wim.taymans@collabora.co.uk>
+
        * tools/gst-launch.c: (event_loop):
        Print the message seqnums.
 
index d522489..0591570 100644 (file)
@@ -2519,7 +2519,6 @@ bin_handle_async_start (GstBin * bin, gboolean new_base_time)
   if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_NO_PREROLL)
     goto was_no_preroll;
 
-
   old_state = GST_STATE (bin);
 
   /* when we PLAYING we go back to PAUSED, when preroll happens, we go back to
@@ -2763,6 +2762,8 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
 {
   GstObject *src;
   GstMessageType type;
+  GstMessage *tmessage;
+  guint32 seqnum;
 
   src = GST_MESSAGE_SRC (message);
   type = GST_MESSAGE_TYPE (message);
@@ -2784,9 +2785,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
 
       /* if we are completely EOS, we forward an EOS message */
       if (eos) {
-        GST_DEBUG_OBJECT (bin, "all sinks posted EOS");
-        gst_element_post_message (GST_ELEMENT_CAST (bin),
-            gst_message_new_eos (GST_OBJECT_CAST (bin)));
+        seqnum = gst_message_get_seqnum (message);
+        tmessage = gst_message_new_eos (GST_OBJECT_CAST (bin));
+        gst_message_set_seqnum (tmessage, seqnum);
+
+        GST_DEBUG_OBJECT (bin,
+            "all sinks posted EOS, posting seqnum #%" G_GUINT32_FORMAT, seqnum);
+        gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
       }
       break;
     }
@@ -2827,10 +2832,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
       }
       GST_OBJECT_UNLOCK (bin);
       if (post) {
+        seqnum = gst_message_get_seqnum (message);
+        tmessage = gst_message_new_segment_done (GST_OBJECT_CAST (bin),
+            format, position);
+        gst_message_set_seqnum (tmessage, seqnum);
+
         /* post segment done with latest format and position. */
-        gst_element_post_message (GST_ELEMENT_CAST (bin),
-            gst_message_new_segment_done (GST_OBJECT_CAST (bin),
-                format, position));
+        gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
       }
       break;
     }
@@ -3245,6 +3253,7 @@ gst_bin_query (GstElement * element, GstQuery * query)
 
   fold_data.query = query;
 
+  /* set the result of the query to FALSE initially */
   g_value_init (&ret, G_TYPE_BOOLEAN);
   g_value_set_boolean (&ret, res);
 
index fb11e94..f77c46c 100644 (file)
@@ -215,9 +215,13 @@ struct _GstBaseSinkPrivate
   /* caps for pull based scheduling */
   GstCaps *pull_caps;
 
+  /* blocksize for pulling */
   guint blocksize;
 
   gboolean discont;
+
+  /* seqnum of the stream */
+  guint32 seqnum;
 };
 
 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
@@ -2350,8 +2354,16 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
       goto flushing;
 
     if (G_LIKELY (event_res)) {
+      guint32 seqnum;
+
+      seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
+      GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
+
       switch (type) {
         case GST_EVENT_EOS:
+        {
+          GstMessage *message;
+
           /* the EOS event is completely handled so we mark
            * ourselves as being in the EOS state. eos is also 
            * protected by the object lock so we can read it when 
@@ -2359,11 +2371,15 @@ gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
           GST_OBJECT_LOCK (basesink);
           basesink->eos = TRUE;
           GST_OBJECT_UNLOCK (basesink);
+
           /* ok, now we can post the message */
           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
-          gst_element_post_message (GST_ELEMENT_CAST (basesink),
-              gst_message_new_eos (GST_OBJECT_CAST (basesink)));
+
+          message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
+          gst_message_set_seqnum (message, seqnum);
+          gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
           break;
+        }
         case GST_EVENT_NEWSEGMENT:
           /* configure the segment */
           gst_base_sink_configure_segment (basesink, pad, event,
@@ -2665,7 +2681,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
         gst_event_unref (event);
       } else {
         /* we set the received EOS flag here so that we can use it when testing if
-         * we are prerolled and to refure more buffers. */
+         * we are prerolled and to refuse more buffers. */
         basesink->priv->received_eos = TRUE;
 
         /* EOS is a prerollable object, we call the unlocked version because it
@@ -3998,10 +4014,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
         basesink->playing_async = FALSE;
         basesink->need_preroll = FALSE;
         if (basesink->eos) {
+          GstMessage *message;
+
           /* need to post EOS message here */
           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
-          gst_element_post_message (GST_ELEMENT_CAST (basesink),
-              gst_message_new_eos (GST_OBJECT_CAST (basesink)));
+          message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
+          gst_message_set_seqnum (message, basesink->priv->seqnum);
+          gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
         } else {
           GST_DEBUG_OBJECT (basesink, "signal preroll");
           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
index e28cc1a..4c3b4e7 100644 (file)
@@ -229,6 +229,9 @@ struct _GstBaseSrcPrivate
   GstClockTimeDiff ts_offset;
 
   gboolean do_timestamp;
+
+  /* stream sequence number */
+  guint32 seqnum;
 };
 
 static GstElementClass *parent_class = NULL;
@@ -1154,6 +1157,8 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
   gboolean relative_seek = FALSE;
   gboolean seekseg_configured = FALSE;
   GstSegment seeksegment;
+  guint32 seqnum;
+  GstEvent *tevent;
 
   GST_DEBUG_OBJECT (src, "doing seek");
 
@@ -1180,14 +1185,19 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
     }
 
     flush = flags & GST_SEEK_FLAG_FLUSH;
+    seqnum = gst_event_get_seqnum (event);
   } else {
     flush = FALSE;
+    /* get next seqnum */
+    seqnum = gst_util_seqnum_next ();
   }
 
   /* send flush start */
-  if (flush)
-    gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
-  else
+  if (flush) {
+    tevent = gst_event_new_flush_start ();
+    gst_event_set_seqnum (tevent, seqnum);
+    gst_pad_push_event (src->srcpad, tevent);
+  } else
     gst_pad_pause_task (src->srcpad);
 
   /* unblock streaming thread. */
@@ -1197,6 +1207,14 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
    * because the task is paused, our streaming thread stopped 
    * or because our peer is flushing. */
   GST_PAD_STREAM_LOCK (src->srcpad);
+  if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
+    /* we have seen this event before, issue a warning for now */
+    GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
+        seqnum);
+  } else {
+    src->priv->seqnum = seqnum;
+    GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
+  }
 
   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
 
@@ -1239,9 +1257,11 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
 
   /* and prepare to continue streaming */
   if (flush) {
+    tevent = gst_event_new_flush_stop ();
+    gst_event_set_seqnum (tevent, seqnum);
     /* send flush stop, peer will accept data and events again. We
      * are not yet providing data as we still have the STREAM_LOCK. */
-    gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
+    gst_pad_push_event (src->srcpad, tevent);
   } else if (res && src->data.ABI.running) {
     /* we are running the current segment and doing a non-flushing seek, 
      * close the segment first based on the last_stop. */
@@ -1255,6 +1275,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
         gst_event_new_new_segment_full (TRUE,
         src->segment.rate, src->segment.applied_rate, src->segment.format,
         src->segment.start, src->segment.last_stop, src->segment.time);
+    gst_event_set_seqnum (src->priv->close_segment, seqnum);
   }
 
   /* The subclass must have converted the segment to the processing format 
@@ -1271,9 +1292,13 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
 
     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
-      gst_element_post_message (GST_ELEMENT (src),
-          gst_message_new_segment_start (GST_OBJECT (src),
-              src->segment.format, src->segment.last_stop));
+      GstMessage *message;
+
+      message = gst_message_new_segment_start (GST_OBJECT (src),
+          src->segment.format, src->segment.last_stop);
+      gst_message_set_seqnum (message, seqnum);
+
+      gst_element_post_message (GST_ELEMENT (src), message);
     }
 
     /* for deriving a stop position for the playback segment from the seek
@@ -1301,6 +1326,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
           src->segment.rate, src->segment.applied_rate, src->segment.format,
           src->segment.start, src->segment.last_stop, src->segment.time);
     }
+    gst_event_set_seqnum (src->priv->start_segment, seqnum);
   }
 
   src->priv->discont = TRUE;
@@ -2273,6 +2299,7 @@ flushing:
 pause:
   {
     const gchar *reason = gst_flow_get_name (ret);
+    GstEvent *event;
 
     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
     src->data.ABI.running = FALSE;
@@ -2281,20 +2308,27 @@ pause:
       if (ret == GST_FLOW_UNEXPECTED) {
         /* perform EOS logic */
         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
-          gst_element_post_message (GST_ELEMENT_CAST (src),
-              gst_message_new_segment_done (GST_OBJECT_CAST (src),
-                  src->segment.format, src->segment.last_stop));
+          GstMessage *message;
+
+          message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
+              src->segment.format, src->segment.last_stop);
+          gst_message_set_seqnum (message, src->priv->seqnum);
+          gst_element_post_message (GST_ELEMENT_CAST (src), message);
         } else {
-          gst_pad_push_event (pad, gst_event_new_eos ());
+          event = gst_event_new_eos ();
+          gst_event_set_seqnum (event, src->priv->seqnum);
+          gst_pad_push_event (pad, event);
           src->priv->last_sent_eos = TRUE;
         }
       } else {
+        event = gst_event_new_eos ();
+        gst_event_set_seqnum (event, src->priv->seqnum);
         /* for fatal errors we post an error message, post the error
          * first so the app knows about the error first. */
         GST_ELEMENT_ERROR (src, STREAM, FAILED,
             (_("Internal data flow error.")),
             ("streaming task paused, reason %s (%d)", reason, ret));
-        gst_pad_push_event (pad, gst_event_new_eos ());
+        gst_pad_push_event (pad, event);
         src->priv->last_sent_eos = TRUE;
       }
     }
@@ -2793,7 +2827,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
     {
-      GstEvent **event_p;
+      GstEvent **event_p, *event;
 
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
@@ -2803,7 +2837,9 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
        * the EOS event to the element */
       if (!basesrc->priv->last_sent_eos) {
         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
-        gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
+        event = gst_event_new_eos ();
+        gst_event_set_seqnum (event, basesrc->priv->seqnum);
+        gst_pad_push_event (basesrc->srcpad, event);
         basesrc->priv->last_sent_eos = TRUE;
       }
       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);