rtpmux: Aggregate incoming segments
authorOlivier Crête <olivier.crete@collabora.co.uk>
Fri, 7 May 2010 20:42:22 +0000 (16:42 -0400)
committerTim-Philipp Müller <tim@centricular.net>
Sun, 16 Dec 2012 16:35:14 +0000 (16:35 +0000)
gst/rtpmanager/gstrtpmux.c
gst/rtpmanager/gstrtpmux.h
tests/check/elements/rtpmux.c

index 851ea06..3c72497 100644 (file)
@@ -62,6 +62,7 @@ typedef struct
   guint clock_base;
 
   GstCaps *out_caps;
+  GstSegment segment;
 } GstRTPMuxPadPrivate;
 
 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
@@ -82,6 +83,7 @@ static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer);
 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps);
 static GstCaps *gst_rtp_mux_getcaps (GstPad * pad);
+static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event);
 
 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
     element, GstStateChange transition);
@@ -219,6 +221,8 @@ gst_rtp_mux_init (GstRTPMux * object, GstRTPMuxClass * g_class)
   object->ssrc = DEFAULT_SSRC;
   object->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
   object->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
+
+  object->segment_pending = TRUE;
 }
 
 static void
@@ -234,15 +238,15 @@ gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
   gst_pad_set_getcaps_function (sinkpad, gst_rtp_mux_getcaps);
   if (klass->chain_func)
     gst_pad_set_chain_function (sinkpad, klass->chain_func);
-  if (klass->sink_event_func)
-    gst_pad_set_event_function (sinkpad, klass->sink_event_func);
+  gst_pad_set_event_function (sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
 
-  /* This could break with gstreamer 0.10.9 */
-  gst_pad_set_active (sinkpad, TRUE);
+
+  gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
 
   gst_pad_set_element_private (sinkpad, padpriv);
 
-  /* dd the pad to the element */
+  gst_pad_set_active (sinkpad, TRUE);
   gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
 }
 
@@ -318,6 +322,7 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
   GstRTPMux *rtp_mux;
   GstFlowReturn ret;
   GstRTPMuxPadPrivate *padpriv;
+  GstEvent *newseg_event = NULL;
 
   rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
 
@@ -333,15 +338,34 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
   rtp_mux->seqnum++;
   gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum);
   padpriv = gst_pad_get_element_private (pad);
-  if (padpriv)
+  if (padpriv) {
     gst_buffer_set_caps (buffer, padpriv->out_caps);
+    if (padpriv->segment.format == GST_FORMAT_TIME)
+      GST_BUFFER_TIMESTAMP (buffer) =
+          gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
+          GST_BUFFER_TIMESTAMP (buffer));
+  }
+
+  if (rtp_mux->segment_pending) {
+    /*
+     * We set the start at 0, because we re-timestamps to the running time
+     */
+    newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
+        GST_FORMAT_TIME, 0, -1, 0);
+
+    rtp_mux->segment_pending = FALSE;
+  }
   GST_OBJECT_UNLOCK (rtp_mux);
+
   gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc);
   gst_rtp_mux_readjust_rtp_timestamp (rtp_mux, pad, buffer);
   GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u",
       GST_BUFFER_SIZE (buffer), rtp_mux->seqnum,
       gst_rtp_buffer_get_timestamp (buffer));
 
+  if (newseg_event)
+    gst_pad_push_event (rtp_mux->srcpad, newseg_event);
+
   if (!padpriv) {
     ret = GST_FLOW_NOT_LINKED;
     gst_buffer_unref (buffer);
@@ -569,10 +593,103 @@ gst_rtp_mux_set_property (GObject * object,
   }
 }
 
+static gboolean
+gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event)
+{
+
+  GstRTPMux *mux;
+  gboolean ret = FALSE;
+  gboolean forward = TRUE;
+
+  mux = GST_RTP_MUX (gst_pad_get_parent (pad));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+    {
+      GstRTPMuxPadPrivate *padpriv;
+
+      GST_OBJECT_LOCK (mux);
+      mux->segment_pending = TRUE;
+      padpriv = gst_pad_get_element_private (pad);
+      if (padpriv)
+        gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
+      GST_OBJECT_UNLOCK (pad);
+    }
+      break;
+    case GST_EVENT_NEWSEGMENT:
+    {
+      gboolean update;
+      gdouble rate, applied_rate;
+      GstFormat format;
+      gint64 start, stop, position;
+      GstRTPMuxPadPrivate *padpriv;
+
+      gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
+          &format, &start, &stop, &position);
+
+      GST_OBJECT_LOCK (mux);
+      padpriv = gst_pad_get_element_private (pad);
+
+      if (padpriv) {
+        if (format == GST_FORMAT_TIME)
+          gst_segment_set_newsegment_full (&padpriv->segment, update,
+              rate, applied_rate, format, start, stop, position);
+        else
+          gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
+      }
+      GST_OBJECT_UNLOCK (mux);
+      gst_event_unref (event);
+      forward = FALSE;
+      ret = TRUE;
+      break;
+    }
+    default:
+      break;
+  }
+
+  if (forward) {
+    GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (mux);
+
+    if (klass->sink_event_func)
+      klass->sink_event_func (pad, event);
+    else
+      ret = gst_pad_push_event (mux->srcpad, event);
+  }
+
+  gst_object_unref (mux);
+  return ret;
+}
+
+
+static void
+clear_segment (gpointer data, gpointer user_data)
+{
+  GstPad *pad = data;
+  GstRTPMux *mux = user_data;
+  GstRTPMuxPadPrivate *padpriv;
+
+  GST_OBJECT_LOCK (mux);
+  padpriv = gst_pad_get_element_private (pad);
+  if (padpriv)
+    gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
+  GST_OBJECT_UNLOCK (mux);
+
+  gst_object_unref (pad);
+}
+
+
 static void
 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
 {
+  GstIterator *iter;
+
+  iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux));
+  while (gst_iterator_foreach (iter, clear_segment, rtp_mux) ==
+      GST_ITERATOR_RESYNC);
+  gst_iterator_free (iter);
+
   GST_OBJECT_LOCK (rtp_mux);
+  rtp_mux->segment_pending = TRUE;
 
   if (rtp_mux->ssrc == -1)
     rtp_mux->current_ssrc = g_random_int ();
@@ -589,6 +706,7 @@ gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
     rtp_mux->ts_base = g_random_int ();
   else
     rtp_mux->ts_base = rtp_mux->ts_offset;
+
   GST_DEBUG_OBJECT (rtp_mux, "set clock-base to %u", rtp_mux->ts_base);
 
   GST_OBJECT_UNLOCK (rtp_mux);
index 77a394b..8be74ad 100644 (file)
@@ -58,6 +58,8 @@ struct _GstRTPMux
   guint16 seqnum;               /* protected by object lock */
   guint ssrc;
   guint current_ssrc;
+
+  gboolean segment_pending;
 };
 
 struct _GstRTPMuxClass
index b37522c..5c55634 100644 (file)
@@ -60,6 +60,14 @@ setcaps_func (GstPad * pad, GstCaps * caps)
   return TRUE;
 }
 
+static gboolean
+event_func (GstPad * pad, GstEvent * event)
+{
+  gst_event_unref (event);
+
+  return TRUE;
+}
+
 static void
 test_basic (const gchar * elem_name, int count, check_cb cb)
 {
@@ -74,6 +82,7 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
   GstCaps *src2caps = NULL;
   GstCaps *sinkcaps = NULL;
   GstCaps *caps;
+  GstEvent *newsegment;
   int i;
 
   rtpmux = gst_check_setup_element (elem_name);
@@ -92,6 +101,7 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
   gst_pad_set_getcaps_function (src2, getcaps_func);
   gst_pad_set_getcaps_function (sink, getcaps_func);
   gst_pad_set_setcaps_function (sink, setcaps_func);
+  gst_pad_set_event_function (sink, event_func);
   g_object_set_data (G_OBJECT (src1), "caps", &src1caps);
   g_object_set_data (G_OBJECT (src2), "caps", &src2caps);
   g_object_set_data (G_OBJECT (sink), "caps", &sinkcaps);
@@ -130,8 +140,17 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
       "ssrc", G_TYPE_UINT, 66, NULL);
   fail_unless (gst_pad_set_caps (src1, caps));
 
+  newsegment = gst_event_new_new_segment (FALSE, 1, GST_FORMAT_TIME,
+      100000, -1, 0);
+  fail_unless (gst_pad_push_event (src1, newsegment));
+  newsegment = gst_event_new_new_segment (FALSE, 1, GST_FORMAT_TIME,
+      50000, -1, 0);
+  fail_unless (gst_pad_push_event (src2, newsegment));
+
   for (i = 0; i < count; i++) {
     inbuf = gst_rtp_buffer_new_allocate (10, 0, 0);
+    GST_BUFFER_TIMESTAMP (inbuf) = i * 1000 + 100000;
+    GST_BUFFER_DURATION (inbuf) = 1000;
     gst_buffer_set_caps (inbuf, caps);
     gst_rtp_buffer_set_version (inbuf, 2);
     gst_rtp_buffer_set_payload_type (inbuf, 98);
@@ -140,6 +159,10 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
     gst_rtp_buffer_set_seq (inbuf, 2000 + i);
     fail_unless (gst_pad_push (src1, inbuf) == GST_FLOW_OK);
 
+    if (buffers)
+      fail_unless (GST_BUFFER_TIMESTAMP (buffers->data) == i * 1000, "%lld",
+          GST_BUFFER_TIMESTAMP (buffers->data));
+
     cb (src2, i);
 
     g_list_foreach (buffers, (GFunc) gst_buffer_unref, NULL);