plugins/elements/gstqueue.*: Be smarter when calculating the current amount of data...
authorWim Taymans <wim.taymans@gmail.com>
Thu, 10 May 2007 15:21:20 +0000 (15:21 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 10 May 2007 15:21:20 +0000 (15:21 +0000)
Original commit message from CVS:
* plugins/elements/gstqueue.c: (gst_queue_class_init),
(update_time_level), (gst_queue_locked_flush),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_push_one), (gst_queue_loop):
* plugins/elements/gstqueue.h:
Be smarter when calculating the current amount of data in the queue by
measuring the difference between start and end timestamps (in running
time) inside the queue. Fixes #432876.
API: GstQueue::pushing to notify elements that we are pushing data again
since the running signal is rather broken for this purpose.

ChangeLog
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h

index 60a9734..6b36fb5 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,16 @@
+2007-05-10  Wim Taymans  <wim@fluendo.com>
+
+       * plugins/elements/gstqueue.c: (gst_queue_class_init),
+       (update_time_level), (gst_queue_locked_flush),
+       (gst_queue_handle_sink_event), (gst_queue_chain),
+       (gst_queue_push_one), (gst_queue_loop):
+       * plugins/elements/gstqueue.h:
+       Be smarter when calculating the current amount of data in the queue by
+       measuring the difference between start and end timestamps (in running
+       time) inside the queue. Fixes #432876.
+       API: GstQueue::pushing to notify elements that we are pushing data again
+       since the running signal is rather broken for this purpose.
+
 2007-05-10  Stefan Kost  <ensonic@users.sf.net>
 
        * plugins/elements/gstqueue.c (_do_init, gst_queue_signals,
index ac65eb7..e19b5b8 100644 (file)
@@ -106,6 +106,7 @@ enum
   SIGNAL_UNDERRUN,
   SIGNAL_RUNNING,
   SIGNAL_OVERRUN,
+  SIGNAL_PUSHING,
   LAST_SIGNAL
 };
 
@@ -277,6 +278,17 @@ gst_queue_class_init (GstQueueClass * klass)
       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+  /**
+   * GstQueue::pushing:
+   * @queue: the queue instance
+   *
+   * Reports when the queue has enough data to start pushing data again on the
+   * source pad.
+   */
+  gst_queue_signals[SIGNAL_PUSHING] =
+      g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+      G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
+      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
 
   /* properties */
   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
@@ -490,6 +502,24 @@ gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
 }
 
 static void
+update_time_level (GstQueue * queue)
+{
+  gint64 sink_time, src_time;
+
+  sink_time =
+      gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+      queue->sink_segment.last_stop);
+
+  src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+      queue->src_segment.last_stop);
+
+  if (sink_time >= src_time)
+    queue->cur_level.time = sink_time - src_time;
+  else
+    queue->cur_level.time = 0;
+}
+
+static void
 gst_queue_locked_flush (GstQueue * queue)
 {
   while (!g_queue_is_empty (queue->queue)) {
@@ -505,6 +535,8 @@ gst_queue_locked_flush (GstQueue * queue)
   queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
   queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
   queue->min_threshold.time = queue->orig_min_threshold.time;
+  gst_segment_init (&queue->sink_segment, GST_FORMAT_UNDEFINED);
+  gst_segment_init (&queue->src_segment, GST_FORMAT_UNDEFINED);
 
   /* we deleted something... */
   g_cond_signal (queue->item_del);
@@ -559,6 +591,24 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       STATUS (queue, pad, "received EOS");
       have_eos = TRUE;
       break;
+    case GST_EVENT_NEWSEGMENT:
+    {
+      gboolean update;
+      GstFormat format;
+      gdouble rate, arate;
+      gint64 start, stop, time;
+
+      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+          &start, &stop, &time);
+
+      GST_DEBUG_OBJECT (queue, "received NEWSEGMENT in %s",
+          gst_format_get_name (format));
+
+      /* now configure the values */
+      gst_segment_set_newsegment_full (&queue->sink_segment, update,
+          rate, arate, format, start, stop, time);
+      break;
+    }
     default:
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* we put the event in the queue, we don't have to act ourselves */
@@ -611,19 +661,24 @@ gst_queue_is_filled (GstQueue * queue)
               queue->cur_level.time >= queue->max_size.time)));
 }
 
-
 static GstFlowReturn
 gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 {
   GstQueue *queue;
+  GstClockTime duration, timestamp;
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   /* we have to lock the queue since we span threads */
   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  duration = GST_BUFFER_DURATION (buffer);
+
   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
+      "adding buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+      GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
 
   /* We make space available if we're "full" according to whatever
    * the user defined as "full". Note that this only applies to buffers.
@@ -679,9 +734,29 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
          * to make things read-only. Also keep our list uptodate. */
         queue->cur_level.bytes -= GST_BUFFER_SIZE (leak);
         queue->cur_level.buffers--;
-        if (GST_BUFFER_DURATION (leak) != GST_CLOCK_TIME_NONE)
-          queue->cur_level.time -= GST_BUFFER_DURATION (leak);
 
+        timestamp = GST_BUFFER_TIMESTAMP (buffer);
+        duration = GST_BUFFER_DURATION (buffer);
+
+        /* update start time in queue */
+        if (queue->src_segment.format == GST_FORMAT_TIME) {
+          gint64 last_stop;
+
+          if (timestamp != GST_CLOCK_TIME_NONE)
+            last_stop = timestamp;
+          else
+            last_stop = queue->src_segment.last_stop;
+
+          gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
+              last_stop);
+
+          update_time_level (queue);
+        } else if (duration != GST_CLOCK_TIME_NONE) {
+          if (queue->cur_level.time > duration)
+            queue->cur_level.time -= duration;
+          else
+            queue->cur_level.time = 0;
+        }
         gst_buffer_unref (leak);
         break;
       }
@@ -724,9 +799,26 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   /* add buffer to the statistics */
   queue->cur_level.buffers++;
   queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
-  if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
-    queue->cur_level.time += GST_BUFFER_DURATION (buffer);
 
+  /* update start time in queue */
+  if (queue->sink_segment.format == GST_FORMAT_TIME) {
+    gint64 last_stop;
+
+    if (timestamp != GST_CLOCK_TIME_NONE)
+      last_stop = timestamp;
+    else
+      last_stop = queue->sink_segment.last_stop;
+
+    if (duration != GST_CLOCK_TIME_NONE)
+      last_stop += duration;
+
+    gst_segment_set_last_stop (&queue->sink_segment, GST_FORMAT_TIME,
+        last_stop);
+
+    update_time_level (queue);
+  } else if (duration != GST_CLOCK_TIME_NONE) {
+    queue->cur_level.time += duration;
+  }
   STATUS (queue, pad, "+ level");
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
@@ -771,15 +863,38 @@ gst_queue_push_one (GstQueue * queue)
 
   if (GST_IS_BUFFER (data)) {
     GstFlowReturn result;
+    GstClockTime timestamp, duration;
+    GstBuffer *buffer = GST_BUFFER (data);
 
     /* Update statistics */
     queue->cur_level.buffers--;
-    queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
-    if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
-      queue->cur_level.time -= GST_BUFFER_DURATION (data);
+    queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
+
+    timestamp = GST_BUFFER_TIMESTAMP (buffer);
+    duration = GST_BUFFER_DURATION (buffer);
+
+    /* update start time in queue */
+    if (queue->src_segment.format == GST_FORMAT_TIME) {
+      gint64 last_stop;
+
+      if (timestamp != GST_CLOCK_TIME_NONE)
+        last_stop = timestamp;
+      else
+        last_stop = queue->src_segment.last_stop;
+
+      gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
+          last_stop);
+
+      update_time_level (queue);
+    } else if (duration != GST_CLOCK_TIME_NONE) {
+      if (queue->cur_level.time > duration)
+        queue->cur_level.time -= duration;
+      else
+        queue->cur_level.time = 0;
+    }
 
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    result = gst_pad_push (queue->srcpad, GST_BUFFER (data));
+    result = gst_pad_push (queue->srcpad, buffer);
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     /* else result of push indicates what happens */
@@ -794,19 +909,40 @@ gst_queue_push_one (GstQueue * queue)
       gst_pad_pause_task (queue->srcpad);
     }
   } else if (GST_IS_EVENT (data)) {
-    if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
-      queue->cur_level.buffers = 0;
-      queue->cur_level.bytes = 0;
-      queue->cur_level.time = 0;
-      /* all incomming data is now unexpected */
-      queue->srcresult = GST_FLOW_UNEXPECTED;
-      /* and we don't need to process anymore */
-      GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
-      gst_pad_pause_task (queue->srcpad);
-      restart = FALSE;
+    GstEvent *event = GST_EVENT (data);
+
+    switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_EOS:
+        queue->cur_level.buffers = 0;
+        queue->cur_level.bytes = 0;
+        queue->cur_level.time = 0;
+        /* all incomming data is now unexpected */
+        queue->srcresult = GST_FLOW_UNEXPECTED;
+        /* and we don't need to process anymore */
+        GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
+        gst_pad_pause_task (queue->srcpad);
+        restart = FALSE;
+        break;
+      case GST_EVENT_NEWSEGMENT:
+      {
+        gboolean update;
+        GstFormat format;
+        gdouble rate, arate;
+        gint64 start, stop, time;
+
+        gst_event_parse_new_segment_full (event, &update, &rate, &arate,
+            &format, &start, &stop, &time);
+
+        /* now configure the values */
+        gst_segment_set_newsegment_full (&queue->src_segment, update,
+            rate, arate, format, start, stop, time);
+        break;
+      }
+      default:
+        break;
     }
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_pad_push_event (queue->srcpad, GST_EVENT (data));
+    gst_pad_push_event (queue->srcpad, event);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     if (restart == TRUE)
       return TRUE;
@@ -869,6 +1005,7 @@ restart:
     STATUS (queue, pad, "post-empty wait");
     GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+    g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_PUSHING], 0);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
   }
 
index 0de1218..05ca904 100644 (file)
@@ -75,6 +75,10 @@ struct _GstQueue {
   GstPad *sinkpad;
   GstPad *srcpad;
 
+  /* segments to keep track of timestamps */
+  GstSegment sink_segment;
+  GstSegment src_segment;
+
   /* flowreturn when srcpad is paused */
   GstFlowReturn srcresult;
 
@@ -106,6 +110,8 @@ struct _GstQueueClass {
   void (*running)      (GstQueue *queue);
   void (*overrun)      (GstQueue *queue);
 
+  void (*pushing)      (GstQueue *queue);
+
   gpointer _gst_reserved[GST_PADDING];
 };