rtpmanager: Update codes based on 1.18.4
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpmux.c
index f6c40b8..4d16d5d 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
  * SECTION:element-rtpmux
+ * @title: rtpmux
  * @see_also: rtpdtmfmux
  *
  * The rtp muxer takes multiple RTP streams having the same clock-rate and
  * muxes into a single stream with a single SSRC.
  *
- * <refsect2>
- * <title>Example pipelines</title>
+ * ## Example pipelines
  * |[
- * gst-launch rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
+ * gst-launch-1.0 rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
  *              alsasrc ! alawenc ! rtppcmapay !                        \
  *              application/x-rtp, payload=8, rate=8000 ! mux.sink_0    \
  *              audiotestsrc is-live=1 !                                \
@@ -45,9 +45,7 @@
  * In this example, an audio stream is captured from ALSA and another is
  * generated, both are encoded into different payload types and muxed together
  * so they can be sent on the same port.
- * </refsect2>
  *
- * Last reviewed on 2010-09-30 (0.10.21)
  */
 
 #ifdef HAVE_CONFIG_H
@@ -65,7 +63,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
 
 enum
 {
-  ARG_0,
+  PROP_0,
   PROP_TIMESTAMP_OFFSET,
   PROP_SEQNUM_OFFSET,
   PROP_SEQNUM,
@@ -82,21 +80,25 @@ static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
-static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
+static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_PAD_SINK,
     GST_PAD_REQUEST,
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
-    GstPadTemplate * templ, const gchar * name);
+    GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
-static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer);
-static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad,
+static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer);
+static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
     GstBufferList * bufferlist);
-static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps);
-static GstCaps *gst_rtp_mux_getcaps (GstPad * pad);
-static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux,
+    GstCaps * caps);
+static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 
 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
     element, GstStateChange transition);
@@ -107,22 +109,11 @@ static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 static void gst_rtp_mux_dispose (GObject * object);
 
-GST_BOILERPLATE (GstRTPMux, gst_rtp_mux, GstElement, GST_TYPE_ELEMENT);
+static gboolean gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux,
+    GstEvent * event);
 
-static void
-gst_rtp_mux_base_init (gpointer g_class)
-{
-  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+G_DEFINE_TYPE (GstRTPMux, gst_rtp_mux, GST_TYPE_ELEMENT);
 
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&src_factory));
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&sink_factory));
-
-  gst_element_class_set_details_simple (element_class, "RTP muxer",
-      "Codec/Muxer",
-      "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
-}
 
 static void
 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
@@ -133,10 +124,20 @@ gst_rtp_mux_class_init (GstRTPMuxClass * klass)
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
 
+
+  gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
+  gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
+
+  gst_element_class_set_static_metadata (gstelement_class, "RTP muxer",
+      "Codec/Muxer",
+      "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
+
   gobject_class->get_property = gst_rtp_mux_get_property;
   gobject_class->set_property = gst_rtp_mux_set_property;
   gobject_class->dispose = gst_rtp_mux_dispose;
 
+  klass->src_event = gst_rtp_mux_src_event_real;
+
   g_object_class_install_property (G_OBJECT_CLASS (klass),
       PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
           "Timestamp Offset",
@@ -153,9 +154,10 @@ gst_rtp_mux_class_init (GstRTPMuxClass * klass)
           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SSRC,
       g_param_spec_uint ("ssrc", "SSRC",
-          "The SSRC of the packets (-1 == random)",
+          "The SSRC of the packets (default == random)",
           0, G_MAXUINT, DEFAULT_SSRC,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
+          G_PARAM_STATIC_STRINGS));
 
   gstelement_class->request_new_pad =
       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
@@ -166,8 +168,11 @@ gst_rtp_mux_class_init (GstRTPMuxClass * klass)
 static void
 gst_rtp_mux_dispose (GObject * object)
 {
+  GstRTPMux *rtp_mux = GST_RTP_MUX (object);
   GList *item;
 
+  g_clear_object (&rtp_mux->last_pad);
+
 restart:
   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
     GstPad *pad = GST_PAD (item->data);
@@ -177,83 +182,110 @@ restart:
     }
   }
 
-  G_OBJECT_CLASS (parent_class)->dispose (object);
+  G_OBJECT_CLASS (gst_rtp_mux_parent_class)->dispose (object);
 }
 
 static gboolean
-gst_rtp_mux_src_event (GstPad * pad, GstEvent * event)
+gst_rtp_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
-  GstElement *rtp_mux;
-  GstIterator *iter;
-  GstPad *sinkpad;
-  gboolean result = FALSE;
-  gboolean done = FALSE;
-
-  rtp_mux = gst_pad_get_parent_element (pad);
-  g_return_val_if_fail (rtp_mux != NULL, FALSE);
-
-  iter = gst_element_iterate_sink_pads (rtp_mux);
-
-  while (!done) {
-    switch (gst_iterator_next (iter, (gpointer) & sinkpad)) {
-      case GST_ITERATOR_OK:
-        gst_event_ref (event);
-        result |= gst_pad_push_event (sinkpad, event);
-        gst_object_unref (sinkpad);
-        break;
-      case GST_ITERATOR_RESYNC:
-        gst_iterator_resync (iter);
-        result = FALSE;
-        break;
-      case GST_ITERATOR_ERROR:
-        GST_WARNING_OBJECT (rtp_mux, "Error iterating sinkpads");
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
+  GstRTPMux *rtp_mux = GST_RTP_MUX (parent);
+  GstRTPMuxClass *klass;
+  gboolean ret;
+
+  klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
+
+  ret = klass->src_event (rtp_mux, event);
+
+  return ret;
+}
+
+static gboolean
+gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux, GstEvent * event)
+{
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CUSTOM_UPSTREAM:
+    {
+      const GstStructure *s = gst_event_get_structure (event);
+
+      if (gst_structure_has_name (s, "GstRTPCollision")) {
+        guint ssrc = 0;
+
+        if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+          ssrc = -1;
+
+        GST_DEBUG_OBJECT (rtp_mux, "collided ssrc: %x", ssrc);
+
+        /* choose another ssrc for our stream */
+        GST_OBJECT_LOCK (rtp_mux);
+        if (ssrc == rtp_mux->current_ssrc) {
+          GstCaps *caps;
+          guint suggested_ssrc = 0;
+          guint32 new_ssrc;
+
+          if (gst_structure_get_uint (s, "suggested-ssrc", &suggested_ssrc))
+            rtp_mux->current_ssrc = suggested_ssrc;
+
+          while (ssrc == rtp_mux->current_ssrc)
+            rtp_mux->current_ssrc = g_random_int ();
+
+          new_ssrc = rtp_mux->current_ssrc;
+          GST_INFO_OBJECT (rtp_mux, "New ssrc after collision %x (was: %x)",
+              new_ssrc, ssrc);
+          GST_OBJECT_UNLOCK (rtp_mux);
+
+          caps = gst_pad_get_current_caps (rtp_mux->srcpad);
+          caps = gst_caps_make_writable (caps);
+          gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, new_ssrc, NULL);
+          gst_pad_set_caps (rtp_mux->srcpad, caps);
+          gst_caps_unref (caps);
+        } else {
+          GST_OBJECT_UNLOCK (rtp_mux);
+        }
+      }
+      break;
     }
+    default:
+      break;
   }
-  gst_iterator_free (iter);
-  gst_object_unref (rtp_mux);
-  gst_event_unref (event);
 
-  return result;
+
+  return gst_pad_event_default (rtp_mux->srcpad, GST_OBJECT (rtp_mux), event);
 }
 
 static void
-gst_rtp_mux_init (GstRTPMux * object, GstRTPMuxClass * g_class)
+gst_rtp_mux_init (GstRTPMux * rtp_mux)
 {
-  GstElementClass *klass = GST_ELEMENT_GET_CLASS (object);
+  GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
 
-  object->srcpad =
+  rtp_mux->srcpad =
       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
           "src"), "src");
-  gst_pad_set_event_function (object->srcpad,
+  gst_pad_set_event_function (rtp_mux->srcpad,
       GST_DEBUG_FUNCPTR (gst_rtp_mux_src_event));
-  gst_element_add_pad (GST_ELEMENT (object), object->srcpad);
+  gst_pad_use_fixed_caps (rtp_mux->srcpad);
+  gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
 
-  object->ssrc = DEFAULT_SSRC;
-  object->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
-  object->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
+  rtp_mux->ssrc = DEFAULT_SSRC;
+  rtp_mux->current_ssrc = DEFAULT_SSRC;
+  rtp_mux->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
+  rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
 
-  object->segment_pending = TRUE;
+  rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
 }
 
 static void
 gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
 {
-  GstRTPMuxClass *klass;
   GstRTPMuxPadPrivate *padpriv = g_slice_new0 (GstRTPMuxPadPrivate);
 
-  klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
-
   /* setup some pad functions */
-  gst_pad_set_setcaps_function (sinkpad, gst_rtp_mux_setcaps);
-  gst_pad_set_getcaps_function (sinkpad, gst_rtp_mux_getcaps);
   gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
   gst_pad_set_chain_list_function (sinkpad,
       GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
   gst_pad_set_event_function (sinkpad,
       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
+  gst_pad_set_query_function (sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_query));
 
 
   gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
@@ -266,7 +298,7 @@ gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
 
 static GstPad *
 gst_rtp_mux_request_new_pad (GstElement * element,
-    GstPadTemplate * templ, const gchar * req_name)
+    GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
 {
   GstRTPMux *rtp_mux;
   GstPad *newpad;
@@ -303,74 +335,124 @@ gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
   gst_element_remove_pad (element, pad);
 
   if (padpriv) {
-    gst_caps_replace (&padpriv->out_caps, NULL);
     g_slice_free (GstRTPMuxPadPrivate, padpriv);
   }
 }
 
-/* Put our own clock-base on the buffer */
+/* Put our own timestamp-offset on the buffer */
 static void
 gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
-    GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer)
+    GstRTPMuxPadPrivate * padpriv, GstRTPBuffer * rtpbuffer)
 {
   guint32 ts;
   guint32 sink_ts_base = 0;
 
-  if (padpriv && padpriv->have_clock_base)
-    sink_ts_base = padpriv->clock_base;
+  if (padpriv && padpriv->have_timestamp_offset)
+    sink_ts_base = padpriv->timestamp_offset;
 
-  ts = gst_rtp_buffer_get_timestamp (buffer) - sink_ts_base + rtp_mux->ts_base;
+  ts = gst_rtp_buffer_get_timestamp (rtpbuffer) - sink_ts_base +
+      rtp_mux->ts_base;
   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
-      gst_rtp_buffer_get_timestamp (buffer), ts);
-  gst_rtp_buffer_set_timestamp (buffer, ts);
+      gst_rtp_buffer_get_timestamp (rtpbuffer), ts);
+  gst_rtp_buffer_set_timestamp (rtpbuffer, ts);
 }
 
 static gboolean
 process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
-    GstBuffer * buffer)
+    GstRTPBuffer * rtpbuffer)
 {
   GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
 
   if (klass->accept_buffer_locked)
-    if (!klass->accept_buffer_locked (rtp_mux, padpriv, buffer))
+    if (!klass->accept_buffer_locked (rtp_mux, padpriv, rtpbuffer))
       return FALSE;
 
   rtp_mux->seqnum++;
-  gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum);
+  gst_rtp_buffer_set_seq (rtpbuffer, rtp_mux->seqnum);
 
-  gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc);
-  gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, buffer);
-  GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u",
-      GST_BUFFER_SIZE (buffer), rtp_mux->seqnum,
-      gst_rtp_buffer_get_timestamp (buffer));
+  gst_rtp_buffer_set_ssrc (rtpbuffer, rtp_mux->current_ssrc);
+  gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, rtpbuffer);
+  GST_LOG_OBJECT (rtp_mux,
+      "Pushing packet size %" G_GSIZE_FORMAT ", seq=%d, ts=%u, ssrc=%x",
+      rtpbuffer->map[0].size, rtp_mux->seqnum,
+      gst_rtp_buffer_get_timestamp (rtpbuffer), rtp_mux->current_ssrc);
 
   if (padpriv) {
-    gst_buffer_set_caps (buffer, padpriv->out_caps);
-    if (padpriv->segment.format == GST_FORMAT_TIME)
-      GST_BUFFER_TIMESTAMP (buffer) =
+    if (padpriv->segment.format == GST_FORMAT_TIME) {
+      GST_BUFFER_PTS (rtpbuffer->buffer) =
           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
-          GST_BUFFER_TIMESTAMP (buffer));
+          GST_BUFFER_PTS (rtpbuffer->buffer));
+      GST_BUFFER_DTS (rtpbuffer->buffer) =
+          gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
+          GST_BUFFER_DTS (rtpbuffer->buffer));
+    }
   }
 
   return TRUE;
 }
 
-static GstFlowReturn
-gst_rtp_mux_chain_list (GstPad * pad, GstBufferList * bufferlist)
+struct BufferListData
 {
   GstRTPMux *rtp_mux;
-  GstFlowReturn ret;
-  GstBufferListIterator *it;
   GstRTPMuxPadPrivate *padpriv;
-  GstEvent *newseg_event = NULL;
-  gboolean drop = TRUE;
+  gboolean drop;
+};
 
-  rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
+static gboolean
+process_list_item (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+  struct BufferListData *bd = user_data;
+  GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
 
-  if (!gst_rtp_buffer_list_validate (bufferlist)) {
-    GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
-    gst_object_unref (rtp_mux);
-    return GST_FLOW_ERROR;
+  *buffer = gst_buffer_make_writable (*buffer);
+
+  gst_rtp_buffer_map (*buffer, GST_MAP_READWRITE, &rtpbuffer);
+
+  bd->drop = !process_buffer_locked (bd->rtp_mux, bd->padpriv, &rtpbuffer);
+
+  gst_rtp_buffer_unmap (&rtpbuffer);
+
+  if (bd->drop)
+    return FALSE;
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buffer) &&
+      GST_BUFFER_PTS_IS_VALID (*buffer))
+    bd->rtp_mux->last_stop = GST_BUFFER_PTS (*buffer) +
+        GST_BUFFER_DURATION (*buffer);
+  else
+    bd->rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
+
+  return TRUE;
+}
+
+static gboolean resend_events (GstPad * pad, GstEvent ** event,
+    gpointer user_data);
+
+static GstFlowReturn
+gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * bufferlist)
+{
+  GstRTPMux *rtp_mux;
+  GstFlowReturn ret;
+  GstRTPMuxPadPrivate *padpriv;
+  gboolean changed = FALSE;
+  struct BufferListData bd;
+
+  rtp_mux = GST_RTP_MUX (parent);
+
+  if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
+    GstCaps *current_caps = gst_pad_get_current_caps (pad);
+
+    if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
+      gst_pad_mark_reconfigure (rtp_mux->srcpad);
+      if (GST_PAD_IS_FLUSHING (rtp_mux->srcpad))
+        ret = GST_FLOW_FLUSHING;
+      else
+        ret = GST_FLOW_NOT_NEGOTIATED;
+      gst_buffer_list_unref (bufferlist);
+      goto out;
+    }
+    gst_caps_unref (current_caps);
   }
 
   GST_OBJECT_LOCK (rtp_mux);
@@ -383,39 +465,25 @@ gst_rtp_mux_chain_list (GstPad * pad, GstBufferList * bufferlist)
     goto out;
   }
 
-  bufferlist = gst_buffer_list_make_writable (bufferlist);
-  it = gst_buffer_list_iterate (bufferlist);
-  while (gst_buffer_list_iterator_next_group (it)) {
-    GstBuffer *rtpbuf;
-
-    rtpbuf = gst_buffer_list_iterator_next (it);
-    rtpbuf = gst_buffer_make_writable (rtpbuf);
-
-    drop = !process_buffer_locked (rtp_mux, padpriv, rtpbuf);
-
-    if (drop)
-      break;
-
-    gst_buffer_list_iterator_take (it, rtpbuf);
-  }
-  gst_buffer_list_iterator_free (it);
+  bd.rtp_mux = rtp_mux;
+  bd.padpriv = padpriv;
+  bd.drop = FALSE;
 
-  if (!drop && rtp_mux->segment_pending) {
-    /*
-     * We set the start at 0, because we re-timestamps to the running time
-     */
-    newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
-        GST_FORMAT_TIME, 0, -1, 0);
+  bufferlist = gst_buffer_list_make_writable (bufferlist);
+  gst_buffer_list_foreach (bufferlist, process_list_item, &bd);
 
-    rtp_mux->segment_pending = FALSE;
+  if (!bd.drop && pad != rtp_mux->last_pad) {
+    changed = TRUE;
+    g_clear_object (&rtp_mux->last_pad);
+    rtp_mux->last_pad = g_object_ref (pad);
   }
 
   GST_OBJECT_UNLOCK (rtp_mux);
 
-  if (newseg_event)
-    gst_pad_push_event (rtp_mux->srcpad, newseg_event);
+  if (changed)
+    gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
 
-  if (drop) {
+  if (bd.drop) {
     gst_buffer_list_unref (bufferlist);
     ret = GST_FLOW_OK;
   } else {
@@ -424,26 +492,55 @@ gst_rtp_mux_chain_list (GstPad * pad, GstBufferList * bufferlist)
 
 out:
 
-  gst_object_unref (rtp_mux);
-
   return ret;
 }
 
+static gboolean
+resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+  GstRTPMux *rtp_mux = user_data;
+
+  if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
+    GstCaps *caps;
+
+    gst_event_parse_caps (*event, &caps);
+    gst_rtp_mux_setcaps (pad, rtp_mux, caps);
+  } else if (GST_EVENT_TYPE (*event) == GST_EVENT_SEGMENT) {
+    GstSegment new_segment;
+    gst_segment_init (&new_segment, GST_FORMAT_TIME);
+    gst_pad_push_event (rtp_mux->srcpad, gst_event_new_segment (&new_segment));
+  } else {
+    gst_pad_push_event (rtp_mux->srcpad, gst_event_ref (*event));
+  }
+
+  return TRUE;
+}
+
 static GstFlowReturn
-gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
+gst_rtp_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstRTPMux *rtp_mux;
   GstFlowReturn ret;
   GstRTPMuxPadPrivate *padpriv;
-  GstEvent *newseg_event = NULL;
   gboolean drop;
-
-  rtp_mux = GST_RTP_MUX (GST_OBJECT_PARENT (pad));
-
-  if (!gst_rtp_buffer_validate (buffer)) {
-    gst_buffer_unref (buffer);
-    GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
-    return GST_FLOW_ERROR;
+  gboolean changed = FALSE;
+  GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
+
+  rtp_mux = GST_RTP_MUX (parent);
+
+  if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
+    GstCaps *current_caps = gst_pad_get_current_caps (pad);
+
+    if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
+      gst_pad_mark_reconfigure (rtp_mux->srcpad);
+      if (GST_PAD_IS_FLUSHING (rtp_mux->srcpad))
+        ret = GST_FLOW_FLUSHING;
+      else
+        ret = GST_FLOW_NOT_NEGOTIATED;
+      gst_buffer_unref (buffer);
+      goto out;
+    }
+    gst_caps_unref (current_caps);
   }
 
   GST_OBJECT_LOCK (rtp_mux);
@@ -457,21 +554,36 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
 
   buffer = gst_buffer_make_writable (buffer);
 
-  drop = !process_buffer_locked (rtp_mux, padpriv, buffer);
+  if (!gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtpbuffer)) {
+    GST_OBJECT_UNLOCK (rtp_mux);
+    gst_buffer_unref (buffer);
+    GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
+    return GST_FLOW_ERROR;
+  }
+
+  drop = !process_buffer_locked (rtp_mux, padpriv, &rtpbuffer);
 
-  if (!drop && rtp_mux->segment_pending) {
-    /*
-     * We set the start at 0, because we re-timestamps to the running time
-     */
-    newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
-        GST_FORMAT_TIME, 0, -1, 0);
+  gst_rtp_buffer_unmap (&rtpbuffer);
 
-    rtp_mux->segment_pending = FALSE;
+  if (!drop) {
+    if (pad != rtp_mux->last_pad) {
+      changed = TRUE;
+      g_clear_object (&rtp_mux->last_pad);
+      rtp_mux->last_pad = g_object_ref (pad);
+    }
+
+    if (GST_BUFFER_DURATION_IS_VALID (buffer) &&
+        GST_BUFFER_PTS_IS_VALID (buffer))
+      rtp_mux->last_stop = GST_BUFFER_PTS (buffer) +
+          GST_BUFFER_DURATION (buffer);
+    else
+      rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
   }
+
   GST_OBJECT_UNLOCK (rtp_mux);
 
-  if (newseg_event)
-    gst_pad_push_event (rtp_mux->srcpad, newseg_event);
+  if (changed)
+    gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
 
   if (drop) {
     gst_buffer_unref (buffer);
@@ -480,61 +592,105 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
     ret = gst_pad_push (rtp_mux->srcpad, buffer);
   }
 
+out:
   return ret;
 }
 
 static gboolean
-gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps)
+gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux, GstCaps * caps)
 {
-  GstRTPMux *rtp_mux;
   GstStructure *structure;
   gboolean ret = FALSE;
   GstRTPMuxPadPrivate *padpriv;
+  GstCaps *peercaps;
+
+  if (caps == NULL)
+    return FALSE;
 
-  rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
+  if (!gst_caps_is_fixed (caps))
+    return FALSE;
+
+  peercaps = gst_pad_peer_query_caps (rtp_mux->srcpad, NULL);
+  if (peercaps) {
+    GstCaps *tcaps, *othercaps;;
+    tcaps = gst_pad_get_pad_template_caps (pad);
+    othercaps = gst_caps_intersect_full (peercaps, tcaps,
+        GST_CAPS_INTERSECT_FIRST);
+
+    if (gst_caps_get_size (othercaps) > 0) {
+      structure = gst_caps_get_structure (othercaps, 0);
+      GST_OBJECT_LOCK (rtp_mux);
+      if (gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
+        GST_INFO_OBJECT (pad, "Use downstream ssrc: %x", rtp_mux->current_ssrc);
+        rtp_mux->have_ssrc = TRUE;
+      }
+      if (gst_structure_get_uint (structure,
+              "timestamp-offset", &rtp_mux->ts_base)) {
+        GST_INFO_OBJECT (pad, "Use downstream timestamp-offset: %u",
+            rtp_mux->ts_base);
+      }
+      GST_OBJECT_UNLOCK (rtp_mux);
+    }
+
+    gst_caps_unref (othercaps);
+
+    gst_caps_unref (peercaps);
+    gst_caps_unref (tcaps);
+  }
 
   structure = gst_caps_get_structure (caps, 0);
 
   if (!structure)
-    goto out;
+    return FALSE;
 
   GST_OBJECT_LOCK (rtp_mux);
   padpriv = gst_pad_get_element_private (pad);
   if (padpriv &&
-      gst_structure_get_uint (structure, "clock-base", &padpriv->clock_base)) {
-    padpriv->have_clock_base = TRUE;
+      gst_structure_get_uint (structure, "timestamp-offset",
+          &padpriv->timestamp_offset)) {
+    padpriv->have_timestamp_offset = TRUE;
   }
-  GST_OBJECT_UNLOCK (rtp_mux);
 
   caps = gst_caps_copy (caps);
 
+  /* if we don't have a specified ssrc, first try to take one from the caps,
+     and if that fails, generate one */
+  if (rtp_mux->ssrc == DEFAULT_SSRC) {
+    if (rtp_mux->current_ssrc == DEFAULT_SSRC) {
+      if (!gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
+        rtp_mux->current_ssrc = g_random_int ();
+        GST_INFO_OBJECT (rtp_mux, "Set random ssrc %x", rtp_mux->current_ssrc);
+      }
+    }
+  } else {
+    rtp_mux->current_ssrc = rtp_mux->ssrc;
+    GST_INFO_OBJECT (rtp_mux, "Set ssrc %x", rtp_mux->current_ssrc);
+  }
+
   gst_caps_set_simple (caps,
-      "clock-base", G_TYPE_UINT, rtp_mux->ts_base,
-      "seqnum-base", G_TYPE_UINT, rtp_mux->seqnum_base, NULL);
+      "timestamp-offset", G_TYPE_UINT, rtp_mux->ts_base,
+      "seqnum-offset", G_TYPE_UINT, rtp_mux->seqnum_base,
+      "ssrc", G_TYPE_UINT, rtp_mux->current_ssrc, NULL);
+
+  GST_OBJECT_UNLOCK (rtp_mux);
+
+  if (rtp_mux->send_stream_start) {
+    gchar s_id[32];
+
+    /* stream-start (FIXME: create id based on input ids) */
+    g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
+    gst_pad_push_event (rtp_mux->srcpad, gst_event_new_stream_start (s_id));
+
+    rtp_mux->send_stream_start = FALSE;
+  }
 
   GST_DEBUG_OBJECT (rtp_mux,
       "setting caps %" GST_PTR_FORMAT " on src pad..", caps);
   ret = gst_pad_set_caps (rtp_mux->srcpad, caps);
 
-  if (rtp_mux->ssrc == -1) {
-    if (gst_structure_has_field_typed (structure, "ssrc", G_TYPE_UINT)) {
-      rtp_mux->current_ssrc = g_value_get_uint
-          (gst_structure_get_value (structure, "ssrc"));
-    }
-  }
 
-  if (ret) {
-    GST_OBJECT_LOCK (rtp_mux);
-    padpriv = gst_pad_get_element_private (pad);
-    if (padpriv)
-      gst_caps_replace (&padpriv->out_caps, caps);
-    GST_OBJECT_UNLOCK (rtp_mux);
-  }
   gst_caps_unref (caps);
 
-out:
-  gst_object_unref (rtp_mux);
-
   return ret;
 }
 
@@ -560,64 +716,69 @@ clear_caps (GstCaps * caps, gboolean only_clock_rate)
 }
 
 static gboolean
-same_clock_rate_fold (gpointer item, GValue * ret, gpointer user_data)
+same_clock_rate_fold (const GValue * item, GValue * ret, gpointer user_data)
 {
   GstPad *mypad = user_data;
-  GstPad *pad = item;
+  GstPad *pad = g_value_get_object (item);
   GstCaps *peercaps;
-  GstCaps *othercaps;
-  const GstCaps *accumcaps;
-  GstCaps *intersect;
+  GstCaps *accumcaps;
 
-  if (pad == mypad) {
-    gst_object_unref (pad);
+  if (pad == mypad)
     return TRUE;
-  }
 
-  peercaps = gst_pad_peer_get_caps (pad);
+  accumcaps = g_value_get_boxed (ret);
+  peercaps = gst_pad_peer_query_caps (pad, accumcaps);
   if (!peercaps) {
-    gst_object_unref (pad);
+    g_warning ("no peercaps");
     return TRUE;
   }
+  peercaps = gst_caps_make_writable (peercaps);
+  clear_caps (peercaps, TRUE);
 
-  othercaps = gst_caps_intersect (peercaps,
-      gst_pad_get_pad_template_caps (pad));
-  gst_caps_unref (peercaps);
-
-  accumcaps = gst_value_get_caps (ret);
+  g_value_take_boxed (ret, peercaps);
 
-  clear_caps (othercaps, TRUE);
-
-  intersect = gst_caps_intersect (accumcaps, othercaps);
-
-  g_value_take_boxed (ret, intersect);
-
-  gst_caps_unref (othercaps);
-  gst_object_unref (pad);
-
-  return !gst_caps_is_empty (intersect);
+  return !gst_caps_is_empty (peercaps);
 }
 
 static GstCaps *
-gst_rtp_mux_getcaps (GstPad * pad)
+gst_rtp_mux_getcaps (GstPad * pad, GstRTPMux * mux, GstCaps * filter)
 {
-  GstRTPMux *mux = GST_RTP_MUX (gst_pad_get_parent (pad));
   GstCaps *caps = NULL;
   GstIterator *iter = NULL;
   GValue v = { 0 };
   GstIteratorResult res;
-  GstCaps *peercaps = gst_pad_peer_get_caps (mux->srcpad);
-  GstCaps *othercaps = NULL;
+  GstCaps *peercaps;
+  GstCaps *othercaps;
+  GstCaps *tcaps;
+  const GstStructure *structure;
+
+  peercaps = gst_pad_peer_query_caps (mux->srcpad, NULL);
 
   if (peercaps) {
-    othercaps = gst_caps_intersect (peercaps,
-        gst_pad_get_pad_template_caps (pad));
+    tcaps = gst_pad_get_pad_template_caps (pad);
+    othercaps = gst_caps_intersect_full (peercaps, tcaps,
+        GST_CAPS_INTERSECT_FIRST);
     gst_caps_unref (peercaps);
   } else {
-    othercaps = gst_caps_copy (gst_pad_get_pad_template_caps (mux->srcpad));
+    tcaps = gst_pad_get_pad_template_caps (mux->srcpad);
+    if (filter)
+      othercaps = gst_caps_intersect_full (filter, tcaps,
+          GST_CAPS_INTERSECT_FIRST);
+    else
+      othercaps = gst_caps_copy (tcaps);
   }
+  gst_caps_unref (tcaps);
+
+  GST_LOG_OBJECT (pad, "Intersected srcpad-peercaps and template caps: %"
+      GST_PTR_FORMAT, othercaps);
 
-  clear_caps (othercaps, FALSE);
+  structure = gst_caps_get_structure (othercaps, 0);
+  if (mux->ssrc == DEFAULT_SSRC) {
+    if (gst_structure_get_uint (structure, "ssrc", &mux->current_ssrc))
+      GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %x", mux->current_ssrc);
+  }
+
+  clear_caps (othercaps, TRUE);
 
   g_value_init (&v, GST_TYPE_CAPS);
 
@@ -625,23 +786,53 @@ gst_rtp_mux_getcaps (GstPad * pad)
   do {
     gst_value_set_caps (&v, othercaps);
     res = gst_iterator_fold (iter, same_clock_rate_fold, &v, pad);
+    gst_iterator_resync (iter);
   } while (res == GST_ITERATOR_RESYNC);
   gst_iterator_free (iter);
 
-  caps = (GstCaps *) gst_value_get_caps (&v);
+  caps = gst_caps_intersect ((GstCaps *) gst_value_get_caps (&v), othercaps);
+
+  g_value_unset (&v);
+  gst_caps_unref (othercaps);
 
   if (res == GST_ITERATOR_ERROR) {
     gst_caps_unref (caps);
     caps = gst_caps_new_empty ();
   }
 
-  if (othercaps)
-    gst_caps_unref (othercaps);
-  gst_object_unref (mux);
 
   return caps;
 }
 
+static gboolean
+gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+  GstRTPMux *mux = GST_RTP_MUX (parent);
+  gboolean res = FALSE;
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_CAPS:
+    {
+      GstCaps *filter, *caps;
+
+      gst_query_parse_caps (query, &filter);
+      GST_LOG_OBJECT (pad, "Received caps-query with filter-caps: %"
+          GST_PTR_FORMAT, filter);
+      caps = gst_rtp_mux_getcaps (pad, mux, filter);
+      gst_query_set_caps_result (query, caps);
+      GST_LOG_OBJECT (mux, "Answering caps-query with caps: %"
+          GST_PTR_FORMAT, caps);
+      gst_caps_unref (caps);
+      res = TRUE;
+      break;
+    }
+    default:
+      res = gst_pad_query_default (pad, parent, query);
+      break;
+  }
+
+  return res;
+}
 
 static void
 gst_rtp_mux_get_property (GObject * object,
@@ -651,6 +842,7 @@ gst_rtp_mux_get_property (GObject * object,
 
   rtp_mux = GST_RTP_MUX (object);
 
+  GST_OBJECT_LOCK (rtp_mux);
   switch (prop_id) {
     case PROP_TIMESTAMP_OFFSET:
       g_value_set_int (value, rtp_mux->ts_offset);
@@ -659,9 +851,7 @@ gst_rtp_mux_get_property (GObject * object,
       g_value_set_int (value, rtp_mux->seqnum_offset);
       break;
     case PROP_SEQNUM:
-      GST_OBJECT_LOCK (rtp_mux);
       g_value_set_uint (value, rtp_mux->seqnum);
-      GST_OBJECT_UNLOCK (rtp_mux);
       break;
     case PROP_SSRC:
       g_value_set_uint (value, rtp_mux->ssrc);
@@ -670,6 +860,7 @@ gst_rtp_mux_get_property (GObject * object,
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
+  GST_OBJECT_UNLOCK (rtp_mux);
 }
 
 static void
@@ -688,7 +879,12 @@ gst_rtp_mux_set_property (GObject * object,
       rtp_mux->seqnum_offset = g_value_get_int (value);
       break;
     case PROP_SSRC:
+      GST_OBJECT_LOCK (rtp_mux);
       rtp_mux->ssrc = g_value_get_uint (value);
+      rtp_mux->current_ssrc = rtp_mux->ssrc;
+      rtp_mux->have_ssrc = TRUE;
+      GST_DEBUG_OBJECT (rtp_mux, "ssrc prop set to %x", rtp_mux->ssrc);
+      GST_OBJECT_UNLOCK (rtp_mux);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -697,101 +893,75 @@ gst_rtp_mux_set_property (GObject * object,
 }
 
 static gboolean
-gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event)
+gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
+  GstRTPMux *mux = GST_RTP_MUX (parent);
+  gboolean is_pad;
+  gboolean ret;
 
-  GstRTPMux *mux;
-  gboolean ret = FALSE;
-  gboolean forward = TRUE;
-
-  mux = GST_RTP_MUX (gst_pad_get_parent (pad));
+  GST_OBJECT_LOCK (mux);
+  is_pad = (pad == mux->last_pad);
+  GST_OBJECT_UNLOCK (mux);
 
   switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_FLUSH_STOP:
+    case GST_EVENT_CAPS:
     {
-      GstRTPMuxPadPrivate *padpriv;
+      GstCaps *caps;
 
+      gst_event_parse_caps (event, &caps);
+      GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
+          GST_PTR_FORMAT, caps);
+      ret = gst_rtp_mux_setcaps (pad, mux, caps);
+      gst_event_unref (event);
+      return ret;
+    }
+    case GST_EVENT_FLUSH_STOP:
+    {
       GST_OBJECT_LOCK (mux);
-      mux->segment_pending = TRUE;
-      padpriv = gst_pad_get_element_private (pad);
-      if (padpriv)
-        gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
+      mux->last_stop = GST_CLOCK_TIME_NONE;
       GST_OBJECT_UNLOCK (mux);
-    }
       break;
-    case GST_EVENT_NEWSEGMENT:
+    }
+    case GST_EVENT_SEGMENT:
     {
-      gboolean update;
-      gdouble rate, applied_rate;
-      GstFormat format;
-      gint64 start, stop, position;
       GstRTPMuxPadPrivate *padpriv;
 
-      gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
-          &format, &start, &stop, &position);
-
       GST_OBJECT_LOCK (mux);
       padpriv = gst_pad_get_element_private (pad);
 
       if (padpriv) {
-        if (format == GST_FORMAT_TIME)
-          gst_segment_set_newsegment_full (&padpriv->segment, update,
-              rate, applied_rate, format, start, stop, position);
-        else
-          gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
+        gst_event_copy_segment (event, &padpriv->segment);
       }
       GST_OBJECT_UNLOCK (mux);
-      gst_event_unref (event);
-      forward = FALSE;
-      ret = TRUE;
+
+      if (is_pad) {
+        GstSegment new_segment;
+        gst_segment_init (&new_segment, GST_FORMAT_TIME);
+        gst_event_unref (event);
+        event = gst_event_new_segment (&new_segment);
+      }
       break;
     }
     default:
       break;
   }
 
-  if (forward)
-    ret = gst_pad_push_event (mux->srcpad, event);
-
-  gst_object_unref (mux);
-  return ret;
-}
-
-
-static void
-clear_segment (gpointer data, gpointer user_data)
-{
-  GstPad *pad = data;
-  GstRTPMux *mux = user_data;
-  GstRTPMuxPadPrivate *padpriv;
-
-  GST_OBJECT_LOCK (mux);
-  padpriv = gst_pad_get_element_private (pad);
-  if (padpriv)
-    gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
-  GST_OBJECT_UNLOCK (mux);
-
-  gst_object_unref (pad);
+  if (is_pad) {
+    return gst_pad_push_event (mux->srcpad, event);
+  } else {
+    gst_event_unref (event);
+    return TRUE;
+  }
 }
 
-
 static void
 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
 {
-  GstIterator *iter;
-
-  iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux));
-  while (gst_iterator_foreach (iter, clear_segment, rtp_mux) ==
-      GST_ITERATOR_RESYNC);
-  gst_iterator_free (iter);
 
   GST_OBJECT_LOCK (rtp_mux);
-  rtp_mux->segment_pending = TRUE;
 
-  if (rtp_mux->ssrc == -1)
-    rtp_mux->current_ssrc = g_random_int ();
-  else
-    rtp_mux->current_ssrc = rtp_mux->ssrc;
+  g_clear_object (&rtp_mux->last_pad);
+  rtp_mux->send_stream_start = TRUE;
 
   if (rtp_mux->seqnum_offset == -1)
     rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
@@ -804,7 +974,12 @@ gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
   else
     rtp_mux->ts_base = rtp_mux->ts_offset;
 
-  GST_DEBUG_OBJECT (rtp_mux, "set clock-base to %u", rtp_mux->ts_base);
+  rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
+
+  if (rtp_mux->have_ssrc)
+    rtp_mux->current_ssrc = rtp_mux->ssrc;
+
+  GST_DEBUG_OBJECT (rtp_mux, "set timestamp-offset to %u", rtp_mux->ts_base);
 
   GST_OBJECT_UNLOCK (rtp_mux);
 }
@@ -813,22 +988,30 @@ static GstStateChangeReturn
 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
 {
   GstRTPMux *rtp_mux;
+  GstStateChangeReturn ret;
 
   rtp_mux = GST_RTP_MUX (element);
 
   switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-      break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       gst_rtp_mux_ready_to_paused (rtp_mux);
       break;
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (gst_rtp_mux_parent_class)->change_state (element,
+      transition);
+
+  switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      g_clear_object (&rtp_mux->last_pad);
       break;
     default:
       break;
   }
 
-  return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  return ret;
 }
 
 gboolean