concat: Add new element that concatenates multiple streams
authorSebastian Dröge <sebastian@centricular.com>
Thu, 7 Aug 2014 12:42:44 +0000 (14:42 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Tue, 12 Aug 2014 12:38:56 +0000 (15:38 +0300)
https://bugzilla.gnome.org/show_bug.cgi?id=734470

plugins/elements/Makefile.am
plugins/elements/gstconcat.c [new file with mode: 0644]
plugins/elements/gstconcat.h [new file with mode: 0644]
plugins/elements/gstelements.c

index 3821b3c..6d3857c 100644 (file)
@@ -4,6 +4,7 @@ plugin_LTLIBRARIES = libgstcoreelements.la
 libgstcoreelements_la_DEPENDENCIES = $(top_builddir)/gst/libgstreamer-@GST_API_VERSION@.la
 libgstcoreelements_la_SOURCES =        \
        gstcapsfilter.c         \
+       gstconcat.c             \
        gstdownloadbuffer.c     \
        gstelements.c           \
        gstelements_private.c   \
@@ -34,6 +35,7 @@ libgstcoreelements_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
 
 noinst_HEADERS =               \
        gstcapsfilter.h         \
+       gstconcat.h             \
        gstdownloadbuffer.h     \
        gstelements_private.h   \
        gstfakesink.h           \
diff --git a/plugins/elements/gstconcat.c b/plugins/elements/gstconcat.c
new file mode 100644 (file)
index 0000000..006b190
--- /dev/null
@@ -0,0 +1,673 @@
+/* GStreamer concat element
+ *
+ *  Copyright (c) 2014 Sebastian Dröge <sebastian@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstconcat.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_concat_debug);
+#define GST_CAT_DEFAULT gst_concat_debug
+
+G_GNUC_INTERNAL GType gst_concat_pad_get_type (void);
+
+#define GST_TYPE_CONCAT_PAD (gst_concat_pad_get_type())
+#define GST_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_CONCAT_PAD, GstConcatPad))
+#define GST_CONCAT_PAD_CAST(obj) ((GstConcatPad *)(obj))
+#define GST_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_CONCAT_PAD, GstConcatPadClass))
+#define GST_IS_CONCAT_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_CONCAT_PAD))
+#define GST_IS_CONCAT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_CONCAT_PAD))
+
+typedef struct _GstConcatPad GstConcatPad;
+typedef struct _GstConcatPadClass GstConcatPadClass;
+
+struct _GstConcatPad
+{
+  GstPad parent;
+
+  GstSegment segment;
+
+  /* Protected by the concat lock */
+  gboolean flushing;
+};
+
+struct _GstConcatPadClass
+{
+  GstPadClass parent;
+};
+
+G_DEFINE_TYPE (GstConcatPad, gst_concat_pad, GST_TYPE_PAD);
+
+static void
+gst_concat_pad_class_init (GstConcatPadClass * klass)
+{
+}
+
+static void
+gst_concat_pad_init (GstConcatPad * self)
+{
+  gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
+  self->flushing = FALSE;
+}
+
+static GstStaticPadTemplate concat_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink_%u",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS_ANY);
+
+static GstStaticPadTemplate concat_src_template =
+GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+#define _do_init \
+  GST_DEBUG_CATEGORY_INIT (gst_concat_debug, "concat", 0, "concat element");
+#define gst_concat_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstConcat, gst_concat, GST_TYPE_ELEMENT, _do_init);
+
+static void gst_concat_dispose (GObject * object);
+static void gst_concat_finalize (GObject * object);
+
+static GstStateChangeReturn gst_concat_change_state (GstElement * element,
+    GstStateChange transition);
+static GstPad *gst_concat_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
+static void gst_concat_release_pad (GstElement * element, GstPad * pad);
+
+static GstFlowReturn gst_concat_sink_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer);
+static gboolean gst_concat_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_concat_sink_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
+
+static gboolean gst_concat_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_concat_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
+
+static gboolean gst_concat_switch_pad (GstConcat * self);
+
+static void
+gst_concat_class_init (GstConcatClass * klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+
+  gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_concat_dispose);
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_concat_finalize);
+
+  gst_element_class_set_static_metadata (gstelement_class,
+      "Concat", "Generic", "Concatenate multiple streams",
+      "Sebastian Dröge <sebastian@centricular.com>");
+
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&concat_sink_template));
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&concat_src_template));
+
+  gstelement_class->request_new_pad =
+      GST_DEBUG_FUNCPTR (gst_concat_request_new_pad);
+  gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_concat_release_pad);
+  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_concat_change_state);
+}
+
+static void
+gst_concat_init (GstConcat * self)
+{
+  g_mutex_init (&self->lock);
+  g_cond_init (&self->cond);
+
+  self->srcpad = gst_pad_new_from_static_template (&concat_src_template, "src");
+  gst_pad_set_event_function (self->srcpad,
+      GST_DEBUG_FUNCPTR (gst_concat_src_event));
+  gst_pad_set_query_function (self->srcpad,
+      GST_DEBUG_FUNCPTR (gst_concat_src_query));
+  gst_pad_use_fixed_caps (self->srcpad);
+
+  gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
+}
+
+static void
+gst_concat_dispose (GObject * object)
+{
+  GstConcat *self = GST_CONCAT (object);
+  GList *item;
+
+  gst_object_replace ((GstObject **) & self->current_sinkpad, NULL);
+
+restart:
+  for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
+    GstPad *pad = GST_PAD (item->data);
+
+    if (GST_PAD_IS_SINK (pad)) {
+      gst_element_release_request_pad (GST_ELEMENT (object), pad);
+      goto restart;
+    }
+  }
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
+gst_concat_finalize (GObject * object)
+{
+  GstConcat *self = GST_CONCAT (object);
+
+  g_mutex_clear (&self->lock);
+  g_cond_clear (&self->cond);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstPad *
+gst_concat_request_new_pad (GstElement * element, GstPadTemplate * templ,
+    const gchar * name, const GstCaps * caps)
+{
+  GstConcat *self = GST_CONCAT (element);
+  GstPad *sinkpad;
+  gchar *pad_name;
+
+  GST_DEBUG_OBJECT (element, "requesting pad");
+
+  g_mutex_lock (&self->lock);
+  pad_name = g_strdup_printf ("sink_%u", self->pad_count);
+  self->pad_count++;
+  g_mutex_unlock (&self->lock);
+
+  sinkpad = GST_PAD_CAST (g_object_new (GST_TYPE_CONCAT_PAD,
+          "name", pad_name, "direction", templ->direction, "template", templ,
+          NULL));
+  g_free (pad_name);
+
+  gst_pad_set_chain_function (sinkpad,
+      GST_DEBUG_FUNCPTR (gst_concat_sink_chain));
+  gst_pad_set_event_function (sinkpad,
+      GST_DEBUG_FUNCPTR (gst_concat_sink_event));
+  gst_pad_set_query_function (sinkpad,
+      GST_DEBUG_FUNCPTR (gst_concat_sink_query));
+  GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
+  GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
+
+  gst_pad_set_active (sinkpad, TRUE);
+
+  g_mutex_lock (&self->lock);
+  self->sinkpads = g_list_prepend (self->sinkpads, gst_object_ref (sinkpad));
+  if (!self->current_sinkpad)
+    self->current_sinkpad = gst_object_ref (sinkpad);
+  g_mutex_unlock (&self->lock);
+
+  gst_element_add_pad (element, sinkpad);
+
+  return sinkpad;
+}
+
+static void
+gst_concat_release_pad (GstElement * element, GstPad * pad)
+{
+  GstConcat *self = GST_CONCAT (element);
+  GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
+  GList *l;
+  gboolean current_pad_removed = FALSE;
+  gboolean eos = FALSE;
+
+  GST_DEBUG_OBJECT (self, "releasing pad");
+
+  g_mutex_lock (&self->lock);
+  spad->flushing = TRUE;
+  g_cond_broadcast (&self->cond);
+  g_mutex_unlock (&self->lock);
+
+  gst_pad_set_active (pad, FALSE);
+
+  /* Now the pad is definitely not running anymore */
+
+  g_mutex_lock (&self->lock);
+  if (self->current_sinkpad == GST_PAD_CAST (spad)) {
+    eos = ! !gst_concat_switch_pad (self);
+    current_pad_removed = TRUE;
+  }
+
+  for (l = self->sinkpads; l; l = l->next) {
+    if ((gpointer) spad == l->data) {
+      gst_object_unref (spad);
+      self->sinkpads = g_list_delete_link (self->sinkpads, l);
+      break;
+    }
+  }
+  g_mutex_unlock (&self->lock);
+
+  gst_element_remove_pad (GST_ELEMENT_CAST (self), pad);
+
+  if (GST_STATE (self) > GST_STATE_READY) {
+    if (current_pad_removed && !eos)
+      gst_element_post_message (GST_ELEMENT_CAST (self),
+          gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
+
+    /* FIXME: Sending EOS from application thread */
+    if (eos)
+      gst_pad_push_event (self->srcpad, gst_event_new_eos ());
+  }
+}
+
+/* Returns FALSE if flushing
+ * Must be called from the pad's streaming thread
+ */
+static gboolean
+gst_concat_pad_wait (GstConcatPad * spad, GstConcat * self)
+{
+  g_mutex_lock (&self->lock);
+  if (spad->flushing) {
+    g_mutex_unlock (&self->lock);
+    GST_DEBUG_OBJECT (spad, "Flushing");
+    return FALSE;
+  }
+
+  while (spad != GST_CONCAT_PAD_CAST (self->current_sinkpad)) {
+    GST_TRACE_OBJECT (spad, "Not the current sinkpad - waiting");
+    g_cond_wait (&self->cond, &self->lock);
+    if (spad->flushing) {
+      g_mutex_unlock (&self->lock);
+      GST_DEBUG_OBJECT (spad, "Flushing");
+      return FALSE;
+    }
+  }
+  /* This pad can only become not the current sinkpad from
+   * a) This streaming thread (we hold the stream lock)
+   * b) Releasing the pad (takes the stream lock, see above)
+   *
+   * Unlocking here is thus safe and we can safely push
+   * serialized data to our srcpad
+   */
+  GST_DEBUG_OBJECT (spad, "Now the current sinkpad");
+  g_mutex_unlock (&self->lock);
+
+  return TRUE;
+}
+
+static GstFlowReturn
+gst_concat_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  GstFlowReturn ret;
+  GstConcat *self = GST_CONCAT (parent);
+  GstConcatPad *spad = GST_CONCAT_PAD (pad);
+
+  GST_LOG_OBJECT (pad, "received buffer %p", buffer);
+
+  if (!gst_concat_pad_wait (spad, self))
+    return GST_FLOW_FLUSHING;
+
+  if (self->last_stop == GST_CLOCK_TIME_NONE)
+    self->last_stop = spad->segment.start;
+
+  if (self->format == GST_FORMAT_TIME) {
+    GstClockTime start_time = GST_BUFFER_TIMESTAMP (buffer);
+    GstClockTime end_time = GST_CLOCK_TIME_NONE;
+
+    if (start_time != GST_CLOCK_TIME_NONE)
+      end_time = start_time;
+    if (GST_BUFFER_DURATION_IS_VALID (buffer))
+      end_time += GST_BUFFER_DURATION (buffer);
+
+    if (end_time != GST_CLOCK_TIME_NONE && end_time > self->last_stop)
+      self->last_stop = end_time;
+  } else {
+    self->last_stop += gst_buffer_get_size (buffer);
+  }
+
+  ret = gst_pad_push (self->srcpad, buffer);
+
+  GST_LOG_OBJECT (pad, "handled buffer %s", gst_flow_get_name (ret));
+
+  return ret;
+}
+
+/* Returns FALSE if no further pad, must be called with concat lock */
+static gboolean
+gst_concat_switch_pad (GstConcat * self)
+{
+  GList *l;
+  gboolean next;
+  GstSegment segment;
+  gint64 last_stop;
+
+  segment = GST_CONCAT_PAD (self->current_sinkpad)->segment;
+
+  last_stop = self->last_stop;
+  if (last_stop == GST_CLOCK_TIME_NONE)
+    last_stop = segment.stop;
+  if (last_stop == GST_CLOCK_TIME_NONE)
+    last_stop = segment.start;
+  g_assert (last_stop != GST_CLOCK_TIME_NONE);
+
+  if (last_stop > segment.stop)
+    last_stop = segment.stop;
+
+  if (segment.format == GST_FORMAT_TIME)
+    last_stop =
+        gst_segment_to_running_time (&segment, segment.format, last_stop);
+  else
+    last_stop += segment.start;
+
+  self->current_start_offset += last_stop;
+
+  for (l = self->sinkpads; l; l = l->next) {
+    if ((gpointer) self->current_sinkpad == l->data) {
+      l = l->prev;
+      GST_DEBUG_OBJECT (self,
+          "Switching from pad %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT,
+          self->current_sinkpad, l ? l->data : NULL);
+      gst_object_unref (self->current_sinkpad);
+      self->current_sinkpad = l ? gst_object_ref (l->data) : NULL;
+      g_cond_broadcast (&self->cond);
+      break;
+    }
+  }
+
+  next = self->current_sinkpad != NULL;
+
+  self->last_stop = GST_CLOCK_TIME_NONE;
+
+  return next;
+}
+
+static gboolean
+gst_concat_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  GstConcat *self = GST_CONCAT (parent);
+  GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
+  gboolean ret = TRUE;
+
+  GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_STREAM_START:{
+      if (!gst_concat_pad_wait (spad, self)) {
+        ret = FALSE;
+        gst_event_unref (event);
+      } else {
+        ret = gst_pad_event_default (pad, parent, event);
+      }
+      break;
+    }
+    case GST_EVENT_SEGMENT:{
+      /* Drop segment event, we create our own one */
+      gst_event_copy_segment (event, &spad->segment);
+      gst_event_unref (event);
+
+      g_mutex_lock (&self->lock);
+      if (self->format == GST_FORMAT_UNDEFINED) {
+        if (spad->segment.format != GST_FORMAT_TIME
+            && spad->segment.format != GST_FORMAT_BYTES) {
+          g_mutex_unlock (&self->lock);
+          GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
+              ("Can only operate in TIME or BYTES format"));
+          ret = FALSE;
+          break;
+        }
+        self->format = spad->segment.format;
+        GST_DEBUG_OBJECT (self, "Operating in %s format",
+            gst_format_get_name (self->format));
+        g_mutex_unlock (&self->lock);
+      } else if (self->format != spad->segment.format) {
+        g_mutex_unlock (&self->lock);
+        GST_ELEMENT_ERROR (self, CORE, FAILED, (NULL),
+            ("Operating in %s format but new pad has %s",
+                gst_format_get_name (self->format),
+                gst_format_get_name (spad->segment.format)));
+        ret = FALSE;
+      } else {
+        g_mutex_unlock (&self->lock);
+      }
+
+      if (!gst_concat_pad_wait (spad, self)) {
+        ret = FALSE;
+      } else {
+        GstSegment segment = spad->segment;
+
+        /* We know no duration */
+        segment.duration = -1;
+
+        /* Update segment values to be contiguous with last stream */
+        if (self->format == GST_FORMAT_TIME) {
+          segment.base += self->current_start_offset;
+        } else {
+          /* Shift start/stop byte position */
+          segment.start += self->current_start_offset;
+          if (segment.stop != -1)
+            segment.stop += self->current_start_offset;
+        }
+        gst_pad_push_event (self->srcpad, gst_event_new_segment (&segment));
+      }
+      break;
+    }
+    case GST_EVENT_EOS:{
+      gst_event_unref (event);
+      if (!gst_concat_pad_wait (spad, self)) {
+        ret = FALSE;
+      } else {
+        gboolean next;
+
+        g_mutex_lock (&self->lock);
+        next = gst_concat_switch_pad (self);
+        g_mutex_unlock (&self->lock);
+        ret = TRUE;
+
+        if (!next) {
+          gst_pad_push_event (self->srcpad, gst_event_new_eos ());
+        } else {
+          gst_element_post_message (GST_ELEMENT_CAST (self),
+              gst_message_new_duration_changed (GST_OBJECT_CAST (self)));
+        }
+      }
+      break;
+    }
+    case GST_EVENT_FLUSH_START:{
+      g_mutex_lock (&self->lock);
+      spad->flushing = TRUE;
+      g_cond_broadcast (&self->cond);
+      g_mutex_unlock (&self->lock);
+      break;
+    }
+    case GST_EVENT_FLUSH_STOP:{
+      gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
+      spad->flushing = FALSE;
+      break;
+    }
+    default:{
+      /* Wait for other serialized events before forwarding */
+      if (GST_EVENT_IS_SERIALIZED (event) && !gst_concat_pad_wait (spad, self)) {
+        gst_event_unref (event);
+        ret = FALSE;
+      } else {
+        ret = gst_pad_event_default (pad, parent, event);
+      }
+      break;
+    }
+  }
+
+  return ret;
+}
+
+static gboolean
+gst_concat_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+  GstConcat *self = GST_CONCAT (parent);
+  GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
+  gboolean ret = TRUE;
+
+  GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
+
+  switch (GST_QUERY_TYPE (query)) {
+    default:
+      /* Wait for other serialized queries before forwarding */
+      if (GST_QUERY_IS_SERIALIZED (query) && !gst_concat_pad_wait (spad, self)) {
+        ret = FALSE;
+      } else {
+        ret = gst_pad_query_default (pad, parent, query);
+      }
+      break;
+  }
+
+  return ret;
+}
+
+static gboolean
+gst_concat_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  GstConcat *self = GST_CONCAT (parent);
+  gboolean ret = TRUE;
+
+  GST_LOG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:{
+      /* We don't support seeking */
+      gst_event_unref (event);
+      ret = FALSE;
+      break;
+    }
+    case GST_EVENT_QOS:{
+      GstQOSType type;
+      GstClockTimeDiff diff;
+      GstClockTime timestamp;
+      gdouble proportion;
+
+      gst_event_parse_qos (event, &type, &proportion, &diff, &timestamp);
+      gst_event_unref (event);
+
+      if (timestamp != GST_CLOCK_TIME_NONE
+          && timestamp > self->current_start_offset) {
+        timestamp -= self->current_start_offset;
+        event = gst_event_new_qos (type, proportion, diff, timestamp);
+        ret = gst_pad_push_event (self->current_sinkpad, event);
+      } else {
+        ret = FALSE;
+      }
+      break;
+    }
+    default:
+      ret = gst_pad_event_default (pad, parent, event);
+      break;
+  }
+
+  return ret;
+}
+
+static gboolean
+gst_concat_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+  gboolean ret = TRUE;
+
+  GST_LOG_OBJECT (pad, "received query %" GST_PTR_FORMAT, query);
+
+  switch (GST_QUERY_TYPE (query)) {
+    default:
+      ret = gst_pad_query_default (pad, parent, query);
+      break;
+  }
+
+  return ret;
+}
+
+static void
+reset_pad (const GValue * data, gpointer user_data)
+{
+  GstPad *pad = g_value_get_object (data);
+  GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
+
+  gst_segment_init (&spad->segment, GST_FORMAT_UNDEFINED);
+  spad->flushing = FALSE;
+}
+
+static void
+unblock_pad (const GValue * data, gpointer user_data)
+{
+  GstPad *pad = g_value_get_object (data);
+  GstConcatPad *spad = GST_CONCAT_PAD_CAST (pad);
+
+  spad->flushing = TRUE;
+}
+
+static GstStateChangeReturn
+gst_concat_change_state (GstElement * element, GstStateChange transition)
+{
+  GstConcat *self = GST_CONCAT (element);
+  GstStateChangeReturn ret;
+
+  switch (transition) {
+    case GST_STATE_CHANGE_READY_TO_PAUSED:{
+      self->format = GST_FORMAT_UNDEFINED;
+      self->current_start_offset = 0;
+      self->last_stop = GST_CLOCK_TIME_NONE;
+      break;
+    }
+    case GST_STATE_CHANGE_PAUSED_TO_READY:{
+      GstIterator *iter = gst_element_iterate_sink_pads (element);
+      GstIteratorResult res;
+
+      g_mutex_lock (&self->lock);
+      do {
+        res = gst_iterator_foreach (iter, unblock_pad, NULL);
+      } while (res == GST_ITERATOR_RESYNC);
+
+      gst_iterator_free (iter);
+      g_cond_broadcast (&self->cond);
+      g_mutex_unlock (&self->lock);
+
+      if (res == GST_ITERATOR_ERROR)
+        return GST_STATE_CHANGE_FAILURE;
+
+      break;
+    }
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  if (ret == GST_STATE_CHANGE_FAILURE)
+    return ret;
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PAUSED_TO_READY:{
+      GstIterator *iter = gst_element_iterate_sink_pads (element);
+      GstIteratorResult res;
+
+      do {
+        res = gst_iterator_foreach (iter, reset_pad, NULL);
+      } while (res == GST_ITERATOR_RESYNC);
+
+      gst_iterator_free (iter);
+
+      if (res == GST_ITERATOR_ERROR)
+        return GST_STATE_CHANGE_FAILURE;
+
+      break;
+    }
+    default:
+      break;
+  }
+
+  return ret;
+}
diff --git a/plugins/elements/gstconcat.h b/plugins/elements/gstconcat.h
new file mode 100644 (file)
index 0000000..176e22a
--- /dev/null
@@ -0,0 +1,73 @@
+/* GStreamer concat element
+ *
+ *  Copyright (c) 2014 Sebastian Dröge <sebastian@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ *
+ */
+
+#ifndef __GST_CONCAT_H__
+#define __GST_CONCAT_H__
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_CONCAT (gst_concat_get_type())
+#define GST_CONCAT(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_CONCAT, GstConcat))
+#define GST_CONCAT_CAST(obj) ((GstConcat*)obj)
+#define GST_CONCAT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_CONCAT,GstConcatClass))
+#define GST_IS_CONCAT(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CONCAT))
+#define GST_IS_CONCAT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_CONCAT))
+
+typedef struct _GstConcat GstConcat;
+typedef struct _GstConcatClass GstConcatClass;
+
+/**
+ * GstConcat:
+ *
+ * The private concat structure
+ */
+struct _GstConcat
+{
+  /*< private >*/
+  GstElement parent;
+
+  GMutex lock;
+  GCond cond;
+  GList *sinkpads; /* Last is earliest */
+  GstPad *current_sinkpad;
+  GstPad *srcpad;
+  guint pad_count;
+
+  /* Format we're operating in */
+  GstFormat format;
+  /* In format, running time or accumulated byte offset */
+  guint64 current_start_offset;
+  /* Between current pad's segment start and stop */
+  guint64 last_stop;
+};
+
+struct _GstConcatClass
+{
+  GstElementClass parent_class;
+};
+
+G_GNUC_INTERNAL GType gst_concat_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_CONCAT_H__ */
index 93d3a9a..2b1781d 100644 (file)
@@ -28,6 +28,7 @@
 #include <gst/gst.h>
 
 #include "gstcapsfilter.h"
+#include "gstconcat.h"
 #include "gstdownloadbuffer.h"
 #include "gstfakesink.h"
 #include "gstfakesrc.h"
@@ -52,6 +53,9 @@ plugin_init (GstPlugin * plugin)
   if (!gst_element_register (plugin, "capsfilter", GST_RANK_NONE,
           gst_capsfilter_get_type ()))
     return FALSE;
+  if (!gst_element_register (plugin, "concat", GST_RANK_NONE,
+          gst_concat_get_type ()))
+    return FALSE;
   if (!gst_element_register (plugin, "downloadbuffer", GST_RANK_NONE,
           gst_download_buffer_get_type ()))
     return FALSE;