Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / gst / adder / gstadder.c
index 10fa22f..3c4b0f0 100644 (file)
@@ -44,6 +44,7 @@
 #include "config.h"
 #endif
 #include "gstadder.h"
+#include <gst/audio/audio.h>
 #include <string.h>             /* strcmp */
 #include "gstadderorc.h"
 
@@ -122,10 +123,13 @@ static void gst_adder_release_pad (GstElement * element, GstPad * pad);
 static GstStateChangeReturn gst_adder_change_state (GstElement * element,
     GstStateChange transition);
 
-static GstBuffer *gst_adder_do_clip (GstCollectPads * pads,
-    GstCollectData * data, GstBuffer * buffer, gpointer user_data);
-static GstFlowReturn gst_adder_collected (GstCollectPads * pads,
+static GstFlowReturn gst_adder_do_clip (GstCollectPads2 * pads,
+    GstCollectData2 * data, GstBuffer * buffer, GstBuffer ** out,
     gpointer user_data);
+static GstFlowReturn gst_adder_collected (GstCollectPads2 * pads,
+    gpointer user_data);
+static gboolean gst_adder_event (GstCollectPads2 * pads, GstCollectData2 * pad,
+    GstEvent * event, gpointer user_data);
 
 /* non-clipping versions (for float) */
 #define MAKE_FUNC_NC(name,type)                                 \
@@ -678,12 +682,14 @@ gst_adder_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       /* check if we are flushing */
       if (flush) {
-        /* make sure we accept nothing anymore and return WRONG_STATE */
-        gst_collect_pads_set_flushing (adder->collect, TRUE);
-
         /* flushing seek, start flush downstream, the flush will be done
-         * when all pads received a FLUSH_STOP. */
+         * when all pads received a FLUSH_STOP.
+         * Make sure we accept nothing anymore and return WRONG_STATE.
+         * We send a flush-start before, to ensure no streaming is done
+         * as we need to take the stream lock.
+         */
         gst_pad_push_event (adder->srcpad, gst_event_new_flush_start ());
+        gst_collect_pads2_set_flushing (adder->collect, TRUE);
 
         /* We can't send FLUSH_STOP here since upstream could start pushing data
          * after we unlock adder->collect.
@@ -698,7 +704,7 @@ gst_adder_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
       /* now wait for the collected to be finished and mark a new
        * segment. After we have the lock, no collect function is running and no
        * new collect function will be called for as long as we're flushing. */
-      GST_OBJECT_LOCK (adder->collect);
+      GST_COLLECT_PADS2_STREAM_LOCK (adder->collect);
       adder->segment.rate = rate;
       if (curtype == GST_SEEK_TYPE_SET)
         adder->segment.start = cur;
@@ -711,9 +717,9 @@ gst_adder_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (flush) {
         /* Yes, we need to call _set_flushing again *WHEN* the streaming threads
          * have stopped so that the cookie gets properly updated. */
-        gst_collect_pads_set_flushing (adder->collect, TRUE);
+        gst_collect_pads2_set_flushing (adder->collect, TRUE);
       }
-      GST_OBJECT_UNLOCK (adder->collect);
+      GST_COLLECT_PADS2_STREAM_UNLOCK (adder->collect);
       GST_DEBUG_OBJECT (adder, "forwarding seek event: %" GST_PTR_FORMAT,
           event);
 
@@ -776,29 +782,27 @@ gst_adder_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       goto beach;
     }
     case GST_EVENT_FLUSH_STOP:
-      /* we received a flush-stop. The collect_event function will push the
-       * event past our element. We simply forward all flush-stop events, even
-       * when no flush-stop was pending, this is required because collectpads
-       * does not provide an API to handle-but-not-forward the flush-stop.
-       * We unset the pending flush-stop flag so that we don't send anymore
-       * flush-stop from the collect function later.
+      /* we received a flush-stop. The collect_event function will call the
+       * gst_adder_event function we have set on the GstCollectPads2, so we
+       * have control over whether the event is sent past our element.
+       * We will only forward it when flush_stop_pending is set, and we will
+       * unset it then.
        */
-      GST_OBJECT_LOCK (adder->collect);
+      GST_COLLECT_PADS2_STREAM_LOCK (adder->collect);
       g_atomic_int_set (&adder->new_segment_pending, TRUE);
-      g_atomic_int_set (&adder->flush_stop_pending, FALSE);
       /* Clear pending tags */
       if (adder->pending_events) {
         g_list_foreach (adder->pending_events, (GFunc) gst_event_unref, NULL);
         g_list_free (adder->pending_events);
         adder->pending_events = NULL;
       }
-      GST_OBJECT_UNLOCK (adder->collect);
+      GST_COLLECT_PADS2_STREAM_UNLOCK (adder->collect);
       break;
     case GST_EVENT_TAG:
-      GST_OBJECT_LOCK (adder->collect);
+      GST_COLLECT_PADS2_STREAM_LOCK (adder->collect);
       /* collect tags here so we can push them out when we collect data */
       adder->pending_events = g_list_append (adder->pending_events, event);
-      GST_OBJECT_UNLOCK (adder->collect);
+      GST_COLLECT_PADS2_STREAM_UNLOCK (adder->collect);
       goto beach;
     case GST_EVENT_SEGMENT:
       if (g_atomic_int_compare_and_exchange (&adder->wait_for_new_segment,
@@ -812,7 +816,7 @@ gst_adder_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       break;
   }
 
-  /* now GstCollectPads can take care of the rest, e.g. EOS */
+  /* now GstCollectPads2 can take care of the rest, e.g. EOS */
   ret = adder->collect_event (pad, parent, event);
 
 beach:
@@ -879,11 +883,13 @@ gst_adder_init (GstAdder * adder)
   adder->filter_caps = NULL;
 
   /* keep track of the sinkpads requested */
-  adder->collect = gst_collect_pads_new ();
-  gst_collect_pads_set_function (adder->collect,
+  adder->collect = gst_collect_pads2_new ();
+  gst_collect_pads2_set_function (adder->collect,
       GST_DEBUG_FUNCPTR (gst_adder_collected), adder);
-  gst_collect_pads_set_clip_function (adder->collect,
+  gst_collect_pads2_set_clip_function (adder->collect,
       GST_DEBUG_FUNCPTR (gst_adder_do_clip), adder);
+  gst_collect_pads2_set_event_function (adder->collect,
+      GST_DEBUG_FUNCPTR (gst_adder_event), adder);
 }
 
 static void
@@ -985,11 +991,10 @@ gst_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
   g_free (name);
 
   gst_pad_set_query_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_sink_query));
-  gst_collect_pads_add_pad (adder->collect, newpad, sizeof (GstCollectData),
-      NULL);
+  gst_collect_pads2_add_pad (adder->collect, newpad, sizeof (GstCollectData2));
 
   /* FIXME: hacked way to override/extend the event function of
-   * GstCollectPads; because it sets its own event function giving the
+   * GstCollectPads2; because it sets its own event function giving the
    * element no access to events */
   adder->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
   gst_pad_set_event_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_sink_event));
@@ -1009,7 +1014,7 @@ not_sink:
 could_not_add:
   {
     GST_DEBUG_OBJECT (adder, "could not add pad");
-    gst_collect_pads_remove_pad (adder->collect, newpad);
+    gst_collect_pads2_remove_pad (adder->collect, newpad);
     gst_object_unref (newpad);
     return NULL;
   }
@@ -1024,13 +1029,13 @@ gst_adder_release_pad (GstElement * element, GstPad * pad)
 
   GST_DEBUG_OBJECT (adder, "release pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
-  gst_collect_pads_remove_pad (adder->collect, pad);
+  gst_collect_pads2_remove_pad (adder->collect, pad);
   gst_element_remove_pad (element, pad);
 }
 
-static GstBuffer *
-gst_adder_do_clip (GstCollectPads * pads, GstCollectData * data,
-    GstBuffer * buffer, gpointer user_data)
+static GstFlowReturn
+gst_adder_do_clip (GstCollectPads2 * pads, GstCollectData2 * data,
+    GstBuffer * buffer, GstBuffer ** out, gpointer user_data)
 {
   GstAdder *adder = GST_ADDER (user_data);
   gint rate, bpf;
@@ -1040,11 +1045,12 @@ gst_adder_do_clip (GstCollectPads * pads, GstCollectData * data,
 
   buffer = gst_audio_buffer_clip (buffer, &data->segment, rate, bpf);
 
-  return buffer;
+  *out = buffer;
+  return GST_FLOW_OK;
 }
 
 static GstFlowReturn
-gst_adder_collected (GstCollectPads * pads, gpointer user_data)
+gst_adder_collected (GstCollectPads2 * pads, gpointer user_data)
 {
   /*
    * combine streams by adding data values
@@ -1086,7 +1092,7 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
 
   /* get available bytes for reading, this can be 0 which could mean empty
    * buffers or EOS, which we will catch when we loop over the pads. */
-  outsize = gst_collect_pads_available (pads);
+  outsize = gst_collect_pads2_available (pads);
   /* can only happen when no pads to collect or all EOS */
   if (outsize == 0)
     goto eos;
@@ -1100,18 +1106,18 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
       outsize, bps, bpf);
 
   for (collected = pads->data; collected; collected = next) {
-    GstCollectData *collect_data;
+    GstCollectData2 *collect_data;
     GstBuffer *inbuf;
     gboolean is_gap;
 
     /* take next to see if this is the last collectdata */
     next = g_slist_next (collected);
 
-    collect_data = (GstCollectData *) collected->data;
+    collect_data = (GstCollectData2 *) collected->data;
 
     /* get a buffer of size bytes, if we get a buffer, it is at least outsize
      * bytes big. */
-    inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
+    inbuf = gst_collect_pads2_take_buffer (pads, collect_data, outsize);
     /* NULL means EOS or an empty buffer so we still need to flush in
      * case of an empty buffer. */
     if (inbuf == NULL) {
@@ -1286,6 +1292,24 @@ eos:
   }
 }
 
+static gboolean
+gst_adder_event (GstCollectPads2 * pads, GstCollectData2 * pad,
+    GstEvent * event, gpointer user_data)
+{
+  GstAdder *adder = GST_ADDER (user_data);
+  if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) {
+    if (g_atomic_int_compare_and_exchange (&adder->flush_stop_pending,
+            TRUE, FALSE)) {
+      return FALSE;
+    } else {
+      gst_event_unref (event);
+      return TRUE;
+    }
+  } else {
+    return FALSE;
+  }
+}
+
 static GstStateChangeReturn
 gst_adder_change_state (GstElement * element, GstStateChange transition)
 {
@@ -1304,14 +1328,14 @@ gst_adder_change_state (GstElement * element, GstStateChange transition)
       adder->new_segment_pending = TRUE;
       adder->wait_for_new_segment = FALSE;
       gst_segment_init (&adder->segment, GST_FORMAT_TIME);
-      gst_collect_pads_start (adder->collect);
+      gst_collect_pads2_start (adder->collect);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      /* need to unblock the collectpads before calling the
+      /* need to unblock the collectpads2 before calling the
        * parent change_state so that streaming can finish */
-      gst_collect_pads_stop (adder->collect);
+      gst_collect_pads2_stop (adder->collect);
       break;
     default:
       break;