rtp: Fix sending of small packets
authorSanchayan Maity <sanchayan@asymptotic.io>
Mon, 7 Sep 2020 13:05:40 +0000 (18:35 +0530)
committerSanchayan Maity <sanchayan@asymptotic.io>
Mon, 21 Sep 2020 04:36:48 +0000 (10:06 +0530)
The current implementation for RTP send isn't optimised for sending MTU
bytes of data like rtp-native. For eg. if MTU is 1280 bytes and we have
to send 1276 bytes, two packets are send out one of 1268 bytes and other
of 8 bytes. Sending out a packet of 8 bytes has a significant overhead
and we should be sending MTU bytes of data.

Fix this by accumulating MTU bytes of data and sending data only on
accumulation of MTU worth of data.

src/modules/rtp/rtp-gstreamer.c

index 0db33095811097ee3ef2592cdd09ccce08feaa65..2fe0b02e15379abface1bae8e3cb67f3fcfda819 100644 (file)
@@ -43,6 +43,7 @@
     }
 
 #define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
+#define RTP_HEADER_SIZE    12
 
 struct pa_rtp_context {
     pa_fdsem *fdsem;
@@ -53,6 +54,9 @@ struct pa_rtp_context {
     GstElement *appsink;
 
     uint32_t last_timestamp;
+
+    uint8_t *send_buf;
+    size_t mtu;
 };
 
 static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
@@ -171,6 +175,8 @@ pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, con
     c = pa_xnew0(pa_rtp_context, 1);
 
     c->ss = *ss;
+    c->mtu = mtu - RTP_HEADER_SIZE;
+    c->send_buf = pa_xmalloc(c->mtu);
 
     if (!gst_init_check(NULL, NULL, &error)) {
         pa_log_error("Could not initialise GStreamer: %s", error->message);
@@ -216,18 +222,10 @@ static bool process_bus_messages(pa_rtp_context *c) {
     return ret;
 }
 
-static void free_buffer(pa_memblock *memblock) {
-    pa_memblock_release(memblock);
-    pa_memblock_unref(memblock);
-}
-
 /* Called from I/O thread context */
 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
-    pa_memchunk chunk = { 0, };
     GstBuffer *buf;
-    void *data;
-    bool stop = false;
-    int ret = 0;
+    size_t n = 0;
 
     pa_assert(c);
     pa_assert(q);
@@ -235,40 +233,81 @@ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
     if (!process_bus_messages(c))
         return -1;
 
-    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
-        GstClock *clock;
-        GstClockTime timestamp, clock_time;
+    /*
+     * While we check here for atleast MTU worth of data being available in
+     * memblockq, we might not have exact equivalent to MTU. Hence, we walk
+     * over the memchunks in memblockq and accumulate MTU bytes next.
+     */
+    if (pa_memblockq_get_length(q) < c->mtu)
+        return 0;
+
+    for (;;) {
+        pa_memchunk chunk;
+        int r;
+
+        pa_memchunk_reset(&chunk);
+
+        if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
+            /*
+             * Accumulate MTU bytes of data before sending. If the current
+             * chunk length + accumulated bytes exceeds MTU, we drop bytes
+             * considered for transfer in this iteration from memblockq.
+             *
+             * The remaining bytes will be available in the next iteration,
+             * as these will be tracked and maintained by memblockq.
+             */
+            size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
+
+            pa_assert(chunk.memblock);
+
+            memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k);
+            pa_memblock_release(chunk.memblock);
+            pa_memblock_unref(chunk.memblock);
+
+            n += k;
+            pa_memblockq_drop(q, k);
+        }
 
-        clock = gst_element_get_clock(c->pipeline);
-        clock_time = gst_clock_get_time(clock);
-        gst_object_unref(clock);
+        if (r < 0 || n >= c->mtu) {
+            GstClock *clock;
+            GstClockTime timestamp, clock_time;
+            GstMapInfo info;
 
-        timestamp = gst_element_get_base_time(c->pipeline);
-        if (timestamp > clock_time)
-          timestamp -= clock_time;
-        else
-          timestamp = 0;
+            if (n > 0) {
+                clock = gst_element_get_clock(c->pipeline);
+                clock_time = gst_clock_get_time(clock);
+                gst_object_unref(clock);
 
-        pa_assert(chunk.memblock);
+                timestamp = gst_element_get_base_time(c->pipeline);
+                if (timestamp > clock_time)
+                  timestamp -= clock_time;
+                else
+                  timestamp = 0;
 
-        data = pa_memblock_acquire(chunk.memblock);
+                buf = gst_buffer_new_allocate(NULL, n, NULL);
+                pa_assert(buf);
 
-        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
-                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
-                                          (GDestroyNotify) free_buffer);
+                GST_BUFFER_PTS(buf) = timestamp;
 
-        GST_BUFFER_PTS(buf) = timestamp;
+                pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE));
 
-        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
-            pa_log_error("Could not push buffer");
-            stop = true;
-            ret = -1;
-        }
+                memcpy(info.data, c->send_buf, n);
+                gst_buffer_unmap(buf, &info);
 
-        pa_memblockq_drop(q, chunk.length);
+                if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
+                    pa_log_error("Could not push buffer");
+                    return -1;
+                }
+            }
+
+            if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
+                break;
+
+            n = 0;
+        }
     }
 
-    return ret;
+    return 0;
 }
 
 static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
@@ -415,6 +454,7 @@ pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample
 
     c->fdsem = pa_fdsem_new();
     c->ss = *ss;
+    c->send_buf = NULL;
 
     if (!gst_init_check(NULL, NULL, &error)) {
         pa_log_error("Could not initialise GStreamer: %s", error->message);
@@ -537,6 +577,7 @@ void pa_rtp_context_free(pa_rtp_context *c) {
     if (c->appsrc) {
         gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
         gst_object_unref(c->appsrc);
+        pa_xfree(c->send_buf);
     }
 
     if (c->appsink)