rtpmanager: Update codes based on 1.18.4
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxsend.c
index 922fad3..bfa5db8 100644 (file)
 
 /**
  * SECTION:element-rtprtxsend
+ * @title: rtprtxsend
  *
  * See #GstRtpRtxReceive for examples
- * 
+ *
  * The purpose of the sender RTX object is to keep a history of RTP packets up
  * to a configurable limit (max-size-time or max-size-packets). It will listen
  * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
@@ -62,7 +63,7 @@ enum
   PROP_MAX_SIZE_PACKETS,
   PROP_NUM_RTX_REQUESTS,
   PROP_NUM_RTX_PACKETS,
-  PROP_LAST
+  PROP_CLOCK_RATE_MAP,
 };
 
 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
@@ -74,7 +75,7 @@ static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
     GST_PAD_ALWAYS,
-    GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
+    GST_STATIC_CAPS ("application/x-rtp")
     );
 
 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
@@ -86,6 +87,8 @@ static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
     GstEvent * event);
 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer);
+static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
+    GstObject * parent, GstBufferList * list);
 
 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
@@ -119,7 +122,7 @@ buffer_queue_item_free (BufferQueueItem * item)
 typedef struct
 {
   guint32 rtx_ssrc;
-  guint16 next_seqnum;
+  guint16 seqnum_base, next_seqnum;
   gint clock_rate;
 
   /* history of rtp packets */
@@ -132,7 +135,7 @@ ssrc_rtx_data_new (guint32 rtx_ssrc)
   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
 
   data->rtx_ssrc = rtx_ssrc;
-  data->next_seqnum = g_random_int_range (0, G_MAXUINT16);
+  data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
 
   return data;
@@ -190,10 +193,13 @@ gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
           " Number of retransmission packets sent", 0, G_MAXUINT,
           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&src_factory));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sink_factory));
+  g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
+      g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
+          "Map of payload types to their clock rates",
+          GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
+  gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
 
   gst_element_class_set_static_metadata (gstelement_class,
       "RTP Retransmission Sender", "Codec",
@@ -228,6 +234,9 @@ gst_rtp_rtx_send_finalize (GObject * object)
   g_hash_table_unref (rtx->rtx_pt_map);
   if (rtx->rtx_pt_map_structure)
     gst_structure_free (rtx->rtx_pt_map_structure);
+  g_hash_table_unref (rtx->clock_rate_map);
+  if (rtx->clock_rate_map_structure)
+    gst_structure_free (rtx->clock_rate_map_structure);
   g_object_unref (rtx->queue);
 
   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
@@ -258,6 +267,8 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
   gst_pad_set_chain_function (rtx->sinkpad,
       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
+  gst_pad_set_chain_list_function (rtx->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
 
   rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
@@ -266,6 +277,7 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
       NULL, (GDestroyNotify) ssrc_rtx_data_free);
   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
+  rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
 
   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
@@ -386,8 +398,8 @@ gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
   fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
           GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
 
-  GST_DEBUG_OBJECT (rtx,
-      "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
+  GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
+      "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
       seqnum, ssrc);
 
   /* gst_rtp_buffer_map does not map the payload so do it now */
@@ -399,7 +411,10 @@ gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
 
   /* copy extension if any */
   if (rtp.size[1]) {
-    mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
+    mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
+    gst_memory_map (mem, &map, GST_MAP_WRITE);
+    memcpy (map.data, rtp.data[1], rtp.size[1]);
+    gst_memory_unmap (mem, &map);
     gst_buffer_append_memory (new_buffer, mem);
   }
 
@@ -426,6 +441,9 @@ gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
   gst_rtp_buffer_set_padding (&new_rtp, FALSE);
   gst_rtp_buffer_unmap (&new_rtp);
 
+  /* Copy over timestamps */
+  gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
+
   return new_buffer;
 }
 
@@ -456,16 +474,15 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         guint ssrc = 0;
         GstBuffer *rtx_buf = NULL;
 
-        /* retrieve seqnum of the packet that need to be restransmisted */
+        /* retrieve seqnum of the packet that need to be retransmitted */
         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
           seqnum = -1;
 
-        /* retrieve ssrc of the packet that need to be restransmisted */
+        /* retrieve ssrc of the packet that need to be retransmitted */
         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
           ssrc = -1;
 
-        GST_DEBUG_OBJECT (rtx,
-            "request seqnum: %" G_GUINT32_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
+        GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
             seqnum, ssrc);
 
         GST_OBJECT_LOCK (rtx);
@@ -485,9 +502,29 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
           if (iter) {
             BufferQueueItem *item = g_sequence_get (iter);
-            GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
+            GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
             rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
           }
+#ifndef GST_DISABLE_DEBUG
+          else {
+            BufferQueueItem *item = NULL;
+
+            iter = g_sequence_get_begin_iter (data->queue);
+            if (!g_sequence_iter_is_end (iter))
+              item = g_sequence_get (iter);
+
+            if (item && seqnum < item->seqnum) {
+              GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
+                  "removed from the rtx queue; the first available is %u",
+                  seqnum, item->seqnum);
+            } else {
+              GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
+                  "transmitted yet in the original stream; either the remote end "
+                  "is not configured correctly, or the source is too slow",
+                  seqnum);
+            }
+          }
+#endif
         }
         GST_OBJECT_UNLOCK (rtx);
 
@@ -504,11 +541,11 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
           ssrc = -1;
 
-        GST_DEBUG_OBJECT (rtx, "collision ssrc: %" G_GUINT32_FORMAT, ssrc);
+        GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
 
         GST_OBJECT_LOCK (rtx);
 
-        /* choose another ssrc for our retransmited stream */
+        /* choose another ssrc for our retransmitted stream */
         if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
           guint master_ssrc;
           SSRCRtxData *data;
@@ -586,22 +623,55 @@ gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       GstCaps *caps;
       GstStructure *s;
       guint ssrc;
+      gint payload;
+      gpointer rtx_payload;
       SSRCRtxData *data;
 
       gst_event_parse_caps (event, &caps);
-      g_assert (gst_caps_is_fixed (caps));
 
       s = gst_caps_get_structure (caps, 0);
       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
         ssrc = -1;
+      if (!gst_structure_get_int (s, "payload", &payload))
+        payload = -1;
+
+      if (payload == -1 || ssrc == G_MAXUINT)
+        break;
+
+      if (payload == -1)
+        GST_WARNING_OBJECT (rtx, "No payload in caps");
 
       GST_OBJECT_LOCK (rtx);
       data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+      if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
+              GUINT_TO_POINTER (payload), NULL, &rtx_payload))
+        rtx_payload = GINT_TO_POINTER (-1);
+
+      if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
+        GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
+
+      GST_DEBUG_OBJECT (rtx,
+          "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
+          payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
+
       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
 
+      /* The session might need to know the RTX ssrc */
+      caps = gst_caps_copy (caps);
+      gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
+          "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
+
+      if (GPOINTER_TO_INT (rtx_payload) != -1)
+        gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
+            GPOINTER_TO_INT (rtx_payload), NULL);
+
       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
           data->clock_rate, ssrc);
       GST_OBJECT_UNLOCK (rtx);
+
+      gst_event_unref (event);
+      event = gst_event_new_caps (caps);
+      gst_caps_unref (caps);
       break;
     }
     default:
@@ -640,11 +710,10 @@ gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
 }
 
-static GstFlowReturn
-gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+/* Must be called with lock */
+static void
+process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
 {
-  GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
-  GstFlowReturn ret = GST_FLOW_ERROR;
   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
   BufferQueueItem *item;
   SSRCRtxData *data;
@@ -660,12 +729,19 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
   gst_rtp_buffer_unmap (&rtp);
 
-  GST_OBJECT_LOCK (rtx);
+  GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
+      ssrc);
 
   /* do not store the buffer if it's payload type is unknown */
   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
 
+    if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
+      data->clock_rate =
+          GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
+              GUINT_TO_POINTER (payload_type)));
+    }
+
     /* add current rtp buffer to queue history */
     item = g_slice_new0 (BufferQueueItem);
     item->seqnum = seqnum;
@@ -683,14 +759,41 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
     }
   }
+}
+
+static GstFlowReturn
+gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+  GstFlowReturn ret;
 
+  GST_OBJECT_LOCK (rtx);
+  process_buffer (rtx, buffer);
   GST_OBJECT_UNLOCK (rtx);
+  ret = gst_pad_push (rtx->srcpad, buffer);
 
-  GST_LOG_OBJECT (rtx,
-      "push seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, seqnum,
-      ssrc);
+  return ret;
+}
 
-  ret = gst_pad_push (rtx->srcpad, buffer);
+static gboolean
+process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+  process_buffer (user_data, *buffer);
+  return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * list)
+{
+  GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+  GstFlowReturn ret;
+
+  GST_OBJECT_LOCK (rtx);
+  gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
+  GST_OBJECT_UNLOCK (rtx);
+
+  ret = gst_pad_push_list (rtx->srcpad, list);
 
   return ret;
 }
@@ -704,11 +807,12 @@ gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
 
     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
-      gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
-
       GST_OBJECT_LOCK (rtx);
+      /* Update statistics just before pushing. */
       rtx->num_rtx_packets++;
       GST_OBJECT_UNLOCK (rtx);
+
+      gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
     } else if (GST_IS_EVENT (data->object)) {
       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
 
@@ -786,6 +890,11 @@ gst_rtp_rtx_send_get_property (GObject * object,
       g_value_set_uint (value, rtx->num_rtx_packets);
       GST_OBJECT_UNLOCK (rtx);
       break;
+    case PROP_CLOCK_RATE_MAP:
+      GST_OBJECT_LOCK (rtx);
+      g_value_set_boxed (value, rtx->clock_rate_map_structure);
+      GST_OBJECT_UNLOCK (rtx);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -842,6 +951,16 @@ gst_rtp_rtx_send_set_property (GObject * object,
       rtx->max_size_packets = g_value_get_uint (value);
       GST_OBJECT_UNLOCK (rtx);
       break;
+    case PROP_CLOCK_RATE_MAP:
+      GST_OBJECT_LOCK (rtx);
+      if (rtx->clock_rate_map_structure)
+        gst_structure_free (rtx->clock_rate_map_structure);
+      rtx->clock_rate_map_structure = g_value_dup_boxed (value);
+      g_hash_table_remove_all (rtx->clock_rate_map);
+      gst_structure_foreach (rtx->clock_rate_map_structure,
+          structure_to_hash_table, rtx->clock_rate_map);
+      GST_OBJECT_UNLOCK (rtx);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;