rtpbin: store more in the PacketInfo
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 13 Sep 2013 10:22:36 +0000 (12:22 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 13 Sep 2013 12:34:28 +0000 (14:34 +0200)
Store all info in the PacketInfo so that we can avoid mapping the packet
multiple times.

gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h
gst/rtpmanager/rtpstats.h

index ba1318b..6a0194e 100644 (file)
@@ -1559,11 +1559,26 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
 
   if (pinfo->rtp) {
-    GstRTPBuffer rtpb = { NULL };
-
-    gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtpb);
-    pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtpb);
-    gst_rtp_buffer_unmap (&rtpb);
+    GstRTPBuffer rtp = { NULL };
+
+    if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
+      goto invalid_packet;
+
+    pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
+    if (idx == 0) {
+      gint i;
+
+      /* only keep info for first buffer */
+      pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+      pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
+      pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
+      pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+      /* copy available csrc */
+      pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
+      for (i = 0; i < pinfo->csrc_count; i++)
+        pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
+    }
+    gst_rtp_buffer_unmap (&rtp);
   }
 
   if (idx == 0) {
@@ -1578,6 +1593,13 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
     }
   }
   return TRUE;
+
+  /* ERRORS */
+invalid_packet:
+  {
+    GST_DEBUG ("invalid RTP packet received");
+    return FALSE;
+  }
 }
 
 /* update the RTPPacketInfo structure with the current time and other bits
@@ -1585,11 +1607,13 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
  * This function is typically called when a validated packet is received.
  * This function should be called with the SESSION_LOCK
  */
-static void
+static gboolean
 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
 {
+  gboolean res;
+
   pinfo->send = send;
   pinfo->rtp = rtp;
   pinfo->is_list = is_list;
@@ -1603,11 +1627,14 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
 
   if (is_list) {
     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
-    gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet, pinfo);
+    res =
+        gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
+        pinfo);
   } else {
     GstBuffer *buffer = GST_BUFFER_CAST (data);
-    update_packet (&buffer, 0, pinfo);
+    res = update_packet (&buffer, 0, pinfo);
   }
+  return res;
 }
 
 static void
@@ -1615,6 +1642,10 @@ clean_packet_info (RTPPacketInfo * pinfo)
 {
   if (pinfo->address)
     g_object_unref (pinfo->address);
+  if (pinfo->data) {
+    gst_mini_object_unref (pinfo->data);
+    pinfo->data = NULL;
+  }
 }
 
 static gboolean
@@ -1687,35 +1718,19 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   gboolean created;
   gboolean prevsender, prevactive;
   RTPPacketInfo pinfo = { 0, };
-  guint32 csrcs[16];
-  guint8 i, count;
   guint64 oldrate;
-  GstRTPBuffer rtp = { NULL };
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
-  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
-    goto invalid_packet;
-
-  /* get SSRC to look up in session database */
-  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
-  /* copy available csrc for later */
-  count = gst_rtp_buffer_get_csrc_count (&rtp);
-  /* make sure to not overflow our array. An RTP buffer can maximally contain
-   * 16 CSRCs */
-  count = MIN (count, 16);
-
-  for (i = 0; i < count; i++)
-    csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
-
-  gst_rtp_buffer_unmap (&rtp);
-
   RTP_SESSION_LOCK (sess);
 
   /* update pinfo stats */
-  update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer, current_time,
-      running_time, -1);
+  if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
+          current_time, running_time, -1))
+    goto invalid_packet;
+
+  ssrc = pinfo.ssrc;
 
   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
   if (!source)
@@ -1726,7 +1741,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   oldrate = source->bitrate;
 
   /* let source process the packet */
-  result = rtp_source_process_rtp (source, buffer, &pinfo);
+  result = rtp_source_process_rtp (source, &pinfo);
 
   /* source became active */
   if (source_update_active (sess, source, prevactive))
@@ -1742,13 +1757,14 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
 
   if (source->validated) {
     gboolean created;
+    gint i;
 
     /* for validated sources, we add the CSRCs as well */
-    for (i = 0; i < count; i++) {
+    for (i = 0; i < pinfo.csrc_count; i++) {
       guint32 csrc;
       RTPSource *csrc_src;
 
-      csrc = csrcs[i];
+      csrc = pinfo.csrcs[i];
 
       /* get source */
       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
@@ -1776,6 +1792,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
 invalid_packet:
   {
     gst_buffer_unref (buffer);
+    RTP_SESSION_UNLOCK (sess);
     GST_DEBUG ("invalid RTP packet received");
     return GST_FLOW_OK;
   }
@@ -2391,6 +2408,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
   RTP_SESSION_UNLOCK (sess);
 
+  pinfo.data = NULL;
   clean_packet_info (&pinfo);
 
   /* notify caller of sr packets in the callback */
index 5add2ac..4382676 100644 (file)
@@ -863,33 +863,27 @@ get_clock_rate (RTPSource * src, guint8 payload)
  * 50 milliseconds apart and arrive 60 milliseconds apart, then the jitter is 10
  * milliseconds. */
 static void
-calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPPacketInfo * pinfo)
+calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo)
 {
   GstClockTime running_time;
   guint32 rtparrival, transit, rtptime;
   gint32 diff;
   gint clock_rate;
   guint8 pt;
-  GstRTPBuffer rtp = { NULL };
 
   /* get arrival time */
   if ((running_time = pinfo->running_time) == GST_CLOCK_TIME_NONE)
     goto no_time;
 
-  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
-    goto invalid_packet;
-
-  pt = gst_rtp_buffer_get_payload_type (&rtp);
+  pt = pinfo->pt;
 
   GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt);
 
   /* get clockrate */
-  if ((clock_rate = get_clock_rate (src, pt)) == -1) {
-    gst_rtp_buffer_unmap (&rtp);
+  if ((clock_rate = get_clock_rate (src, pt)) == -1)
     goto no_clock_rate;
-  }
 
-  rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+  rtptime = pinfo->rtptime;
 
   /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
    * care about the absolute value, just the difference. */
@@ -918,7 +912,6 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPPacketInfo * pinfo)
   GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f",
       rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0);
 
-  gst_rtp_buffer_unmap (&rtp);
   return;
 
   /* ERRORS */
@@ -927,11 +920,6 @@ no_time:
     GST_WARNING ("cannot get current running_time");
     return;
   }
-invalid_packet:
-  {
-    GST_WARNING ("invalid RTP packet");
-    return;
-  }
 no_clock_rate:
   {
     GST_WARNING ("cannot get clock-rate for pt %d", pt);
@@ -992,35 +980,29 @@ do_bitrate_estimation (RTPSource * src, GstClockTime running_time,
 /**
  * rtp_source_process_rtp:
  * @src: an #RTPSource
- * @buffer: an RTP buffer
+ * @pinfo: an #RTPPacketInfo
  *
- * Let @src handle the incomming RTP @buffer.
+ * Let @src handle the incomming RTP packet described in @pinfo.
  *
  * Returns: a #GstFlowReturn.
  */
 GstFlowReturn
-rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
-    RTPPacketInfo * pinfo)
+rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo)
 {
   GstFlowReturn result = GST_FLOW_OK;
   guint16 seqnr, udelta;
   RTPSourceStats *stats;
   guint16 expected;
-  GstRTPBuffer rtp = { NULL };
 
   g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
-  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+  g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR);
 
   stats = &src->stats;
 
-  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
-    goto invalid_packet;
-
-  seqnr = gst_rtp_buffer_get_seq (&rtp);
-  gst_rtp_buffer_unmap (&rtp);
+  seqnr = pinfo->seqnum;
 
   if (stats->cycles == -1) {
-    GST_DEBUG ("received first buffer");
+    GST_DEBUG ("received first packet");
     /* first time we heard of this source */
     init_seq (src, seqnr);
     src->stats.max_seq = seqnr - 1;
@@ -1045,9 +1027,10 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
       } else {
         GstBuffer *q;
 
-        GST_DEBUG ("probation %d: queue buffer", src->curr_probation);
+        GST_DEBUG ("probation %d: queue packet", src->curr_probation);
         /* when still in probation, keep packets in a list. */
-        g_queue_push_tail (src->packets, buffer);
+        g_queue_push_tail (src->packets, pinfo->data);
+        pinfo->data = NULL;
         /* remove packets from queue if there are too many */
         while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
           q = g_queue_pop_head (src->packets);
@@ -1098,25 +1081,19 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
       seqnr, src->stats.packets_received, src->stats.octets_received);
 
   /* calculate jitter for the stats */
-  calculate_jitter (src, buffer, pinfo);
+  calculate_jitter (src, pinfo);
 
   /* we're ready to push the RTP packet now */
-  result = push_packet (src, buffer);
+  result = push_packet (src, pinfo->data);
+  pinfo->data = NULL;
 
 done:
   return result;
 
   /* ERRORS */
-invalid_packet:
-  {
-    GST_WARNING ("invalid packet received");
-    gst_buffer_unref (buffer);
-    return GST_FLOW_OK;
-  }
 bad_sequence:
   {
     GST_WARNING ("unacceptable seqnum received");
-    gst_buffer_unref (buffer);
     return GST_FLOW_OK;
   }
 probation_seqnum:
@@ -1124,7 +1101,6 @@ probation_seqnum:
     GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
     src->curr_probation = src->probation;
     src->stats.max_seq = seqnr;
-    gst_buffer_unref (buffer);
     return GST_FLOW_OK;
   }
 }
index d8b2473..9cd1f2d 100644 (file)
@@ -229,7 +229,7 @@ void            rtp_source_set_rtp_from        (RTPSource *src, GSocketAddress *
 void            rtp_source_set_rtcp_from       (RTPSource *src, GSocketAddress *address);
 
 /* handling RTP */
-GstFlowReturn   rtp_source_process_rtp         (RTPSource *src, GstBuffer *buffer, RTPPacketInfo *pinfo);
+GstFlowReturn   rtp_source_process_rtp         (RTPSource *src, RTPPacketInfo *pinfo);
 
 GstFlowReturn   rtp_source_send_rtp            (RTPSource *src, gpointer data, gboolean is_list,
                                                 GstClockTime running_time);
index cddb5af..c868217 100644 (file)
@@ -68,6 +68,9 @@ typedef struct {
  * @header_len: number of overhead bytes per packet
  * @bytes: bytes of the packet including lowlevel overhead
  * @payload_len: bytes of the RTP payload
+ * @seqnum: the seqnum of the packet
+ * @pt: the payload type of the packet
+ * @rtptime: the RTP time of the packet
  *
  * Structure holding information about the packet.
  */
@@ -83,6 +86,12 @@ typedef struct {
   guint         header_len;
   guint         bytes;
   guint         payload_len;
+  guint32       ssrc;
+  guint16       seqnum;
+  guint8        pt;
+  guint32       rtptime;
+  guint32       csrc_count;
+  guint32       csrcs[16];
 } RTPPacketInfo;
 
 /**