rtpsession: add support for buffer lists on the recv path
authorAntonio Ospite <antonio.ospite@collabora.com>
Wed, 27 Feb 2019 16:03:44 +0000 (17:03 +0100)
committerOlivier CrĂȘte <olivier.crete@collabora.com>
Wed, 7 Aug 2019 19:32:30 +0000 (15:32 -0400)
The send path in rtpsession processes the buffer list along the way,
sharing info and stats between packets in the same list, because it
assumes that all packets in a buffer list are from the same frame.

However, in the receiving path packets can arrive in all sorts of
arrangements:

  - different sources,
  - different frames (different timestamps),
  - different types (multiplexed RTP and RTCP, invalid RTP packets).

so a more general approach should be used to correctly support buffer
lists in the receive path.

It turns out that it's simpler and more robust to process buffers
individually inside the rtpsession element even if they come in a buffer
list, and then reassemble a new buffer list when pushing the buffers
downstream.

This avoids complicating the existing code to make all functions
buffer-list-aware with the risk of introducing regressions,

To support buffer lists in the receive path and reduce the "push
overhead" in the pipeline, a new private field named processed_list is
added to GstRtpSessionPrivate, it is set in the chain_list handler and
used in the process_rtp callback; this is to achieve the following:

  - iterate over the incoming buffer list;
  - process the packets one by one;
  - add the valid ones to a new buffer list;
  - push the new buffer list downstream.

The processed_list field is reset before pushing a buffer list to be on
the safe side in case a single buffer was to be pushed by upstream
at some later point.

NOTE:

The proposed modifications do not change the behavior of the send path.

The process_rtp callback is called in rtpsource.c by the push_rtp
callback (via source_push_rtp) only when the source is not internal.

So even though push_rtp is also called in the send path, it won't end up
using process_rtp in this case because the source would be internal in
the send path.

The reasoning from above may suggest a future refactoring: push_rtp
might be split to better differentiate the send and receive path.

gst/rtpmanager/gstrtpsession.c

index 3440a6a..8f6c3cf 100644 (file)
@@ -276,6 +276,12 @@ struct _GstRtpSessionPrivate
 
   guint recv_rtx_req_count;
   guint sent_rtx_req_count;
+
+  /*
+   * This is the list of processed packets in the receive path when upstream
+   * pushed a buffer list.
+   */
+  GstBufferList *processed_list;
 };
 
 /* callbacks to handle actions from the session manager */
@@ -301,6 +307,8 @@ static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
     gpointer user_data);
 static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
     GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_recv_rtp_list (GstPad * pad,
+    GstObject * parent, GstBufferList * list);
 static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
     GstObject * parent, GstBuffer * buffer);
 static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
@@ -804,6 +812,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
       "rtpsession", 0, "RTP Session");
 
   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp_list);
   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
@@ -1316,8 +1325,7 @@ gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
 }
 
-/* called when the session manager has an RTP packet or a list of packets
- * ready for further processing */
+/* called when the session manager has an RTP packet ready to be pushed */
 static GstFlowReturn
 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
     GstBuffer * buffer, gpointer user_data)
@@ -1334,8 +1342,14 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
   GST_RTP_SESSION_UNLOCK (rtpsession);
 
   if (rtp_src) {
-    GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
-    result = gst_pad_push (rtp_src, buffer);
+    if (rtpsession->priv->processed_list) {
+      GST_LOG_OBJECT (rtpsession, "queueing received RTP packet");
+      gst_buffer_list_add (rtpsession->priv->processed_list, buffer);
+      result = GST_FLOW_OK;
+    } else {
+      GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
+      result = gst_pad_push (rtp_src, buffer);
+    }
     gst_object_unref (rtp_src);
   } else {
     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
@@ -1972,6 +1986,60 @@ push_error:
 }
 
 static gboolean
+process_received_buffer_in_list (GstBuffer ** buffer, guint idx, gpointer data)
+{
+  gint ret;
+
+  ret = gst_rtp_session_chain_recv_rtp (NULL, data, *buffer);
+  if (ret != GST_FLOW_OK)
+    GST_ERROR ("Processing individual buffer in a list failed");
+
+  /*
+   * The buffer has been processed, remove it from the original list, if it was
+   * a valid RTP buffer it has been added to the "processed" list in
+   * gst_rtp_session_process_rtp().
+   */
+  *buffer = NULL;
+  return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_session_chain_recv_rtp_list (GstPad * pad, GstObject * parent,
+    GstBufferList * list)
+{
+  GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
+  GstBufferList *processed_list;
+
+  processed_list = gst_buffer_list_new ();
+
+  /* Set some private data to detect that a buffer list is being pushed. */
+  rtpsession->priv->processed_list = processed_list;
+
+  /*
+   * Individually process the buffers from the incoming buffer list as the
+   * incoming RTP packets in the list can be mixed in all sorts of ways:
+   *    - different frames,
+   *    - different sources,
+   *    - different types (RTP or RTCP)
+   */
+  gst_buffer_list_foreach (list,
+      (GstBufferListFunc) process_received_buffer_in_list, parent);
+
+  gst_buffer_list_unref (list);
+
+  /* Clean up private data in case the next push does not use a buffer list. */
+  rtpsession->priv->processed_list = NULL;
+
+  if (gst_buffer_list_length (processed_list) == 0 || !rtpsession->recv_rtp_src) {
+    gst_buffer_list_unref (processed_list);
+    return GST_FLOW_OK;
+  }
+
+  GST_LOG_OBJECT (rtpsession, "pushing received RTP list");
+  return gst_pad_push_list (rtpsession->recv_rtp_src, processed_list);
+}
+
+static gboolean
 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
     GstEvent * event)
 {
@@ -2372,6 +2440,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
       "recv_rtp_sink");
   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
       gst_rtp_session_chain_recv_rtp);
+  gst_pad_set_chain_list_function (rtpsession->recv_rtp_sink,
+      gst_rtp_session_chain_recv_rtp_list);
   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
       gst_rtp_session_event_recv_rtp_sink);
   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,