Port mplex element to 0.10. Fixes bug #520329.
authorMark Nauwelaerts <manauw@skynet.be>
Wed, 5 Mar 2008 06:03:03 +0000 (06:03 +0000)
committerSebastian Dröge <slomo@circular-chaos.org>
Wed, 5 Mar 2008 06:03:03 +0000 (06:03 +0000)
Original commit message from CVS:
Patch by: Mark Nauwelaerts <manauw at skynet dot be>
* configure.ac:
* ext/Makefile.am:
* ext/mplex/Makefile.am:
* ext/mplex/gstmplex.cc:
* ext/mplex/gstmplex.hh:
* ext/mplex/gstmplexibitstream.cc:
* ext/mplex/gstmplexibitstream.hh:
* ext/mplex/gstmplexjob.cc:
* ext/mplex/gstmplexjob.hh:
* ext/mplex/gstmplexoutputstream.cc:
* ext/mplex/gstmplexoutputstream.hh:
Port mplex element to 0.10. Fixes bug #520329.
* tests/check/Makefile.am:
* tests/check/elements/mplex.c: (test_sink_event), (setup_src_pad),
(teardown_src_pad), (setup_mplex), (cleanup_mplex),
(GST_START_TEST), (mplex_suite), (main):
Add unit test for the mplex element.

14 files changed:
ChangeLog
configure.ac
ext/Makefile.am
ext/mplex/Makefile.am
ext/mplex/gstmplex.cc
ext/mplex/gstmplex.hh
ext/mplex/gstmplexibitstream.cc
ext/mplex/gstmplexibitstream.hh
ext/mplex/gstmplexjob.cc
ext/mplex/gstmplexjob.hh
ext/mplex/gstmplexoutputstream.cc
ext/mplex/gstmplexoutputstream.hh
tests/check/Makefile.am
tests/check/elements/mplex.c [new file with mode: 0644]

index bc4c240..0b33c56 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -3,6 +3,29 @@
        Patch by: Mark Nauwelaerts <manauw at skynet dot be>
 
        * configure.ac:
+       * ext/Makefile.am:
+       * ext/mplex/Makefile.am:
+       * ext/mplex/gstmplex.cc:
+       * ext/mplex/gstmplex.hh:
+       * ext/mplex/gstmplexibitstream.cc:
+       * ext/mplex/gstmplexibitstream.hh:
+       * ext/mplex/gstmplexjob.cc:
+       * ext/mplex/gstmplexjob.hh:
+       * ext/mplex/gstmplexoutputstream.cc:
+       * ext/mplex/gstmplexoutputstream.hh:
+       Port mplex element to 0.10. Fixes bug #520329.
+
+       * tests/check/Makefile.am:
+       * tests/check/elements/mplex.c: (test_sink_event), (setup_src_pad),
+       (teardown_src_pad), (setup_mplex), (cleanup_mplex),
+       (GST_START_TEST), (mplex_suite), (main):
+       Add unit test for the mplex element.
+
+2008-03-05  Sebastian Dröge  <slomo@circular-chaos.org>
+
+       Patch by: Mark Nauwelaerts <manauw at skynet dot be>
+
+       * configure.ac:
        Clean up detection of different mjpegtoolsAPI versions.
 
        * ext/mpeg2enc/gstmpeg2enc.cc:
index 6967dd0..a7ac689 100644 (file)
@@ -710,6 +710,67 @@ AG_GST_CHECK_FEATURE(MPEG2ENC, [mpeg2enc], mpeg2enc, [
   fi
 ])
 
+dnl *** mplex ***
+translit(dnm, m, l) AM_CONDITIONAL(USE_MPLEX, true)
+AG_GST_CHECK_FEATURE(MPLEX, [mplex], mplex, [
+  HAVE_MPLEX="no"
+  dnl we require a c++ compiler for this one
+  if [ test x$HAVE_CXX = xyes ]; then
+    dnl libmplex was first included in mjpegtools-1.6.2-rc4 (1.6.1.93)
+    dnl since many distros include mjpegtools specifically without mplex
+    dnl and mpeg2enc, we check for mplex on its own, too.
+    dnl libmplex < 1.9rc? has fuzzy ABI, valgrind and other problems
+    PKG_CHECK_MODULES(MPLEX, mjpegtools >= 1.9.0, [
+      dnl switch over to c++ to test things
+      AC_LANG_CPLUSPLUS
+      OLD_CPPFLAGS="$CPPFLAGS"
+      CPPFLAGS="$CPPFLAGS $MPLEX_CFLAGS"
+      AC_CHECK_HEADER(interact.hpp, [
+        MPLEX_LIBS="$MPLEX_LIBS -lmplex2 -lm"
+        OLD_LIBS="$LIBS"
+        LIBS="$LIBS $MPLEX_LIBS"
+        dnl older libmplex uses off_t SegmentSize (), which leads to fuzzy ABI;
+        dnl don't want this here
+        AC_MSG_CHECKING([for valid mplex objects])
+        AC_TRY_RUN([
+
+#include <interact.hpp>
+#include <outputstrm.hpp>
+#include <multiplexor.hpp>
+int
+main (int   argc,
+      char *argv[])
+{
+  class TestOutputStream : public OutputStream {
+  public:
+    TestOutputStream () : OutputStream () { }
+    void Write (uint8_t *a, unsigned int b) { }
+    void NextSegment () { }
+    uint64_t SegmentSize () { }
+    void Close () { }
+    int Open () { }
+};
+  MultiplexJob *job = new MultiplexJob ();
+  vector<IBitStream *> inputs;
+  job->SetupInputStreams (inputs);
+  TestOutputStream *out = new TestOutputStream ();
+  Multiplexor *mux = new Multiplexor(*job, *out, NULL);
+  return 0;
+}
+        ],[
+          HAVE_MPLEX="yes"
+          AC_SUBST(MPLEX_CFLAGS)
+          AC_SUBST(MPLEX_LIBS)
+          AC_MSG_RESULT(yes)
+        ], AC_MSG_RESULT(no))
+        LIBS="$OLD_LIBS"
+      ])
+      CPPFLAGS="$OLD_CPPFLAGS"
+      AC_LANG_C
+    ], HAVE_MPLEX="no")
+  fi
+])
+
 dnl *** musepack ***
 translit(dnm, m, l) AM_CONDITIONAL(USE_MUSEPACK, true)
 AG_GST_CHECK_FEATURE(MUSEPACK, [musepackdec], musepack, [
@@ -961,6 +1022,7 @@ AM_CONDITIONAL(USE_JACK, false)
 AM_CONDITIONAL(USE_LADSPA, false)
 AM_CONDITIONAL(USE_LIBMMS, false)
 AM_CONDITIONAL(USE_MPEG2ENC, false)
+AM_CONDITIONAL(USE_MPLEX, false)
 AM_CONDITIONAL(USE_MUSEPACK, false)
 AM_CONDITIONAL(USE_MUSICBRAINZ, false)
 AM_CONDITIONAL(USE_MYTHTV, false)
@@ -1108,6 +1170,7 @@ ext/libmms/Makefile
 ext/Makefile
 ext/nas/Makefile
 ext/mpeg2enc/Makefile
+ext/mplex/Makefile
 ext/musepack/Makefile
 ext/musicbrainz/Makefile
 ext/mythtv/Makefile
index ef399c5..af82344 100644 (file)
@@ -154,11 +154,11 @@ else
 METADATA_DIR=
 endif
 
-if USE_MPLEX
-MPLEX_DIR=mplex
-else
+if USE_MPLEX
+MPLEX_DIR=mplex
+else
 MPLEX_DIR=
-endif
+endif
 
 if USE_MUSEPACK
 MUSEPACK_DIR=musepack
@@ -355,6 +355,7 @@ DIST_SUBDIRS = \
        divx \
        metadata \
        mpeg2enc \
+       mplex \
        musepack \
        musicbrainz \
        mythtv \
index d99b5b7..aa51a99 100644 (file)
@@ -6,8 +6,10 @@ libgstmplex_la_SOURCES = \
        gstmplexjob.cc \
        gstmplexoutputstream.cc
 
-libgstmplex_la_CXXFLAGS = $(GST_CFLAGS) $(MPLEX_CFLAGS)
-libgstmplex_la_LIBADD = $(MPLEX_LIBS)
+libgstmplex_la_CXXFLAGS = \
+       $(GST_PLUGINS_BASE_CFLAGS) $(GST_CXXFLAGS) $(MPLEX_CFLAGS)
+libgstmplex_la_LIBADD = \
+       $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS) $(MPLEX_LIBS)
 libgstmplex_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
 
 noinst_HEADERS = \
index a4c92f9..5368823 100644 (file)
@@ -1,5 +1,6 @@
 /* GStreamer mplex (mjpegtools) wrapper
  * (c) 2003 Ronald Bultje <rbultje@ronald.bitfreak.net>
+ * (c) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
  *
  * gstmplex.cc: gstreamer mplex wrapper
  *
  * Boston, MA 02111-1307, USA.
  */
 
+/**
+ * SECTION:element-mplex
+ * @see_also: mpeg2enc
+ *
+ * <refsect2>
+ * <para>
+ * This element is an audio/video multiplexer for MPEG-1/2 video streams
+ * and (un)compressed audio streams such as AC3, MPEG layer I/II/III.
+ * It is based on the <ulink url="http://mjpeg.sourceforge.net/">mjpegtools</ulink> library.
+ * Documentation on creating MPEG videos in general can be found in the
+ * <ulink url="https://sourceforge.net/docman/display_doc.php?docid=3456&group_id=5776#s7">MJPEG Howto</ulink>
+ * and the man-page of the mplex tool documents the properties of this element,
+ * which are shared with the mplex tool.
+ * </para>
+ * <title>Example pipeline</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v videotestsrc num-buffers=1000 ! mpeg2enc ! mplex ! filesink location=videotestsrc.mpg
+ * </programlisting>
+ * This example pipeline will encode a test video source to a an
+ * MPEG1 elementary stream and multiplexes this to an MPEG system stream.
+ * </para>
+ * <para>
+ * If several streams are being multiplexed, there should (as usual) be
+ * a queue in each stream, and due to mplex' buffering the capacities of these
+ * may have to be set to a few times the default settings to prevent the
+ * pipeline stalling.
+ * </para>
+ * </refsect2>
+ */
+
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 #include "gstmplexibitstream.hh"
 #include "gstmplexjob.hh"
 
+GST_DEBUG_CATEGORY (mplex_debug);
+
 static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
-    GST_STATIC_CAPS ("video/mpeg, " "systemstream = (boolean) true")
+    GST_STATIC_CAPS ("video/mpeg, systemstream = (boolean) true ")
     );
 
 static GstStaticPadTemplate video_sink_templ =
@@ -39,20 +73,29 @@ GST_STATIC_PAD_TEMPLATE ("video_%d",
     GST_PAD_SINK,
     GST_PAD_REQUEST,
     GST_STATIC_CAPS ("video/mpeg, "
-        "mpegversion = (int) [ 1, 2 ], " "systemstream = (boolean) false")
+        "mpegversion = (int) { 1, 2 }, "
+        "systemstream = (boolean) false, "
+        "width = (int) [ 16, 4096 ], "
+        "height = (int) [ 16, 4096 ], framerate = (fraction) [ 0, MAX ]")
     );
 
+#define COMMON_AUDIO_CAPS \
+  "channels = (int) [ 1, 8 ], " \
+  "rate = (int) [ 8000, 96000 ]"
+
 static GstStaticPadTemplate audio_sink_templ =
     GST_STATIC_PAD_TEMPLATE ("audio_%d",
     GST_PAD_SINK,
     GST_PAD_REQUEST,
     GST_STATIC_CAPS ("audio/mpeg, "
         "mpegversion = (int) 1, "
-        "layer = (int) [ 1, 2 ]; "
-        "audio/x-ac3; "
+        "layer = (int) [ 1, 3 ], "
+        COMMON_AUDIO_CAPS "; "
+        "audio/x-ac3, "
+        COMMON_AUDIO_CAPS "; "
         "audio/x-dts; "
         "audio/x-raw-int, "
-        "endianness = (int) BYTE_ORDER, "
+        "endianness = (int) BIG_ENDIAN, "
         "signed = (boolean) TRUE, "
         "width = (int) { 16, 20, 24 }, "
         "depth = (int) { 16, 20, 24 }, "
@@ -61,16 +104,13 @@ static GstStaticPadTemplate audio_sink_templ =
 
 /* FIXME: subtitles */
 
-static void gst_mplex_base_init (GstMplexClass * klass);
-static void gst_mplex_class_init (GstMplexClass * klass);
-static void gst_mplex_init (GstMplex * enc);
-static void gst_mplex_dispose (GObject * object);
-
-static void gst_mplex_loop (GstElement * element);
-
+static void gst_mplex_finalize (GObject * object);
+static void gst_mplex_reset (GstMplex * mplex);
+static void gst_mplex_loop (GstMplex * mplex);
 static GstPad *gst_mplex_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name);
-
+static void gst_mplex_release_pad (GstElement * element, GstPad * pad);
+static gboolean gst_mplex_src_activate_push (GstPad * pad, gboolean active);
 static GstStateChangeReturn gst_mplex_change_state (GstElement * element,
     GstStateChange transition);
 
@@ -79,53 +119,26 @@ static void gst_mplex_get_property (GObject * object,
 static void gst_mplex_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
 
-static GstElementClass *parent_class = NULL;
-
-GType
-gst_mplex_get_type (void)
-{
-  static GType gst_mplex_type = 0;
-
-  if (!gst_mplex_type) {
-    static const GTypeInfo gst_mplex_info = {
-      sizeof (GstMplexClass),
-      (GBaseInitFunc) gst_mplex_base_init,
-      NULL,
-      (GClassInitFunc) gst_mplex_class_init,
-      NULL,
-      NULL,
-      sizeof (GstMplex),
-      0,
-      (GInstanceInitFunc) gst_mplex_init,
-    };
-
-    gst_mplex_type =
-        g_type_register_static (GST_TYPE_ELEMENT,
-        "GstMplex", &gst_mplex_info, (GTypeFlags) 0);
-  }
-
-  return gst_mplex_type;
-}
+GST_BOILERPLATE (GstMplex, gst_mplex, GstElement, GST_TYPE_ELEMENT);
 
 static void
-gst_mplex_base_init (GstMplexClass * klass)
+gst_mplex_base_init (gpointer klass)
 {
-  static GstElementDetails gst_mplex_details = {
-    "mplex video multiplexer",
-    "Codec/Muxer",
-    "High-quality MPEG/DVD/SVCD/VCD video/audio multiplexer",
-    "Andrew Stevens <andrew.stevens@nexgo.de>\n"
-        "Ronald Bultje <rbultje@ronald.bitfreak.net>"
-  };
   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
 
+  gst_element_class_set_details_simple (element_class,
+      "mplex video multiplexer", "Codec/Muxer",
+      "High-quality MPEG/DVD/SVCD/VCD video/audio multiplexer",
+      "Andrew Stevens <andrew.stevens@nexgo.de>\n"
+      "Ronald Bultje <rbultje@ronald.bitfreak.net>\n"
+      "Mark Nauwelaerts <mnauw@users.sourceforge.net");
+
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&src_templ));
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&video_sink_templ));
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&audio_sink_templ));
-  gst_element_class_set_details (element_class, &gst_mplex_details);
 }
 
 static void
@@ -134,169 +147,458 @@ gst_mplex_class_init (GstMplexClass * klass)
   GObjectClass *object_class = G_OBJECT_CLASS (klass);
   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
 
-  parent_class = GST_ELEMENT_CLASS (g_type_class_ref (GST_TYPE_ELEMENT));
-
-  /* register arguments */
-  mjpeg_default_handler_verbosity (0);
-  GstMplexJob::initProperties (object_class);
+  GST_DEBUG_CATEGORY_INIT (mplex_debug, "mplex", 0, "MPEG video/audio muxer");
 
   object_class->set_property = gst_mplex_set_property;
   object_class->get_property = gst_mplex_get_property;
 
-  object_class->dispose = gst_mplex_dispose;
+  /* register properties */
+  GstMplexJob::initProperties (object_class);
+
+  object_class->finalize = GST_DEBUG_FUNCPTR (gst_mplex_finalize);
 
-  element_class->change_state = gst_mplex_change_state;
-  element_class->request_new_pad = gst_mplex_request_new_pad;
+  element_class->change_state = GST_DEBUG_FUNCPTR (gst_mplex_change_state);
+  element_class->request_new_pad =
+      GST_DEBUG_FUNCPTR (gst_mplex_request_new_pad);
+  element_class->release_pad = GST_DEBUG_FUNCPTR (gst_mplex_release_pad);
 }
 
 static void
-gst_mplex_dispose (GObject * object)
+gst_mplex_finalize (GObject * object)
 {
   GstMplex *mplex = GST_MPLEX (object);
+  GSList *walk;
 
-  if (mplex->mux) {
-    delete mplex->mux;
+  /* release all pads */
+  walk = mplex->pads;
+  while (walk) {
+    GstMplexPad *mpad = (GstMplexPad *) walk->data;
 
-    mplex->mux = NULL;
+    gst_object_unref (mpad->pad);
+    mpad->pad = NULL;
+    walk = walk->next;
   }
+
+  /* clean up what's left of them */
+  gst_mplex_reset (mplex);
+
+  /* ... and of the rest */
   delete mplex->job;
+
+  g_mutex_free (mplex->tlock);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 static void
-gst_mplex_init (GstMplex * mplex)
+gst_mplex_init (GstMplex * mplex, GstMplexClass * g_class)
 {
   GstElement *element = GST_ELEMENT (mplex);
-
-  GST_FLAG_SET (element, GST_ELEMENT_EVENT_AWARE);
+  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
   mplex->srcpad =
-      gst_pad_new_from_template (gst_element_get_pad_template (element, "src"),
-      "src");
+      gst_pad_new_from_template (gst_element_class_get_pad_template
+      (element_class, "src"), "src");
   gst_element_add_pad (element, mplex->srcpad);
+  gst_pad_use_fixed_caps (mplex->srcpad);
+  gst_pad_set_activatepush_function (mplex->srcpad,
+      GST_DEBUG_FUNCPTR (gst_mplex_src_activate_push));
 
   mplex->job = new GstMplexJob ();
-  mplex->mux = NULL;
   mplex->num_apads = 0;
   mplex->num_vpads = 0;
 
-  gst_element_set_loop_function (element, gst_mplex_loop);
+  mplex->tlock = g_mutex_new ();
+
+  gst_mplex_reset (mplex);
 }
 
 static void
-gst_mplex_loop (GstElement * element)
+gst_mplex_reset (GstMplex * mplex)
 {
-  GstMplex *mplex = GST_MPLEX (element);
+  GSList *walk;
+  GSList *nlist = NULL;
 
-  if (!mplex->mux) {
-    GstMplexOutputStream *out;
-    const GList *item;
+  mplex->eos = FALSE;
+  mplex->srcresult = GST_FLOW_CUSTOM_SUCCESS;
 
-    for (item = gst_element_get_pad_list (element);
-        item != NULL; item = item->next) {
-      StreamKind type;
-      GstMplexIBitStream *inputstream;
-      JobStream *jobstream;
-      GstPad *pad = GST_PAD (item->data);
-      GstStructure *structure;
-      const GstCaps *caps;
-      const gchar *mime;
+  /* reset existing streams */
+  walk = mplex->pads;
+  while (walk != NULL) {
+    GstMplexPad *mpad;
 
-      /* skip our source pad */
-      if (GST_PAD_DIRECTION (pad) == GST_PAD_SRC)
-        continue;
+    mpad = (GstMplexPad *) walk->data;
 
-      /* create inputstream, assure we've got caps */
-      inputstream = new GstMplexIBitStream (pad);
+    mpad->needed = 0;
+    mpad->eos = FALSE;
+    gst_adapter_clear (mpad->adapter);
+    if (mpad->bs) {
+      delete mpad->bs;
 
-      /* skip unnegotiated pads */
-      if (!(caps = GST_PAD_CAPS (pad))) {
-        delete inputstream;
+      mpad->bs = NULL;
+    }
 
-        continue;
-      }
+    if (!mpad->pad) {
+      g_cond_free (mpad->cond);
+      gst_object_unref (mpad->adapter);
+      g_free (mpad);
+    } else
+      nlist = g_slist_append (nlist, mpad);
 
-      /* get format */
-      structure = gst_caps_get_structure (caps, 0);
-      mime = gst_structure_get_name (structure);
-
-      if (!strcmp (mime, "video/mpeg")) {
-        VideoParams *params;
-
-        type = MPEG_VIDEO;
-
-        params = VideoParams::Default (mplex->job->mux_format);
-        mplex->job->video_param.push_back (params);
-        mplex->job->video_tracks++;
-      } else if (!strcmp (mime, "audio/mpeg")) {
-        type = MPEG_AUDIO;
-        mplex->job->audio_tracks++;
-      } else if (!strcmp (mime, "audio/x-ac3")) {
-        type = AC3_AUDIO;
-        mplex->job->audio_tracks++;
-      } else if (!strcmp (mime, "audio/x-dts")) {
-        type = DTS_AUDIO;
-        mplex->job->audio_tracks++;
-      } else if (!strcmp (mime, "audio/x-raw-int")) {
-        LpcmParams *params;
-        gint bits, chans, rate;
-
-        type = LPCM_AUDIO;
-
-        /* set LPCM params */
-        gst_structure_get_int (structure, "depth", &bits);
-        gst_structure_get_int (structure, "rate", &rate);
-        gst_structure_get_int (structure, "channels", &chans);
-        params = LpcmParams::Checked (rate, chans, bits);
-
-        mplex->job->lpcm_param.push_back (params);
-        mplex->job->audio_tracks++;
-        mplex->job->lpcm_tracks++;
-      } else {
-        delete inputstream;
-
-        continue;
-      }
+    walk = walk->next;
+  }
 
-      jobstream = new JobStream (inputstream, type);
-      mplex->job->streams.push_back (jobstream);
-    }
+  g_slist_free (mplex->pads);
+  mplex->pads = nlist;
 
-    if (!mplex->job->video_tracks && !mplex->job->audio_tracks) {
-      GST_ELEMENT_ERROR (element, CORE, NEGOTIATION, (NULL),
-          ("no input video or audio tracks set up before loop function"));
-      return;
-    }
+  /* clear mplex stuff */
+  /* clean up stream settings */
+  while (!mplex->job->streams.empty ()) {
+    delete mplex->job->streams.back ();
+
+    mplex->job->streams.pop_back ();
+  }
+  while (!mplex->job->video_param.empty ()) {
+    delete mplex->job->video_param.back ();
 
-    /* create new encoder with inputs/output */
-    out = new GstMplexOutputStream (element, mplex->srcpad);
-    mplex->mux = new Multiplexor (*mplex->job, *out);
+    mplex->job->video_param.pop_back ();
   }
+  while (!mplex->job->lpcm_param.empty ()) {
+    delete mplex->job->lpcm_param.back ();
 
-  mplex->mux->Multiplex ();
+    mplex->job->lpcm_param.pop_back ();
+  }
+  mplex->job->audio_tracks = 0;
+  mplex->job->video_tracks = 0;
+  mplex->job->lpcm_tracks = 0;
 }
 
-static GstPadLinkReturn
-gst_mplex_sink_link (GstPad * pad, const GstCaps * caps)
+static gboolean
+gst_mplex_setcaps (GstPad * pad, GstCaps * caps)
 {
-  GstStructure *structure = gst_caps_get_structure (caps, 0);
-  const gchar *mime = gst_structure_get_name (structure);
+  GstMplex *mplex;
+  const gchar *mime;
+  GstStructure *structure;
+  StreamKind type;
+  JobStream *jobstream;
+  GstMplexIBitStream *inputstream;
+  GstMplexPad *mpad;
+  GstCaps *othercaps;
+  gboolean ret = TRUE;
+
+  mplex = GST_MPLEX (GST_PAD_PARENT (pad));
+
+  /* does not go well to negotiate when started */
+  if (mplex->srcresult != GST_FLOW_CUSTOM_SUCCESS)
+    goto refuse_renegotiation;
+
+  /* since muxer does not really check much ... */
+  othercaps = gst_caps_intersect (caps, gst_pad_get_pad_template_caps (pad));
+  if (othercaps)
+    gst_caps_unref (othercaps);
+  else
+    goto refuse_caps;
+
+  /* set the fixed template caps on the srcpad, should accept without objection */
+  othercaps = gst_caps_copy (gst_pad_get_pad_template_caps (mplex->srcpad));
+  ret = gst_pad_set_caps (mplex->srcpad, othercaps);
+  gst_caps_unref (othercaps);
+  if (!ret)
+    goto refuse_caps;
+
+  structure = gst_caps_get_structure (caps, 0);
+  mime = gst_structure_get_name (structure);
+
+  if (!strcmp (mime, "video/mpeg")) {   /* video */
+    VideoParams *params;
+
+    type = MPEG_VIDEO;
+    if (mplex->job->bufsize)
+      params = VideoParams::Checked (mplex->job->bufsize);
+    else
+      params = VideoParams::Default (mplex->job->mux_format);
+    /* set standard values if forced by the selected profile */
+    if (params->Force (mplex->job->mux_format))
+      GST_WARNING_OBJECT (mplex,
+          "overriding non-standard option due to selected profile");
+
+    mplex->job->video_param.push_back (params);
+    mplex->job->video_tracks++;
+  } else {                      /* audio */
+    if (!strcmp (mime, "audio/mpeg")) {
+      type = MPEG_AUDIO;
+    } else if (!strcmp (mime, "audio/x-ac3")) {
+      type = AC3_AUDIO;
+    } else if (!strcmp (mime, "audio/x-dts")) {
+      type = DTS_AUDIO;
+    } else if (!strcmp (mime, "audio/x-raw-int")) {
+      LpcmParams *params;
+      gint bits, chans, rate;
+      gboolean result = TRUE;
+
+      type = LPCM_AUDIO;
+
+      /* set LPCM params */
+      result &= gst_structure_get_int (structure, "depth", &bits);
+      result &= gst_structure_get_int (structure, "rate", &rate);
+      result &= gst_structure_get_int (structure, "channels", &chans);
+      if (!result)
+        goto refuse_caps;
+
+      params = LpcmParams::Checked (rate, chans, bits);
+
+      mplex->job->lpcm_param.push_back (params);
+      mplex->job->lpcm_tracks++;
+    } else
+      goto refuse_caps;
+
+    mplex->job->audio_tracks++;
+  }
 
-  /* raw audio caps needs to be fixed */
-  if (!strcmp (mime, "audio/x-raw-int")) {
-    gint width, depth;
+  mpad = (GstMplexPad *) gst_pad_get_element_private (pad);
+  g_return_val_if_fail (mpad, FALSE);
+  inputstream = new GstMplexIBitStream (mpad);
+  mpad->bs = inputstream;
+  jobstream = new JobStream (inputstream, type);
+  mplex->job->streams.push_back (jobstream);
 
-    if (!gst_caps_is_fixed (caps))
-      return GST_PAD_LINK_DELAYED;
+  return TRUE;
 
-    gst_structure_get_int (structure, "width", &width);
-    gst_structure_get_int (structure, "depth", &depth);
+refuse_caps:
+  {
+    GST_WARNING_OBJECT (mplex, "refused caps %" GST_PTR_FORMAT, caps);
 
-    if (depth != width)
-      return GST_PAD_LINK_REFUSED;
+    /* undo if we were a bit too fast/confident */
+    if (GST_PAD_CAPS (mplex->srcpad))
+      gst_pad_set_caps (mplex->srcpad, NULL);
+
+    return FALSE;
   }
+refuse_renegotiation:
+  {
+    GST_WARNING_OBJECT (mplex, "already started; "
+        "refused (re)negotiation (to %" GST_PTR_FORMAT ")", caps);
 
-  /* we do the actual inputstream setup in our first loopfunc cycle */
-  return GST_PAD_LINK_OK;
+    return FALSE;
+  }
+}
+
+static void
+gst_mplex_loop (GstMplex * mplex)
+{
+  GstMplexOutputStream *out = NULL;
+  Multiplexor *mux = NULL;
+  GSList *walk;
+
+  /* do not try to resume muxing after it finished
+   * this can be relevant mainly/only in case of forced state change */
+  if (mplex->eos)
+    goto eos;
+
+  /* inform downstream about what's coming */
+  gst_pad_push_event (mplex->srcpad, gst_event_new_new_segment (FALSE, 1.0,
+          GST_FORMAT_BYTES, 0, -1, 0));
+
+  /* hm (!) each inputstream really needs an initial read
+   * so that all is internally in the proper state */
+  walk = mplex->pads;
+  while (walk != NULL) {
+    GstMplexPad *mpad;
+
+    mpad = (GstMplexPad *) walk->data;
+    mpad->bs->ReadBuffer ();
+
+    walk = walk->next;
+  }
+
+  /* create new multiplexer with inputs/output */
+  out = new GstMplexOutputStream (mplex, mplex->srcpad);
+#if GST_MJPEGTOOLS_API >= 10900
+  mux = new Multiplexor (*mplex->job, *out, NULL);
+#else
+  mux = new Multiplexor (*mplex->job, *out);
+#endif
+
+  if (mux) {
+    mux->Multiplex ();
+    delete mux;
+    delete out;
+
+    /* if not well and truly eos, something strange happened  */
+    if (!mplex->eos) {
+      GST_ERROR_OBJECT (mplex, "muxing task ended without being eos");
+      /* notify there is no point in collecting any more */
+      GST_MPLEX_MUTEX_LOCK (mplex);
+      mplex->srcresult = GST_FLOW_ERROR;
+      GST_MPLEX_SIGNAL_ALL (mplex);
+      GST_MPLEX_MUTEX_UNLOCK (mplex);
+    } else
+      goto eos;
+  } else {
+    GST_WARNING_OBJECT (mplex, "failed to create Multiplexor");
+  }
+
+  /* fall-through */
+done:
+  {
+    /* no need to run wildly, stopped elsewhere, e.g. state change */
+    GST_DEBUG_OBJECT (mplex, "pausing muxing task");
+    gst_pad_pause_task (mplex->srcpad);
+
+    return;
+  }
+eos:
+  {
+    GST_DEBUG_OBJECT (mplex, "encoding task reached eos");
+    goto done;
+  }
+}
+
+static gboolean
+gst_mplex_sink_event (GstPad * sinkpad, GstEvent * event)
+{
+  GstMplex *mplex;
+  GstMplexPad *mpad;
+  gboolean result = TRUE;
+
+  mplex = (GstMplex *) (GST_PAD_PARENT (sinkpad));
+  mpad = (GstMplexPad *) gst_pad_get_element_private (sinkpad);
+  g_return_val_if_fail (mpad, FALSE);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_START:
+      /* forward event */
+      gst_pad_event_default (sinkpad, event);
+
+      /* now unblock the chain function */
+      GST_MPLEX_MUTEX_LOCK (mplex);
+      mplex->srcresult = GST_FLOW_WRONG_STATE;
+      GST_MPLEX_SIGNAL (mplex, mpad);
+      GST_MPLEX_MUTEX_UNLOCK (mplex);
+      /* no way to pause/restart loop task */
+      goto done;
+    case GST_EVENT_FLUSH_STOP:
+      /* forward event */
+      gst_pad_event_default (sinkpad, event);
+
+      /* clear state and resume */
+      GST_MPLEX_MUTEX_LOCK (mplex);
+      gst_adapter_clear (mpad->adapter);
+      mplex->srcresult = GST_FLOW_OK;
+      GST_MPLEX_MUTEX_UNLOCK (mplex);
+      goto done;
+    case GST_EVENT_NEWSEGMENT:
+      /* eat segments; we make our own (byte)stream */
+      gst_event_unref (event);
+      goto done;
+    case GST_EVENT_EOS:
+      /* inform this pad that it can stop now */
+      GST_MPLEX_MUTEX_LOCK (mplex);
+      mpad->eos = TRUE;
+      GST_MPLEX_SIGNAL (mplex, mpad);
+      GST_MPLEX_MUTEX_UNLOCK (mplex);
+
+      /* eat this event for now, task will send eos when finished */
+      gst_event_unref (event);
+      goto done;
+    default:
+      /* for a serialized event, wait until earlier data is gone,
+       * though this is no guarantee as to when task is done with it.
+       * Only wait if loop has been started already */
+      if (GST_EVENT_IS_SERIALIZED (event)) {
+        GST_MPLEX_MUTEX_LOCK (mplex);
+        while (mplex->srcresult == GST_FLOW_OK && !mpad->needed)
+          GST_MPLEX_WAIT (mplex, mpad);
+        GST_MPLEX_MUTEX_UNLOCK (mplex);
+      }
+      break;
+  }
+
+  result = gst_pad_event_default (sinkpad, event);
+
+done:
+  return result;
+}
+
+/* starts task if conditions are right for it
+ * must be called with mutex_lock held */
+static void
+gst_mplex_start_task (GstMplex * mplex)
+{
+  /* start task to create multiplexor and start muxing */
+  if (G_UNLIKELY (mplex->srcresult == GST_FLOW_CUSTOM_SUCCESS)
+      && mplex->job->video_tracks == mplex->num_vpads
+      && mplex->job->audio_tracks == mplex->num_apads) {
+    gst_pad_start_task (mplex->srcpad, (GstTaskFunction) gst_mplex_loop, mplex);
+    mplex->srcresult = GST_FLOW_OK;
+  }
+}
+
+static GstFlowReturn
+gst_mplex_chain (GstPad * sinkpad, GstBuffer * buffer)
+{
+  GstMplex *mplex;
+  GstMplexPad *mpad;
+
+  mplex = (GstMplex *) (GST_PAD_PARENT (sinkpad));
+  mpad = (GstMplexPad *) gst_pad_get_element_private (sinkpad);
+  g_return_val_if_fail (mpad, GST_FLOW_ERROR);
+
+  /* check if pad were properly negotiated and set up */
+  if (G_UNLIKELY (!mpad->bs)) {
+    GST_ELEMENT_ERROR (mplex, CORE, NEGOTIATION, (NULL),
+        ("input pad has not been set up prior to chain function"));
+    return GST_FLOW_NOT_NEGOTIATED;
+  }
+
+  GST_MPLEX_MUTEX_LOCK (mplex);
+
+  gst_mplex_start_task (mplex);
+
+  if (G_UNLIKELY (mpad->eos))
+    goto eos;
+
+  if (G_UNLIKELY (!GST_FLOW_IS_SUCCESS (mplex->srcresult)))
+    goto ignore;
+
+  gst_adapter_push (mpad->adapter, buffer);
+  buffer = NULL;
+  while (gst_adapter_available (mpad->adapter) >= mpad->needed) {
+    GST_MPLEX_SIGNAL (mplex, mpad);
+    GST_MPLEX_WAIT (mplex, mpad);
+    /* may have become flushing or in error */
+    if (G_UNLIKELY (mplex->srcresult != GST_FLOW_OK))
+      goto ignore;
+    /* or been removed */
+    if (G_UNLIKELY (mpad->eos))
+      goto eos;
+  }
+
+  GST_MPLEX_MUTEX_UNLOCK (mplex);
+
+  return GST_FLOW_OK;
+
+/* special cases */
+eos:
+  {
+    GST_DEBUG_OBJECT (mplex, "ignoring buffer at end-of-stream");
+    GST_MPLEX_MUTEX_UNLOCK (mplex);
+
+    gst_buffer_unref (buffer);
+    return GST_FLOW_UNEXPECTED;
+  }
+ignore:
+  {
+    GstFlowReturn ret = mplex->srcresult;
+
+    GST_DEBUG_OBJECT (mplex, "ignoring buffer because src task encountered %s",
+        gst_flow_get_name (ret));
+    GST_MPLEX_MUTEX_UNLOCK (mplex);
+
+    if (buffer)
+      gst_buffer_unref (buffer);
+    return ret;
+  }
 }
 
 static GstPad *
@@ -307,25 +609,73 @@ gst_mplex_request_new_pad (GstElement * element,
   GstMplex *mplex = GST_MPLEX (element);
   gchar *padname;
   GstPad *newpad;
+  GstMplexPad *mpad;
 
   if (templ == gst_element_class_get_pad_template (klass, "audio_%d")) {
+    GST_DEBUG_OBJECT (mplex, "request pad audio %d", mplex->num_apads);
     padname = g_strdup_printf ("audio_%d", mplex->num_apads++);
   } else if (templ == gst_element_class_get_pad_template (klass, "video_%d")) {
+    GST_DEBUG_OBJECT (mplex, "request pad video %d", mplex->num_vpads);
     padname = g_strdup_printf ("video_%d", mplex->num_vpads++);
   } else {
-    g_warning ("mplex: this is not our template!");
+    GST_WARNING_OBJECT (mplex, "This is not our template!");
     return NULL;
   }
 
   newpad = gst_pad_new_from_template (templ, padname);
-  gst_pad_set_link_function (newpad, gst_mplex_sink_link);
-  gst_element_add_pad (element, newpad);
   g_free (padname);
 
+  mpad = g_new0 (GstMplexPad, 1);
+  mpad->adapter = gst_adapter_new ();
+  mpad->cond = g_cond_new ();
+  gst_object_ref (newpad);
+  mpad->pad = newpad;
+
+  gst_pad_set_setcaps_function (newpad, GST_DEBUG_FUNCPTR (gst_mplex_setcaps));
+  gst_pad_set_chain_function (newpad, GST_DEBUG_FUNCPTR (gst_mplex_chain));
+  gst_pad_set_event_function (newpad, GST_DEBUG_FUNCPTR (gst_mplex_sink_event));
+  gst_pad_set_element_private (newpad, mpad);
+  gst_element_add_pad (element, newpad);
+  mplex->pads = g_slist_append (mplex->pads, mpad);
+
   return newpad;
 }
 
 static void
+gst_mplex_release_pad (GstElement * element, GstPad * pad)
+{
+  GstMplex *mplex = GST_MPLEX (element);
+  GstMplexPad *mpad;
+
+  g_return_if_fail (pad);
+  mpad = (GstMplexPad *) gst_pad_get_element_private (pad);
+  g_return_if_fail (mpad);
+
+  if (gst_element_remove_pad (element, pad)) {
+    gchar *padname;
+
+    GST_MPLEX_MUTEX_LOCK (mplex);
+    mpad->eos = TRUE;
+    gst_object_unref (mpad->pad);
+    mpad->pad = NULL;
+    /* wake up if waiting on this pad */
+    GST_MPLEX_SIGNAL (mplex, mpad);
+
+    padname = gst_object_get_name (GST_OBJECT (pad));
+    if (strstr (padname, "audio")) {
+      mplex->num_apads--;
+    } else {
+      mplex->num_vpads--;
+    }
+    g_free (padname);
+
+    /* may now be up to us to get things going */
+    gst_mplex_start_task (mplex);
+    GST_MPLEX_MUTEX_UNLOCK (mplex);
+  }
+}
+
+static void
 gst_mplex_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec)
 {
@@ -339,33 +689,118 @@ gst_mplex_set_property (GObject * object,
   GST_MPLEX (object)->job->setProperty (prop_id, value);
 }
 
+static gboolean
+gst_mplex_src_activate_push (GstPad * pad, gboolean active)
+{
+  gboolean result = TRUE;
+  GstMplex *mplex;
+
+  mplex = GST_MPLEX (GST_PAD_PARENT (pad));
+
+  if (active) {
+    /* chain will start task once all streams have been setup */
+  } else {
+    /* end the muxing loop by forcing eos and unblock chains */
+    GST_MPLEX_MUTEX_LOCK (mplex);
+    mplex->eos = TRUE;
+    mplex->srcresult = GST_FLOW_WRONG_STATE;
+    GST_MPLEX_SIGNAL_ALL (mplex);
+    GST_MPLEX_MUTEX_UNLOCK (mplex);
+
+    /* muxing loop should have ended now and can be joined */
+    result = gst_pad_stop_task (pad);
+  }
+
+  return result;
+}
+
 static GstStateChangeReturn
 gst_mplex_change_state (GstElement * element, GstStateChange transition)
 {
   GstMplex *mplex = GST_MPLEX (element);
+  GstStateChangeReturn ret;
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      break;
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  if (ret == GST_STATE_CHANGE_FAILURE)
+    goto done;
 
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      delete mplex->mux;
-      mplex->mux = NULL;
-      mplex->num_apads = 0;
-      mplex->num_vpads = 0;
+      gst_mplex_reset (mplex);
       break;
     default:
       break;
   }
 
-  if (parent_class->change_state)
-    return parent_class->change_state (element, transition);
+done:
+  return ret;
+}
+
+#ifndef GST_DISABLE_GST_DEBUG
+
+static mjpeg_log_handler_t old_handler = NULL;
+
+/* note that this will affect all mjpegtools elements/threads */
+static void
+gst_mplex_log_callback (log_level_t level, const char *message)
+{
+  GstDebugLevel gst_level;
+
+#if GST_MJPEGTOOLS_API >= 10903
+  static const gint mjpeg_log_error = mjpeg_loglev_t ("error");
+  static const gint mjpeg_log_warn = mjpeg_loglev_t ("warn");
+  static const gint mjpeg_log_info = mjpeg_loglev_t ("info");
+  static const gint mjpeg_log_debug = mjpeg_loglev_t ("debug");
+#else
+  static const gint mjpeg_log_error = LOG_ERROR;
+  static const gint mjpeg_log_warn = LOG_WARN;
+  static const gint mjpeg_log_info = LOG_INFO;
+  static const gint mjpeg_log_debug = LOG_DEBUG;
+#endif
 
-  return GST_STATE_CHANGE_SUCCESS;
+  if (level == mjpeg_log_error) {
+    gst_level = GST_LEVEL_ERROR;
+  } else if (level == mjpeg_log_warn) {
+    gst_level = GST_LEVEL_WARNING;
+  } else if (level == mjpeg_log_info) {
+    gst_level = GST_LEVEL_INFO;
+  } else if (level == mjpeg_log_debug) {
+    gst_level = GST_LEVEL_DEBUG;
+  } else {
+    gst_level = GST_LEVEL_INFO;
+  }
+
+  /* message could have a % in it, do not segfault in such case */
+  gst_debug_log (mplex_debug, gst_level, "", "", 0, NULL, "%s", message);
+
+  /* chain up to the old handler;
+   * this could actually be a handler from another mjpegtools based
+   * gstreamer element; in which case messages can come out double or from
+   * the wrong element ... */
+  old_handler (level, message);
 }
+#endif
 
 static gboolean
 plugin_init (GstPlugin * plugin)
 {
-  if (!gst_library_load ("gstbytestream"))
-    return FALSE;
+#ifndef GST_DISABLE_GST_DEBUG
+  old_handler = mjpeg_log_set_handler (gst_mplex_log_callback);
+  g_assert (old_handler != NULL);
+#endif
+  /* in any case, we do not want default handler output */
+  mjpeg_default_handler_verbosity (0);
 
   return gst_element_register (plugin, "mplex", GST_RANK_NONE, GST_TYPE_MPLEX);
 }
@@ -374,4 +809,4 @@ GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
     GST_VERSION_MINOR,
     "mplex",
     "High-quality MPEG/DVD/SVCD/VCD video/audio multiplexer",
-    plugin_init, VERSION, "GPL", GST_PACKAGE, GST_ORIGIN)
+    plugin_init, VERSION, "GPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
index e8497ed..c519f1d 100644 (file)
@@ -23,7 +23,9 @@
 #define __GST_MPLEX_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstadapter.h>
 #include <multiplexor.hpp>
+#include "gstmplexibitstream.hh"
 #include "gstmplexjob.hh"
 
 G_BEGIN_DECLS
@@ -39,18 +41,74 @@ G_BEGIN_DECLS
 #define GST_IS_MPLEX_CLASS(obj) \
   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_MPLEX))
 
+GST_DEBUG_CATEGORY_EXTERN (mplex_debug);
+#define GST_CAT_DEFAULT mplex_debug
+
+#define GST_MPLEX_MUTEX_LOCK(m) G_STMT_START {                          \
+  GST_LOG_OBJECT (m, "locking tlock from thread %p", g_thread_self ()); \
+  g_mutex_lock ((m)->tlock);                                            \
+  GST_LOG_OBJECT (m, "locked tlock from thread %p", g_thread_self ());  \
+} G_STMT_END
+
+#define GST_MPLEX_MUTEX_UNLOCK(m) G_STMT_START {                          \
+  GST_LOG_OBJECT (m, "unlocking tlock from thread %p", g_thread_self ()); \
+  g_mutex_unlock ((m)->tlock);                                            \
+} G_STMT_END
+
+#define GST_MPLEX_WAIT(m, p) G_STMT_START {                          \
+  GST_LOG_OBJECT (m, "thread %p waiting", g_thread_self ());         \
+  g_cond_wait ((p)->cond, (m)->tlock);                               \
+} G_STMT_END
+
+#define GST_MPLEX_SIGNAL(m, p) G_STMT_START {                           \
+  GST_LOG_OBJECT (m, "signalling from thread %p", g_thread_self ());    \
+  g_cond_signal ((p)->cond);                                            \
+} G_STMT_END
+
+#define GST_MPLEX_SIGNAL_ALL(m) G_STMT_START {                        \
+  GST_LOG_OBJECT (m, "signalling all from thread %p", g_thread_self ());    \
+  GSList *walk = m->pads;                                                   \
+  while (walk) {                                                            \
+       GST_MPLEX_SIGNAL (m, (GstMplexPad *) walk->data);                          \
+       walk = walk->next;                                                      \
+  }                                                                         \
+} G_STMT_END
+
+typedef struct _GstMplexPad
+{
+  /* associated pad */
+  GstPad *pad;
+  /* with mplex TLOCK */
+  /* adapter collecting buffers for this pad */
+  GstAdapter *adapter;
+  /* no more to expect on this pad */
+  gboolean eos;
+  /* signals counterpart thread to have a look */
+  GCond *cond;
+  /* amount needed by mplex on this stream */
+  guint needed;
+  /* bitstream for this pad */
+  GstMplexIBitStream *bs;
+} GstMplexPad;
+
 typedef struct _GstMplex {
   GstElement parent;
 
   /* pads */
-  GstPad *sinkpad, *srcpad;
+  GSList *pads;
+  GstPad *srcpad;
   guint num_apads, num_vpads;
 
   /* options wrapper */
   GstMplexJob *job;
 
-  /* general muxing object (contains rest) */
-  Multiplexor *mux;
+  /* lock for syncing */
+  GMutex *tlock;
+  /* with TLOCK */
+  /* muxer writer generated eos */
+  gboolean eos;
+  /* flowreturn obtained by muxer task */
+  GstFlowReturn srcresult;
 } GstMplex;
 
 typedef struct _GstMplexClass {
index 0f51770..4249aea 100644 (file)
@@ -1,5 +1,6 @@
 /* GStreamer mplex (mjpegtools) wrapper
  * (c) 2003 Ronald Bultje <rbultje@ronald.bitfreak.net>
+ * (c) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
  *
  * gstmplexibitstream.hh: gstreamer/mplex input bitstream wrapper
  *
 
 #include <string.h>
 
+#include "gstmplex.hh"
 #include "gstmplexibitstream.hh"
 
 /*
  * Class init/exit functions.
  */
 
-GstMplexIBitStream::GstMplexIBitStream (GstPad * _pad, guint buf_size):
+GstMplexIBitStream::GstMplexIBitStream (GstMplexPad * _data, guint buf_size):
 IBitStream ()
 {
-  guint8 *data;
-
-  pad = _pad;
-  bs = gst_bytestream_new (pad);
+  mpad = _data;
+  mplex = GST_MPLEX (GST_PAD_PARENT (mpad->pad));
   eos = FALSE;
 
-  streamname = g_strdup (gst_pad_get_name (_pad));
-
   SetBufSize (buf_size);
   eobs = false;
   byteidx = 0;
-
-  /* we peek 1 byte (not even caring about the return value) so we
-   * are sure that we have data and thus capsnego must be completed
-   * when we return. */
-  gst_bytestream_peek_bytes (bs, &data, 1);
-
-  if (!ReadIntoBuffer () && buffered == 0) {
-    GST_ELEMENT_ERROR (gst_pad_get_parent (_pad), RESOURCE, READ, (NULL),
-        ("Failed to read from input pad %s", gst_pad_get_name (pad)));
-  }
-}
-
-GstMplexIBitStream::~GstMplexIBitStream (void)
-{
-  gst_bytestream_destroy (bs);
 }
 
 /*
@@ -67,47 +50,45 @@ GstMplexIBitStream::~GstMplexIBitStream (void)
  */
 
 size_t
-GstMplexIBitStream::ReadStreamBytes (uint8_t * buf, size_t size)
+    GstMplexIBitStream::ReadStreamBytes (uint8_t * buf, size_t size =
+    BUFFER_SIZE)
 {
   guint8 *data;
 
-  guint read = 0;
-
-  if (eos)
-    return 0;
+  GST_MPLEX_MUTEX_LOCK (mplex);
 
-  while (!eos && (read = gst_bytestream_peek_bytes (bs, &data, size)) != size) {
-    GstEvent *event;
+  GST_DEBUG_OBJECT (mplex, "needing %d bytes", (guint) size);
 
-    guint pending;
-
-    gst_bytestream_get_status (bs, &pending, &event);
-    if (event) {
-      switch (GST_EVENT_TYPE (event)) {
-        case GST_EVENT_EOS:
-          eos = TRUE;
-          break;
-        default:
-          break;
-      }
-      gst_event_unref (event);
-    }
+  while (gst_adapter_available (mpad->adapter) < size
+      && !mplex->eos && !mpad->eos) {
+    mpad->needed = size;
+    GST_MPLEX_SIGNAL (mplex, mpad);
+    GST_MPLEX_WAIT (mplex, mpad);
   }
 
-  if (read > 0) {
-    memcpy (buf, data, read);
-    gst_bytestream_flush_fast (bs, read);
+  mpad->needed = 0;
+  size = MIN (size, gst_adapter_available (mpad->adapter));
+  if (size) {
+    data = gst_adapter_take (mpad->adapter, size);
+    memcpy (buf, data, size);
+    g_free (data);
   }
 
-  return read;
+  GST_MPLEX_MUTEX_UNLOCK (mplex);
+
+  return size;
 }
 
 /*
  * Are we at EOS?
  */
 
-bool
-GstMplexIBitStream::EndOfStream (void)
+bool GstMplexIBitStream::EndOfStream (void)
 {
   return eos;
 }
+
+bool GstMplexIBitStream::ReadBuffer ()
+{
+  return ReadIntoBuffer (BUFFER_SIZE);
+}
index 504835a..533160d 100644 (file)
@@ -1,5 +1,6 @@
 /* GStreamer mplex (mjpegtools) wrapper
  * (c) 2003 Ronald Bultje <rbultje@ronald.bitfreak.net>
+ * (c) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
  *
  * gstmplexibitstream.hh: gstreamer/mplex input bitstream wrapper
  *
 #define __GST_MPLEXIBITSTREAM_H__
 
 #include <gst/gst.h>
-#include <gst/bytestream/bytestream.h>
 #include <mjpeg_types.h>
 #include <bits.hpp>
 
+#include "gstmplex.hh"
+
+/* forward declaration; break circular referencing */
+typedef struct _GstMplex GstMplex;
+typedef struct _GstMplexPad GstMplexPad;
+
 class GstMplexIBitStream : public IBitStream {
 public:
-  GstMplexIBitStream (GstPad *pad, 
-                     guint   buf_size = BUFFER_SIZE);
-  ~GstMplexIBitStream (void);
+  GstMplexIBitStream (GstMplexPad *pad, guint buf_size = BUFFER_SIZE);
+  bool ReadBuffer ();
 
 protected:
   /* read data */
-  size_t ReadStreamBytes (uint8_t *buf,
-                         size_t   number);
+  size_t ReadStreamBytes (uint8_t *buf, size_t number);
 
   /* are we at EOS? */
   bool EndOfStream (void);
 
 private:
-  GstPad *pad;
-  GstByteStream *bs;
+  GstMplex *mplex;
+  GstMplexPad *mpad;
   gboolean eos;
 };
 
index f13aa36..33597c7 100644 (file)
@@ -36,7 +36,8 @@ enum
   ARG_SPLIT_SEQUENCE,
   ARG_SEGMENT_SIZE,
   ARG_PACKETS_PER_PACK,
-  ARG_SECTOR_SIZE
+  ARG_SECTOR_SIZE,
+  ARG_BUFSIZE
       /* FILL ME */
 };
 
@@ -54,16 +55,16 @@ gst_mplex_format_get_type (void)
 
   if (!mplex_format_type) {
     static const GEnumValue mplex_formats[] = {
-      {0, "0", "Generic MPEG-1"},
-      {1, "1", "Standard VCD"},
-      {2, "2", "User VCD"},
-      {3, "3", "Generic MPEG-2"},
-      {4, "4", "Standard SVCD"},
-      {5, "5", "User SVCD"},
-      {6, "6", "VCD Stills sequences"},
-      {7, "7", "SVCD Stills sequences"},
-      {8, "8", "DVD MPEG-2 for dvdauthor"},
-      {9, "9", "DVD MPEG-2"},
+      {0, "Generic MPEG-1", "0"},
+      {1, "Standard VCD", "1"},
+      {2, "User VCD", "2"},
+      {3, "Generic MPEG-2", "3"},
+      {4, "Standard SVCD", "4"},
+      {5, "User SVCD", "5"},
+      {6, "VCD Stills sequences", "6"},
+      {7, "SVCD Stills sequences", "7"},
+      {8, "DVD MPEG-2 for dvdauthor", "8"},
+      {9, "DVD MPEG-2", "9"},
       {0, NULL, NULL},
     };
 
@@ -82,6 +83,7 @@ GstMplexJob::GstMplexJob (void):
 MultiplexJob ()
 {
   /* blabla */
+  bufsize = 0;
 }
 
 /*
@@ -104,13 +106,15 @@ GstMplexJob::initProperties (GObjectClass * klass)
           "Bitrate of output stream in kbps (0 = autodetect)",
           0, 15 * 1024, 0, (GParamFlags) G_PARAM_READWRITE));
 
-#if 0
-  {
-  "video-buffer", 1, 0, 'b'}
-  ,
-#endif
-      /* some boolean stuff for headers */
-      g_object_class_install_property (klass, ARG_VBR,
+  /* override decode buffer size otherwise determined by format */
+  g_object_class_install_property (klass, ARG_BUFSIZE,
+      g_param_spec_int ("bufsize", "Decoder buf. size",
+          "Target decoders video buffer size (kB) "
+          "[default determined by format if not explicitly set]",
+          20, 4000, 46, (GParamFlags) G_PARAM_READWRITE));
+
+  /* some boolean stuff for headers */
+  g_object_class_install_property (klass, ARG_VBR,
       g_param_spec_boolean ("vbr", "VBR",
           "Whether the input video stream is variable bitrate",
           FALSE, (GParamFlags) G_PARAM_READWRITE));
@@ -118,17 +122,19 @@ GstMplexJob::initProperties (GObjectClass * klass)
       g_param_spec_boolean ("system-headers", "System headers",
           "Create system header in every pack for generic formats",
           FALSE, (GParamFlags) G_PARAM_READWRITE));
+#if 0                           /* not supported */
   g_object_class_install_property (klass, ARG_SPLIT_SEQUENCE,
       g_param_spec_boolean ("split-sequence", "Split sequence",
           "Simply split a sequence across files "
           "(rather than building run-out/run-in)",
           FALSE, (GParamFlags) G_PARAM_READWRITE));
 
-  /* size of a segment (followed by EOS) */
+  /* size of a segment */
   g_object_class_install_property (klass, ARG_SEGMENT_SIZE,
       g_param_spec_int ("max-segment-size", "Max. segment size",
           "Max. size per segment/file in MB (0 = unlimited)",
           0, 10 * 1024, 0, (GParamFlags) G_PARAM_READWRITE));
+#endif
 
   /* packets per pack (generic formats) */
   g_object_class_install_property (klass, ARG_PACKETS_PER_PACK,
@@ -155,7 +161,8 @@ GstMplexJob::getProperty (guint prop_id, GValue * value)
       g_value_set_enum (value, mux_format);
       break;
     case ARG_MUX_BITRATE:
-      g_value_set_int (value, data_rate / 1000);
+      /* convert from bytes back to bits */
+      g_value_set_int (value, (data_rate * 8) / 1000);
       break;
     case ARG_VBR:
       g_value_set_boolean (value, VBR);
@@ -175,6 +182,9 @@ GstMplexJob::getProperty (guint prop_id, GValue * value)
     case ARG_SECTOR_SIZE:
       g_value_set_int (value, sector_size);
       break;
+    case ARG_BUFSIZE:
+      g_value_set_int (value, bufsize);
+      break;
     default:
       break;
   }
@@ -211,6 +221,9 @@ GstMplexJob::setProperty (guint prop_id, const GValue * value)
     case ARG_SECTOR_SIZE:
       sector_size = g_value_get_int (value);
       break;
+    case ARG_BUFSIZE:
+      bufsize = g_value_get_int (value);
+      break;
     default:
       break;
   }
index 8aa5b14..8e96070 100644 (file)
@@ -37,6 +37,8 @@ public:
                    GValue       *value);
   void setProperty (guint         prop_id,
                    const GValue *value);
+
+  int bufsize;
 };
 
 #endif /* __GST_MPLEXJOB_H__ */
index 206c13e..5a0cc5a 100644 (file)
@@ -1,5 +1,6 @@
 /* GStreamer mplex (mjpegtools) wrapper
  * (c) 2003 Ronald Bultje <rbultje@ronald.bitfreak.net>
+ * (c) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
  *
  * gstmplexoutputstream.hh: gstreamer/mplex output stream wrapper
  *
 
 #include <string.h>
 
+#include "gstmplex.hh"
 #include "gstmplexoutputstream.hh"
 
 /*
  * Class init functions.
  */
 
-GstMplexOutputStream::GstMplexOutputStream (GstElement * _element, GstPad * _pad):
+GstMplexOutputStream::GstMplexOutputStream (GstMplex * _element, GstPad * _pad):
 OutputStream ()
 {
-  element = _element;
+  mplex = _element;
   pad = _pad;
   size = 0;
 }
@@ -54,21 +56,32 @@ GstMplexOutputStream::Open (void)
 void
 GstMplexOutputStream::Close (void)
 {
-  gst_pad_push (pad, GST_DATA (gst_event_new (GST_EVENT_EOS)));
-  gst_element_set_eos (element);
+  GST_MPLEX_MUTEX_LOCK (mplex);
+  GST_DEBUG_OBJECT (mplex, "closing stream and sending eos");
+  gst_pad_push_event (pad, gst_event_new_eos ());
+  /* notify chain there is no more need to supply buffers */
+  mplex->eos = TRUE;
+  GST_MPLEX_SIGNAL_ALL (mplex);
+  GST_MPLEX_MUTEX_UNLOCK (mplex);
 }
 
 /*
  * Get size of current segment.
  */
 
-off_t GstMplexOutputStream::SegmentSize (void)
+#if GST_MJPEGTOOLS_API >= 10900
+uint64_t
+GstMplexOutputStream::SegmentSize (void)
+#else
+off_t
+GstMplexOutputStream::SegmentSize (void)
+#endif
 {
   return size;
 }
 
 /*
- * Next segment.
+ * Next segment; not really supported.
  */
 
 void
@@ -76,9 +89,8 @@ GstMplexOutputStream::NextSegment (void)
 {
   size = 0;
 
-  /* send EOS. The filesink (or whatever) handles that
-   * and opens a new file. */
-  gst_pad_push (pad, GST_DATA (gst_event_new (GST_EVENT_EOS)));
+  GST_WARNING_OBJECT (mplex, "multiple file output is not supported");
+  /* FIXME: no such filesink behaviour to be expected */
 }
 
 /*
@@ -94,5 +106,8 @@ GstMplexOutputStream::Write (guint8 * data, guint len)
   memcpy (GST_BUFFER_DATA (buf), data, len);
 
   size += len;
-  gst_pad_push (pad, GST_DATA (buf));
+  GST_MPLEX_MUTEX_LOCK (mplex);
+  gst_buffer_set_caps (buf, GST_PAD_CAPS (pad));
+  mplex->srcresult = gst_pad_push (pad, buf);
+  GST_MPLEX_MUTEX_UNLOCK (mplex);
 }
index c67040b..f93c2f9 100644 (file)
@@ -1,5 +1,6 @@
 /* GStreamer mplex (mjpegtools) wrapper
  * (c) 2003 Ronald Bultje <rbultje@ronald.bitfreak.net>
+ * (c) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
  *
  * gstmplexoutputstream.hh: gstreamer/mplex output stream wrapper
  *
 #include <mjpeg_types.h>
 #include <outputstrm.hpp>
 
+#include "gstmplex.hh"
+
 class GstMplexOutputStream : public OutputStream {
 public:
-  GstMplexOutputStream (GstElement *element,
-                       GstPad     *pad);
+  GstMplexOutputStream (GstMplex *element, GstPad *pad);
 
   /* open/close. Basically 'no-op's (close() sets EOS). */
   int  Open  (void);
   void Close (void);
 
   /* get size of current segment */
+#if GST_MJPEGTOOLS_API >= 10900
+  uint64_t SegmentSize (void);
+#else
   off_t SegmentSize (void);
+#endif
 
   /* next segment */
   void NextSegment (void);
 
   /* write data */
-  void Write (guint8 *data,
-             guint   len);
+  void Write (guint8 *data, guint len);
 
 private:
-  GstElement *element;
+  GstMplex *mplex;
   GstPad *pad;
   guint64 size;
 };
index 0cc6574..abaf230 100644 (file)
@@ -35,6 +35,12 @@ else
 check_mpeg2enc =
 endif
 
+if USE_MPLEX
+check_mplex = elements/mplex
+else
+check_mplex =
+endif
+
 if USE_NEON
 check_neon = elements/neonhttpsrc
 else
@@ -55,7 +61,8 @@ endif
 
 
 VALGRIND_TO_FIX = \
-       elements/mpeg2enc
+       elements/mpeg2enc \
+       elements/mplex
 
 # valgrind testing
 VALGRIND_TESTS_DISABLE = \
@@ -64,6 +71,7 @@ VALGRIND_TESTS_DISABLE = \
 check_PROGRAMS = \
        generic/states \
        $(check_mpeg2enc)  \
+       $(check_mplex)     \ 
        $(check_neon)      \
        $(check_soup)      \
        $(check_timidity)  \
diff --git a/tests/check/elements/mplex.c b/tests/check/elements/mplex.c
new file mode 100644 (file)
index 0000000..06fc949
--- /dev/null
@@ -0,0 +1,315 @@
+/* GStreamer
+ *
+ * unit test for mplex
+ *
+ * Copyright (C) <2008> Mark Nauwelaerts <mnauw@users.sourceforge.net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <unistd.h>
+
+#include <gst/check/gstcheck.h>
+
+/* For ease of programming we use globals to keep refs for our floating
+ * src and sink pads we create; otherwise we always have to do get_pad,
+ * get_peer, and then remove references in every test function */
+static GstPad *mysrcpad, *mysinkpad;
+
+#define AUDIO_CAPS_STRING "audio/mpeg, " \
+                           "mpegversion = (int) 1, " \
+                           "layer = (int) 2, " \
+                           "rate = (int) 48000, " \
+                           "channels = (int) 1, " \
+                           "framerate = (fraction) 25/1"
+
+#define MPEG_CAPS_STRING "video/mpeg, " \
+                           "systemstream = (bool) true"
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS (MPEG_CAPS_STRING));
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS (AUDIO_CAPS_STRING));
+
+
+/* some global vars, makes it easy as for the ones above */
+static GMutex *mplex_mutex;
+static GCond *mplex_cond;
+static gboolean arrived_eos;
+
+/* another easy hack, some mp2 audio data that should please mplex
+ * perhaps less would also do, but anyway ...
+ */
+/* begin binary data: */
+guint8 mp2_data[] =             /* 384 */
+{
+  0xFF, 0xFD, 0x84, 0xC4, 0x75, 0x56, 0x46, 0x54, 0x54, 0x5B, 0x2E, 0xB0,
+  0x80, 0x00, 0x00, 0xAB, 0xAA, 0xAE, 0x8A, 0xAC, 0xB4, 0xD7, 0x9D, 0xB6,
+  0xDB, 0x5D, 0xB3, 0xDB, 0x8C, 0xF5, 0xCF, 0x8D, 0x38, 0xD2, 0xFB, 0xF3,
+  0x66, 0x59, 0x6C, 0x62, 0x49, 0x16, 0x59, 0x65, 0xAC, 0xE8, 0x8C, 0x6F,
+  0x18, 0x48, 0x6B, 0x96, 0xD0, 0xD2, 0x68, 0xA6, 0xC5, 0x42, 0x45, 0xA1,
+  0x28, 0x42, 0xBC, 0xA3, 0x99, 0x39, 0x53, 0x20, 0xBA, 0x4A, 0x56, 0x30,
+  0xC5, 0x81, 0xE6, 0x16, 0x6B, 0x77, 0x67, 0x24, 0x29, 0xA9, 0x11, 0x7E,
+  0xA9, 0xA8, 0x41, 0xE1, 0x11, 0x48, 0x79, 0xB1, 0xC2, 0x30, 0x39, 0x2D,
+  0x40, 0x9A, 0xEC, 0x12, 0x65, 0xC5, 0xDD, 0x68, 0x8D, 0x6A, 0xF4, 0x63,
+  0x02, 0xAE, 0xE5, 0x1B, 0xAA, 0xA3, 0x87, 0x1B, 0xDE, 0xB8, 0x6B, 0x7A,
+  0x9B, 0xAF, 0xF7, 0x1A, 0x39, 0x33, 0x9A, 0x17, 0x56, 0x64, 0x0D, 0xDC,
+  0xE2, 0x15, 0xEF, 0x93, 0x24, 0x9A, 0x8E, 0x59, 0x49, 0x7D, 0x45, 0x68,
+  0x2D, 0x9F, 0x85, 0x71, 0xA8, 0x99, 0xC4, 0x6D, 0x26, 0x46, 0x40, 0xBA,
+  0x9A, 0xD6, 0x3D, 0xCF, 0x45, 0xB2, 0xC6, 0xF3, 0x16, 0x21, 0x8B, 0xA8,
+  0xD5, 0x59, 0x78, 0x87, 0xB7, 0x42, 0x9A, 0x65, 0x59, 0x9A, 0x99, 0x58,
+  0x71, 0x26, 0x20, 0x33, 0x76, 0xEE, 0x96, 0x70, 0xF2, 0xBC, 0xB3, 0x7D,
+  0x6B, 0x35, 0x48, 0x37, 0x59, 0x21, 0xC4, 0x87, 0x8A, 0xD8, 0x05, 0x36,
+  0xA5, 0x1A, 0x5C, 0x0A, 0x4F, 0x4B, 0x39, 0x40, 0x39, 0x9A, 0x17, 0xD9,
+  0xAD, 0x21, 0xBE, 0x64, 0xB4, 0x6B, 0x13, 0x03, 0x20, 0x95, 0xDA, 0x18,
+  0x89, 0x88, 0xB5, 0x44, 0xE2, 0x5D, 0x4F, 0x12, 0x19, 0xC4, 0x1A, 0x1A,
+  0x07, 0x07, 0x91, 0xA8, 0x4C, 0x66, 0xB4, 0x81, 0x33, 0xDE, 0xDB, 0xD6,
+  0x24, 0x17, 0xD2, 0x9A, 0x4E, 0xC9, 0x88, 0xAB, 0x44, 0xAA, 0x25, 0x4A,
+  0x79, 0xA9, 0x39, 0x39, 0x0D, 0x2D, 0x20, 0x76, 0x68, 0x5F, 0x65, 0x25,
+  0xCF, 0x29, 0x27, 0x67, 0xB3, 0x68, 0x6C, 0xE5, 0xDC, 0xA5, 0x79, 0xC9,
+  0xAB, 0x46, 0x9D, 0x21, 0x35, 0x82, 0x98, 0xBA, 0x0E, 0x26, 0x39, 0x20,
+  0xAE, 0x1B, 0x92, 0x3D, 0xF7, 0x9F, 0x29, 0xB5, 0xF3, 0xB6, 0x38, 0x68,
+  0x65, 0x99, 0xAD, 0xD8, 0x98, 0x56, 0x5A, 0x61, 0x8D, 0xCB, 0x4A, 0x29,
+  0x43, 0x0E, 0x2D, 0x33, 0x40, 0x6A, 0xB7, 0x5F, 0x49, 0xC9, 0x81, 0xE4,
+  0x0D, 0x6F, 0x15, 0x58, 0x1B, 0x9E, 0x74, 0x20, 0x5D, 0x97, 0x5B, 0x5A,
+  0xDF, 0x92, 0x2D, 0x5A, 0x98, 0xCE, 0x50, 0x20, 0x1A, 0x33, 0x6A, 0x67,
+  0xE2, 0x18, 0x94, 0xA4, 0x70, 0x8F, 0x5F, 0x11, 0x85, 0xB0, 0xE5, 0xD8,
+  0xD4, 0xAA, 0x86, 0xAE, 0x1C, 0x0D, 0xA1, 0x6B, 0x21, 0xB9, 0xC2, 0x17
+};
+
+/* end binary data. size = 384 bytes */
+
+gboolean
+test_sink_event (GstPad * pad, GstEvent * event)
+{
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      g_mutex_lock (mplex_mutex);
+      arrived_eos = TRUE;
+      g_cond_signal (mplex_cond);
+      g_mutex_unlock (mplex_mutex);
+      break;
+    default:
+      break;
+  }
+
+  return gst_pad_event_default (pad, event);
+}
+
+/* setup and teardown needs some special handling for muxer */
+GstPad *
+setup_src_pad (GstElement * element,
+    GstStaticPadTemplate * template, GstCaps * caps, gchar * sinkname)
+{
+  GstPad *srcpad, *sinkpad;
+
+  GST_DEBUG_OBJECT (element, "setting up sending pad");
+  /* sending pad */
+  srcpad = gst_pad_new_from_static_template (template, "src");
+  fail_if (srcpad == NULL, "Could not create a srcpad");
+  ASSERT_OBJECT_REFCOUNT (srcpad, "srcpad", 1);
+
+  sinkpad = gst_element_get_pad (element, sinkname);
+  fail_if (sinkpad == NULL, "Could not get sink pad from %s",
+      GST_ELEMENT_NAME (element));
+  /* references are owned by: 1) us, 2) mplex, 3) mplex list */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  if (caps)
+    fail_unless (gst_pad_set_caps (srcpad, caps));
+  fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
+      "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
+  gst_object_unref (sinkpad);   /* because we got it higher up */
+
+  /* references are owned by: 1) mplex, 2) mplex list */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
+
+  return srcpad;
+}
+
+void
+teardown_src_pad (GstElement * element, gchar * sinkname)
+{
+  GstPad *srcpad, *sinkpad;
+  gchar *padname;
+
+  /* clean up floating src pad */
+  padname = g_strdup (sinkname);
+  memcpy (strchr (padname, '%'), "0", 2);
+  sinkpad = gst_element_get_pad (element, padname);
+  g_free (padname);
+  /* pad refs held by 1) mplex 2) mplex list and 3) us (through _get) */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  srcpad = gst_pad_get_peer (sinkpad);
+
+  gst_pad_unlink (srcpad, sinkpad);
+
+  /* after unlinking, pad refs still held by
+   * 1) mplex and 2) mplex list and 3) us (through _get) */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  gst_object_unref (sinkpad);
+  /* one more ref is held by element itself */
+
+  /* pad refs held by both creator and this function (through _get_peer) */
+  ASSERT_OBJECT_REFCOUNT (srcpad, "srcpad", 2);
+  gst_object_unref (srcpad);
+  gst_object_unref (srcpad);
+
+}
+
+GstElement *
+setup_mplex ()
+{
+  GstElement *mplex;
+
+  GST_DEBUG ("setup_mplex");
+  mplex = gst_check_setup_element ("mplex");
+  mysrcpad = setup_src_pad (mplex, &srctemplate, NULL, "audio_%d");
+  mysinkpad = gst_check_setup_sink_pad (mplex, &sinktemplate, NULL);
+  gst_pad_set_active (mysrcpad, TRUE);
+  gst_pad_set_active (mysinkpad, TRUE);
+
+  /* need to know when we are eos */
+  gst_pad_set_event_function (mysinkpad, test_sink_event);
+
+  /* and notify the test run */
+  mplex_mutex = g_mutex_new ();
+  mplex_cond = g_cond_new ();
+
+  return mplex;
+}
+
+void
+cleanup_mplex (GstElement * mplex)
+{
+  GST_DEBUG ("cleanup_mplex");
+  gst_element_set_state (mplex, GST_STATE_NULL);
+
+  gst_pad_set_active (mysrcpad, FALSE);
+  gst_pad_set_active (mysinkpad, FALSE);
+  teardown_src_pad (mplex, "audio_%d");
+  gst_check_teardown_sink_pad (mplex);
+  gst_check_teardown_element (mplex);
+
+  g_mutex_free (mplex_mutex);
+  g_cond_free (mplex_cond);
+
+  gst_deinit ();
+}
+
+GST_START_TEST (test_audio_pad)
+{
+  GstElement *mplex;
+  GstBuffer *inbuffer, *outbuffer;
+  GstCaps *caps;
+  int i, num_buffers;
+
+  /* PES pack_start_code */
+  guint8 data0[] = { 0x00, 0x00, 0x01, 0xba };
+  /* MPEG_program_end_code */
+  guint8 data1[] = { 0x00, 0x00, 0x01, 0xb9 };
+
+  mplex = setup_mplex ();
+  fail_unless (gst_element_set_state (mplex,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
+      "could not set to playing");
+
+  /* corresponds to I420 buffer for the size mentioned in the caps */
+  inbuffer = gst_buffer_new ();
+  GST_BUFFER_DATA (inbuffer) = mp2_data;
+  GST_BUFFER_SIZE (inbuffer) = sizeof (mp2_data);
+  caps = gst_caps_from_string (AUDIO_CAPS_STRING);
+  gst_buffer_set_caps (inbuffer, caps);
+  gst_caps_unref (caps);
+  GST_BUFFER_TIMESTAMP (inbuffer) = 0;
+  ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1);
+  fail_unless (gst_pad_push (mysrcpad, inbuffer) == GST_FLOW_OK);
+
+  /* need to force eos and state change to make sure the encoding task ends */
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()) == TRUE);
+  /* need to wait a bit to make sure mplex task digested all this */
+  g_mutex_lock (mplex_mutex);
+  while (!arrived_eos)
+    g_cond_wait (mplex_cond, mplex_mutex);
+  g_mutex_unlock (mplex_mutex);
+
+  num_buffers = g_list_length (buffers);
+  /* well, we do not really know much with mplex, but at least something ... */
+  fail_unless (num_buffers >= 1);
+
+  /* clean up buffers */
+  for (i = 0; i < num_buffers; ++i) {
+    outbuffer = GST_BUFFER (buffers->data);
+    fail_if (outbuffer == NULL);
+
+    if (i == 0) {
+      fail_unless (GST_BUFFER_SIZE (outbuffer) >= sizeof (data0));
+      fail_unless (memcmp (data0, GST_BUFFER_DATA (outbuffer),
+              sizeof (data0)) == 0);
+    }
+    if (i == num_buffers - 1) {
+      fail_unless (GST_BUFFER_SIZE (outbuffer) >= sizeof (data1));
+      fail_unless (memcmp (data1, GST_BUFFER_DATA (outbuffer) +
+              GST_BUFFER_SIZE (outbuffer) - sizeof (data1),
+              sizeof (data1)) == 0);
+    }
+    buffers = g_list_remove (buffers, outbuffer);
+
+    ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 1);
+    gst_buffer_unref (outbuffer);
+    outbuffer = NULL;
+  }
+
+  cleanup_mplex (mplex);
+  g_list_free (buffers);
+  buffers = NULL;
+}
+
+GST_END_TEST;
+
+Suite *
+mplex_suite (void)
+{
+  Suite *s = suite_create ("mplex");
+  TCase *tc_chain = tcase_create ("general");
+
+  suite_add_tcase (s, tc_chain);
+  tcase_add_test (tc_chain, test_audio_pad);
+
+  return s;
+}
+
+int
+main (int argc, char **argv)
+{
+  int nf;
+
+  Suite *s = mplex_suite ();
+  SRunner *sr = srunner_create (s);
+
+  gst_check_init (&argc, &argv);
+
+  srunner_run_all (sr, CK_NORMAL);
+  nf = srunner_ntests_failed (sr);
+  srunner_free (sr);
+
+  return nf;
+}