Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
index 32973c0..77c5594 100644 (file)
@@ -44,6 +44,7 @@ enum
   SIGNAL_ON_SENDER_TIMEOUT,
   SIGNAL_ON_SENDING_RTCP,
   SIGNAL_ON_FEEDBACK_RTCP,
+  SIGNAL_SEND_RTCP,
   LAST_SIGNAL
 };
 
@@ -104,6 +105,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id,
 
 static gboolean rtp_session_on_sending_rtcp (RTPSession * sess,
     GstBuffer * buffer, gboolean early);
+static void rtp_session_send_rtcp (RTPSession * sess,
+    GstClockTimeDiff max_delay);
 
 
 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
@@ -128,6 +131,75 @@ accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
 }
 
 static void
+gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN (GClosure * closure,
+    GValue * return_value G_GNUC_UNUSED, guint n_param_values,
+    const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
+    gpointer marshal_data)
+{
+  typedef gboolean (*GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (gpointer data1,
+      gpointer arg_1, gboolean arg_2, gpointer data2);
+  register GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN callback;
+  register GCClosure *cc = (GCClosure *) closure;
+  register gpointer data1, data2;
+  gboolean v_return;
+
+  g_return_if_fail (return_value != NULL);
+  g_return_if_fail (n_param_values == 3);
+
+  if (G_CCLOSURE_SWAP_DATA (closure)) {
+    data1 = closure->data;
+    data2 = g_value_peek_pointer (param_values + 0);
+  } else {
+    data1 = g_value_peek_pointer (param_values + 0);
+    data2 = closure->data;
+  }
+  callback =
+      (GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (marshal_data ? marshal_data :
+      cc->callback);
+
+  v_return = callback (data1,
+      gst_value_get_mini_object (param_values + 1),
+      g_value_get_boolean (param_values + 2), data2);
+
+  g_value_set_boolean (return_value, v_return);
+}
+
+static void
+gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT (GClosure * closure,
+    GValue * return_value G_GNUC_UNUSED, guint n_param_values,
+    const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
+    gpointer marshal_data)
+{
+  typedef void (*GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (gpointer
+      data1, guint arg_1, guint arg_2, guint arg_3, guint arg_4, gpointer arg_5,
+      gpointer data2);
+  register GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT callback;
+  register GCClosure *cc = (GCClosure *) closure;
+  register gpointer data1, data2;
+
+  g_return_if_fail (n_param_values == 6);
+
+  if (G_CCLOSURE_SWAP_DATA (closure)) {
+    data1 = closure->data;
+    data2 = g_value_peek_pointer (param_values + 0);
+  } else {
+    data1 = g_value_peek_pointer (param_values + 0);
+    data2 = closure->data;
+  }
+  callback =
+      (GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (marshal_data ?
+      marshal_data : cc->callback);
+
+  callback (data1,
+      g_value_get_uint (param_values + 1),
+      g_value_get_uint (param_values + 2),
+      g_value_get_uint (param_values + 3),
+      g_value_get_uint (param_values + 4),
+      gst_value_get_mini_object (param_values + 5), data2);
+}
+
+
+static void
 rtp_session_class_init (RTPSessionClass * klass)
 {
   GObjectClass *gobject_class;
@@ -275,8 +347,8 @@ rtp_session_class_init (RTPSessionClass * klass)
   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
-      accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__POINTER_BOOLEAN,
-      G_TYPE_BOOLEAN, 2, G_TYPE_POINTER, G_TYPE_BOOLEAN);
+      accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN,
+      G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER, G_TYPE_BOOLEAN);
 
   /**
    * RTPSession::on-feedback-rtcp:
@@ -295,9 +367,25 @@ rtp_session_class_init (RTPSessionClass * klass)
   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_POINTER,
-      G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
-      G_TYPE_POINTER);
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT,
+      G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
+      GST_TYPE_BUFFER);
+
+  /**
+   * RTPSession::send-rtcp:
+   * @session: the object which received the signal
+   * @max_delay: The maximum delay after which the feedback will not be useful
+   *  anymore
+   *
+   * Requests that the #RTPSession initiate a new RTCP packet as soon as
+   * possible within the requested delay.
+   */
+
+  rtp_session_signals[SIGNAL_SEND_RTCP] =
+      g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+      gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
 
   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
@@ -408,6 +496,7 @@ rtp_session_class_init (RTPSessionClass * klass)
   klass->get_source_by_ssrc =
       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
   klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
+  klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
 
   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
 }
@@ -769,6 +858,14 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
     sess->callbacks.reconsider = callbacks->reconsider;
     sess->reconsider_user_data = user_data;
   }
+  if (callbacks->request_key_unit) {
+    sess->callbacks.request_key_unit = callbacks->request_key_unit;
+    sess->request_key_unit_user_data = user_data;
+  }
+  if (callbacks->request_time) {
+    sess->callbacks.request_time = callbacks->request_time;
+    sess->request_time_user_data = user_data;
+  }
 }
 
 /**
@@ -880,6 +977,24 @@ rtp_session_set_reconsider_callback (RTPSession * sess,
 }
 
 /**
+ * rtp_session_set_request_time_callback:
+ * @sess: an #RTPSession
+ * @callback: callback to set
+ * @user_data: user data passed in the callback
+ *
+ * Configure only the request_time callback
+ */
+void
+rtp_session_set_request_time_callback (RTPSession * sess,
+    RTPSessionRequestTime callback, gpointer user_data)
+{
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  sess->callbacks.request_time = callback;
+  sess->request_time_user_data = user_data;
+}
+
+/**
  * rtp_session_set_bandwidth:
  * @sess: an #RTPSession
  * @bandwidth: the bandwidth allocated
@@ -1552,11 +1667,14 @@ rtp_session_create_source (RTPSession * sess)
 static void
 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
-    GstClockTime running_time)
+    GstClockTime running_time, guint64 ntpnstime)
 {
+  GstMetaNetAddress *meta;
+
   /* get time of arrival */
   arrival->current_time = current_time;
   arrival->running_time = running_time;
+  arrival->ntpnstime = ntpnstime;
 
   /* get packet size including header overhead */
   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
@@ -1568,11 +1686,12 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
   }
 
   /* for netbuffer we can store the IP address to check for collisions */
-  arrival->have_address = GST_IS_NETBUFFER (buffer);
-  if (arrival->have_address) {
-    GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
-
-    memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
+  meta = gst_buffer_get_meta_net_address (buffer);
+  if (meta) {
+    arrival->have_address = TRUE;
+    memcpy (&arrival->address, &meta->naddr, sizeof (GstNetAddress));
+  } else {
+    arrival->have_address = FALSE;
   }
 }
 
@@ -1611,7 +1730,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   RTP_SESSION_LOCK (sess);
   /* update arrival stats */
   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
-      running_time);
+      running_time, -1);
 
   /* ignore more RTP packets when we left the session */
   if (sess->source->received_bye)
@@ -1732,7 +1851,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
       /* only deal with report blocks for our session, we update the stats of
        * the sender of the RTCP message. We could also compare our stats against
        * the other sender to see if we are better or worse. */
-      rtp_source_process_rb (source, arrival->current_time, fractionlost,
+      rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
           packetslost, exthighestseq, jitter, lsr, dlsr);
     }
   }
@@ -1892,10 +2011,16 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
     validated = !RTP_SOURCE_IS_ACTIVE (source);
     source->validated = TRUE;
 
+    /* source became active */
+    if (validated) {
+      sess->stats.active_sources++;
+      GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
+          sess->stats.active_sources);
+      on_ssrc_validated (sess, source);
+    }
+
     if (created)
       on_new_ssrc (sess, source);
-    if (validated)
-      on_ssrc_validated (sess, source);
     if (changed)
       on_ssrc_sdes (sess, source);
 
@@ -1929,6 +2054,9 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
     GST_DEBUG ("SSRC: %08x", ssrc);
 
+    if (ssrc == sess->source->ssrc)
+      return;
+
     /* find src and mark bye, no probation when dealing with RTCP */
     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
     if (!source)
@@ -2003,36 +2131,78 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
   GST_DEBUG ("received APP");
 }
 
+static void
+rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
+    guint32 media_ssrc, GstClockTime current_time)
+{
+  RTPSource *src;
+  guint32 round_trip = 0;
+
+  if (!sess->callbacks.request_key_unit)
+    return;
+
+  src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+      GINT_TO_POINTER (sender_ssrc));
+
+  if (!src)
+    return;
+
+  if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
+      rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL,
+          &round_trip)) {
+    GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
+        GST_SECOND, 65536);
+
+    if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
+        current_time - sess->last_keyframe_request < round_trip_in_ns) {
+      GST_DEBUG ("Ignoring PLI because one was send without one RTT (%"
+          GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
+          GST_TIME_ARGS (current_time - sess->last_keyframe_request),
+          GST_TIME_ARGS (round_trip_in_ns));;
+      return;
+    }
+  }
+
+  sess->last_keyframe_request = current_time;
+
+  GST_LOG ("received PLI from %X %p(%p)", sender_ssrc,
+      sess->callbacks.process_rtp, sess->callbacks.request_key_unit);
+
+  sess->callbacks.request_key_unit (sess, FALSE,
+      sess->request_key_unit_user_data);
+}
 
 static void
 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival)
+    RTPArrivalStats * arrival, GstClockTime current_time)
 {
   GstRTCPType type = gst_rtcp_packet_get_type (packet);
   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
-  guint length = 4 * (gst_rtcp_packet_get_length (packet) - 2);
+  guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
+  guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
 
-  GST_DEBUG ("received feedback %d:%d from %08X about %08X"
-      " with FCI of length %d", type, fbtype, sender_ssrc, media_ssrc, length);
+  GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
+      "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
 
   if (g_signal_has_handler_pending (sess,
           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
-    GstBuffer *fci = NULL;
+    GstBuffer *fci_buffer = NULL;
 
-    if (length) {
-      fci = gst_buffer_create_sub (packet->buffer, packet->offset + 72, length);
-      GST_BUFFER_TIMESTAMP (fci) = arrival->running_time;
+    if (fci_length > 0) {
+      fci_buffer = gst_buffer_create_sub (packet->buffer,
+          fci_data - GST_BUFFER_DATA (packet->buffer), fci_length);
+      GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
     }
 
     RTP_SESSION_UNLOCK (sess);
     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
-        type, fbtype, sender_ssrc, media_ssrc, fci);
+        type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
     RTP_SESSION_LOCK (sess);
 
-    if (fci)
-      gst_buffer_unref (fci);
+    if (fci_buffer)
+      gst_buffer_unref (fci_buffer);
   }
 
   if (sess->rtcp_feedback_retention_window) {
@@ -2042,6 +2212,24 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
     if (src)
       rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
   }
+
+  if (rtp_source_get_ssrc (sess->source) == media_ssrc) {
+    switch (type) {
+      case GST_RTCP_TYPE_PSFB:
+        switch (fbtype) {
+          case GST_RTCP_PSFB_TYPE_PLI:
+            rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
+                current_time);
+            break;
+          default:
+            break;
+        }
+        break;
+      case GST_RTCP_TYPE_RTPFB:
+      default:
+        break;
+    }
+  }
 }
 
 /**
@@ -2049,6 +2237,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
  * @sess: and #RTPSession
  * @buffer: an RTCP buffer
  * @current_time: the current system time
+ * @ntpnstime: the current NTP time in nanoseconds
  *
  * Process an RTCP buffer in the session manager. This function takes ownership
  * of @buffer.
@@ -2057,7 +2246,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
  */
 GstFlowReturn
 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
-    GstClockTime current_time)
+    GstClockTime current_time, guint64 ntpnstime)
 {
   GstRTCPPacket packet;
   gboolean more, is_bye = FALSE, do_sync = FALSE;
@@ -2074,7 +2263,8 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   RTP_SESSION_LOCK (sess);
   /* update arrival stats */
-  update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
+  update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
+      ntpnstime);
 
   if (sess->sent_bye)
     goto ignore;
@@ -2113,7 +2303,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
         break;
       case GST_RTCP_TYPE_RTPFB:
       case GST_RTCP_TYPE_PSFB:
-        rtp_session_process_feedback (sess, &packet, &arrival);
+        rtp_session_process_feedback (sess, &packet, &arrival, current_time);
         break;
       default:
         GST_WARNING ("got unknown RTCP packet");
@@ -2984,13 +3174,10 @@ dont_send:
 }
 
 void
-rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, gboolean fir)
+rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc)
 {
   guint i;
 
-  if (fir)
-    return;
-
   for (i = 0; i < sess->rtcp_pli_requests->len; i++)
     if (ssrc == g_array_index (sess->rtcp_pli_requests, guint32, i))
       return;
@@ -3050,3 +3237,16 @@ rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   return ret;
 }
+
+static void
+rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
+{
+  GstClockTime now;
+
+  if (!sess->callbacks.send_rtcp)
+    return;
+
+  now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+  rtp_session_request_early_rtcp (sess, now, max_delay);
+}