baseparse: modify reverse playback handling
authorMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Tue, 14 Feb 2012 18:33:46 +0000 (19:33 +0100)
committerMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Tue, 14 Feb 2012 18:33:46 +0000 (19:33 +0100)
... so as to allow the push-mode case to provide data to subclass
on a buffer by buffer basis (as in regular forward case), rather
than all buffers of a fragment chucked together.

Also refactor buffer handling some more, and add some debug.

libs/gst/base/gstbaseparse.c

index fa3c849..8767e99 100644 (file)
@@ -314,6 +314,7 @@ struct _GstBaseParsePrivate
 
   /* reverse playback */
   GSList *buffers_pending;
+  GSList *buffers_head;
   GSList *buffers_queued;
   GSList *buffers_send;
   GstClockTime last_ts;
@@ -435,8 +436,11 @@ static gint64 gst_base_parse_find_offset (GstBaseParse * parse,
 static GstFlowReturn gst_base_parse_locate_time (GstBaseParse * parse,
     GstClockTime * _time, gint64 * _offset);
 
-static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse,
-    gboolean push_only);
+static GstFlowReturn gst_base_parse_start_fragment (GstBaseParse * parse);
+static GstFlowReturn gst_base_parse_finish_fragment (GstBaseParse * parse,
+    gboolean prev_head);
+
+static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse);
 
 static gboolean gst_base_parse_is_seekable (GstBaseParse * parse);
 
@@ -450,6 +454,9 @@ gst_base_parse_clear_queues (GstBaseParse * parse)
       NULL);
   g_slist_free (parse->priv->buffers_pending);
   parse->priv->buffers_pending = NULL;
+  g_slist_foreach (parse->priv->buffers_head, (GFunc) gst_buffer_unref, NULL);
+  g_slist_free (parse->priv->buffers_head);
+  parse->priv->buffers_head = NULL;
   g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL);
   g_slist_free (parse->priv->buffers_send);
   parse->priv->buffers_send = NULL;
@@ -1049,7 +1056,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
       if (in_segment->rate > 0.0)
         gst_base_parse_drain (parse);
       else
-        gst_base_parse_process_fragment (parse, FALSE);
+        gst_base_parse_finish_fragment (parse, FALSE);
       gst_adapter_clear (parse->priv->adapter);
 
       parse->priv->offset = offset;
@@ -1088,7 +1095,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
       if (parse->segment.rate > 0.0)
         gst_base_parse_drain (parse);
       else
-        gst_base_parse_process_fragment (parse, FALSE);
+        gst_base_parse_finish_fragment (parse, TRUE);
 
       /* If we STILL have zero frames processed, fire an error */
       if (parse->priv->framecount == 0) {
@@ -1734,7 +1741,9 @@ gst_base_parse_unprepare_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
   gst_base_parse_frame_update (parse, frame, NULL);
 }
 
-/* takes ownership of @buffer */
+/* Wraps buffer in a frame and dispatches to subclass.
+ * Also manages data skipping and offset handling (including adapter flushing).
+ * Takes ownership of @buffer */
 static GstFlowReturn
 gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
     gint * skip, gint * flushed)
@@ -1745,6 +1754,12 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
 
   g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR);
 
+  GST_LOG_OBJECT (parse,
+      "handling buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT
+      ", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
   /* track what is being flushed during this single round of frame processing */
   parse->priv->flushed = 0;
   *skip = 0;
@@ -1759,15 +1774,16 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
   ret = klass->handle_frame (parse, frame, skip);
   gst_base_parse_unprepare_frame (parse, frame);
 
-  if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
-    gst_adapter_clear (parse->priv->adapter);
-  }
-
   *flushed = parse->priv->flushed;
 
   GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d",
       *skip, *flushed);
 
+  if (ret != GST_FLOW_OK) {
+    GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
+    goto exit;
+  }
+
   /* subclass can only do one of these, or semantics are too unclear */
   g_assert (*skip == 0 || *flushed == 0);
 
@@ -1779,13 +1795,41 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
     parse->priv->prev_frame = NULL;
   }
 
+  /* track skipping */
+  if (*skip > 0) {
+    GstClockTime timestamp;
+    GstBuffer *outbuf;
+
+    GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", *skip);
+    if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
+      /* reverse playback, and no frames found yet, so we are skipping
+       * the leading part of a fragment, which may form the tail of
+       * fragment coming later, hopefully subclass skips efficiently ... */
+      timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
+      outbuf = gst_adapter_take_buffer (parse->priv->adapter, *skip);
+      outbuf = gst_buffer_make_writable (outbuf);
+      GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
+      parse->priv->buffers_head =
+          g_slist_prepend (parse->priv->buffers_head, outbuf);
+      outbuf = NULL;
+    } else {
+      gst_adapter_flush (parse->priv->adapter, *skip);
+    }
+    if (!parse->priv->discont)
+      parse->priv->sync_offset = parse->priv->offset;
+    parse->priv->offset += *skip;
+    parse->priv->discont = TRUE;
+    /* check for indefinite skipping */
+    if (ret == GST_FLOW_OK)
+      ret = gst_base_parse_check_sync (parse);
+  }
+
   parse->priv->offset += *flushed;
 
-#ifndef GST_DISABLE_GST_DEBUG
-  if (ret != GST_FLOW_OK) {
-    GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
+exit:
+  if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
+    gst_adapter_clear (parse->priv->adapter);
   }
-#endif
 
   return ret;
 }
@@ -2281,7 +2325,34 @@ gst_base_parse_send_buffers (GstBaseParse * parse)
   return ret;
 }
 
-/* gst_base_parse_process_fragment:
+/* gst_base_parse_start_fragment:
+ *
+ * Prepares for processing a reverse playback (forward) fragment
+ * by (re)setting proper state variables.
+ */
+static GstFlowReturn
+gst_base_parse_start_fragment (GstBaseParse * parse)
+{
+  GST_LOG_OBJECT (parse, "starting fragment");
+
+  /* invalidate so no fall-back timestamping is performed;
+   * ok if taken from subclass or upstream */
+  parse->priv->next_ts = GST_CLOCK_TIME_NONE;
+  parse->priv->prev_ts = GST_CLOCK_TIME_NONE;
+  /* prevent it hanging around stop all the time */
+  parse->segment.position = GST_CLOCK_TIME_NONE;
+  /* mark next run */
+  parse->priv->discont = TRUE;
+
+  /* head of previous fragment is now pending tail of current fragment */
+  parse->priv->buffers_pending = parse->priv->buffers_head;
+  parse->priv->buffers_head = NULL;
+
+  return GST_FLOW_OK;
+}
+
+
+/* gst_base_parse_finish_fragment:
  *
  * Processes a reverse playback (forward) fragment:
  * - append head of last fragment that was skipped to current fragment data
@@ -2290,40 +2361,35 @@ gst_base_parse_send_buffers (GstBaseParse * parse)
  * - push queued data
  */
 static GstFlowReturn
-gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only)
+gst_base_parse_finish_fragment (GstBaseParse * parse, gboolean prev_head)
 {
   GstBuffer *buf;
   GstFlowReturn ret = GST_FLOW_OK;
   gboolean seen_key = FALSE, seen_delta = FALSE;
 
-  if (push_only)
-    goto push;
+  GST_LOG_OBJECT (parse, "finishing fragment");
 
   /* restore order */
   parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
   while (parse->priv->buffers_pending) {
     buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
-    GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
-        gst_buffer_get_size (buf));
-    gst_adapter_push (parse->priv->adapter, buf);
+    if (prev_head) {
+      GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
+          gst_buffer_get_size (buf));
+      gst_adapter_push (parse->priv->adapter, buf);
+    } else {
+      GST_LOG_OBJECT (parse, "discarding head buffer");
+      gst_buffer_unref (buf);
+    }
     parse->priv->buffers_pending =
         g_slist_delete_link (parse->priv->buffers_pending,
         parse->priv->buffers_pending);
   }
 
-  /* invalidate so no fall-back timestamping is performed;
-   * ok if taken from subclass or upstream */
-  parse->priv->next_ts = GST_CLOCK_TIME_NONE;
-  /* prevent it hanging around stop all the time */
-  parse->segment.position = GST_CLOCK_TIME_NONE;
-  /* mark next run */
-  parse->priv->discont = TRUE;
-
   /* chain looks for frames and queues resulting ones (in stead of pushing) */
   /* initial skipped data is added to buffers_pending */
   gst_base_parse_drain (parse);
 
-push:
   if (parse->priv->buffers_send) {
     buf = GST_BUFFER_CAST (parse->priv->buffers_send->data);
     seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
@@ -2416,7 +2482,6 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   GstBaseParseClass *bclass;
   GstBaseParse *parse;
   GstFlowReturn ret = GST_FLOW_OK;
-  GstBuffer *outbuf = NULL;
   GstBuffer *tmpbuf = NULL;
   guint fsize = 1;
   gint skip = -1;
@@ -2516,14 +2581,13 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       return ret;
     }
     /* upstream feeding us in reverse playback;
-     * gather each fragment, then process it in single run */
+     * finish previous fragment and start new upon DISCONT */
     if (parse->segment.rate < 0.0) {
       if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
         GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment");
-        ret = gst_base_parse_process_fragment (parse, FALSE);
+        ret = gst_base_parse_finish_fragment (parse, TRUE);
+        gst_base_parse_start_fragment (parse);
       }
-      gst_adapter_push (parse->priv->adapter, buffer);
-      return ret;
     }
     gst_adapter_push (parse->priv->adapter, buffer);
   }
@@ -2577,38 +2641,17 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush);
       tmpbuf = NULL;
 
+      /* probably already implicitly unmapped due to adapter operation,
+       * but for good measure ... */
       gst_adapter_unmap (parse->priv->adapter);
       if (ret != GST_FLOW_OK) {
         goto done;
       }
-      if (skip > 0) {
-        GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
-        if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
-          /* reverse playback, and no frames found yet, so we are skipping
-           * the leading part of a fragment, which may form the tail of
-           * fragment coming later, hopefully subclass skips efficiently ... */
-          timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
-          outbuf = gst_adapter_take_buffer (parse->priv->adapter, skip);
-          outbuf = gst_buffer_make_writable (outbuf);
-          GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
-          parse->priv->buffers_pending =
-              g_slist_prepend (parse->priv->buffers_pending, outbuf);
-          outbuf = NULL;
-        } else {
-          gst_adapter_flush (parse->priv->adapter, skip);
-        }
-        if (!parse->priv->discont)
-          parse->priv->sync_offset = parse->priv->offset;
-        parse->priv->offset += skip;
-        parse->priv->discont = TRUE;
-      } else if (!flush) {
+      if (skip == 0 && flush == 0) {
         GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, "
             "breaking to get more data");
         goto done;
       }
-      if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
-        goto done;
-      }
     }
 
     /* Grab lock to prevent a race with FLUSH_START handler */
@@ -2752,8 +2795,9 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
   /* offset will increase again as fragment is processed/parsed */
   parse->priv->last_offset = offset;
 
+  gst_base_parse_start_fragment (parse);
   gst_adapter_push (parse->priv->adapter, buffer);
-  ret = gst_base_parse_process_fragment (parse, FALSE);
+  ret = gst_base_parse_finish_fragment (parse, TRUE);
   if (ret != GST_FLOW_OK)
     goto exit;
 
@@ -2771,7 +2815,7 @@ static GstFlowReturn
 gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
     gboolean full)
 {
-  GstBuffer *buffer, *outbuf;
+  GstBuffer *buffer;
   GstFlowReturn ret = GST_FLOW_OK;
   guint fsize, min_size;
   gint flushed = 0;
@@ -2789,14 +2833,18 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
   while (TRUE) {
     min_size = MAX (parse->priv->min_frame_size, fsize);
 
+    GST_LOG_OBJECT (parse, "reading buffer size %u", min_size);
+
     ret = gst_base_parse_pull_range (parse, min_size, &buffer);
     if (ret != GST_FLOW_OK)
       goto done;
 
     /* if we got a short read, inform subclass we are draining leftover
      * and no more is to be expected */
-    if (gst_buffer_get_size (buffer) < min_size)
+    if (gst_buffer_get_size (buffer) < min_size) {
+      GST_LOG_OBJECT (parse, "... but did not get that; marked draining");
       parse->priv->drain = TRUE;
+    }
 
     if (parse->priv->detecting) {
       ret = klass->detect (parse, buffer);
@@ -2822,34 +2870,11 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
       /* Else handle this buffer normally */
     }
 
-    /* might need it later on */
-    gst_buffer_ref (buffer);
     ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed);
     if (ret != GST_FLOW_OK)
       break;
 
-    if (skip > 0) {
-      GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
-      if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
-        /* reverse playback, and no frames found yet, so we are skipping
-         * the leading part of a fragment, which may form the tail of
-         * fragment coming later, hopefully subclass skips efficiently ... */
-        outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0, skip);
-        parse->priv->buffers_pending =
-            g_slist_prepend (parse->priv->buffers_pending, outbuf);
-        outbuf = NULL;
-      }
-      if (!parse->priv->discont)
-        parse->priv->sync_offset = parse->priv->offset;
-      parse->priv->offset += skip;
-      parse->priv->discont = TRUE;
-    } else {
-      /* default to reasonable increase */
-      fsize += 64 * 1024;
-    }
-    /* no longer needed */
-    gst_buffer_unref (buffer);
-    /* changed offset means something happened,
+    /* something flushed means something happened,
      * and we should bail out of this loop so as not to occupy
      * the task thread indefinitely */
     if (flushed) {
@@ -2863,10 +2888,13 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
       ret = GST_FLOW_EOS;
       break;
     }
-    parse->priv->drain = FALSE;
-    if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
-      goto done;
+    /* otherwise, get some more data
+     * note that is checked this does not happen indefinitely */
+    if (!skip) {
+      GST_LOG_OBJECT (parse, "getting some more data");
+      fsize += 64 * 1024;
     }
+    parse->priv->drain = FALSE;
   }
 
 done:
@@ -2905,7 +2933,7 @@ gst_base_parse_loop (GstPad * pad)
       parse->segment.position >= parse->segment.stop) {
     GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
     /* push what was accumulated during loop run */
-    gst_base_parse_process_fragment (parse, TRUE);
+    gst_base_parse_finish_fragment (parse, FALSE);
     /* force previous fragment */
     parse->priv->offset = -1;
     ret = GST_FLOW_OK;