ext/ogg/gstoggmux.c: Commit patch from James "Doc" Livingston, adds proper EOS handli...
authorJames Doc Livingston <doclivingston@gmail.com>
Fri, 29 Sep 2006 10:43:05 +0000 (10:43 +0000)
committerMichael Smith <msmith@xiph.org>
Fri, 29 Sep 2006 10:43:05 +0000 (10:43 +0000)
Original commit message from CVS:
* ext/ogg/gstoggmux.c: (gst_ogg_mux_request_new_pad),
(gst_ogg_mux_release_pad), (gst_ogg_mux_push_buffer),
(gst_ogg_mux_compare_pads), (gst_ogg_mux_queue_pads),
(gst_ogg_mux_send_headers), (gst_ogg_mux_process_best_pad),
(gst_ogg_mux_collected):
Commit patch from James "Doc" Livingston, adds proper EOS handling
in oggmux. GStreamer can, for the first time ever, create a valid
Ogg file! Yay!

* tests/check/pipelines/oggmux.c: (check_chain_final_state),
(oggmux_suite):
Reenable tests now that they pass.

ChangeLog
common
ext/ogg/gstoggmux.c
tests/check/pipelines/oggmux.c

index 96e8d13..1547ecb 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,18 @@
+2006-09-29  Michael Smith  <msmith@fluendo.com>
+
+       * ext/ogg/gstoggmux.c: (gst_ogg_mux_request_new_pad),
+       (gst_ogg_mux_release_pad), (gst_ogg_mux_push_buffer),
+       (gst_ogg_mux_compare_pads), (gst_ogg_mux_queue_pads),
+       (gst_ogg_mux_send_headers), (gst_ogg_mux_process_best_pad),
+       (gst_ogg_mux_collected):
+         Commit patch from James "Doc" Livingston, adds proper EOS handling
+         in oggmux. GStreamer can, for the first time ever, create a valid
+         Ogg file! Yay!
+
+       * tests/check/pipelines/oggmux.c: (check_chain_final_state),
+       (oggmux_suite):
+         Reenable tests now that they pass.
+
 2006-09-29  Wim Taymans  <wim@fluendo.com>
 
        * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_handle_clients):
diff --git a/common b/common
index bdd0108..9991f6f 160000 (submodule)
--- a/common
+++ b/common
@@ -1 +1 @@
-Subproject commit bdd0108b3540ffadeb82cee28b8867a8a6e7ae78
+Subproject commit 9991f6fa61ee11475c390dd6675ef7952f079e43
index 7034f37..ea9b3c7 100644 (file)
@@ -66,7 +66,10 @@ typedef struct
 {
   GstCollectData collect;       /* we extend the CollectData */
 
-  GstBuffer *buffer;            /* the queued buffer for this pad */
+  /* These two buffers make a very simple queue - they enter as 'next_buffer'
+   * and (usually) leave as 'buffer', except at EOS, when buffer will be NULL */
+  GstBuffer *buffer;            /* the first waiting buffer for the pad */
+  GstBuffer *next_buffer;       /* the second waiting buffer for the pad */
 
   gint serial;
   ogg_stream_state stream;
@@ -105,6 +108,9 @@ struct _GstOggMux
   /* sinkpads */
   GstCollectPads *collect;
 
+  /* number of pads which have not received EOS */
+  gint active_pads;
+
   /* the pad we are currently using to fill a page */
   GstOggPad *pulling;
 
@@ -397,6 +403,7 @@ gst_ogg_mux_request_new_pad (GstElement * element,
       oggpad = (GstOggPad *)
           gst_collect_pads_add_pad (ogg_mux->collect, newpad,
           sizeof (GstOggPad));
+      ogg_mux->active_pads++;
 
       oggpad->serial = serial;
       ogg_stream_init (&oggpad->stream, serial);
@@ -440,10 +447,6 @@ gst_ogg_mux_release_pad (GstElement * element, GstPad * pad)
 
   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
 
-  /* FIXME: When a request pad is released while paused or playing, 
-   * we probably need to do something to finalise its stream in the
-   * ogg data we're producing, but I'm not sure what */
-
   /* Find out GstOggPad in the collect pads info and clean it up */
 
   GST_OBJECT_LOCK (ogg_mux->collect);
@@ -705,18 +708,24 @@ gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPad * first,
 
   /* if the first pad doesn't contain anything or is even NULL, return
    * the second pad as best candidate and vice versa */
-  if (first == NULL || first->buffer == NULL)
+  if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL))
     return 1;
-  if (second == NULL || second->buffer == NULL)
+  if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL))
     return -1;
 
   /* no timestamp on first buffer, it must go first */
-  firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
+  if (first->buffer)
+    firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
+  else
+    firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
   if (firsttime == GST_CLOCK_TIME_NONE)
     return -1;
 
   /* no timestamp on second buffer, it must go first */
-  secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
+  if (second->buffer)
+    secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
+  else
+    secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
   if (secondtime == GST_CLOCK_TIME_NONE)
     return 1;
 
@@ -739,9 +748,19 @@ gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPad * first,
   return 0;
 }
 
-/* make sure a buffer is queued on all pads, returns a pointer to an oggpad
- * that holds the best buffer or NULL when no pad was usable.
- * "best" means the buffer marked with the lowest timestamp */
+/* make sure at least one buffer is queued on all pads, two if possible
+ * 
+ * if pad->buffer == NULL, pad->next_buffer !=  NULL, then
+ *   we do not know if the buffer is the last or not
+ * if pad->buffer != NULL, pad->next_buffer != NULL, then
+ *   pad->buffer is not the last buffer for the pad
+ * if pad->buffer != NULL, pad->next_buffer == NULL, then
+ *   pad->buffer if the last buffer for the pad
+ * 
+ * returns a pointer to an oggpad that holds the best buffer, or
+ * NULL when no pad was usable. "best" means the buffer marked
+ * with the lowest timestamp. If best->buffer == NULL then nothing
+ * should be done until more data arrives */
 static GstOggPad *
 gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
 {
@@ -766,6 +785,14 @@ gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
       GstBuffer *buf;
       gboolean incaps;
 
+      /* shift the buffer along if needed (it's okay if next_buffer is NULL) */
+      if (pad->buffer == NULL) {
+        GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
+            pad->next_buffer);
+        pad->buffer = pad->next_buffer;
+        pad->next_buffer = NULL;
+      }
+
       buf = gst_collect_pads_pop (ogg_mux->collect, data);
       GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
 
@@ -794,6 +821,9 @@ gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
           ogg_page page;
           GstFlowReturn ret;
 
+          /* it's no longer active */
+          ogg_mux->active_pads--;
+
           /* Just gone to EOS. Flush existing page(s) */
           pad->eos = TRUE;
 
@@ -809,14 +839,16 @@ gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
         }
       }
 
-      pad->buffer = buf;
+      pad->next_buffer = buf;
     }
 
     /* we should have a buffer now, see if it is the best pad to
      * pull on */
-    if (pad->buffer) {
+    if (pad->buffer || pad->next_buffer) {
       if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
-        GST_LOG_OBJECT (data->pad, "new best pad");
+        GST_LOG_OBJECT (data->pad,
+            "new best pad, with buffers %" GST_PTR_FORMAT
+            " and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
 
         bestpad = pad;
       }
@@ -960,7 +992,7 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
     GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
 
     /* if the pad has no buffer, we don't care */
-    if (pad->buffer == NULL)
+    if (pad->buffer == NULL && pad->next_buffer == NULL)
       continue;
 
     /* now figure out the headers */
@@ -996,6 +1028,9 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
     } else if (pad->buffer) {
       buf = pad->buffer;
       gst_buffer_ref (buf);
+    } else if (pad->next_buffer) {
+      buf = pad->next_buffer;
+      gst_buffer_ref (buf);
     } else {
       /* fixme -- should be caught in the previous list traversal. */
       GST_OBJECT_LOCK (pad);
@@ -1129,59 +1164,44 @@ gst_ogg_mux_send_headers (GstOggMux * mux)
   return ret;
 }
 
-/* this function is called when there is data on all pads.
+/* this function is called to process data on the best pending pad.
  *
  * basic idea:
  *
- * 1) find a pad to pull on, this is done by looking at the buffers
- *    to decide which one to use, we use the 'oldest' one first.
- * 2) store the selected pad and keep on pulling until we fill a
+ * 1) store the selected pad and keep on pulling until we fill a
  *    complete ogg page or the ogg page is filled above the max-delay
  *    threshold. This is needed because the ogg spec says that
  *    you should fill a complete page with data from the same logical
  *    stream. When the page is filled, go back to 1).
- * 3) before filling a page, read ahead one more buffer to see if this
+ * 2) before filling a page, read ahead one more buffer to see if this
  *    packet is the last of the stream. We need to do this because the ogg
  *    spec mandates that the last packet should have the EOS flag set before
- *    sending it to ogg. FIXME: Apparently we're allowed to send empty 'nil'
- *    pages with the EOS flag set for EOS, so we could do this. Not sure how
- *    that works, though. TODO: 'read ahead one more buffer' is a bit funky
- *    with collectpads. Rethink this.
- * 4) pages get queued on a per-pad queue. Every time a page is queued, a
+ *    sending it to ogg. if pad->buffer is NULL we need to wait to find out
+ *    whether there are any more buffers.
+ * 3) pages get queued on a per-pad queue. Every time a page is queued, a
  *    dequeue is called, which will dequeue the oldest page on any pad, provided
  *    that ALL pads have at least one marked page in the queue (or remaining
- *    pad are at EOS)
+ *    pads are at EOS)
  */
 static GstFlowReturn
-gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
+gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPad * best)
 {
-  GstOggPad *best;
   gboolean delta_unit;
   GstFlowReturn ret;
   gint64 granulepos = 0;
   GstClockTime timestamp, gp_time;
 
-  GST_LOG_OBJECT (ogg_mux, "collected");
-
-  /* queue buffers on all pads; find a buffer with the lowest timestamp */
-  best = gst_ogg_mux_queue_pads (ogg_mux);
-  if (best && !best->buffer) {
-    GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
-    return GST_FLOW_OK;
-  }
-
-  if (!best) {
-    /* EOS : FIXME !! We need to handle EOS correctly, and set EOS
-       flags on the ogg pages. */
-    GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
-    gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
-    return GST_FLOW_WRONG_STATE;
-  }
-
   GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
       ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
       ogg_mux->pulling);
 
+  /* best->buffer is non-NULL, either the pad is EOS's or there is a next 
+   * buffer */
+  if (best->next_buffer == NULL && !best->eos) {
+    GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached");
+    return GST_FLOW_WRONG_STATE;
+  }
+
   /* if we were already pulling from one pad, but the new "best" buffer is
    * from another pad, we need to check if we have reason to flush a page
    * for the pad we were pulling from before */
@@ -1283,7 +1303,7 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
         " packet %" G_GINT64_FORMAT " (%ld bytes) created from buffer",
         packet.granulepos, packet.packetno, packet.bytes);
 
-    packet.e_o_s = 0;
+    packet.e_o_s = (pad->eos ? 1 : 0);
     tmpbuf = NULL;
 
     /* we flush when we see a new keyframe */
@@ -1447,6 +1467,54 @@ gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
   return GST_FLOW_OK;
 }
 
+/* This function is called when there is data on all pads.
+ * 
+ * It finds a pad to pull on, this is done by looking at the buffers
+ * to decide which one to use, and using the 'oldest' one first. It then calls
+ * gst_ogg_mux_process_best_pad() to process as much data as possible.
+ * 
+ * If all the pads have received EOS, it flushes out all data by continually
+ * getting the best pad and calling gst_ogg_mux_process_best_pad() until they
+ * are all empty, and then sends EOS.
+ */
+static GstFlowReturn
+gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
+{
+  GstOggPad *best;
+  GstFlowReturn ret;
+
+  GST_LOG_OBJECT (ogg_mux, "collected");
+
+  /* queue buffers on all pads; find a buffer with the lowest timestamp */
+  best = gst_ogg_mux_queue_pads (ogg_mux);
+  if (best && !best->buffer) {
+    GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
+    return GST_FLOW_OK;
+  }
+
+  if (!best) {
+    return GST_FLOW_WRONG_STATE;
+  }
+
+  ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
+
+  /* if all the pads have been removed, flush all pending data */
+  if ((ret == GST_FLOW_OK) && (ogg_mux->active_pads == 0)) {
+    GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
+
+    do {
+      best = gst_ogg_mux_queue_pads (ogg_mux);
+      if (best)
+        ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
+    } while ((ret == GST_FLOW_OK) && (best != NULL));
+
+    GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
+    gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
+  }
+
+  return ret;
+}
+
 static void
 gst_ogg_mux_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec)
index 40223df..8a8acee 100644 (file)
@@ -80,8 +80,7 @@ get_page_codec (ogg_page * page)
 static gboolean
 check_chain_final_state (gpointer key, ChainState * state, gpointer data)
 {
-  /* FIXME: check disabled until oggmux is fixed (#337026) */
-  /* fail_unless (state->eos, "missing EOS flag on chain %u", state->serialno); */
+  fail_unless (state->eos, "missing EOS flag on chain %u", state->serialno);
 
   /* return TRUE to empty the chain table */
   return TRUE;
@@ -294,7 +293,6 @@ GST_START_TEST (test_theora_vorbis)
 
 GST_END_TEST;
 
-#if 0
 /* THIS TEST FAILS AT THE MOMENT (KILLED AFTER TIMEOUT): */
 GST_START_TEST (test_vorbis_theora)
 {
@@ -304,7 +302,6 @@ GST_START_TEST (test_vorbis_theora)
 }
 
 GST_END_TEST;
-#endif
 
 static Suite *
 oggmux_suite (void)
@@ -323,8 +320,7 @@ oggmux_suite (void)
 #endif
 
 #if (defined (HAVE_THEORA) && defined (HAVE_VORBIS))
-  /* THIS TEST FAILS AT THE MOMENT (KILLED AFTER TIMEOUT): */
-  /* tcase_add_test (tc_chain, test_vorbis_theora); */
+  tcase_add_test (tc_chain, test_vorbis_theora);
   tcase_add_test (tc_chain, test_theora_vorbis);
 #endif