rndbuffersize: add push mode support
authorTim-Philipp Müller <tim@centricular.net>
Thu, 28 Jun 2012 19:03:05 +0000 (20:03 +0100)
committerTim-Philipp Müller <tim@centricular.net>
Thu, 28 Jun 2012 19:05:09 +0000 (20:05 +0100)
https://bugzilla.gnome.org/show_bug.cgi?id=656317

gst/debugutils/rndbuffersize.c

index 37a887b..243b1c7 100644 (file)
@@ -27,6 +27,7 @@
 #endif
 
 #include <gst/gst.h>
+#include <gst/base/gstadapter.h>
 
 GST_DEBUG_CATEGORY_STATIC (gst_rnd_buffer_size_debug);
 #define GST_CAT_DEFAULT gst_rnd_buffer_size_debug
@@ -53,6 +54,8 @@ struct _GstRndBufferSize
   guint64 offset;
 
   gboolean need_newsegment;
+
+  GstAdapter *adapter;
 };
 
 struct _GstRndBufferSizeClass
@@ -95,6 +98,10 @@ static GstStateChangeReturn gst_rnd_buffer_size_change_state (GstElement *
     element, GstStateChange transition);
 static gboolean gst_rnd_buffer_size_src_event (GstPad * pad,
     GstObject * parent, GstEvent * event);
+static gboolean gst_rnd_buffer_size_sink_event (GstPad * pad,
+    GstObject * parent, GstEvent * event);
+static GstFlowReturn gst_rnd_buffer_size_chain (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
 
 GType gst_rnd_buffer_size_get_type (void);
 #define gst_rnd_buffer_size_parent_class parent_class
@@ -148,6 +155,10 @@ gst_rnd_buffer_size_init (GstRndBufferSize * self)
       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_activate));
   gst_pad_set_activatemode_function (self->sinkpad,
       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_activate_mode));
+  gst_pad_set_event_function (self->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_sink_event));
+  gst_pad_set_chain_function (self->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_chain));
   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
 
   self->srcpad = gst_pad_new_from_static_template (&src_template, "src");
@@ -225,25 +236,19 @@ gst_rnd_buffer_size_activate (GstPad * pad, GstObject * parent)
 
   query = gst_query_new_scheduling ();
 
-  if (!gst_pad_peer_query (pad, query)) {
-    gst_query_unref (query);
-    goto no_pull;
-  }
+  if (gst_pad_peer_query (pad, query))
+    pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
+  else
+    pull_mode = FALSE;
 
-  pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
   gst_query_unref (query);
 
-  if (!pull_mode)
-    goto no_pull;
-
-  GST_DEBUG_OBJECT (pad, "activating pull");
-  return gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE);
-
-  /* ERRORS */
-no_pull:
-  {
-    GST_DEBUG_OBJECT (pad, "pull mode not supported");
-    return FALSE;
+  if (pull_mode) {
+    GST_DEBUG_OBJECT (pad, "activating pull");
+    return gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE);
+  } else {
+    GST_DEBUG_OBJECT (pad, "activating push");
+    return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
   }
 }
 
@@ -268,6 +273,10 @@ gst_rnd_buffer_size_activate_mode (GstPad * pad, GstObject * parent,
         res = gst_pad_stop_task (pad);
       }
       break;
+    case GST_PAD_MODE_PUSH:
+      GST_INFO_OBJECT (self, "%sactivating in push mode", (active) ? "" : "de");
+      res = TRUE;
+      break;
     default:
       res = FALSE;
       break;
@@ -329,6 +338,106 @@ gst_rnd_buffer_size_src_event (GstPad * pad, GstObject * parent,
   return TRUE;
 }
 
+static GstFlowReturn
+gst_rnd_buffer_size_drain_adapter (GstRndBufferSize * self, gboolean eos)
+{
+  GstFlowReturn flow;
+  GstBuffer *buf;
+  guint num_bytes, avail;
+
+  flow = GST_FLOW_OK;
+
+  if (G_UNLIKELY (self->min > self->max))
+    goto bogus_minmax;
+
+  do {
+    if (self->min != self->max) {
+      num_bytes = g_rand_int_range (self->rand, self->min, self->max);
+    } else {
+      num_bytes = self->min;
+    }
+
+    GST_LOG_OBJECT (self, "pulling %u bytes out of adapter", num_bytes);
+
+    buf = gst_adapter_take_buffer (self->adapter, num_bytes);
+
+    if (buf == NULL) {
+      if (!eos) {
+        GST_LOG_OBJECT (self, "not enough bytes in adapter");
+        break;
+      }
+
+      avail = gst_adapter_available (self->adapter);
+
+      if (avail == 0)
+        break;
+
+      if (avail < self->min) {
+        GST_WARNING_OBJECT (self, "discarding %u bytes at end (min=%u)",
+            avail, self->min);
+        gst_adapter_clear (self->adapter);
+        break;
+      }
+      buf = gst_adapter_take_buffer (self->adapter, avail);
+      g_assert (buf != NULL);
+    }
+
+    flow = gst_pad_push (self->srcpad, buf);
+  }
+  while (flow == GST_FLOW_OK);
+
+  return flow;
+
+/* ERRORS */
+bogus_minmax:
+  {
+    GST_ELEMENT_ERROR (self, LIBRARY, SETTINGS,
+        ("The minimum buffer size is smaller than the maximum buffer size."),
+        ("buffer sizes: max=%d, min=%d", self->min, self->max));
+    return GST_FLOW_ERROR;
+  }
+}
+
+static gboolean
+gst_rnd_buffer_size_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
+{
+  GstRndBufferSize *rnd = GST_RND_BUFFER_SIZE (parent);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      gst_rnd_buffer_size_drain_adapter (rnd, TRUE);
+      break;
+    case GST_EVENT_FLUSH_STOP:
+      if (rnd->adapter != NULL)
+        gst_adapter_clear (rnd->adapter);
+      break;
+    default:
+      break;
+  }
+
+  return gst_pad_push_event (rnd->srcpad, event);
+}
+
+static GstFlowReturn
+gst_rnd_buffer_size_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+  GstRndBufferSize *rnd = GST_RND_BUFFER_SIZE (parent);
+  GstFlowReturn flow;
+
+  if (rnd->adapter == NULL)
+    rnd->adapter = gst_adapter_new ();
+
+  gst_adapter_push (rnd->adapter, buf);
+
+  flow = gst_rnd_buffer_size_drain_adapter (rnd, FALSE);
+
+  if (flow != GST_FLOW_OK)
+    GST_INFO_OBJECT (rnd, "flow: %s", gst_flow_get_name (flow));
+
+  return flow;
+}
+
 static void
 gst_rnd_buffer_size_loop (GstRndBufferSize * self)
 {
@@ -452,6 +561,10 @@ gst_rnd_buffer_size_change_state (GstElement * element,
       }
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
+      if (self->adapter) {
+        g_object_unref (self->adapter);
+        self->adapter = NULL;
+      }
       break;
     default:
       break;