Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / third_party / webrtc / modules / pacing / paced_sender.cc
index 6204a9a..a071ffc 100644 (file)
 #include <assert.h>
 
 #include <map>
+#include <queue>
 #include <set>
 
 #include "webrtc/modules/interface/module_common_types.h"
+#include "webrtc/modules/pacing/bitrate_prober.h"
 #include "webrtc/system_wrappers/interface/clock.h"
 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
+#include "webrtc/system_wrappers/interface/field_trial.h"
+#include "webrtc/system_wrappers/interface/logging.h"
 #include "webrtc/system_wrappers/interface/trace_event.h"
 
 namespace {
@@ -28,69 +32,140 @@ const int kMinPacketLimitMs = 5;
 // time.
 const int kMaxIntervalTimeMs = 30;
 
-// Max time that the first packet in the queue can sit in the queue if no
-// packets are sent, regardless of buffer state. In practice only in effect at
-// low bitrates (less than 320 kbits/s).
-const int kMaxQueueTimeWithoutSendingUs = 30000;
-
 }  // namespace
 
 namespace webrtc {
 namespace paced_sender {
 struct Packet {
-  Packet(uint32_t ssrc,
+  Packet(PacedSender::Priority priority,
+         uint32_t ssrc,
          uint16_t seq_number,
          int64_t capture_time_ms,
          int64_t enqueue_time_ms,
          int length_in_bytes,
-         bool retransmission)
-      : ssrc(ssrc),
+         bool retransmission,
+         uint64_t enqueue_order)
+      : priority(priority),
+        ssrc(ssrc),
         sequence_number(seq_number),
         capture_time_ms(capture_time_ms),
         enqueue_time_ms(enqueue_time_ms),
         bytes(length_in_bytes),
-        retransmission(retransmission) {}
+        retransmission(retransmission),
+        enqueue_order(enqueue_order) {}
+
+  PacedSender::Priority priority;
   uint32_t ssrc;
   uint16_t sequence_number;
   int64_t capture_time_ms;
   int64_t enqueue_time_ms;
   int bytes;
   bool retransmission;
+  uint64_t enqueue_order;
+  std::list<Packet>::iterator this_it;
 };
 
-// STL list style class which prevents duplicates in the list.
-class PacketList {
+// Used by priority queue to sort packets.
+struct Comparator {
+  bool operator()(const Packet* first, const Packet* second) {
+    // Highest prio = 0.
+    if (first->priority != second->priority)
+      return first->priority > second->priority;
+
+    // Retransmissions go first.
+    if (second->retransmission && !first->retransmission)
+      return true;
+
+    // Older frames have higher prio.
+    if (first->capture_time_ms != second->capture_time_ms)
+      return first->capture_time_ms > second->capture_time_ms;
+
+    return first->enqueue_order > second->enqueue_order;
+  }
+};
+
+// Class encapsulating a priority queue with some extensions.
+class PacketQueue {
  public:
-  PacketList() {};
+  PacketQueue() : bytes_(0) {}
+  virtual ~PacketQueue() {}
+
+  void Push(const Packet& packet) {
+    if (!AddToDupeSet(packet))
+      return;
+
+    // Store packet in list, use pointers in priority queue for cheaper moves.
+    // Packets have a handle to its own iterator in the list, for easy removal
+    // when popping from queue.
+    packet_list_.push_front(packet);
+    std::list<Packet>::iterator it = packet_list_.begin();
+    it->this_it = it;          // Handle for direct removal from list.
+    prio_queue_.push(&(*it));  // Pointer into list.
+    bytes_ += packet.bytes;
+  }
 
-  bool empty() const {
-    return packet_list_.empty();
+  const Packet& BeginPop() {
+    const Packet& packet = *prio_queue_.top();
+    prio_queue_.pop();
+    return packet;
   }
 
-  Packet front() const {
-    return packet_list_.front();
+  void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
+
+  void FinalizePop(const Packet& packet) {
+    RemoveFromDupeSet(packet);
+    bytes_ -= packet.bytes;
+    packet_list_.erase(packet.this_it);
   }
 
-  void pop_front() {
-    Packet& packet = packet_list_.front();
-    uint16_t sequence_number = packet.sequence_number;
-    uint32_t ssrc = packet.ssrc;
-    packet_list_.pop_front();
-    sequence_number_set_[ssrc].erase(sequence_number);
+  bool Empty() const { return prio_queue_.empty(); }
+
+  size_t SizeInPackets() const { return prio_queue_.size(); }
+
+  uint32_t SizeInBytes() const { return bytes_; }
+
+  int64_t OldestEnqueueTime() const {
+    std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
+    if (it == packet_list_.rend())
+      return 0;
+    return it->enqueue_time_ms;
   }
 
-  void push_back(const Packet& packet) {
-    if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
-        sequence_number_set_[packet.ssrc].end()) {
-      // Don't insert duplicates.
-      packet_list_.push_back(packet);
-      sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
+ private:
+  // Try to add a packet to the set of ssrc/seqno identifiers currently in the
+  // queue. Return true if inserted, false if this is a duplicate.
+  bool AddToDupeSet(const Packet& packet) {
+    SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+    if (it == dupe_map_.end()) {
+      // First for this ssrc, just insert.
+      dupe_map_[packet.ssrc].insert(packet.sequence_number);
+      return true;
     }
+
+    // Insert returns a pair, where second is a bool set to true if new element.
+    return it->second.insert(packet.sequence_number).second;
   }
 
- private:
+  void RemoveFromDupeSet(const Packet& packet) {
+    SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+    assert(it != dupe_map_.end());
+    it->second.erase(packet.sequence_number);
+    if (it->second.empty()) {
+      dupe_map_.erase(it);
+    }
+  }
+
+  // List of packets, in the order the were enqueued. Since dequeueing may
+  // occur out of order, use list instead of vector.
   std::list<Packet> packet_list_;
-  std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
+  // Priority queue of the packets, sorted according to Comparator.
+  // Use pointers into list, to avoid moving whole struct within heap.
+  std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
+  // Total number of bytes in the queue.
+  uint64_t bytes_;
+  // Map<ssrc, set<seq_no> >, for checking duplicates.
+  typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
+  SsrcSeqNoMap dupe_map_;
 };
 
 class IntervalBudget {
@@ -121,6 +196,8 @@ class IntervalBudget {
 
   int bytes_remaining() const { return bytes_remaining_; }
 
+  int target_rate_kbps() const { return target_rate_kbps_; }
+
  private:
   int target_rate_kbps_;
   int bytes_remaining_;
@@ -131,6 +208,7 @@ const float PacedSender::kDefaultPaceMultiplier = 2.5f;
 
 PacedSender::PacedSender(Clock* clock,
                          Callback* callback,
+                         int bitrate_kbps,
                          int max_bitrate_kbps,
                          int min_bitrate_kbps)
     : clock_(clock),
@@ -138,15 +216,13 @@ PacedSender::PacedSender(Clock* clock,
       critsect_(CriticalSectionWrapper::CreateCriticalSection()),
       enabled_(true),
       paused_(false),
-      max_queue_length_ms_(kDefaultMaxQueueLengthMs),
       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
+      prober_(new BitrateProber()),
+      bitrate_bps_(1000 * bitrate_kbps),
       time_last_update_us_(clock->TimeInMicroseconds()),
-      capture_time_ms_last_queued_(0),
-      capture_time_ms_last_sent_(0),
-      high_priority_packets_(new paced_sender::PacketList),
-      normal_priority_packets_(new paced_sender::PacketList),
-      low_priority_packets_(new paced_sender::PacketList) {
+      packets_(new paced_sender::PacketQueue()),
+      packet_counter_(0) {
   UpdateBytesPerInterval(kMinPacketLimitMs);
 }
 
@@ -172,11 +248,13 @@ bool PacedSender::Enabled() const {
   return enabled_;
 }
 
-void PacedSender::UpdateBitrate(int max_bitrate_kbps,
+void PacedSender::UpdateBitrate(int bitrate_kbps,
+                                int max_bitrate_kbps,
                                 int min_bitrate_kbps) {
   CriticalSectionScoped cs(critsect_.get());
   media_budget_->set_target_rate_kbps(max_bitrate_kbps);
   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
+  bitrate_bps_ = 1000 * bitrate_kbps;
 }
 
 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
@@ -187,69 +265,51 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
   if (!enabled_) {
     return true;  // We can send now.
   }
+  // Enable probing if the probing experiment is enabled.
+  if (!prober_->IsProbing() && ProbingExperimentIsEnabled()) {
+    prober_->SetEnabled(true);
+  }
+  prober_->MaybeInitializeProbe(bitrate_bps_);
+
   if (capture_time_ms < 0) {
     capture_time_ms = clock_->TimeInMilliseconds();
   }
-  if (priority != kHighPriority &&
-      capture_time_ms > capture_time_ms_last_queued_) {
-    capture_time_ms_last_queued_ = capture_time_ms;
-    TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
-                             "capture_time_ms", capture_time_ms);
-  }
-  paced_sender::PacketList* packet_list = NULL;
-  switch (priority) {
-    case kHighPriority:
-      packet_list = high_priority_packets_.get();
-      break;
-    case kNormalPriority:
-      packet_list = normal_priority_packets_.get();
-      break;
-    case kLowPriority:
-      packet_list = low_priority_packets_.get();
-      break;
-  }
-  packet_list->push_back(paced_sender::Packet(ssrc,
-                                              sequence_number,
-                                              capture_time_ms,
-                                              clock_->TimeInMilliseconds(),
-                                              bytes,
-                                              retransmission));
+
+  packets_->Push(paced_sender::Packet(
+      priority, ssrc, sequence_number, capture_time_ms,
+      clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++));
   return false;
 }
 
-void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
+int PacedSender::ExpectedQueueTimeMs() const {
   CriticalSectionScoped cs(critsect_.get());
-  max_queue_length_ms_ = max_queue_length_ms;
+  int target_rate = media_budget_->target_rate_kbps();
+  assert(target_rate > 0);
+  return packets_->SizeInBytes() * 8 / target_rate;
+}
+
+size_t PacedSender::QueueSizePackets() const {
+  CriticalSectionScoped cs(critsect_.get());
+  return packets_->SizeInPackets();
 }
 
 int PacedSender::QueueInMs() const {
   CriticalSectionScoped cs(critsect_.get());
-  int64_t now_ms = clock_->TimeInMilliseconds();
-  int64_t oldest_packet_enqueue_time = now_ms;
-  if (!high_priority_packets_->empty()) {
-    oldest_packet_enqueue_time =
-        std::min(oldest_packet_enqueue_time,
-                 high_priority_packets_->front().enqueue_time_ms);
-  }
-  if (!normal_priority_packets_->empty()) {
-    oldest_packet_enqueue_time =
-        std::min(oldest_packet_enqueue_time,
-                 normal_priority_packets_->front().enqueue_time_ms);
-  }
-  if (!low_priority_packets_->empty()) {
-    oldest_packet_enqueue_time =
-        std::min(oldest_packet_enqueue_time,
-                 low_priority_packets_->front().enqueue_time_ms);
-  }
-  return now_ms - oldest_packet_enqueue_time;
+
+  int64_t oldest_packet = packets_->OldestEnqueueTime();
+  if (oldest_packet == 0)
+    return 0;
+
+  return clock_->TimeInMilliseconds() - oldest_packet;
 }
 
 int32_t PacedSender::TimeUntilNextProcess() {
   CriticalSectionScoped cs(critsect_.get());
-  int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() -
-      time_last_update_us_ + 500) / 1000;
-  if (elapsed_time_ms <= 0) {
-    return kMinPacketLimitMs;
+  int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
+  int elapsed_time_ms = static_cast<int>((elapsed_time_us + 500) / 1000);
+  if (prober_->IsProbing()) {
+    int next_probe = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
+    return next_probe;
   }
   if (elapsed_time_ms >= kMinPacketLimitMs) {
     return 0;
@@ -270,122 +330,70 @@ int32_t PacedSender::Process() {
       uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
       UpdateBytesPerInterval(delta_time_ms);
     }
-    paced_sender::PacketList* packet_list;
-    while (ShouldSendNextPacket(&packet_list)) {
-      if (!SendPacketFromList(packet_list))
+
+    while (!packets_->Empty()) {
+      if (media_budget_->bytes_remaining() <= 0 && !prober_->IsProbing())
+        return 0;
+
+      // Since we need to release the lock in order to send, we first pop the
+      // element from the priority queue but keep it in storage, so that we can
+      // reinsert it if send fails.
+      const paced_sender::Packet& packet = packets_->BeginPop();
+      if (SendPacket(packet)) {
+        // Send succeeded, remove it from the queue.
+        packets_->FinalizePop(packet);
+        if (prober_->IsProbing())
+          return 0;
+      } else {
+        // Send failed, put it back into the queue.
+        packets_->CancelPop(packet);
         return 0;
+      }
     }
-    if (high_priority_packets_->empty() &&
-        normal_priority_packets_->empty() &&
-        low_priority_packets_->empty() &&
-        padding_budget_->bytes_remaining() > 0) {
-      int padding_needed = padding_budget_->bytes_remaining();
-      critsect_->Leave();
-      int bytes_sent = callback_->TimeToSendPadding(padding_needed);
-      critsect_->Enter();
-      media_budget_->UseBudget(bytes_sent);
-      padding_budget_->UseBudget(bytes_sent);
+
+    int padding_needed = padding_budget_->bytes_remaining();
+    if (padding_needed > 0) {
+      SendPadding(padding_needed);
     }
   }
   return 0;
 }
 
-bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
-    EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
-  paced_sender::Packet packet = GetNextPacketFromList(packet_list);
+bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
   critsect_->Leave();
-
   const bool success = callback_->TimeToSendPacket(packet.ssrc,
                                                    packet.sequence_number,
                                                    packet.capture_time_ms,
                                                    packet.retransmission);
   critsect_->Enter();
-  // If packet cannot be sent then keep it in packet list and exit early.
-  // There's no need to send more packets.
-  if (!success) {
-    return false;
-  }
-  packet_list->pop_front();
-  const bool last_packet =
-      packet_list->empty() ||
-      packet_list->front().capture_time_ms > packet.capture_time_ms;
-  if (packet_list != high_priority_packets_.get()) {
-    if (packet.capture_time_ms > capture_time_ms_last_sent_) {
-      capture_time_ms_last_sent_ = packet.capture_time_ms;
-    } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
-               last_packet) {
-      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
-    }
+
+  if (success) {
+    // Update media bytes sent.
+    prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
+    media_budget_->UseBudget(packet.bytes);
+    padding_budget_->UseBudget(packet.bytes);
   }
-  return true;
-}
 
-void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
-  media_budget_->IncreaseBudget(delta_time_ms);
-  padding_budget_->IncreaseBudget(delta_time_ms);
+  return success;
 }
 
-bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
-  *packet_list = NULL;
-  if (media_budget_->bytes_remaining() <= 0) {
-    // All bytes consumed for this interval.
-    // Check if we have not sent in a too long time.
-    if (clock_->TimeInMicroseconds() - time_last_send_us_ >
-        kMaxQueueTimeWithoutSendingUs) {
-      if (!high_priority_packets_->empty()) {
-        *packet_list = high_priority_packets_.get();
-        return true;
-      }
-      if (!normal_priority_packets_->empty()) {
-        *packet_list = normal_priority_packets_.get();
-        return true;
-      }
-    }
-    // Send any old packets to avoid queuing for too long.
-    if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
-      int64_t high_priority_capture_time = -1;
-      if (!high_priority_packets_->empty()) {
-        high_priority_capture_time =
-            high_priority_packets_->front().capture_time_ms;
-        *packet_list = high_priority_packets_.get();
-      }
-      if (!normal_priority_packets_->empty() &&
-          (high_priority_capture_time == -1 ||
-           high_priority_capture_time >
-               normal_priority_packets_->front().capture_time_ms)) {
-        *packet_list = normal_priority_packets_.get();
-      }
-      if (*packet_list)
-        return true;
-    }
-    return false;
-  }
-  if (!high_priority_packets_->empty()) {
-    *packet_list = high_priority_packets_.get();
-    return true;
-  }
-  if (!normal_priority_packets_->empty()) {
-    *packet_list = normal_priority_packets_.get();
-    return true;
-  }
-  if (!low_priority_packets_->empty()) {
-    *packet_list = low_priority_packets_.get();
-    return true;
-  }
-  return false;
-}
+void PacedSender::SendPadding(int padding_needed) {
+  critsect_->Leave();
+  int bytes_sent = callback_->TimeToSendPadding(padding_needed);
+  critsect_->Enter();
 
-paced_sender::Packet PacedSender::GetNextPacketFromList(
-    paced_sender::PacketList* packets) {
-  paced_sender::Packet packet = packets->front();
-  UpdateMediaBytesSent(packet.bytes);
-  return packet;
+  // Update padding bytes sent.
+  media_budget_->UseBudget(bytes_sent);
+  padding_budget_->UseBudget(bytes_sent);
 }
 
-void PacedSender::UpdateMediaBytesSent(int num_bytes) {
-  time_last_send_us_ = clock_->TimeInMicroseconds();
-  media_budget_->UseBudget(num_bytes);
-  padding_budget_->UseBudget(num_bytes);
+void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
+  media_budget_->IncreaseBudget(delta_time_ms);
+  padding_budget_->IncreaseBudget(delta_time_ms);
 }
 
+bool PacedSender::ProbingExperimentIsEnabled() const {
+  return webrtc::field_trial::FindFullName("WebRTC-BitrateProbing") ==
+         "Enabled";
+}
 }  // namespace webrtc