#include <assert.h>
#include <map>
+#include <queue>
#include <set>
#include "webrtc/modules/interface/module_common_types.h"
+#include "webrtc/modules/pacing/bitrate_prober.h"
#include "webrtc/system_wrappers/interface/clock.h"
#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
+#include "webrtc/system_wrappers/interface/field_trial.h"
+#include "webrtc/system_wrappers/interface/logging.h"
#include "webrtc/system_wrappers/interface/trace_event.h"
namespace {
// time.
const int kMaxIntervalTimeMs = 30;
-// Max time that the first packet in the queue can sit in the queue if no
-// packets are sent, regardless of buffer state. In practice only in effect at
-// low bitrates (less than 320 kbits/s).
-const int kMaxQueueTimeWithoutSendingUs = 30000;
-
} // namespace
namespace webrtc {
namespace paced_sender {
struct Packet {
- Packet(uint32_t ssrc,
+ Packet(PacedSender::Priority priority,
+ uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
int length_in_bytes,
- bool retransmission)
- : ssrc(ssrc),
+ bool retransmission,
+ uint64_t enqueue_order)
+ : priority(priority),
+ ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
bytes(length_in_bytes),
- retransmission(retransmission) {}
+ retransmission(retransmission),
+ enqueue_order(enqueue_order) {}
+
+ PacedSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
int64_t enqueue_time_ms;
int bytes;
bool retransmission;
+ uint64_t enqueue_order;
+ std::list<Packet>::iterator this_it;
};
-// STL list style class which prevents duplicates in the list.
-class PacketList {
+// Used by priority queue to sort packets.
+struct Comparator {
+ bool operator()(const Packet* first, const Packet* second) {
+ // Highest prio = 0.
+ if (first->priority != second->priority)
+ return first->priority > second->priority;
+
+ // Retransmissions go first.
+ if (second->retransmission && !first->retransmission)
+ return true;
+
+ // Older frames have higher prio.
+ if (first->capture_time_ms != second->capture_time_ms)
+ return first->capture_time_ms > second->capture_time_ms;
+
+ return first->enqueue_order > second->enqueue_order;
+ }
+};
+
+// Class encapsulating a priority queue with some extensions.
+class PacketQueue {
public:
- PacketList() {};
+ PacketQueue() : bytes_(0) {}
+ virtual ~PacketQueue() {}
+
+ void Push(const Packet& packet) {
+ if (!AddToDupeSet(packet))
+ return;
+
+ // Store packet in list, use pointers in priority queue for cheaper moves.
+ // Packets have a handle to its own iterator in the list, for easy removal
+ // when popping from queue.
+ packet_list_.push_front(packet);
+ std::list<Packet>::iterator it = packet_list_.begin();
+ it->this_it = it; // Handle for direct removal from list.
+ prio_queue_.push(&(*it)); // Pointer into list.
+ bytes_ += packet.bytes;
+ }
- bool empty() const {
- return packet_list_.empty();
+ const Packet& BeginPop() {
+ const Packet& packet = *prio_queue_.top();
+ prio_queue_.pop();
+ return packet;
}
- Packet front() const {
- return packet_list_.front();
+ void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
+
+ void FinalizePop(const Packet& packet) {
+ RemoveFromDupeSet(packet);
+ bytes_ -= packet.bytes;
+ packet_list_.erase(packet.this_it);
}
- void pop_front() {
- Packet& packet = packet_list_.front();
- uint16_t sequence_number = packet.sequence_number;
- uint32_t ssrc = packet.ssrc;
- packet_list_.pop_front();
- sequence_number_set_[ssrc].erase(sequence_number);
+ bool Empty() const { return prio_queue_.empty(); }
+
+ size_t SizeInPackets() const { return prio_queue_.size(); }
+
+ uint32_t SizeInBytes() const { return bytes_; }
+
+ int64_t OldestEnqueueTime() const {
+ std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
+ if (it == packet_list_.rend())
+ return 0;
+ return it->enqueue_time_ms;
}
- void push_back(const Packet& packet) {
- if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
- sequence_number_set_[packet.ssrc].end()) {
- // Don't insert duplicates.
- packet_list_.push_back(packet);
- sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
+ private:
+ // Try to add a packet to the set of ssrc/seqno identifiers currently in the
+ // queue. Return true if inserted, false if this is a duplicate.
+ bool AddToDupeSet(const Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ if (it == dupe_map_.end()) {
+ // First for this ssrc, just insert.
+ dupe_map_[packet.ssrc].insert(packet.sequence_number);
+ return true;
}
+
+ // Insert returns a pair, where second is a bool set to true if new element.
+ return it->second.insert(packet.sequence_number).second;
}
- private:
+ void RemoveFromDupeSet(const Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ assert(it != dupe_map_.end());
+ it->second.erase(packet.sequence_number);
+ if (it->second.empty()) {
+ dupe_map_.erase(it);
+ }
+ }
+
+ // List of packets, in the order the were enqueued. Since dequeueing may
+ // occur out of order, use list instead of vector.
std::list<Packet> packet_list_;
- std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
+ // Priority queue of the packets, sorted according to Comparator.
+ // Use pointers into list, to avoid moving whole struct within heap.
+ std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
+ // Total number of bytes in the queue.
+ uint64_t bytes_;
+ // Map<ssrc, set<seq_no> >, for checking duplicates.
+ typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
+ SsrcSeqNoMap dupe_map_;
};
class IntervalBudget {
int bytes_remaining() const { return bytes_remaining_; }
+ int target_rate_kbps() const { return target_rate_kbps_; }
+
private:
int target_rate_kbps_;
int bytes_remaining_;
PacedSender::PacedSender(Clock* clock,
Callback* callback,
+ int bitrate_kbps,
int max_bitrate_kbps,
int min_bitrate_kbps)
: clock_(clock),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
enabled_(true),
paused_(false),
- max_queue_length_ms_(kDefaultMaxQueueLengthMs),
media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
+ prober_(new BitrateProber()),
+ bitrate_bps_(1000 * bitrate_kbps),
time_last_update_us_(clock->TimeInMicroseconds()),
- capture_time_ms_last_queued_(0),
- capture_time_ms_last_sent_(0),
- high_priority_packets_(new paced_sender::PacketList),
- normal_priority_packets_(new paced_sender::PacketList),
- low_priority_packets_(new paced_sender::PacketList) {
+ packets_(new paced_sender::PacketQueue()),
+ packet_counter_(0) {
UpdateBytesPerInterval(kMinPacketLimitMs);
}
return enabled_;
}
-void PacedSender::UpdateBitrate(int max_bitrate_kbps,
+void PacedSender::UpdateBitrate(int bitrate_kbps,
+ int max_bitrate_kbps,
int min_bitrate_kbps) {
CriticalSectionScoped cs(critsect_.get());
media_budget_->set_target_rate_kbps(max_bitrate_kbps);
padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
+ bitrate_bps_ = 1000 * bitrate_kbps;
}
bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
if (!enabled_) {
return true; // We can send now.
}
+ // Enable probing if the probing experiment is enabled.
+ if (!prober_->IsProbing() && ProbingExperimentIsEnabled()) {
+ prober_->SetEnabled(true);
+ }
+ prober_->MaybeInitializeProbe(bitrate_bps_);
+
if (capture_time_ms < 0) {
capture_time_ms = clock_->TimeInMilliseconds();
}
- if (priority != kHighPriority &&
- capture_time_ms > capture_time_ms_last_queued_) {
- capture_time_ms_last_queued_ = capture_time_ms;
- TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
- "capture_time_ms", capture_time_ms);
- }
- paced_sender::PacketList* packet_list = NULL;
- switch (priority) {
- case kHighPriority:
- packet_list = high_priority_packets_.get();
- break;
- case kNormalPriority:
- packet_list = normal_priority_packets_.get();
- break;
- case kLowPriority:
- packet_list = low_priority_packets_.get();
- break;
- }
- packet_list->push_back(paced_sender::Packet(ssrc,
- sequence_number,
- capture_time_ms,
- clock_->TimeInMilliseconds(),
- bytes,
- retransmission));
+
+ packets_->Push(paced_sender::Packet(
+ priority, ssrc, sequence_number, capture_time_ms,
+ clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++));
return false;
}
-void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
+int PacedSender::ExpectedQueueTimeMs() const {
CriticalSectionScoped cs(critsect_.get());
- max_queue_length_ms_ = max_queue_length_ms;
+ int target_rate = media_budget_->target_rate_kbps();
+ assert(target_rate > 0);
+ return packets_->SizeInBytes() * 8 / target_rate;
+}
+
+size_t PacedSender::QueueSizePackets() const {
+ CriticalSectionScoped cs(critsect_.get());
+ return packets_->SizeInPackets();
}
int PacedSender::QueueInMs() const {
CriticalSectionScoped cs(critsect_.get());
- int64_t now_ms = clock_->TimeInMilliseconds();
- int64_t oldest_packet_enqueue_time = now_ms;
- if (!high_priority_packets_->empty()) {
- oldest_packet_enqueue_time =
- std::min(oldest_packet_enqueue_time,
- high_priority_packets_->front().enqueue_time_ms);
- }
- if (!normal_priority_packets_->empty()) {
- oldest_packet_enqueue_time =
- std::min(oldest_packet_enqueue_time,
- normal_priority_packets_->front().enqueue_time_ms);
- }
- if (!low_priority_packets_->empty()) {
- oldest_packet_enqueue_time =
- std::min(oldest_packet_enqueue_time,
- low_priority_packets_->front().enqueue_time_ms);
- }
- return now_ms - oldest_packet_enqueue_time;
+
+ int64_t oldest_packet = packets_->OldestEnqueueTime();
+ if (oldest_packet == 0)
+ return 0;
+
+ return clock_->TimeInMilliseconds() - oldest_packet;
}
int32_t PacedSender::TimeUntilNextProcess() {
CriticalSectionScoped cs(critsect_.get());
- int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() -
- time_last_update_us_ + 500) / 1000;
- if (elapsed_time_ms <= 0) {
- return kMinPacketLimitMs;
+ int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
+ int elapsed_time_ms = static_cast<int>((elapsed_time_us + 500) / 1000);
+ if (prober_->IsProbing()) {
+ int next_probe = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
+ return next_probe;
}
if (elapsed_time_ms >= kMinPacketLimitMs) {
return 0;
uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
}
- paced_sender::PacketList* packet_list;
- while (ShouldSendNextPacket(&packet_list)) {
- if (!SendPacketFromList(packet_list))
+
+ while (!packets_->Empty()) {
+ if (media_budget_->bytes_remaining() <= 0 && !prober_->IsProbing())
+ return 0;
+
+ // Since we need to release the lock in order to send, we first pop the
+ // element from the priority queue but keep it in storage, so that we can
+ // reinsert it if send fails.
+ const paced_sender::Packet& packet = packets_->BeginPop();
+ if (SendPacket(packet)) {
+ // Send succeeded, remove it from the queue.
+ packets_->FinalizePop(packet);
+ if (prober_->IsProbing())
+ return 0;
+ } else {
+ // Send failed, put it back into the queue.
+ packets_->CancelPop(packet);
return 0;
+ }
}
- if (high_priority_packets_->empty() &&
- normal_priority_packets_->empty() &&
- low_priority_packets_->empty() &&
- padding_budget_->bytes_remaining() > 0) {
- int padding_needed = padding_budget_->bytes_remaining();
- critsect_->Leave();
- int bytes_sent = callback_->TimeToSendPadding(padding_needed);
- critsect_->Enter();
- media_budget_->UseBudget(bytes_sent);
- padding_budget_->UseBudget(bytes_sent);
+
+ int padding_needed = padding_budget_->bytes_remaining();
+ if (padding_needed > 0) {
+ SendPadding(padding_needed);
}
}
return 0;
}
-bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
- EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
- paced_sender::Packet packet = GetNextPacketFromList(packet_list);
+bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
critsect_->Leave();
-
const bool success = callback_->TimeToSendPacket(packet.ssrc,
packet.sequence_number,
packet.capture_time_ms,
packet.retransmission);
critsect_->Enter();
- // If packet cannot be sent then keep it in packet list and exit early.
- // There's no need to send more packets.
- if (!success) {
- return false;
- }
- packet_list->pop_front();
- const bool last_packet =
- packet_list->empty() ||
- packet_list->front().capture_time_ms > packet.capture_time_ms;
- if (packet_list != high_priority_packets_.get()) {
- if (packet.capture_time_ms > capture_time_ms_last_sent_) {
- capture_time_ms_last_sent_ = packet.capture_time_ms;
- } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
- last_packet) {
- TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
- }
+
+ if (success) {
+ // Update media bytes sent.
+ prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
+ media_budget_->UseBudget(packet.bytes);
+ padding_budget_->UseBudget(packet.bytes);
}
- return true;
-}
-void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
- media_budget_->IncreaseBudget(delta_time_ms);
- padding_budget_->IncreaseBudget(delta_time_ms);
+ return success;
}
-bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
- *packet_list = NULL;
- if (media_budget_->bytes_remaining() <= 0) {
- // All bytes consumed for this interval.
- // Check if we have not sent in a too long time.
- if (clock_->TimeInMicroseconds() - time_last_send_us_ >
- kMaxQueueTimeWithoutSendingUs) {
- if (!high_priority_packets_->empty()) {
- *packet_list = high_priority_packets_.get();
- return true;
- }
- if (!normal_priority_packets_->empty()) {
- *packet_list = normal_priority_packets_.get();
- return true;
- }
- }
- // Send any old packets to avoid queuing for too long.
- if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
- int64_t high_priority_capture_time = -1;
- if (!high_priority_packets_->empty()) {
- high_priority_capture_time =
- high_priority_packets_->front().capture_time_ms;
- *packet_list = high_priority_packets_.get();
- }
- if (!normal_priority_packets_->empty() &&
- (high_priority_capture_time == -1 ||
- high_priority_capture_time >
- normal_priority_packets_->front().capture_time_ms)) {
- *packet_list = normal_priority_packets_.get();
- }
- if (*packet_list)
- return true;
- }
- return false;
- }
- if (!high_priority_packets_->empty()) {
- *packet_list = high_priority_packets_.get();
- return true;
- }
- if (!normal_priority_packets_->empty()) {
- *packet_list = normal_priority_packets_.get();
- return true;
- }
- if (!low_priority_packets_->empty()) {
- *packet_list = low_priority_packets_.get();
- return true;
- }
- return false;
-}
+void PacedSender::SendPadding(int padding_needed) {
+ critsect_->Leave();
+ int bytes_sent = callback_->TimeToSendPadding(padding_needed);
+ critsect_->Enter();
-paced_sender::Packet PacedSender::GetNextPacketFromList(
- paced_sender::PacketList* packets) {
- paced_sender::Packet packet = packets->front();
- UpdateMediaBytesSent(packet.bytes);
- return packet;
+ // Update padding bytes sent.
+ media_budget_->UseBudget(bytes_sent);
+ padding_budget_->UseBudget(bytes_sent);
}
-void PacedSender::UpdateMediaBytesSent(int num_bytes) {
- time_last_send_us_ = clock_->TimeInMicroseconds();
- media_budget_->UseBudget(num_bytes);
- padding_budget_->UseBudget(num_bytes);
+void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
+ media_budget_->IncreaseBudget(delta_time_ms);
+ padding_budget_->IncreaseBudget(delta_time_ms);
}
+bool PacedSender::ProbingExperimentIsEnabled() const {
+ return webrtc::field_trial::FindFullName("WebRTC-BitrateProbing") ==
+ "Enabled";
+}
} // namespace webrtc