dashdemux: Replace GQueue by GstDataQueue
authorThiago Santos <thiago.sousa.santos@collabora.com>
Mon, 28 Jan 2013 21:52:04 +0000 (18:52 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 21:14:34 +0000 (18:14 -0300)
GstDataQueue has proper locking and provides functions to limit the
size of the queue. Also has blocking calls that are useful to
our multithread scenario in Dash.

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h
ext/dash/gstfragment.c
ext/dash/gstfragment.h
ext/dash/gstmpdparser.c

index dcc47dc..4c62f4e 100644 (file)
@@ -216,7 +216,6 @@ static gboolean gst_dash_demux_get_next_fragment_set (GstDashDemux * demux);
 static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
 static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
 static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux);
-static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment);
 static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
     GstActiveStream * stream);
 static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
@@ -463,7 +462,7 @@ gst_dash_demux_all_queues_have_data (GstDashDemux * demux)
 
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
-    if (g_queue_is_empty (stream->queue)) {
+    if (gst_data_queue_is_empty (stream->queue)) {
       return FALSE;
     }
   }
@@ -477,15 +476,43 @@ gst_dash_demux_clear_queues (GstDashDemux * demux)
 
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
-    while (!g_queue_is_empty (stream->queue)) {
-      GstFragment *fragment = g_queue_pop_head (stream->queue);
-      g_object_unref (fragment);
-    }
-    g_queue_clear (stream->queue);
+
+    gst_data_queue_flush (stream->queue);
   }
 }
 
 static gboolean
+_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
+    GstDashDemuxStream * stream)
+{
+  /* TODO add limits */
+  return FALSE;
+}
+
+static void
+_data_queue_item_destroy (GstDataQueueItem * item)
+{
+  gst_mini_object_unref (item->object);
+  g_free (item);
+}
+
+static void
+gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
+    GstBuffer * fragment)
+{
+  GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
+
+  item->object = GST_MINI_OBJECT_CAST (fragment);
+  item->duration = GST_BUFFER_DURATION (fragment);
+  item->visible = TRUE;
+  item->size = GST_BUFFER_SIZE (fragment);
+
+  item->destroy = (GDestroyNotify) _data_queue_item_destroy;
+
+  gst_data_queue_push (stream->queue, item);
+}
+
+static gboolean
 gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
 {
   GstDashDemux *demux;
@@ -563,8 +590,8 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
         chunk = list->data;
         current_pos = chunk->start_time;
         //current_sequence = chunk->number;
-        GST_WARNING_OBJECT (demux, "%i <= %i (%i)", current_pos, target_pos,
-            chunk->duration);
+        GST_WARNING_OBJECT (demux, "%llu <= %llu (%llu)", current_pos,
+            target_pos, chunk->duration);
         if (current_pos <= target_pos
             && target_pos < current_pos + chunk->duration) {
           break;
@@ -626,6 +653,10 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
       /* Restart the demux */
       demux->cancelled = FALSE;
       demux->end_of_manifest = FALSE;
+      for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+        GstDashDemuxStream *stream = iter->data;
+        gst_data_queue_set_flushing (stream->queue, FALSE);
+      }
       gst_dash_demux_resume_download_task (demux);
       gst_dash_demux_resume_stream_task (demux);
       g_static_rec_mutex_unlock (&demux->stream_lock);
@@ -686,7 +717,9 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
 
     stream = g_new0 (GstDashDemuxStream, 1);
     caps = gst_dash_demux_get_input_caps (demux, active_stream);
-    stream->queue = g_queue_new ();
+    stream->queue =
+        gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full,
+        stream);
 
     stream->index = i;
     stream->input_caps = caps;
@@ -878,7 +911,14 @@ gst_dash_demux_pad (GstPad * pad, GstBuffer * buf)
 static void
 gst_dash_demux_stop (GstDashDemux * demux)
 {
+  GSList *iter;
+
   gst_uri_downloader_cancel (demux->downloader);
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;
+
+    gst_data_queue_set_flushing (stream->queue, TRUE);
+  }
 
   if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
     GST_TASK_SIGNAL (demux->download_task);
@@ -969,16 +1009,18 @@ needs_pad_switch (GstDashDemux * demux)
   GSList *iter;
 
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDataQueueItem *item;
     GstDashDemuxStream *stream = iter->data;
-    GstFragment *newFragment = g_queue_peek_head (stream->queue);
     GstCaps *srccaps = NULL;
+    GstBuffer *buffer;
 
-    if (newFragment == NULL) {
+    if (!gst_data_queue_peek (stream->queue, &item))
       continue;
-    }
-    if (stream->output_caps)
-      gst_caps_unref (stream->output_caps);
-    stream->output_caps = gst_fragment_get_caps (newFragment);
+
+    buffer = GST_BUFFER_CAST (item->object);
+
+    gst_caps_replace (&stream->output_caps, GST_BUFFER_CAPS (buffer));
+
     if (G_LIKELY (stream->pad))
       srccaps = gst_pad_get_negotiated_caps (stream->pad);
     if (G_UNLIKELY (!srccaps
@@ -1017,7 +1059,6 @@ static void
 gst_dash_demux_stream_loop (GstDashDemux * demux)
 {
   GstFlowReturn ret;
-  GstBufferList *buffer_list;
   guint nb_adaptation_set = 0;
   GstActiveStream *active_stream;
   gboolean switch_pad;
@@ -1057,15 +1098,18 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
   }
 
   for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
+    GstDataQueueItem *item;
+    GstBuffer *buffer;
     GstDashDemuxStream *stream = iter->data;
-    GstFragment *fragment = g_queue_pop_head (stream->queue);
-
-    if (!fragment)
+    if (!gst_data_queue_pop (stream->queue, &item))
       continue;
 
+    buffer = GST_BUFFER_CAST (item->object);
+
     active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
     if (demux->need_segment) {
-      GstClockTime start = fragment->start_time + demux->position_shift;
+      GstClockTime start =
+          GST_BUFFER_TIMESTAMP (buffer) + demux->position_shift;
       /* And send a newsegment */
       GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
           GST_TIME_FORMAT, GST_TIME_ARGS (start));
@@ -1074,11 +1118,13 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
               start, GST_CLOCK_TIME_NONE, start));
     }
 
-    GST_DEBUG_OBJECT (demux, "Pushing fragment #%d (stream %i)",
-        fragment->index, i);
-    buffer_list = gst_fragment_get_buffer_list (fragment);
-    g_object_unref (fragment);
-    ret = gst_pad_push_list (stream->pad, buffer_list);
+    GST_DEBUG_OBJECT (demux,
+        "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
+        GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer), i,
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+        GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+    ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
+    item->destroy (item);
     if ((ret != GST_FLOW_OK) && (active_stream->mimeType == GST_STREAM_VIDEO))
       goto error_pushing;
   }
@@ -1121,7 +1167,7 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
     gst_object_unref (stream->pad);
 
   /* TODO flush the queue */
-  g_queue_free (stream->queue);
+  g_object_unref (stream->queue);
 
   g_free (stream);
 }
@@ -1182,22 +1228,11 @@ gst_dash_demux_get_buffering_time (GstDashDemux * demux)
 static GstClockTime
 gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
 {
-  GstFragment *first_fragment, *last_fragment;
-  GstClockTime buffer_time = 0;
-
-  /* get first fragment */
-  first_fragment = g_queue_peek_head (stream->queue);
-  /* get last fragment */
-  last_fragment = g_queue_peek_tail (stream->queue);
-
-  if (!first_fragment && !last_fragment)
-    return 0;
+  GstDataQueueSize level;
 
-  if (first_fragment && last_fragment) {
-    buffer_time = last_fragment->stop_time - first_fragment->start_time;
-  }
+  gst_data_queue_get_level (stream->queue, &level);
 
-  return buffer_time;
+  return (GstClockTime) level.time;
 }
 
 static float
@@ -1210,34 +1245,6 @@ gst_dash_demux_get_buffering_ratio (GstDashDemux * demux)
     return buffering_time / demux->min_buffering_time;
 }
 
-static GstBuffer *
-gst_dash_demux_merge_buffer_list (GstFragment * fragment)
-{
-  GstBufferList *list;
-  GstBufferListIterator *it;
-  GstBuffer *buffer, *ret = NULL;
-  GstAdapter *adapter;
-  gsize size;
-
-  adapter = gst_adapter_new ();
-  list = gst_fragment_get_buffer_list (fragment);
-  it = gst_buffer_list_iterate (list);
-  while (gst_buffer_list_iterator_next_group (it)) {
-    while ((buffer = gst_buffer_list_iterator_next (it)) != NULL) {
-      gst_adapter_push (adapter, gst_buffer_ref (buffer));
-    }
-  }
-  gst_buffer_list_iterator_free (it);
-  gst_buffer_list_unref (list);
-  size = gst_adapter_available (adapter);
-  if (size > 0)
-    ret = gst_adapter_take_buffer (adapter, size);
-  GST_DEBUG ("Extracted a buffer of size %d from the fragment", size);
-  g_object_unref (adapter);
-
-  return ret;
-}
-
 /* gst_dash_demux_download_loop:
  * 
  * Loop for the "download' task that fetches fragments based on the 
@@ -1301,7 +1308,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
             "Failed to update the manifest file from URL %s",
             demux->client->mpd_uri);
       } else {
-        buffer = gst_dash_demux_merge_buffer_list (download);
+        buffer = gst_fragment_get_buffer (download);
         g_object_unref (download);
         /* parse the manifest file */
         if (buffer == NULL) {
@@ -1554,44 +1561,6 @@ gst_dash_demux_get_next_header (GstDashDemux * demux, guint stream_idx)
   return fragment;
 }
 
-static GstBufferListItem
-gst_dash_demux_add_buffer_cb (GstBuffer ** buffer,
-    guint group, guint idx, gpointer user_data)
-{
-  GstFragment *frag = GST_FRAGMENT (user_data);
-  /* This buffer still belongs to the original fragment */
-  /* so we need to increase refcount */
-  gst_fragment_add_buffer (frag, gst_buffer_ref (*buffer));
-  return GST_BUFFER_LIST_CONTINUE;
-}
-
-/* Since we cannot add headers after the chunk has been downloaded, we have to recreate a new fragment */
-static GstFragment *
-gst_dash_demux_prepend_header (GstDashDemux * demux,
-    GstFragment * frag, GstFragment * header)
-{
-  GstBufferList *list;
-  GstFragment *res = gst_fragment_new ();
-  res->name = g_strdup (frag->name);
-  res->download_start_time = frag->download_start_time;
-  res->download_stop_time = frag->download_stop_time;
-  res->start_time = frag->start_time;
-  res->stop_time = frag->stop_time;
-  res->index = frag->index;
-  res->discontinuous = frag->discontinuous;
-
-  list = gst_fragment_get_buffer_list (header);
-  gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
-  gst_buffer_list_unref (list);
-  list = gst_fragment_get_buffer_list (frag);
-  gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
-  gst_buffer_list_unref (list);
-
-  res->completed = TRUE;
-
-  return res;
-}
-
 static GstCaps *
 gst_dash_demux_get_video_input_caps (GstDashDemux * demux,
     GstActiveStream * stream)
@@ -1749,6 +1718,7 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
     guint stream_idx = stream->index;
+    GstBuffer *buffer;
 
     if (!gst_mpd_client_get_next_fragment (demux->client,
             stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
@@ -1758,7 +1728,10 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
     }
 
     GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
-    GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
+    GST_INFO_OBJECT (demux,
+        "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
+        GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
+        GST_TIME_ARGS (duration));
 
     download = gst_uri_downloader_fetch_uri (demux->downloader,
         next_fragment_uri);
@@ -1767,33 +1740,36 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
     if (download == NULL)
       return FALSE;
 
-    download->start_time = timestamp;
-    download->stop_time = timestamp + duration;
+    buffer = gst_fragment_get_buffer (download);
 
     active_stream =
         gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
-    if (stream == NULL)
+    if (stream == NULL)         /* TODO unref fragments */
       return FALSE;
 
-    download->index = gst_mpd_client_get_segment_index (active_stream) - 1;
-
     if (need_header) {
       /* We need to fetch a new header */
       if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
         GST_INFO_OBJECT (demux, "Unable to fetch header");
       } else {
+        GstBuffer *header_buffer;
         /* Replace fragment with a new one including the header */
-        GstFragment *new_fragment =
-            gst_dash_demux_prepend_header (demux, download, header);
-        g_object_unref (header);
-        g_object_unref (download);
-        download = new_fragment;
+
+        header_buffer = gst_fragment_get_buffer (header);
+        buffer = gst_buffer_join (header_buffer, buffer);
       }
     }
 
-    gst_fragment_set_caps (download, stream->input_caps);
-    g_queue_push_tail (stream->queue, download);
-    size_buffer += gst_fragment_get_buffer_size (download);
+    buffer = gst_buffer_make_metadata_writable (buffer);
+
+    GST_BUFFER_TIMESTAMP (buffer) = timestamp;
+    GST_BUFFER_DURATION (buffer) = duration;
+    GST_BUFFER_OFFSET (buffer) =
+        gst_mpd_client_get_segment_index (active_stream) - 1;
+
+    gst_buffer_set_caps (buffer, stream->input_caps);
+    gst_dash_demux_stream_push_data (stream, buffer);
+    size_buffer += GST_BUFFER_SIZE (buffer);
   }
 
   /* Wake the download task up */
index 3fd8a3f..095f941 100644 (file)
@@ -32,8 +32,7 @@
 
 #include <gst/gst.h>
 #include <gst/base/gstadapter.h>
-#include <gst/gst.h>
-#include <gst/base/gstadapter.h>
+#include <gst/base/gstdataqueue.h>
 #include "gstmpdparser.h"
 #include "gstfragmented.h"
 #include "gsturidownloader.h"
@@ -53,7 +52,6 @@ G_BEGIN_DECLS
 typedef struct _GstDashDemuxStream GstDashDemuxStream;
 typedef struct _GstDashDemux GstDashDemux;
 typedef struct _GstDashDemuxClass GstDashDemuxClass;
-#define MAX_LANGUAGES 20
 
 struct _GstDashDemuxStream
 {
@@ -64,7 +62,7 @@ struct _GstDashDemuxStream
   GstCaps *output_caps;
   GstCaps *input_caps;
 
-  GQueue *queue;
+  GstDataQueue *queue;
 };
 
 /**
index f76f7d1..4be9630 100644 (file)
@@ -20,7 +20,7 @@
  */
 
 #include <glib.h>
-#include <gst/base/gsttypefindhelper.h>
+#include <gst/base/gstadapter.h>
 #include "gstfragmented.h"
 #include "gstfragment.h"
 
@@ -35,18 +35,13 @@ enum
   PROP_NAME,
   PROP_DURATION,
   PROP_DISCONTINOUS,
-  PROP_BUFFER_LIST,
-  PROP_CAPS,
   PROP_LAST
 };
 
 struct _GstFragmentPrivate
 {
-  GstBufferList *buffer_list;
-  guint64 size;
-  GstBufferListIterator *buffer_iterator;
-  GstCaps *caps;
-  GMutex lock;
+  GstAdapter *adapter;
+  GstBuffer *buffer;
 };
 
 G_DEFINE_TYPE (GstFragment, gst_fragment, G_TYPE_OBJECT);
@@ -55,24 +50,6 @@ static void gst_fragment_dispose (GObject * object);
 static void gst_fragment_finalize (GObject * object);
 
 static void
-gst_fragment_set_property (GObject * object,
-    guint property_id, const GValue * value, GParamSpec * pspec)
-{
-  GstFragment *fragment = GST_FRAGMENT (object);
-
-  switch (property_id) {
-    case PROP_CAPS:
-      gst_fragment_set_caps (fragment, g_value_get_boxed (value));
-      break;
-
-    default:
-      /* We don't have any other property... */
-      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
-      break;
-  }
-}
-
-static void
 gst_fragment_get_property (GObject * object,
     guint property_id, GValue * value, GParamSpec * pspec)
 {
@@ -95,14 +72,6 @@ gst_fragment_get_property (GObject * object,
       g_value_set_boolean (value, fragment->discontinuous);
       break;
 
-    case PROP_BUFFER_LIST:
-      g_value_set_object (value, gst_fragment_get_buffer_list (fragment));
-      break;
-
-    case PROP_CAPS:
-      g_value_set_boxed (value, gst_fragment_get_caps (fragment));
-      break;
-
     default:
       /* We don't have any other property... */
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
@@ -110,8 +79,6 @@ gst_fragment_get_property (GObject * object,
   }
 }
 
-
-
 static void
 gst_fragment_class_init (GstFragmentClass * klass)
 {
@@ -119,7 +86,6 @@ gst_fragment_class_init (GstFragmentClass * klass)
 
   g_type_class_add_private (klass, sizeof (GstFragmentPrivate));
 
-  gobject_class->set_property = gst_fragment_set_property;
   gobject_class->get_property = gst_fragment_get_property;
   gobject_class->dispose = gst_fragment_dispose;
   gobject_class->finalize = gst_fragment_finalize;
@@ -140,16 +106,6 @@ gst_fragment_class_init (GstFragmentClass * klass)
   g_object_class_install_property (gobject_class, PROP_DURATION,
       g_param_spec_uint64 ("duration", "Fragment duration",
           "Duration of the fragment", 0, G_MAXUINT64, 0, G_PARAM_READABLE));
-
-  g_object_class_install_property (gobject_class, PROP_BUFFER_LIST,
-      g_param_spec_object ("buffer-list", "Buffer List",
-          "A list with the fragment's buffers", GST_TYPE_FRAGMENT,
-          G_PARAM_READABLE));
-
-  g_object_class_install_property (gobject_class, PROP_CAPS,
-      g_param_spec_boxed ("caps", "Fragment caps",
-          "The caps of the fragment's buffer. (NULL = detect)", GST_TYPE_CAPS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
@@ -159,12 +115,8 @@ gst_fragment_init (GstFragment * fragment)
 
   fragment->priv = priv = GST_FRAGMENT_GET_PRIVATE (fragment);
 
-  g_mutex_init (&fragment->priv->lock);
-  priv->buffer_list = gst_buffer_list_new ();
-  priv->size = 0;
-  priv->buffer_iterator = gst_buffer_list_iterate (priv->buffer_list);
-  gst_buffer_list_iterator_add_group (priv->buffer_iterator);
-  fragment->download_start_time = g_get_real_time ();
+  priv->adapter = gst_adapter_new ();
+  fragment->download_start_time = gst_util_get_timestamp ();
   fragment->start_time = 0;
   fragment->stop_time = 0;
   fragment->index = 0;
@@ -185,7 +137,6 @@ gst_fragment_finalize (GObject * gobject)
   GstFragment *fragment = GST_FRAGMENT (gobject);
 
   g_free (fragment->name);
-  g_mutex_clear (&fragment->priv->lock);
 
   G_OBJECT_CLASS (gst_fragment_parent_class)->finalize (gobject);
 }
@@ -195,74 +146,34 @@ gst_fragment_dispose (GObject * object)
 {
   GstFragmentPrivate *priv = GST_FRAGMENT (object)->priv;
 
-  if (priv->buffer_list != NULL) {
-    gst_buffer_list_iterator_free (priv->buffer_iterator);
-    gst_buffer_list_unref (priv->buffer_list);
-    priv->buffer_list = NULL;
-    priv->size = 0;
+  if (priv->adapter) {
+    gst_object_unref (priv->adapter);
+    priv->adapter = NULL;
   }
-
-  if (priv->caps != NULL) {
-    gst_caps_unref (priv->caps);
-    priv->caps = NULL;
+  if (priv->buffer) {
+    gst_buffer_unref (priv->buffer);
+    priv->buffer = NULL;
   }
 
   G_OBJECT_CLASS (gst_fragment_parent_class)->dispose (object);
 }
 
-GstBufferList *
-gst_fragment_get_buffer_list (GstFragment * fragment)
-{
-  g_return_val_if_fail (fragment != NULL, NULL);
-
-  if (!fragment->completed)
-    return NULL;
-
-  gst_buffer_list_ref (fragment->priv->buffer_list);
-  return fragment->priv->buffer_list;
-}
-
-void
-gst_fragment_set_caps (GstFragment * fragment, GstCaps * caps)
-{
-  g_return_if_fail (fragment != NULL);
-
-  g_mutex_lock (&fragment->priv->lock);
-  gst_caps_replace (&fragment->priv->caps, caps);
-  g_mutex_unlock (&fragment->priv->lock);
-}
-
-GstCaps *
-gst_fragment_get_caps (GstFragment * fragment)
+GstBuffer *
+gst_fragment_get_buffer (GstFragment * fragment)
 {
   g_return_val_if_fail (fragment != NULL, NULL);
 
   if (!fragment->completed)
     return NULL;
 
-  g_mutex_lock (&fragment->priv->lock);
-  if (fragment->priv->caps == NULL) {
-    GstBuffer *buf = gst_buffer_list_get (fragment->priv->buffer_list, 0, 0);
-    fragment->priv->caps = gst_type_find_helper_for_buffer (NULL, buf, NULL);
+  if (!fragment->priv->buffer) {
+    fragment->priv->buffer = gst_adapter_take_buffer (fragment->priv->adapter,
+        gst_adapter_available (fragment->priv->adapter));
   }
-  gst_caps_ref (fragment->priv->caps);
-  g_mutex_unlock (&fragment->priv->lock);
 
-  return fragment->priv->caps;
+  return gst_buffer_ref (fragment->priv->buffer);
 }
 
-guint64
-gst_fragment_get_buffer_size (GstFragment * fragment)
-{
-  g_return_val_if_fail (fragment != NULL, 0);
-
-  if (!fragment->completed)
-    return 0;
-  return fragment->priv->size;
-}
-
-
-
 gboolean
 gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer)
 {
@@ -274,8 +185,20 @@ gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer)
     return FALSE;
   }
 
-  gst_buffer_list_iterator_add (fragment->priv->buffer_iterator, buffer);
-  fragment->priv->size = fragment->priv->size + GST_BUFFER_SIZE (buffer);
+  GST_DEBUG ("Adding new buffer to the fragment");
+  /* We steal the buffers you pass in */
+  gst_adapter_push (fragment->priv->adapter, buffer);
+
   return TRUE;
 }
 
+gsize
+gst_fragment_get_total_size (GstFragment * fragment)
+{
+  g_return_val_if_fail (GST_IS_FRAGMENT (fragment), 0);
+
+  if (fragment->priv->buffer)
+    return GST_BUFFER_SIZE (fragment->priv->buffer);
+
+  return gst_adapter_available (fragment->priv->adapter);
+}
index 9078c25..637600d 100644 (file)
@@ -60,11 +60,10 @@ struct _GstFragmentClass
 
 GType gst_fragment_get_type (void);
 
-guint64 gst_fragment_get_buffer_size (GstFragment * fragment);
-GstBufferList * gst_fragment_get_buffer_list (GstFragment *fragment);
-void gst_fragment_set_caps (GstFragment * fragment, GstCaps * caps);
-GstCaps * gst_fragment_get_caps (GstFragment * fragment);
+GstBuffer * gst_fragment_get_buffer (GstFragment *fragment);
+gboolean gst_fragment_set_headers (GstFragment *fragment, GstBuffer **buffer, guint count);
 gboolean gst_fragment_add_buffer (GstFragment *fragment, GstBuffer *buffer);
+gsize gst_fragment_get_total_size (GstFragment * fragment);
 GstFragment * gst_fragment_new (void);
 
 G_END_DECLS
index 25f6c1b..3d464c5 100644 (file)
@@ -3256,7 +3256,7 @@ gst_mpd_client_get_next_fragment_duration (GstMpdClient * client)
   GstActiveStream *stream;
   GstMediaSegment *media_segment;
 
-  GST_WARNING ("Stream index: %i", client->stream_idx);
+  GST_DEBUG ("Stream index: %i", client->stream_idx);
   stream = g_list_nth_data (client->active_streams, client->stream_idx);
   g_return_val_if_fail (stream != NULL, 0);