rtpsession: add locking for clear-pt-map
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
index 6c1bba0..04fca37 100644 (file)
@@ -281,7 +281,8 @@ struct _GstRtpSessionPrivate
   GstRtpNtpTimeSource ntp_time_source;
   gboolean rtcp_sync_send_time;
 
-  guint rtx_count;
+  guint recv_rtx_req_count;
+  guint sent_rtx_req_count;
 };
 
 /* callbacks to handle actions from the session manager */
@@ -305,6 +306,14 @@ static void gst_rtp_session_notify_nack (RTPSession * sess,
 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
 static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
     gpointer user_data);
+static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp_list (GstPad * pad,
+    GstObject * parent, GstBufferList * list);
 
 static RTPSessionCallbacks callbacks = {
   gst_rtp_session_process_rtp,
@@ -728,13 +737,17 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
    * Various session statistics. This property returns a GstStructure
    * with name application/x-rtp-session-stats with the following fields:
    *
-   *  "rtx-count"       G_TYPE_UINT   The number of retransmission events
-   *      received from downstream (in receiver mode)
-   *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
+   *  "recv-rtx-req-count  G_TYPE_UINT   The number of retransmission event
+   *      received from downstream (in receiver mode) (Since 1.16)
+   *  "sent-rtx-req-count" G_TYPE_UINT   The number of retransmission event
+   *      sent downstream (in sender mode) (Since 1.16)
+   *  "rtx-count"          G_TYPE_UINT   DEPRECATED Since 1.16, same as
+   *      "recv-rtx-req-count".
+   *  "rtx-drop-count"     G_TYPE_UINT   The number of retransmission events
    *      dropped (due to bandwidth constraints)
-   *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
-   *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
-   *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource::stats for all
+   *  "sent-nack-count"    G_TYPE_UINT   Number of NACKs sent
+   *  "recv-nack-count"    G_TYPE_UINT   Number of NACKs received
+   *  "source-stats"       G_TYPE_BOXED  GValueArray of #RTPSource::stats for all
    *      RTP sources (Since 1.8)
    *
    * Since: 1.4
@@ -795,6 +808,12 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
 
   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
       "rtpsession", 0, "RTP Session");
+
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
+
 }
 
 static void
@@ -838,12 +857,15 @@ gst_rtp_session_init (GstRtpSession * rtpsession)
   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
       (GDestroyNotify) gst_caps_unref);
 
+  rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
+
   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
 
   rtpsession->priv->thread_stopped = TRUE;
 
-  rtpsession->priv->rtx_count = 0;
+  rtpsession->priv->recv_rtx_req_count = 0;
+  rtpsession->priv->sent_rtx_req_count = 0;
 
   rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
 }
@@ -1005,8 +1027,10 @@ gst_rtp_session_create_stats (GstRtpSession * rtpsession)
   GstStructure *s;
 
   g_object_get (rtpsession->priv->session, "stats", &s, NULL);
-  gst_structure_set (s, "rtx-count", G_TYPE_UINT, rtpsession->priv->rtx_count,
-      NULL);
+  gst_structure_set (s, "rtx-count", G_TYPE_UINT,
+      rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT,
+      rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT,
+      rtpsession->priv->sent_rtx_req_count, NULL);
 
   return s;
 }
@@ -1270,6 +1294,7 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       /* downstream is now releasing the dataflow and we can join. */
       join_rtcp_thread (rtpsession);
+      rtp_session_reset (rtpsession->priv->session);
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       break;
@@ -1294,7 +1319,9 @@ return_true (gpointer key, gpointer value, gpointer user_data)
 static void
 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
 {
+  GST_RTP_SESSION_LOCK (rtpsession);
   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
 }
 
 /* called when the session manager has an RTP packet or a list of packets
@@ -1396,6 +1423,8 @@ do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
   GST_RTP_SESSION_UNLOCK (rtpsession);
 
   event = gst_event_new_stream_start (stream_id);
+  rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event);
+  gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
   if (have_group_id)
     gst_event_set_group_id (event, group_id);
   gst_pad_push_event (srcpad, event);
@@ -1407,6 +1436,7 @@ do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
 
   gst_segment_init (&seg, GST_FORMAT_TIME);
   event = gst_event_new_segment (&seg);
+  gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
   gst_pad_push_event (srcpad, event);
 }
 
@@ -1415,7 +1445,7 @@ do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
  * well. */
 static GstFlowReturn
 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
-    GstBuffer * buffer, gboolean eos, gpointer user_data)
+    GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data)
 {
   GstFlowReturn result;
   GstRtpSession *rtpsession;
@@ -1438,10 +1468,18 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
     GST_LOG_OBJECT (rtpsession, "sending RTCP");
     result = gst_pad_push (rtcp_src, buffer);
 
-    /* we have to send EOS after this packet */
-    if (eos) {
+    /* Forward send an EOS on the RTCP sink if we received an EOS on the
+     * send_rtp_sink. We don't need to check the recv_rtp_sink since in this
+     * case the EOS event would already have been sent */
+    if (all_sources_bye && rtpsession->send_rtp_sink &&
+        GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) {
+      GstEvent *event;
+
       GST_LOG_OBJECT (rtpsession, "sending EOS");
-      gst_pad_push_event (rtcp_src, gst_event_new_eos ());
+
+      event = gst_event_new_eos ();
+      gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
+      gst_pad_push_event (rtcp_src, event);
     }
     gst_object_unref (rtcp_src);
   } else {
@@ -1665,6 +1703,7 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
     }
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+      rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
       break;
     case GST_EVENT_SEGMENT:
@@ -1699,11 +1738,15 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
         gst_object_ref (rtcp_src);
       GST_RTP_SESSION_UNLOCK (rtpsession);
 
+      gst_event_unref (event);
+
       if (rtcp_src) {
+        event = gst_event_new_eos ();
+        if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID)
+          gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
         ret = gst_pad_push_event (rtcp_src, event);
         gst_object_unref (rtcp_src);
       } else {
-        gst_event_unref (event);
         ret = TRUE;
       }
       break;
@@ -1778,15 +1821,12 @@ gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
                 all_headers, count))
           forward = FALSE;
       } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
-        GstClockTime running_time;
         guint seqnum, delay, deadline, max_delay, avg_rtt;
 
         GST_RTP_SESSION_LOCK (rtpsession);
-        rtpsession->priv->rtx_count++;
+        rtpsession->priv->recv_rtx_req_count++;
         GST_RTP_SESSION_UNLOCK (rtpsession);
 
-        if (!gst_structure_get_clock_time (s, "running-time", &running_time))
-          running_time = -1;
         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
           ssrc = -1;
         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
@@ -1982,6 +2022,7 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
   GstRtpSession *rtpsession;
   GstRtpSessionPrivate *priv;
   GstClockTime current_time;
+  GstClockTime running_time;
   guint64 ntpnstime;
 
   rtpsession = GST_RTP_SESSION (parent);
@@ -1994,9 +2035,10 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
   GST_RTP_SESSION_UNLOCK (rtpsession);
 
   current_time = gst_clock_get_time (priv->sysclock);
-  get_current_times (rtpsession, NULL, &ntpnstime);
+  get_current_times (rtpsession, &running_time, &ntpnstime);
 
-  rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
+  rtp_session_process_rtcp (priv->session, buffer, current_time, running_time,
+      ntpnstime);
 
   return GST_FLOW_OK;           /* always return OK */
 }
@@ -2245,7 +2287,7 @@ gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
   return TRUE;
 }
 
-/* Recieve an RTP packet or a list of packets to be send to the receivers,
+/* Receive an RTP packet or a list of packets to be sent to the receivers,
  * send to RTP session manager and forward to send_rtp_src.
  */
 static GstFlowReturn
@@ -2265,8 +2307,8 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
   if (is_list) {
     GstBuffer *buffer = NULL;
 
-    /* All groups in an list have the same timestamp.
-     * So, just take it from the first group. */
+    /* All buffers in a list have the same timestamp.
+     * So, just take it from the first buffer. */
     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
     if (buffer)
       timestamp = GST_BUFFER_PTS (buffer);
@@ -2685,6 +2727,10 @@ gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
               "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
       gst_pad_push_event (send_rtp_sink, event);
 
+      GST_RTP_SESSION_LOCK (rtpsession);
+      rtpsession->priv->sent_rtx_req_count++;
+      GST_RTP_SESSION_UNLOCK (rtpsession);
+
       if (blp == 0)
         break;