mpegtsmux: Add SCTE-35 support
authorEdward Hervey <edward@centricular.com>
Thu, 26 Sep 2019 15:45:31 +0000 (17:45 +0200)
committerEdward Hervey <bilboed@bilboed.com>
Thu, 31 Oct 2019 12:31:27 +0000 (12:31 +0000)
This adds two properties:
* scte-35-pid: If not 0, enables the SCTE-35 support for the current
  program. This will write the proper PMT and send SCTE-35 NULL
  commands (i.e. heartbeats) at a regular interval
* scte-35-null-interval: This specifies the interval at which the
  NULL commands should be sent

Sending SCTE-35 commands is done by creating the appropriate SCTE-35
GstMpegtsSection and then sending them on the muxer. See the
associated example

gst-libs/gst/mpegts/gst-scte-section.h
gst/mpegtsmux/gstbasetsmux.c
gst/mpegtsmux/gstbasetsmux.h
gst/mpegtsmux/tsmux/tsmux.c
gst/mpegtsmux/tsmux/tsmux.h
gst/mpegtsmux/tsmux/tsmuxcommon.h
tests/examples/mpegts/meson.build
tests/examples/mpegts/ts-scte-writer.c [new file with mode: 0644]

index 8a753c7..aee220d 100644 (file)
@@ -1,9 +1,11 @@
 /*
  * gst-scte-section.h -
  * Copyright (C) 2013, CableLabs, Louisville, CO 80027
+ *           (c) 2019, Centricular ltd
  *
  * Authors:
  *   RUIH Team <ruih@cablelabs.com>
+ *   Edward Hervey <edward@centricular.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
index bcb11ed..6088790 100644 (file)
@@ -195,8 +195,12 @@ enum
   PROP_SI_INTERVAL,
   PROP_BITRATE,
   PROP_PCR_INTERVAL,
+  PROP_SCTE_35_PID,
+  PROP_SCTE_35_NULL_INTERVAL
 };
 
+#define DEFAULT_SCTE_35_PID 0
+
 #define BASETSMUX_DEFAULT_ALIGNMENT    -1
 
 #define CLOCK_BASE 9LL
@@ -726,8 +730,11 @@ gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
       if (ts_pad->prog == NULL)
         goto no_program;
       tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
-      g_hash_table_insert (mux->programs,
-          GINT_TO_POINTER (ts_pad->prog_id), ts_pad->prog);
+      tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
+      tsmux_program_set_scte35_interval (ts_pad->prog,
+          mux->scte35_null_interval);
+      g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
+          ts_pad->prog);
     }
 
     if (ts_pad->stream == NULL) {
@@ -1052,6 +1059,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
   gint64 dts = GST_CLOCK_STIME_NONE;
   gboolean delta = TRUE, header = FALSE;
   StreamData *stream_data;
+  GstMpegtsSection *scte_section = NULL;
 
   GST_DEBUG_OBJECT (mux, "Pads collected");
 
@@ -1128,6 +1136,16 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
 
   GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
 
+  GST_OBJECT_LOCK (mux);
+  scte_section = mux->pending_scte35_section;
+  mux->pending_scte35_section = NULL;
+  GST_OBJECT_UNLOCK (mux);
+  if (G_UNLIKELY (scte_section)) {
+    GST_DEBUG_OBJECT (mux, "Sending pending SCTE section");
+    if (!tsmux_send_section (mux->tsmux, scte_section))
+      GST_ERROR_OBJECT (mux, "Error sending SCTE section !");
+  }
+
   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
     pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
     GST_DEBUG_OBJECT (mux, "Buffer has PTS  %" GST_TIME_FORMAT " pts %"
@@ -1249,8 +1267,18 @@ gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
   if (section) {
     GST_DEBUG ("Received event with mpegts section");
 
-    /* TODO: Check that the section type is supported */
-    tsmux_add_mpegts_si_section (mux->tsmux, section);
+    if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
+      /* Will be sent from the streaming threads */
+      GST_DEBUG_OBJECT (mux, "Storing SCTE event");
+      GST_OBJECT_LOCK (element);
+      if (mux->pending_scte35_section)
+        gst_mpegts_section_unref (mux->pending_scte35_section);
+      mux->pending_scte35_section = section;
+      GST_OBJECT_UNLOCK (element);
+    } else {
+      /* TODO: Check that the section type is supported */
+      tsmux_add_mpegts_si_section (mux->tsmux, section);
+    }
 
     gst_event_unref (event);
 
@@ -1710,6 +1738,12 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
       if (mux->tsmux)
         tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
       break;
+    case PROP_SCTE_35_PID:
+      mux->scte35_pid = g_value_get_uint (value);
+      break;
+    case PROP_SCTE_35_NULL_INTERVAL:
+      mux->scte35_null_interval = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1744,6 +1778,12 @@ gst_base_ts_mux_get_property (GObject * object, guint prop_id,
     case PROP_PCR_INTERVAL:
       g_value_set_uint (value, mux->pcr_interval);
       break;
+    case PROP_SCTE_35_PID:
+      g_value_set_uint (value, mux->scte35_pid);
+      break;
+    case PROP_SCTE_35_NULL_INTERVAL:
+      g_value_set_uint (value, mux->scte35_null_interval);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1878,6 +1918,20 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
           1, G_MAXUINT, TSMUX_DEFAULT_PCR_INTERVAL,
           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
 
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SCTE_35_PID,
+      g_param_spec_uint ("scte-35-pid", "SCTE-35 PID",
+          "PID to use for inserting SCTE-35 packets (0: unused)",
+          0, G_MAXUINT, DEFAULT_SCTE_35_PID,
+          (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass),
+      PROP_SCTE_35_NULL_INTERVAL, g_param_spec_uint ("scte-35-null-interval",
+          "SCTE-35 NULL packet interval",
+          "Set the interval (in ticks of the 90kHz clock) for writing SCTE-35 NULL (heartbeat) packets."
+          " (only valid if scte-35-pid is different from 0)", 1, G_MAXUINT,
+          TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL,
+          (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
       &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
 }
@@ -1895,6 +1949,8 @@ gst_base_ts_mux_init (GstBaseTsMux * mux)
   mux->prog_map = NULL;
   mux->alignment = BASETSMUX_DEFAULT_ALIGNMENT;
   mux->bitrate = TSMUX_DEFAULT_BITRATE;
+  mux->scte35_pid = DEFAULT_SCTE_35_PID;
+  mux->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
 
   mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
   mux->automatic_alignment = 0;
index 759dd03..89bf7e0 100644 (file)
@@ -174,11 +174,14 @@ struct GstBaseTsMux {
   guint si_interval;
   guint64 bitrate;
   guint pcr_interval;
-
+  guint scte35_pid;
+  guint scte35_null_interval;
+  
   /* state */
   gboolean first;
   GstClockTime pending_key_unit_ts;
   GstEvent *force_key_unit_event;
+  GstMpegtsSection *pending_scte35_section;
 
   /* write callback handling/state */
   GstFlowReturn last_flow_ret;
index db4ac29..52dd559 100644 (file)
 
 static gboolean tsmux_write_pat (TsMux * mux);
 static gboolean tsmux_write_pmt (TsMux * mux, TsMuxProgram * program);
+static gboolean tsmux_write_scte_null (TsMux * mux, TsMuxProgram * program);
 static void
 tsmux_section_free (TsMuxSection * section)
 {
@@ -361,6 +362,7 @@ tsmux_add_mpegts_si_section (TsMux * mux, GstMpegtsSection * section)
   return TRUE;
 }
 
+
 /**
  * tsmux_free:
  * @mux: a #TsMux
@@ -451,6 +453,11 @@ tsmux_program_new (TsMux * mux, gint prog_id)
   program->pmt_pid = mux->next_pmt_pid++;
   program->pcr_stream = NULL;
 
+  /* SCTE35 is disabled by default */
+  program->scte35_pid = 0;
+  program->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
+  program->next_scte35_pcr = -1;
+
   program->streams = g_array_sized_new (FALSE, TRUE, sizeof (TsMuxStream *), 1);
 
   mux->programs = g_list_prepend (mux->programs, program);
@@ -496,6 +503,22 @@ tsmux_get_pmt_interval (TsMuxProgram * program)
 }
 
 /**
+ * tsmux_program_set_scte35_interval:
+ * @program: a #TsMuxProgram
+ * @freq: a new SCTE-35 NULL interval
+ *
+ * Set the interval (in cycles of the 90kHz clock) for sending out the SCTE-35
+ * NULL command. This is only effective is the SCTE-35 PID is not 0.
+ */
+void
+tsmux_program_set_scte35_interval (TsMuxProgram * program, guint interval)
+{
+  g_return_if_fail (program != NULL);
+
+  program->scte35_null_interval = interval;
+}
+
+/**
  * tsmux_resend_pmt:
  * @program: a #TsMuxProgram
  *
@@ -510,6 +533,55 @@ tsmux_resend_pmt (TsMuxProgram * program)
 }
 
 /**
+ * tsmux_program_set_scte35_pid:
+ * @program: a #TsMuxProgram
+ * @pid: The pid to use, or 0 to deactivate usage.
+ *
+ * Set the @pid to use for sending SCTE-35 packets on the given
+ * @program.
+ *
+ * This needs to be called as early as possible if SCTE-35 sections
+ * are even going to be used with the given @program so that the PMT
+ * can be properly configured.
+ */
+void
+tsmux_program_set_scte35_pid (TsMuxProgram * program, guint16 pid)
+{
+  TsMuxSection *section;
+  GstMpegtsSCTESIT *sit;
+  g_return_if_fail (program != NULL);
+
+  program->scte35_pid = pid;
+  /* Create/Update the section */
+  if (program->scte35_null_section) {
+    tsmux_section_free (program->scte35_null_section);
+    program->scte35_null_section = NULL;
+  }
+  if (pid != 0) {
+    program->scte35_null_section = section = g_slice_new0 (TsMuxSection);
+    section->pi.pid = pid;
+    sit = gst_mpegts_scte_null_new ();
+    section->section = gst_mpegts_section_from_scte_sit (sit, pid);
+  }
+}
+
+/**
+ * tsmux_program_get_scte35_pid:
+ * @program: a #TsMuxProgram
+ *
+ * Get the PID configured for sending SCTE-35 packets.
+ *
+ * Returns: the configured SCTE-35 PID, or 0 if not active.
+ */
+guint16
+tsmux_program_get_scte35_pid (TsMuxProgram * program)
+{
+  g_return_val_if_fail (program != NULL, 0);
+
+  return program->scte35_pid;
+}
+
+/**
  * tsmux_program_add_stream:
  * @program: a #TsMuxProgram
  * @stream: a #TsMuxStream
@@ -931,8 +1003,9 @@ tsmux_write_ts_header (TsMux * mux, guint8 * buf, TsMuxPacketInfo * pi,
   return TRUE;
 }
 
+/* The unused_arg is needed for g_hash_table_foreach() */
 static gboolean
-tsmux_section_write_packet (GstMpegtsSectionType * type,
+tsmux_section_write_packet (gpointer unused_arg,
     TsMuxSection * section, TsMux * mux)
 {
   GstBuffer *section_buffer;
@@ -1056,6 +1129,38 @@ fail:
   return FALSE;
 }
 
+/**
+ * tsmux_send_section:
+ * @mux: a #TsMux
+ * @section: (transfer full): a #GstMpegtsSection to add
+ *
+ * Send a @section immediately on the stream.
+ *
+ * Returns: %TRUE on success, %FALSE otherwise
+ */
+gboolean
+tsmux_send_section (TsMux * mux, GstMpegtsSection * section)
+{
+  gboolean ret;
+  TsMuxSection tsmux_section;
+
+  g_return_val_if_fail (mux != NULL, FALSE);
+  g_return_val_if_fail (section != NULL, FALSE);
+
+  memset (&tsmux_section, 0, sizeof (tsmux_section));
+
+  GST_DEBUG ("Sending mpegts section with type %d to mux",
+      section->section_type);
+
+  tsmux_section.section = section;
+  tsmux_section.pi.pid = section->pid;
+
+  ret = tsmux_section_write_packet (NULL, &tsmux_section, mux);
+  gst_mpegts_section_unref (section);
+
+  return ret;
+}
+
 static gboolean
 tsmux_write_si (TsMux * mux)
 {
@@ -1183,6 +1288,31 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
 
       cur_pcr = get_current_pcr (mux, cur_ts);
     }
+
+    if (program->scte35_pid != 0) {
+      gboolean write_scte_null = FALSE;
+      if (program->next_scte35_pcr == -1)
+        write_scte_null = TRUE;
+      else if (cur_pcr > program->next_scte35_pcr)
+        write_scte_null = TRUE;
+
+      if (write_scte_null) {
+        GST_DEBUG ("next scte35 pcr %" G_GINT64_FORMAT,
+            program->next_scte35_pcr);
+        if (program->next_scte35_pcr == -1)
+          program->next_scte35_pcr =
+              cur_pcr + program->scte35_null_interval * 300;
+        else
+          program->next_scte35_pcr += program->scte35_null_interval * 300;
+        GST_DEBUG ("next scte35 NOW pcr %" G_GINT64_FORMAT,
+            program->next_scte35_pcr);
+
+        if (!tsmux_write_scte_null (mux, program))
+          return FALSE;
+
+        cur_pcr = get_current_pcr (mux, cur_ts);
+      }
+    }
   }
 
   return TRUE;
@@ -1358,6 +1488,8 @@ tsmux_program_free (TsMuxProgram * program)
   /* Free PMT section */
   if (program->pmt.section)
     gst_mpegts_section_unref (program->pmt.section);
+  if (program->scte35_null_section)
+    tsmux_section_free (program->scte35_null_section);
 
   g_array_free (program->streams, TRUE);
   g_slice_free (TsMuxProgram, program);
@@ -1403,8 +1535,7 @@ tsmux_write_pat (TsMux * mux)
     mux->pat_changed = FALSE;
   }
 
-  return tsmux_section_write_packet (GINT_TO_POINTER (GST_MPEGTS_SECTION_PAT),
-      &mux->pat, mux);
+  return tsmux_section_write_packet (NULL, &mux->pat, mux);
 }
 
 static gboolean
@@ -1459,6 +1590,12 @@ tsmux_write_pmt (TsMux * mux, TsMuxProgram * program)
     g_ptr_array_add (pmt->descriptors, descriptor);
 #endif
 
+    /* Will SCTE-35 be potentially used ? */
+    if (program->scte35_pid != 0) {
+      descriptor = gst_mpegts_descriptor_from_registration ("CUEI", NULL, 0);
+      g_ptr_array_add (pmt->descriptors, descriptor);
+    }
+
     /* Write out the entries */
     for (i = 0; i < program->streams->len; i++) {
       GstMpegtsPMTStream *pmt_stream;
@@ -1475,6 +1612,14 @@ tsmux_write_pmt (TsMux * mux, TsMuxProgram * program)
       g_ptr_array_add (pmt->streams, pmt_stream);
     }
 
+    /* Will SCTE-35 be potentially used ? */
+    if (program->scte35_pid != 0) {
+      GstMpegtsPMTStream *pmt_stream = gst_mpegts_pmt_stream_new ();
+      pmt_stream->stream_type = GST_MPEGTS_STREAM_TYPE_SCTE_SIT;
+      pmt_stream->pid = program->scte35_pid;
+      g_ptr_array_add (pmt->streams, pmt_stream);
+    }
+
     TS_DEBUG ("PMT for program %d has %d streams",
         program->pgm_number, program->streams->len);
 
@@ -1490,8 +1635,15 @@ tsmux_write_pmt (TsMux * mux, TsMuxProgram * program)
     program->pmt.section->version_number = program->pmt_version++;
   }
 
-  return tsmux_section_write_packet (GINT_TO_POINTER (GST_MPEGTS_SECTION_PMT),
-      &program->pmt, mux);
+  return tsmux_section_write_packet (NULL, &program->pmt, mux);
+}
+
+static gboolean
+tsmux_write_scte_null (TsMux * mux, TsMuxProgram * program)
+{
+  /* SCTE-35 NULL section is created when PID is set */
+  GST_LOG ("Writing SCTE NULL packet");
+  return tsmux_section_write_packet (NULL, program->scte35_null_section, mux);
 }
 
 void
index a981914..a7e62a5 100644 (file)
@@ -128,6 +128,14 @@ struct TsMuxProgram {
   /* PID to write the PMT */
   guint16 pmt_pid;
 
+  TsMuxSection *scte35_null_section;
+  /* SCTE-35 pid (0 if inactive/unused) */
+  guint16 scte35_pid;
+  /* Interval between SCTE-35 NULL packets in MPEG PTS clock time */
+  guint   scte35_null_interval;
+  /* Next SCTE-35 position, 27 MHz */
+  gint64  next_scte35_pcr;
+
   /* stream which carries the PCR */
   TsMuxStream *pcr_stream;
 
@@ -214,6 +222,10 @@ void               tsmux_program_free              (TsMuxProgram *program);
 void           tsmux_set_pmt_interval          (TsMuxProgram *program, guint interval);
 guint          tsmux_get_pmt_interval          (TsMuxProgram *program);
 void           tsmux_resend_pmt                (TsMuxProgram *program);
+void            tsmux_program_set_scte35_pid    (TsMuxProgram *program, guint16 pid);
+guint16         tsmux_program_get_scte35_pid    (TsMuxProgram *program);
+void            tsmux_program_set_scte35_interval (TsMuxProgram *mux, guint interval);
+
 
 /* SI table management */
 void            tsmux_set_si_interval           (TsMux *mux, guint interval);
@@ -221,6 +233,9 @@ guint           tsmux_get_si_interval           (TsMux *mux);
 void            tsmux_resend_si                 (TsMux *mux);
 gboolean        tsmux_add_mpegts_si_section     (TsMux * mux, GstMpegtsSection * section);
 
+/* One-time sections */
+gboolean        tsmux_send_section              (TsMux *mux, GstMpegtsSection *section);
+
 /* stream management */
 TsMuxStream *  tsmux_create_stream             (TsMux *mux, guint stream_type, guint16 pid, gchar *language);
 TsMuxStream *  tsmux_find_stream               (TsMux *mux, guint16 pid);
index 7bfc15c..eea4216 100644 (file)
@@ -123,6 +123,8 @@ G_BEGIN_DECLS
 #define TSMUX_DEFAULT_SI_INTERVAL  (TSMUX_CLOCK_FREQ / 10)
 /* PCR interval (1/25th sec) */
 #define TSMUX_DEFAULT_PCR_INTERVAL  (TSMUX_CLOCK_FREQ / 25)
+/* SCTE-35 NULL Interval (5mins) */
+#define TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL (TSMUX_CLOCK_FREQ * 300)
 /* Bitrate (bits per second) */
 #define TSMUX_DEFAULT_BITRATE      0
 
index 3a38c84..d32aeff 100644 (file)
@@ -1,4 +1,4 @@
-foreach fname : ['ts-parser.c', 'ts-section-writer.c']
+foreach fname : ['ts-parser.c', 'ts-section-writer.c', 'ts-scte-writer.c']
   exe_name = fname.split('.').get(0).underscorify()
 
   executable(exe_name,
diff --git a/tests/examples/mpegts/ts-scte-writer.c b/tests/examples/mpegts/ts-scte-writer.c
new file mode 100644 (file)
index 0000000..00c1fe8
--- /dev/null
@@ -0,0 +1,101 @@
+#include <gst/gst.h>
+#include <gst/mpegts/mpegts.h>
+
+/* 45s stream
+ * Send scte-35 NULL packets every 5s
+ * Use PID 123 for SCTE-35 */
+#define PIPELINE_STR "videotestsrc is-live=True num-buffers=1350 ! video/x-raw,framerate=30/1 ! x264enc tune=zerolatency ! queue ! mpegtsmux name=mux scte-35-pid=123 scte-35-null-interval=450000 ! filesink location=test-scte.ts"
+
+static void
+_on_bus_message (GstBus * bus, GstMessage * message, GMainLoop * mainloop)
+{
+  switch (GST_MESSAGE_TYPE (message)) {
+    case GST_MESSAGE_ERROR:
+    case GST_MESSAGE_EOS:
+      g_main_loop_quit (mainloop);
+      break;
+    default:
+      break;
+  }
+}
+
+static void
+send_splice (GstElement * mux, gboolean out)
+{
+  GstMpegtsSCTESIT *sit;
+  GstMpegtsSection *section;
+
+  g_print ("Sending Splice %s event\n", out ? "Out" : "In");
+
+  /* Splice is at 5s for 30s */
+  if (out)
+    sit = gst_mpegts_scte_splice_out_new (1, 5 * 90000, 30 * 90000);
+  else
+    sit = gst_mpegts_scte_splice_in_new (2, 35 * 90000);
+
+  section = gst_mpegts_section_from_scte_sit (sit, 123);
+  gst_mpegts_section_send_event (section, mux);
+  gst_mpegts_section_unref (section);
+}
+
+static gboolean
+send_splice_in (GstElement * mux)
+{
+  send_splice (mux, FALSE);
+
+  return G_SOURCE_REMOVE;
+}
+
+static gboolean
+send_splice_out (GstElement * mux)
+{
+  send_splice (mux, TRUE);
+
+  /* In 30s send the splice-in one */
+  g_timeout_add_seconds (30, (GSourceFunc) send_splice_in, mux);
+
+  return G_SOURCE_REMOVE;
+}
+
+int
+main (int argc, char **argv)
+{
+  GstElement *pipeline = NULL;
+  GError *error = NULL;
+  GstBus *bus;
+  GMainLoop *mainloop;
+  GstElement *mux;
+
+  gst_init (&argc, &argv);
+  gst_mpegts_initialize ();
+
+  pipeline = gst_parse_launch (PIPELINE_STR, &error);
+  if (error) {
+    g_print ("pipeline could not be constructed: %s\n", error->message);
+    g_clear_error (&error);
+    return 1;
+  }
+
+  mainloop = g_main_loop_new (NULL, FALSE);
+
+  mux = gst_bin_get_by_name (GST_BIN (pipeline), "mux");
+  /* Send splice-out 1s in */
+  g_timeout_add_seconds (1, (GSourceFunc) send_splice_out, mux);
+  gst_object_unref (mux);
+
+  /* Put a bus handler */
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  gst_bus_add_signal_watch (bus);
+  g_signal_connect (bus, "message", (GCallback) _on_bus_message, mainloop);
+
+  /* Start pipeline */
+  gst_element_set_state (pipeline, GST_STATE_PLAYING);
+  g_main_loop_run (mainloop);
+
+  gst_element_set_state (pipeline, GST_STATE_NULL);
+
+  gst_object_unref (pipeline);
+  gst_object_unref (bus);
+
+  return 0;
+}