rtpmux: Add support for GstBufferList
authorOlivier Crête <olivier.crete@collabora.co.uk>
Thu, 1 Jul 2010 19:19:12 +0000 (15:19 -0400)
committerTim-Philipp Müller <tim@centricular.net>
Sun, 16 Dec 2012 16:35:15 +0000 (16:35 +0000)
Factor out most of the buffer handling and implement a chain_list
function. Also, the DTMF muxer has been modified to just have a
function to accept or reject a buffer instead of having to subclass
both chain and chain_list.

gst/rtpmanager/gstrtpdtmfmux.c
gst/rtpmanager/gstrtpmux.c
gst/rtpmanager/gstrtpmux.h

index f504874..9fd77c3 100644 (file)
@@ -66,7 +66,8 @@ static GstPad *gst_rtp_dtmf_mux_request_new_pad (GstElement * element,
 static GstStateChangeReturn gst_rtp_dtmf_mux_change_state (GstElement * element,
     GstStateChange transition);
 
-static GstFlowReturn gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer);
+static gboolean gst_rtp_dtmf_mux_accept_buffer_locked (GstRTPMux * rtp_mux,
+    GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer);
 
 GST_BOILERPLATE (GstRTPDTMFMux, gst_rtp_dtmf_mux, GstRTPMux, GST_TYPE_RTP_MUX);
 
@@ -104,24 +105,19 @@ gst_rtp_dtmf_mux_class_init (GstRTPDTMFMuxClass * klass)
       GST_DEBUG_FUNCPTR (gst_rtp_dtmf_mux_request_new_pad);
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_dtmf_mux_change_state);
-  gstrtpmux_class->chain_func = GST_DEBUG_FUNCPTR (gst_rtp_dtmf_mux_chain);
+  gstrtpmux_class->accept_buffer_locked = gst_rtp_dtmf_mux_accept_buffer_locked;
 }
 
-static GstFlowReturn
-gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer)
+static gboolean
+gst_rtp_dtmf_mux_accept_buffer_locked (GstRTPMux * rtp_mux,
+    GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer)
 {
-  GstRTPDTMFMux *mux;
-  GstFlowReturn ret = GST_FLOW_ERROR;
-  GstRTPMuxPadPrivate *padpriv = NULL;
+  GstRTPDTMFMux *mux = GST_RTP_DTMF_MUX (rtp_mux);
   GstClockTime running_ts;
 
-  mux = GST_RTP_DTMF_MUX (gst_pad_get_parent (pad));
-
   running_ts = GST_BUFFER_TIMESTAMP (buffer);
 
-  GST_OBJECT_LOCK (mux);
   if (GST_CLOCK_TIME_IS_VALID (running_ts)) {
-    padpriv = gst_pad_get_element_private (pad);
     if (padpriv && padpriv->segment.format == GST_FORMAT_TIME)
       running_ts = gst_segment_to_running_time (&padpriv->segment,
           GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buffer));
@@ -134,12 +130,12 @@ gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer)
               mux->last_priority_end);
         else
           mux->last_priority_end = running_ts + GST_BUFFER_DURATION (buffer);
-        GST_LOG_OBJECT (mux, "Got buffer %p on priority pad %s,"
+        GST_LOG_OBJECT (mux, "Got buffer %p on priority pad"
             " blocking regular pads until %" GST_TIME_FORMAT, buffer,
-            GST_PAD_NAME (pad), GST_TIME_ARGS (mux->last_priority_end));
+            GST_TIME_ARGS (mux->last_priority_end));
       } else {
-        GST_WARNING_OBJECT (mux, "Buffer %p on pad %s has an invalid duration,"
-            " not blocking other pad", buffer, GST_PAD_NAME (pad));
+        GST_WARNING_OBJECT (mux, "Buffer %p has an invalid duration,"
+            " not blocking other pad", buffer);
       }
     } else {
       if (GST_CLOCK_TIME_IS_VALID (mux->last_priority_end) &&
@@ -147,30 +143,15 @@ gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer)
         GST_LOG_OBJECT (mux, "Dropping buffer %p because running time"
             " %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT, buffer,
             GST_TIME_ARGS (running_ts), GST_TIME_ARGS (mux->last_priority_end));
-        goto drop_buffer;
+        return FALSE;
       }
     }
   } else {
-    GST_LOG_OBJECT (pad, "Buffer %p has an invalid timestamp,"
+    GST_LOG_OBJECT (mux, "Buffer %p has an invalid timestamp,"
         " letting through", buffer);
   }
-  GST_OBJECT_UNLOCK (mux);
-
-  if (parent_class->chain_func)
-    ret = parent_class->chain_func (pad, buffer);
-  else
-    gst_buffer_unref (buffer);
-
-out:
-
-  gst_object_unref (mux);
-  return ret;
 
-drop_buffer:
-  gst_buffer_unref (buffer);
-  ret = GST_FLOW_OK;
-  GST_OBJECT_UNLOCK (mux);
-  goto out;
+  return TRUE;
 }
 
 
index 27da522..babf36d 100644 (file)
@@ -72,6 +72,8 @@ static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name);
 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad,
+    GstBufferList * bufferlist);
 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps);
 static GstCaps *gst_rtp_mux_getcaps (GstPad * pad);
 static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event);
@@ -137,8 +139,6 @@ gst_rtp_mux_class_init (GstRTPMuxClass * klass)
       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad);
   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state);
-
-  klass->chain_func = gst_rtp_mux_chain;
 }
 
 static void
@@ -227,8 +227,9 @@ gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
   /* setup some pad functions */
   gst_pad_set_setcaps_function (sinkpad, gst_rtp_mux_setcaps);
   gst_pad_set_getcaps_function (sinkpad, gst_rtp_mux_getcaps);
-  if (klass->chain_func)
-    gst_pad_set_chain_function (sinkpad, klass->chain_func);
+  gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
+  gst_pad_set_chain_list_function (sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
   gst_pad_set_event_function (sinkpad,
       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
 
@@ -287,19 +288,14 @@ gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
 
 /* Put our own clock-base on the buffer */
 static void
-gst_rtp_mux_readjust_rtp_timestamp (GstRTPMux * rtp_mux, GstPad * pad,
-    GstBuffer * buffer)
+gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
+    GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer)
 {
   guint32 ts;
   guint32 sink_ts_base = 0;
-  GstRTPMuxPadPrivate *padpriv;
-
 
-  GST_OBJECT_LOCK (rtp_mux);
-  padpriv = gst_pad_get_element_private (pad);
   if (padpriv && padpriv->have_clock_base)
     sink_ts_base = padpriv->clock_base;
-  GST_OBJECT_UNLOCK (rtp_mux);
 
   ts = gst_rtp_buffer_get_timestamp (buffer) - sink_ts_base + rtp_mux->ts_base;
   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
@@ -307,38 +303,82 @@ gst_rtp_mux_readjust_rtp_timestamp (GstRTPMux * rtp_mux, GstPad * pad,
   gst_rtp_buffer_set_timestamp (buffer, ts);
 }
 
+static gboolean
+process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
+    GstBuffer * buffer)
+{
+  GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
+
+  if (klass->accept_buffer_locked)
+    if (!klass->accept_buffer_locked (rtp_mux, padpriv, buffer))
+      return FALSE;
+
+  rtp_mux->seqnum++;
+  gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum);
+
+  gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc);
+  gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, buffer);
+  GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u",
+      GST_BUFFER_SIZE (buffer), rtp_mux->seqnum,
+      gst_rtp_buffer_get_timestamp (buffer));
+
+  if (padpriv) {
+    gst_buffer_set_caps (buffer, padpriv->out_caps);
+    if (padpriv->segment.format == GST_FORMAT_TIME)
+      GST_BUFFER_TIMESTAMP (buffer) =
+          gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
+          GST_BUFFER_TIMESTAMP (buffer));
+  }
+
+  return TRUE;
+}
+
 static GstFlowReturn
-gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
+gst_rtp_mux_chain_list (GstPad * pad, GstBufferList * bufferlist)
 {
   GstRTPMux *rtp_mux;
   GstFlowReturn ret;
+  GstBufferListIterator *it;
   GstRTPMuxPadPrivate *padpriv;
   GstEvent *newseg_event = NULL;
+  gboolean drop = TRUE;
 
   rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
 
-  if (!gst_rtp_buffer_validate (buffer)) {
-    gst_buffer_unref (buffer);
+  if (!gst_rtp_buffer_list_validate (bufferlist)) {
     GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
     gst_object_unref (rtp_mux);
     return GST_FLOW_ERROR;
   }
 
-  buffer = gst_buffer_make_writable (buffer);
-
   GST_OBJECT_LOCK (rtp_mux);
-  rtp_mux->seqnum++;
-  gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum);
+
   padpriv = gst_pad_get_element_private (pad);
-  if (padpriv) {
-    gst_buffer_set_caps (buffer, padpriv->out_caps);
-    if (padpriv->segment.format == GST_FORMAT_TIME)
-      GST_BUFFER_TIMESTAMP (buffer) =
-          gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
-          GST_BUFFER_TIMESTAMP (buffer));
+  if (!padpriv) {
+    GST_OBJECT_UNLOCK (rtp_mux);
+    ret = GST_FLOW_NOT_LINKED;
+    gst_buffer_list_unref (bufferlist);
+    goto out;
+  }
+
+  bufferlist = gst_buffer_list_make_writable (bufferlist);
+  it = gst_buffer_list_iterate (bufferlist);
+  while (gst_buffer_list_iterator_next_group (it)) {
+    GstBuffer *rtpbuf;
+
+    rtpbuf = gst_buffer_list_iterator_next (it);
+    rtpbuf = gst_buffer_make_writable (rtpbuf);
+
+    drop = !process_buffer_locked (rtp_mux, padpriv, rtpbuf);
+
+    if (drop)
+      break;
+
+    gst_buffer_list_iterator_take (it, rtpbuf);
   }
+  gst_buffer_list_iterator_free (it);
 
-  if (rtp_mux->segment_pending) {
+  if (!drop && rtp_mux->segment_pending) {
     /*
      * We set the start at 0, because we re-timestamps to the running time
      */
@@ -347,24 +387,78 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
 
     rtp_mux->segment_pending = FALSE;
   }
-  GST_OBJECT_UNLOCK (rtp_mux);
 
-  gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc);
-  gst_rtp_mux_readjust_rtp_timestamp (rtp_mux, pad, buffer);
-  GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u",
-      GST_BUFFER_SIZE (buffer), rtp_mux->seqnum,
-      gst_rtp_buffer_get_timestamp (buffer));
+  GST_OBJECT_UNLOCK (rtp_mux);
 
   if (newseg_event)
     gst_pad_push_event (rtp_mux->srcpad, newseg_event);
 
+  if (drop) {
+    gst_buffer_list_unref (bufferlist);
+    ret = GST_FLOW_OK;
+  } else {
+    ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist);
+  }
+
+out:
+
+  gst_object_unref (rtp_mux);
+
+  return ret;
+}
+
+static GstFlowReturn
+gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
+{
+  GstRTPMux *rtp_mux;
+  GstFlowReturn ret;
+  GstRTPMuxPadPrivate *padpriv;
+  GstEvent *newseg_event = NULL;
+  gboolean drop;
+
+  rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
+
+  if (!gst_rtp_buffer_validate (buffer)) {
+    gst_buffer_unref (buffer);
+    GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
+    gst_object_unref (rtp_mux);
+    return GST_FLOW_ERROR;
+  }
+
+  GST_OBJECT_LOCK (rtp_mux);
+  padpriv = gst_pad_get_element_private (pad);
+
   if (!padpriv) {
+    GST_OBJECT_UNLOCK (rtp_mux);
     ret = GST_FLOW_NOT_LINKED;
     gst_buffer_unref (buffer);
     goto out;
   }
 
-  ret = gst_pad_push (rtp_mux->srcpad, buffer);
+  buffer = gst_buffer_make_writable (buffer);
+
+  drop = !process_buffer_locked (rtp_mux, padpriv, buffer);
+
+  if (!drop && rtp_mux->segment_pending) {
+    /*
+     * We set the start at 0, because we re-timestamps to the running time
+     */
+    newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
+        GST_FORMAT_TIME, 0, -1, 0);
+
+    rtp_mux->segment_pending = FALSE;
+  }
+  GST_OBJECT_UNLOCK (rtp_mux);
+
+  if (newseg_event)
+    gst_pad_push_event (rtp_mux->srcpad, newseg_event);
+
+  if (drop) {
+    gst_buffer_unref (buffer);
+    ret = GST_FLOW_OK;
+  } else {
+    ret = gst_pad_push (rtp_mux->srcpad, buffer);
+  }
 
 out:
 
index 8a6a5dd..9651383 100644 (file)
@@ -38,6 +38,20 @@ G_BEGIN_DECLS
 typedef struct _GstRTPMux GstRTPMux;
 typedef struct _GstRTPMuxClass GstRTPMuxClass;
 
+
+typedef struct
+{
+  gboolean have_clock_base;
+  guint clock_base;
+
+  GstCaps *out_caps;
+
+  GstSegment segment;
+
+  gboolean priority;
+} GstRTPMuxPadPrivate;
+
+
 /**
  * GstRTPMux:
  *
@@ -66,23 +80,11 @@ struct _GstRTPMuxClass
 {
   GstElementClass parent_class;
 
-  GstFlowReturn (*chain_func) (GstPad * pad, GstBuffer * buffer);
+  gboolean (*accept_buffer_locked) (GstRTPMux *rtp_mux,
+      GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer);
 };
 
 
-typedef struct
-{
-  gboolean have_clock_base;
-  guint clock_base;
-
-  GstCaps *out_caps;
-
-  GstSegment segment;
-
-  gboolean priority;
-} GstRTPMuxPadPrivate;
-
-
 GType gst_rtp_mux_get_type (void);
 gboolean gst_rtp_mux_plugin_init (GstPlugin * plugin);