rtprtxsend: retransmit packets in the same order as the rtx requests
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Mon, 4 Nov 2013 16:38:24 +0000 (18:38 +0200)
committerWim Taymans <wtaymans@redhat.com>
Fri, 3 Jan 2014 19:48:28 +0000 (20:48 +0100)
gst/rtpmanager/gstrtprtxsend.c
gst/rtpmanager/gstrtprtxsend.h
tests/check/elements/rtprtx.c

index d9bca1bc83507cca364325fa2a7c4e57855f3663..8d08d7438a507f5930727fbc43ffa579d4824915 100644 (file)
@@ -167,9 +167,8 @@ gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx, gboolean full)
   g_mutex_lock (&rtx->lock);
   g_queue_foreach (rtx->queue, (GFunc) buffer_queue_item_free, NULL);
   g_queue_clear (rtx->queue);
-  g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
-  g_list_free (rtx->pending);
-  rtx->pending = NULL;
+  g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
+  g_queue_clear (rtx->pending);
   rtx->master_ssrc = 0;
   rtx->next_seqnum = g_random_int_range (0, G_MAXUINT16);
   rtx->rtx_ssrc = g_random_int ();
@@ -185,6 +184,7 @@ gst_rtp_rtx_send_finalize (GObject * object)
 
   gst_rtp_rtx_send_reset (rtx, TRUE);
   g_queue_free (rtx->queue);
+  g_queue_free (rtx->pending);
   g_mutex_clear (&rtx->lock);
 
   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
@@ -216,7 +216,7 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
 
   rtx->queue = g_queue_new ();
-  rtx->pending = NULL;
+  rtx->pending = g_queue_new ();
   g_mutex_init (&rtx->lock);
 
   rtx->next_seqnum = g_random_int_range (0, G_MAXUINT16);
@@ -262,7 +262,7 @@ push_seqnum (BufferQueueItem * item, RTXData * data)
   if (item->seqnum == data->seqnum) {
     data->found = TRUE;
     GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
-    rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (item->buffer));
+    g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer));
   }
 }
 
@@ -330,9 +330,8 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
           g_queue_clear (rtx->queue);
 
           /* clear buffers that are about to be retransmited */
-          g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
-          g_list_free (rtx->pending);
-          rtx->pending = NULL;
+          g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
+          g_queue_clear (rtx->pending);
 
           g_mutex_unlock (&rtx->lock);
 
@@ -475,7 +474,7 @@ _gst_rtp_rtx_buffer_new (GstBuffer * buffer, guint32 ssrc, guint16 seqnum,
   return new_buffer;
 }
 
-/* psuh pending retransmission packet.
+/* push pending retransmission packet.
  * it constructs rtx packet from original paclets */
 static void
 do_push (GstBuffer * buffer, GstRtpRtxSend * rtx)
@@ -494,7 +493,7 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
   GstFlowReturn ret = GST_FLOW_ERROR;
-  GList *pending = NULL;
+  GQueue *pending = NULL;
   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
   BufferQueueItem *item;
   guint16 seqnum;
@@ -535,11 +534,13 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   }
 
   /* within lock, get packets that have to be retransmited */
-  pending = rtx->pending;
-  rtx->pending = NULL;
+  if (g_queue_get_length (rtx->pending) > 0) {
+    pending = rtx->pending;
+    rtx->pending = g_queue_new ();
 
-  /* update statistics - assume we will succeed to retransmit those packets */
-  rtx->num_rtx_packets += g_list_length (pending);
+    /* update statistics - assume we will succeed to retransmit those packets */
+    rtx->num_rtx_packets += g_queue_get_length (pending);
+  }
 
   /* transfer payload type while holding the lock */
   rtx->rtx_payload_type = rtx->rtx_payload_type_pending;
@@ -548,9 +549,10 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   g_mutex_unlock (&rtx->lock);
 
   /* retransmit requested packets */
-  g_list_foreach (pending, (GFunc) do_push, rtx);
-  g_list_foreach (pending, (GFunc) gst_buffer_unref, NULL);
-  g_list_free (pending);
+  if (pending) {
+    g_queue_foreach (pending, (GFunc) do_push, rtx);
+    g_queue_free_full (pending, (GDestroyNotify) gst_buffer_unref);
+  }
 
   GST_LOG_OBJECT (rtx,
       "push seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, seqnum,
index 808dd96ad017463b8aa6cee339136c3e4e23cbef..f49ef00e4d96ae1bdf3daecba3f833dcc7075870 100644 (file)
@@ -50,7 +50,7 @@ struct _GstRtpRtxSend
   /* history of rtp packets */
   GQueue *queue;
   /* rtp packets that will be pushed upon next buffer */
-  GList *pending;
+  GQueue *pending;
 
   guint32 master_ssrc;
   guint32 rtx_ssrc;
index 479b3ad986b99898c09245d9cf77645c2f4c8367..5190c338bf6ea7321694fc30bace789cf9cb8d12 100644 (file)
@@ -1235,7 +1235,7 @@ test_rtxsender_packet_retention (gboolean test_with_time)
   }
 
   /* verify the result. buffers should be in this order (numbers are seqnums):
-   * 1, 1rtx, 2, 2rtx, 1rtx, 3, ... , 9, 9rtx, 8rtx, 7rtx, 6rtx, 5rtx, 10 */
+   * 1, 1rtx, 2, 1rtx, 2rtx, 3, ... , 9, 5rtx, 6rtx, 7rtx, 8rtx, 9rtx, 10 */
   {
     GstRTPBuffer orig_rtp = GST_RTP_BUFFER_INIT;
     gint expected_rtx_requests, expected_rtx_packets;
@@ -1258,22 +1258,8 @@ test_rtxsender_packet_retention (gboolean test_with_time)
 
     node = buffers;
     for (i = 1; i <= num_buffers; i++) {
-      /* verify the normal rtp flow packet */
-      res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
-      fail_unless_equals_int (res, TRUE);
-      fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
-      fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
-          payload_type);
-      fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
-      gst_rtp_buffer_unmap (&rtp);
-      node = g_list_next (node);
-
-      /* there are no rtx packets after the last normal one */
-      if (i == num_buffers)
-        break;
-
-      /* now verify the retransmission packets */
-      for (j = i; j > MAX (i - half_buffers, 0); j--) {
+      /* verify the retransmission packets */
+      for (j = MAX (i - half_buffers, 1); j < i; j++) {
         GST_INFO ("checking %d, %d", i, j);
 
         res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
@@ -1295,6 +1281,16 @@ test_rtxsender_packet_retention (gboolean test_with_time)
         gst_rtp_buffer_unmap (&rtp);
         node = g_list_next (node);
       }
+
+      /* verify the normal rtp flow packet */
+      res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
+      fail_unless_equals_int (res, TRUE);
+      fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
+      fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
+          payload_type);
+      fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
+      gst_rtp_buffer_unmap (&rtp);
+      node = g_list_next (node);
     }
   }