From 51edc07127b2ef7ca260fdc73f136e503a90f35c Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Mon, 4 Nov 2013 20:05:03 +0200 Subject: [PATCH] rtprtxsend: use a GSequence to implement the buffer queue This has the advantage that searching the queue to find the buffer with the requested seqnum is done with binary search. --- gst/rtpmanager/gstrtprtxsend.c | 74 +++++++++++++++++++----------------------- gst/rtpmanager/gstrtprtxsend.h | 2 +- 2 files changed, 35 insertions(+), 41 deletions(-) diff --git a/gst/rtpmanager/gstrtprtxsend.c b/gst/rtpmanager/gstrtprtxsend.c index 8d08d74..ce02d31 100644 --- a/gst/rtpmanager/gstrtprtxsend.c +++ b/gst/rtpmanager/gstrtprtxsend.c @@ -165,8 +165,8 @@ static void gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx, gboolean full) { g_mutex_lock (&rtx->lock); - g_queue_foreach (rtx->queue, (GFunc) buffer_queue_item_free, NULL); - g_queue_clear (rtx->queue); + g_sequence_remove_range (g_sequence_get_begin_iter (rtx->queue), + g_sequence_get_end_iter (rtx->queue)); g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); g_queue_clear (rtx->pending); rtx->master_ssrc = 0; @@ -183,7 +183,7 @@ gst_rtp_rtx_send_finalize (GObject * object) GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object); gst_rtp_rtx_send_reset (rtx, TRUE); - g_queue_free (rtx->queue); + g_sequence_free (rtx->queue); g_queue_free (rtx->pending); g_mutex_clear (&rtx->lock); @@ -215,7 +215,7 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain)); gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); - rtx->queue = g_queue_new (); + rtx->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free); rtx->pending = g_queue_new (); g_mutex_init (&rtx->lock); @@ -241,29 +241,14 @@ choose_ssrc (GstRtpRtxSend * rtx) return ssrc; } -typedef struct -{ - GstRtpRtxSend *rtx; - guint seqnum; - gboolean found; -} RTXData; - -/* traverse queue history and try to find the buffer that the - * requested seqnum */ -static void -push_seqnum (BufferQueueItem * item, RTXData * data) +static gint +buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b, + gpointer user_data) { - GstRtpRtxSend *rtx = data->rtx; - - if (data->found) - return; - - /* data->seqnum comes from the request */ - if (item->seqnum == data->seqnum) { - data->found = TRUE; - GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum); - g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer)); - } + /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want, + * it returns negative when seqnum1 > seqnum2 and we want negative + * when b > a, i.e. a is smaller, so it comes first in the sequence */ + return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum); } static gboolean @@ -281,7 +266,6 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { guint32 seqnum = 0; guint ssrc = 0; - RTXData data; /* retrieve seqnum of the packet that need to be restransmisted */ if (!gst_structure_get_uint (s, "seqnum", &seqnum)) @@ -298,12 +282,20 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) g_mutex_lock (&rtx->lock); /* check if request is for us */ if (rtx->master_ssrc == ssrc) { + GSequenceIter *iter; + BufferQueueItem search_item; + + /* update statistics */ ++rtx->num_rtx_requests; - data.rtx = rtx; - data.seqnum = seqnum; - data.found = FALSE; - /* TODO do a binary search because rtx->queue is sorted by seq num */ - g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data); + + search_item.seqnum = seqnum; + iter = g_sequence_lookup (rtx->queue, &search_item, + (GCompareDataFunc) buffer_queue_items_cmp, NULL); + if (iter) { + BufferQueueItem *item = g_sequence_get (iter); + GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum); + g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer)); + } } g_mutex_unlock (&rtx->lock); @@ -326,8 +318,8 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) rtx->rtx_ssrc = choose_ssrc (rtx); /* clear buffers we already saved */ - g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL); - g_queue_clear (rtx->queue); + g_sequence_remove_range (g_sequence_get_begin_iter (rtx->queue), + g_sequence_get_end_iter (rtx->queue)); /* clear buffers that are about to be retransmited */ g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); @@ -394,8 +386,10 @@ gst_rtp_rtx_send_get_ts_diff (GstRtpRtxSend * self) BufferQueueItem *high_buf, *low_buf; guint32 result; - high_buf = g_queue_peek_head (self->queue); - low_buf = g_queue_peek_tail (self->queue); + high_buf = + g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter + (self->queue))); + low_buf = g_sequence_get (g_sequence_get_begin_iter (self->queue)); if (!high_buf || !low_buf || high_buf == low_buf) return 0; @@ -521,16 +515,16 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) item->seqnum = seqnum; item->timestamp = rtptime; item->buffer = gst_buffer_ref (buffer); - g_queue_push_head (rtx->queue, item); + g_sequence_append (rtx->queue, item); /* remove oldest packets from history if they are too many */ if (rtx->max_size_packets) { - while (g_queue_get_length (rtx->queue) > rtx->max_size_packets) - buffer_queue_item_free (g_queue_pop_tail (rtx->queue)); + while (g_sequence_get_length (rtx->queue) > rtx->max_size_packets) + g_sequence_remove (g_sequence_get_begin_iter (rtx->queue)); } if (rtx->max_size_time) { while (gst_rtp_rtx_send_get_ts_diff (rtx) > rtx->max_size_time) - buffer_queue_item_free (g_queue_pop_tail (rtx->queue)); + g_sequence_remove (g_sequence_get_begin_iter (rtx->queue)); } /* within lock, get packets that have to be retransmited */ diff --git a/gst/rtpmanager/gstrtprtxsend.h b/gst/rtpmanager/gstrtprtxsend.h index f49ef00..a48c374 100644 --- a/gst/rtpmanager/gstrtprtxsend.h +++ b/gst/rtpmanager/gstrtprtxsend.h @@ -48,7 +48,7 @@ struct _GstRtpRtxSend GMutex lock; /* history of rtp packets */ - GQueue *queue; + GSequence *queue; /* rtp packets that will be pushed upon next buffer */ GQueue *pending; -- 2.7.4