rtx: various improvements
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 21 Aug 2013 14:53:59 +0000 (16:53 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 21 Aug 2013 15:02:27 +0000 (17:02 +0200)
Use locking
Don't push from the event handler, collected packets in a queue and push from
the chain function.
Clear queues on shutdown.

gst/rtpmanager/gstrtprtxqueue.c
gst/rtpmanager/gstrtprtxqueue.h

index ca299c2..23fcc2d 100644 (file)
@@ -99,11 +99,25 @@ gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
 }
 
 static void
+gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
+{
+  g_mutex_lock (&rtx->lock);
+  g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
+  g_queue_clear (rtx->queue);
+  g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
+  g_list_free (rtx->pending);
+  rtx->pending = NULL;
+  g_mutex_unlock (&rtx->lock);
+}
+
+static void
 gst_rtp_rtx_queue_finalize (GObject * object)
 {
   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
 
+  gst_rtp_rtx_queue_reset (rtx, TRUE);
   g_queue_free (rtx->queue);
+  g_mutex_clear (&rtx->lock);
 
   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
 }
@@ -131,14 +145,15 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
 
-
   rtx->queue = g_queue_new ();
+  g_mutex_init (&rtx->lock);
 }
 
 typedef struct
 {
   GstRTPRtxQueue *rtx;
   guint seqnum;
+  gboolean found;
 } RTXData;
 
 static void
@@ -148,6 +163,9 @@ push_seqnum (GstBuffer * buffer, RTXData * data)
   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
   guint16 seqnum;
 
+  if (data->found)
+    return;
+
   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
     return;
 
@@ -155,7 +173,9 @@ push_seqnum (GstBuffer * buffer, RTXData * data)
   gst_rtp_buffer_unmap (&rtpbuffer);
 
   if (seqnum == data->seqnum) {
-    gst_pad_push (rtx->srcpad, gst_buffer_ref (buffer));
+    data->found = TRUE;
+    GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
+    rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
   }
 }
 
@@ -178,9 +198,15 @@ gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
           seqnum = -1;
 
+        GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
+
+        g_mutex_lock (&rtx->lock);
         data.rtx = rtx;
         data.seqnum = seqnum;
+        data.found = FALSE;
         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
+        g_mutex_unlock (&rtx->lock);
+
         gst_event_unref (event);
         res = TRUE;
       } else {
@@ -195,18 +221,32 @@ gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
   return res;
 }
 
+static void
+do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
+{
+  gst_pad_push (rtx->srcpad, buffer);
+}
+
 static GstFlowReturn
 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstRTPRtxQueue *rtx;
   GstFlowReturn ret;
+  GList *pending;
 
   rtx = GST_RTP_RTX_QUEUE (parent);
 
+  g_mutex_lock (&rtx->lock);
   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
   while (g_queue_get_length (rtx->queue) > 100) {
     gst_buffer_unref (g_queue_pop_tail (rtx->queue));
   }
+  pending = rtx->pending;
+  rtx->pending = NULL;
+  g_mutex_unlock (&rtx->lock);
+
+  g_list_foreach (pending, (GFunc) do_push, rtx);
+  g_list_free (pending);
 
   ret = gst_pad_push (rtx->srcpad, buffer);
 
@@ -239,6 +279,9 @@ static GstStateChangeReturn
 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
 {
   GstStateChangeReturn ret;
+  GstRTPRtxQueue *rtx;
+
+  rtx = GST_RTP_RTX_QUEUE (element);
 
   switch (transition) {
     default:
@@ -250,6 +293,9 @@ gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
       transition);
 
   switch (transition) {
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      gst_rtp_rtx_queue_reset (rtx, TRUE);
+      break;
     default:
       break;
   }
index 258efb3..671a959 100644 (file)
@@ -52,7 +52,9 @@ struct _GstRTPRtxQueue
   GstPad *sinkpad;
   GstPad *srcpad;
 
+  GMutex lock;
   GQueue *queue;
+  GList *pending;
 };
 
 struct _GstRTPRtxQueueClass