#define GST_IS_OGG_MUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OGG_MUX))
#define GST_IS_OGG_MUX_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OGG_MUX))
+#define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
+
typedef struct _GstOggMux GstOggMux;
typedef struct _GstOggMuxClass GstOggMuxClass;
guint64 duration; /* duration of current page */
gboolean eos;
gint64 offset;
+ GstClockTime timestamp; /* timestamp for granulepos of last complete
+ packet on this page */
GstOggPadState state; /* state of the pad */
GList *headers;
+ GQueue *pagebuffers; /* List of pages in buffers ready for pushing */
+
gboolean new_page; /* starting a new page */
gboolean first_delta; /* was the first packet in the page a delta */
gboolean prev_delta; /* was the previous buffer a delta frame */
oggpad->new_page = TRUE;
oggpad->first_delta = FALSE;
oggpad->prev_delta = FALSE;
+ /* TODO: delete this queue (and the things contained within) later,
+ * possibly when doing gst_collectpads_remove_pad() (which we don't seem
+ * to do at all?)
+ */
+ oggpad->pagebuffers = g_queue_new ();
}
}
GST_BUFFER_TIMESTAMP (buffer) = mux->next_ts;
GST_BUFFER_OFFSET (buffer) = mux->offset;
mux->offset += GST_BUFFER_SIZE (buffer);
- GST_BUFFER_OFFSET_END (buffer) = mux->offset;
+ /* Here we set granulepos as our OFFSET_END to give easy direct access to
+ * this value later. Before we push it, we reset this to OFFSET + SIZE
+ * (see gst_ogg_mux_push_buffer). */
+ GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
if (delta)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
}
static GstFlowReturn
-gst_ogg_mux_push_page (GstOggMux * mux, ogg_page * page, gboolean delta)
+gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer)
{
- GstBuffer *buffer;
+ GST_BUFFER_OFFSET_END (buffer) = GST_BUFFER_OFFSET (buffer) +
+ GST_BUFFER_SIZE (buffer);
+
+ return gst_pad_push (mux->srcpad, buffer);
+}
+
+/* if all queues have at least one page, dequeue the page with the lowest
+ * timestamp */
+static gboolean
+gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
+{
+ GSList *walk;
+ GstOggPad *opad = NULL; /* "oldest" pad */
+ GstClockTime oldest = GST_CLOCK_TIME_NONE;
+ GstBuffer *buf = NULL;
+ gboolean ret = FALSE;
+
+ *flowret = GST_FLOW_OK;
+
+ walk = mux->collect->data;
+ while (walk) {
+ GstOggPad *pad = (GstOggPad *) walk->data;
+
+ /* We need each queue to either be at EOS, or have one or more pages
+ * available with a set granulepos (i.e. not -1), otherwise we don't have
+ * enough data yet to determine which stream needs to go next for correct
+ * time ordering. */
+ if (pad->pagebuffers->length == 0) {
+ if (pad->eos) {
+ GST_LOG_OBJECT (pad, "pad is EOS, skipping for dequeue decision");
+ } else {
+ GST_LOG_OBJECT (pad, "no pages in this queue, can't dequeue");
+ return FALSE;
+ }
+ } else {
+ /* We then need to check for a non-negative granulepos */
+ int i;
+ gboolean valid = FALSE;
+
+ for (i = 0; i < pad->pagebuffers->length; i++) {
+ buf = g_queue_peek_nth (pad->pagebuffers, i);
+ /* Here we check the OFFSET_END, which is actually temporarily the
+ * granulepos value for this buffer */
+ if (GST_BUFFER_OFFSET_END (buf) != -1) {
+ valid = TRUE;
+ break;
+ }
+ }
+ if (!valid) {
+ GST_LOG_OBJECT (pad, "No page timestamps in queue, can't dequeue");
+ return FALSE;
+ }
+ }
+
+ walk = g_slist_next (walk);
+ }
+
+ walk = mux->collect->data;
+ while (walk) {
+ GstOggPad *pad = (GstOggPad *) walk->data;
+
+ /* any page with a granulepos of -1 can be pushed immediately.
+ * TODO: it CAN be, but it seems silly to do so? */
+ buf = g_queue_peek_head (pad->pagebuffers);
+ while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
+ GST_LOG_OBJECT (pad, GST_GP_FORMAT " pushing page", -1);
+ g_queue_pop_head (pad->pagebuffers);
+ *flowret = gst_ogg_mux_push_buffer (mux, buf);
+ buf = g_queue_peek_head (pad->pagebuffers);
+ ret = TRUE;
+ }
+
+ if (buf) {
+ /* if no oldest buffer yet, take this one */
+ if (oldest == GST_CLOCK_TIME_NONE) {
+ oldest = GST_BUFFER_TIMESTAMP (buf);
+ opad = pad;
+ } else {
+ /* if we have an oldest, compare with this one */
+ if (GST_BUFFER_TIMESTAMP (buf) < oldest) {
+ oldest = GST_BUFFER_TIMESTAMP (buf);
+ opad = pad;
+ }
+ }
+ }
+ walk = g_slist_next (walk);
+ }
+
+ if (oldest != GST_CLOCK_TIME_NONE) {
+ g_assert (opad);
+ buf = g_queue_pop_head (opad->pagebuffers);
+ GST_LOG_OBJECT (opad,
+ GST_GP_FORMAT " pushing oldest page (time %" GST_TIME_FORMAT ")",
+ GST_BUFFER_OFFSET_END (buf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+ *flowret = gst_ogg_mux_push_buffer (mux, buf);
+ ret = TRUE;
+ }
+
+ return ret;
+}
+
+/* put the given page on a per-pad queue, timestamping it correctly.
+ * after that, dequeue and push as many pages as possible
+ * before calling me, make sure that the the pad's timestamp matches
+ * the page's granulepos */
+static GstFlowReturn
+gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPad * pad, ogg_page * page,
+ gboolean delta)
+{
+ GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
GstFlowReturn ret;
- buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
+ /* take the timestamp of the last completed packet on this page */
+ GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
+ g_queue_push_tail (pad->pagebuffers, buffer);
+ GST_LOG_OBJECT (pad, GST_GP_FORMAT " queued buffer page (time %" GST_TIME_FORMAT "), %d page buffers queued", ogg_page_granulepos (page), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), pad->pagebuffers->length); /* no g_queue_get_length in 2.2 */
- ret = gst_pad_push (mux->srcpad, buffer);
+ while (gst_ogg_mux_dequeue_page (mux, &ret)) {
+ if (ret != GST_FLOW_OK)
+ break;
+ }
return ret;
}
}
/* make sure a buffer is queued on all pads, returns a pointer to an oggpad
- * that holds the best buffer or NULL when no pad was usable */
+ * that holds the best buffer or NULL when no pad was usable.
+ * "best" means the buffer marked with the lowest timestamp */
static GstOggPad *
gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
{
hwalk = hwalk->next;
- if ((ret = gst_pad_push (mux->srcpad, buf)) != GST_FLOW_OK)
+ if ((ret = gst_ogg_mux_push_buffer (mux, buf)) != GST_FLOW_OK)
break;
}
g_list_free (hbufs);
* basic idea:
*
* 1) find a pad to pull on, this is done by looking at the buffers
- * to decide which one should be muxed first.
+ * to decide which one to use, we use the 'oldest' one first.
* 2) store the selected pad and keep on pulling until we fill a
* complete ogg page or the ogg page is filled above the max-delay
* threshold. This is needed because the ogg spec says that
* you should fill a complete page with data from the same logical
* stream. When the page is filled, go back to 1).
- * 3) before filling a packet, read ahead one more buffer to see if this
+ * 3) before filling a page, read ahead one more buffer to see if this
* packet is the last of the stream. We need to do this because the ogg
* spec mandates that the last packet should have the EOS flag set before
- * sending it to ogg.
+ * sending it to ogg. FIXME: Apparently we're allowed to send empty 'nil'
+ * pages with the EOS flag set for EOS, so we could do this.
+ * 4) pages get queued on a per-pad queue. Every time a page is queued, a
+ * dequeue is called, which will dequeue the oldest page on any pad, provided
+ * that ALL pads have at least one marked page in the queue (TODO: or that
+ * pad is at EOS?)
*/
static GstFlowReturn
gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
GstOggPad *best;
gboolean delta_unit;
GstFlowReturn ret;
+ gint64 granulepos = 0;
+ GstClockTime timestamp;
GST_DEBUG ("collected");
+ /* queue buffers on all pads; find a buffer with the lowest timestamp */
best = gst_ogg_mux_queue_pads (ogg_mux);
if (best && !best->buffer)
return GST_FLOW_OK;
return GST_FLOW_WRONG_STATE;
}
- /* we're pulling a pad and there is a better one, see if we need
- * to flush the current page */
+ /* if we were already pulling from one pad, but the new "best" buffer is
+ * from another pad, we need to check if we have reason to flush a page
+ * for the pad we were pulling from before */
if (ogg_mux->pulling && best &&
ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
GstOggPad *pad = ogg_mux->pulling;
ogg_page page;
while (ogg_stream_flush (&pad->stream, &page)) {
- ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
+ /* Place page into the per-pad queue */
+ ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
+ pad->first_delta);
/* increment the page number counter */
pad->pageno++;
/* mark other pages as delta */
}
/* we are pulling from a pad, continue to do so until a page
- * has been filled and pushed */
+ * has been filled and queued */
if (ogg_mux->pulling != NULL) {
ogg_packet packet;
ogg_page page;
}
}
- /* force flush */
+ /* flush the currently built page if neccesary */
if (force_flush) {
while (ogg_stream_flush (&pad->stream, &page)) {
- ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
+ ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
+ pad->first_delta);
/* increment the page number counter */
pad->pageno++;
/* mark other pages as delta */
ogg_stream_packetin (&pad->stream, &packet);
+ granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
+ timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
+
/* don't need the old buffer anymore */
gst_buffer_unref (pad->buffer);
/* store new readahead buffer */
/* let ogg write out the pages now. The packet we got could end
* up in more than one page so we need to write them all */
if (ogg_stream_pageout (&pad->stream, &page) > 0) {
+ if (ogg_page_granulepos (&page) == granulepos) {
+ /* the packet we streamed in finishes on the page,
+ * because the page's granulepos is the granulepos of the last
+ * packet completed on that page,
+ * so update the timestamp that we will give to the page */
+ pad->timestamp = timestamp;
+ GST_DEBUG_OBJECT (pad, "Timestamp of pad is %" GST_TIME_FORMAT
+ ", granulepos is %lld", GST_TIME_ARGS (timestamp), granulepos);
+ }
+
/* push the page */
- ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
+ ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
pad->pageno++;
/* mark next pages as delta */
pad->first_delta = TRUE;
- /* use an inner loop her to flush the remaining pages and
+ /* use an inner loop here to flush the remaining pages and
* mark them as delta frames as well */
while (ogg_stream_pageout (&pad->stream, &page) > 0) {
+ if (ogg_page_granulepos (&page) == granulepos) {
+ /* the page has taken up the new packet completely, which means
+ * the packet ends the page and we can update the timestamp
+ * before pushing out */
+ pad->timestamp = timestamp;
+ }
+
/* we have a complete page now, we can push the page
* and make sure to pull on a new pad the next time around */
- ret = gst_ogg_mux_push_page (ogg_mux, &page, pad->first_delta);
+ ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
+ pad->first_delta);
/* increment the page number counter */
pad->pageno++;
}
* pad for pulling in the next iteration */
ogg_mux->pulling = NULL;
}
+
+ /* Update the timestamp, if neccesary, since and future page will have at
+ * least this timestamp.
+ */
+ if (pad->timestamp < timestamp) {
+ pad->timestamp = timestamp;
+ GST_DEBUG_OBJECT (pad, "Updated timestamp of pad to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timestamp));
+ }
}
return GST_FLOW_OK;