rtprtxqueue: Implement support for buffer lists
authorSebastian Dröge <sebastian@centricular.com>
Thu, 19 Mar 2015 10:39:38 +0000 (11:39 +0100)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 19 Mar 2015 10:54:37 +0000 (11:54 +0100)
gst/rtpmanager/gstrtprtxqueue.c

index 209aa76..c5b88dd 100644 (file)
@@ -64,6 +64,8 @@ static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
     GstEvent * event);
 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer);
+static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
+    GstObject * parent, GstBufferList * list);
 
 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
     element, GstStateChange transition);
@@ -159,6 +161,8 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
   gst_pad_set_chain_function (rtx->sinkpad,
       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
+  gst_pad_set_chain_list_function (rtx->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
 
   rtx->queue = g_queue_new ();
@@ -246,6 +250,16 @@ do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
   gst_pad_push (rtx->srcpad, buffer);
 }
 
+/* Must be called with rtx->lock */
+static void
+shrink_queue (GstRTPRtxQueue * rtx)
+{
+  if (rtx->max_size_packets) {
+    while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
+      gst_buffer_unref (g_queue_pop_tail (rtx->queue));
+  }
+}
+
 static GstFlowReturn
 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
@@ -257,11 +271,7 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 
   g_mutex_lock (&rtx->lock);
   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
-
-  if (rtx->max_size_packets) {
-    while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
-      gst_buffer_unref (g_queue_pop_tail (rtx->queue));
-  }
+  shrink_queue (rtx);
 
   pending = rtx->pending;
   rtx->pending = NULL;
@@ -275,6 +285,42 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   return ret;
 }
 
+static gboolean
+push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+  GQueue *queue = user_data;
+
+  g_queue_push_head (queue, gst_buffer_ref (*buffer));
+
+  return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * list)
+{
+  GstRTPRtxQueue *rtx;
+  GstFlowReturn ret;
+  GList *pending;
+
+  rtx = GST_RTP_RTX_QUEUE (parent);
+
+  g_mutex_lock (&rtx->lock);
+  gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
+  shrink_queue (rtx);
+
+  pending = rtx->pending;
+  rtx->pending = NULL;
+  g_mutex_unlock (&rtx->lock);
+
+  g_list_foreach (pending, (GFunc) do_push, rtx);
+  g_list_free (pending);
+
+  ret = gst_pad_push_list (rtx->srcpad, list);
+
+  return ret;
+}
+
 static void
 gst_rtp_rtx_queue_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec)