rtprtxsend: fix data locking when creating rtx packets
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Tue, 14 Jan 2014 12:01:41 +0000 (13:01 +0100)
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Wed, 15 Jan 2014 09:13:11 +0000 (10:13 +0100)
This patch moves the creation of rtx packets to be done early,
in the src_event() function, when they are requested. The purpose
is to run gst_rtp_rtx_buffer_new() with the object locked to
protect internal data, because if it is done at the pushing stage,
we would have to lock and unlock multiple times in a row while we
are pushing the rtx buffers.

Previously there was no locking at all, which was terribly wrong.

gst/rtpmanager/gstrtprtxsend.c

index 508a180..2eaa84d 100644 (file)
@@ -304,6 +304,81 @@ gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
   return data;
 }
 
+/* Copy fixed header and extension. Add OSN before to copy payload
+ * Copy memory to avoid to manually copy each rtp buffer field.
+ */
+static GstBuffer *
+gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
+{
+  GstMemory *mem = NULL;
+  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+  GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
+  GstBuffer *new_buffer = gst_buffer_new ();
+  GstMapInfo map;
+  guint payload_len = 0;
+  SSRCRtxData *data;
+  guint32 ssrc;
+  guint16 seqnum;
+  guint8 fmtp;
+
+  gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
+
+  /* get needed data from GstRtpRtxSend */
+  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+  data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+  ssrc = data->rtx_ssrc;
+  seqnum = data->next_seqnum++;
+  fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
+          GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
+
+  GST_DEBUG_OBJECT (rtx,
+      "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
+      seqnum, ssrc);
+
+  /* gst_rtp_buffer_map does not map the payload so do it now */
+  gst_rtp_buffer_get_payload (&rtp);
+
+  /* If payload type is not set through SDP/property then
+   * just bump the value */
+  if (fmtp < 96)
+    fmtp = gst_rtp_buffer_get_payload_type (&rtp) + 1;
+
+  /* copy fixed header */
+  mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
+  gst_buffer_append_memory (new_buffer, mem);
+
+  /* copy extension if any */
+  if (rtp.size[1]) {
+    mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
+    gst_buffer_append_memory (new_buffer, mem);
+  }
+
+  /* copy payload and add OSN just before */
+  payload_len = 2 + rtp.size[2];
+  mem = gst_allocator_alloc (NULL, payload_len, NULL);
+
+  gst_memory_map (mem, &map, GST_MAP_WRITE);
+  GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
+  if (rtp.size[2])
+    memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
+  gst_memory_unmap (mem, &map);
+  gst_buffer_append_memory (new_buffer, mem);
+
+  /* everything needed is copied */
+  gst_rtp_buffer_unmap (&rtp);
+
+  /* set ssrc, seqnum and fmtp */
+  gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
+  gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
+  gst_rtp_buffer_set_seq (&new_rtp, seqnum);
+  gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
+  /* RFC 4588: let other elements do the padding, as normal */
+  gst_rtp_buffer_set_padding (&new_rtp, FALSE);
+  gst_rtp_buffer_unmap (&new_rtp);
+
+  return new_buffer;
+}
+
 static gint
 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
     gpointer user_data)
@@ -360,7 +435,8 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
           if (iter) {
             BufferQueueItem *item = g_sequence_get (iter);
             GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
-            g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer));
+            g_queue_push_tail (rtx->pending,
+                gst_rtp_rtx_buffer_new (rtx, item->buffer));
           }
         }
         GST_OBJECT_UNLOCK (rtx);
@@ -495,87 +571,12 @@ gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
 }
 
-/* Copy fixed header and extension. Add OSN before to copy payload
- * Copy memory to avoid to manually copy each rtp buffer field.
- */
-static GstBuffer *
-_gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
-{
-  GstMemory *mem = NULL;
-  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
-  GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
-  GstBuffer *new_buffer = gst_buffer_new ();
-  GstMapInfo map;
-  guint payload_len = 0;
-  SSRCRtxData *data;
-  guint32 ssrc;
-  guint16 seqnum;
-  guint8 fmtp;
-
-  gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
-
-  /* get needed data from GstRtpRtxSend */
-  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
-  data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
-  ssrc = data->rtx_ssrc;
-  seqnum = data->next_seqnum++;
-  fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
-          GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
-
-  GST_DEBUG_OBJECT (rtx,
-      "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
-      seqnum, ssrc);
-
-  /* gst_rtp_buffer_map does not map the payload so do it now */
-  gst_rtp_buffer_get_payload (&rtp);
-
-  /* If payload type is not set through SDP/property then
-   * just bump the value */
-  if (fmtp < 96)
-    fmtp = gst_rtp_buffer_get_payload_type (&rtp) + 1;
-
-  /* copy fixed header */
-  mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
-  gst_buffer_append_memory (new_buffer, mem);
-
-  /* copy extension if any */
-  if (rtp.size[1]) {
-    mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
-    gst_buffer_append_memory (new_buffer, mem);
-  }
-
-  /* copy payload and add OSN just before */
-  payload_len = 2 + rtp.size[2];
-  mem = gst_allocator_alloc (NULL, payload_len, NULL);
-
-  gst_memory_map (mem, &map, GST_MAP_WRITE);
-  GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
-  if (rtp.size[2])
-    memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
-  gst_memory_unmap (mem, &map);
-  gst_buffer_append_memory (new_buffer, mem);
-
-  /* everything needed is copied */
-  gst_rtp_buffer_unmap (&rtp);
-
-  /* set ssrc, seqnum and fmtp */
-  gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
-  gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
-  gst_rtp_buffer_set_seq (&new_rtp, seqnum);
-  gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
-  /* RFC 4588: let other elements do the padding, as normal */
-  gst_rtp_buffer_set_padding (&new_rtp, FALSE);
-  gst_rtp_buffer_unmap (&new_rtp);
-
-  return new_buffer;
-}
-
 /* push pending retransmission packet.
  * it constructs rtx packet from original packets */
 static void
 do_push (GstBuffer * buffer, GstRtpRtxSend * rtx)
 {
-  gst_pad_push (rtx->srcpad, _gst_rtp_rtx_buffer_new (rtx, buffer));
+  gst_pad_push (rtx->srcpad, buffer);
 }
 
 static GstFlowReturn
@@ -638,7 +639,7 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   /* retransmit requested packets */
   if (pending) {
     g_queue_foreach (pending, (GFunc) do_push, rtx);
-    g_queue_free_full (pending, (GDestroyNotify) gst_buffer_unref);
+    g_queue_free (pending);
   }
 
   GST_LOG_OBJECT (rtx,