Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / media / cast / transport / pacing / paced_sender.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/cast/transport/pacing/paced_sender.h"
6
7 #include "base/big_endian.h"
8 #include "base/bind.h"
9 #include "base/message_loop/message_loop.h"
10
11 namespace media {
12 namespace cast {
13 namespace transport {
14
15 namespace {
16
17 static const int64 kPacingIntervalMs = 10;
18 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
19 // bursts of packets.
20 static const size_t kPacingMaxBurstsPerFrame = 3;
21 static const size_t kTargetBurstSize = 10;
22 static const size_t kMaxBurstSize = 20;
23
24 using media::cast::CastLoggingEvent;
25
26 CastLoggingEvent GetLoggingEvent(bool is_audio, bool retransmit) {
27   if (retransmit) {
28     return is_audio ? media::cast::kAudioPacketRetransmitted
29                     : media::cast::kVideoPacketRetransmitted;
30   } else {
31     return is_audio ? media::cast::kAudioPacketSentToNetwork
32                     : media::cast::kVideoPacketSentToNetwork;
33   }
34 }
35
36 }  // namespace
37
38
39 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
40                                            uint32 ssrc,
41                                            uint16 packet_id) {
42   return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
43 }
44
45 PacedSender::PacedSender(
46     base::TickClock* clock,
47     LoggingImpl* logging,
48     PacketSender* transport,
49     const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
50     : clock_(clock),
51       logging_(logging),
52       transport_(transport),
53       transport_task_runner_(transport_task_runner),
54       audio_ssrc_(0),
55       video_ssrc_(0),
56       max_burst_size_(kTargetBurstSize),
57       next_max_burst_size_(kTargetBurstSize),
58       next_next_max_burst_size_(kTargetBurstSize),
59       current_burst_size_(0),
60       state_(State_Unblocked),
61       weak_factory_(this) {
62 }
63
64 PacedSender::~PacedSender() {}
65
66 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
67   audio_ssrc_ = audio_ssrc;
68 }
69
70 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
71   video_ssrc_ = video_ssrc;
72 }
73
74 bool PacedSender::SendPackets(const SendPacketVector& packets) {
75   if (packets.empty()) {
76     return true;
77   }
78   for (size_t i = 0; i < packets.size(); i++) {
79     packet_list_[packets[i].first] =
80         make_pair(PacketType_Normal, packets[i].second);
81   }
82   if (state_ == State_Unblocked) {
83     SendStoredPackets();
84   }
85   return true;
86 }
87
88 bool PacedSender::ResendPackets(const SendPacketVector& packets) {
89   if (packets.empty()) {
90     return true;
91   }
92   for (size_t i = 0; i < packets.size(); i++) {
93     packet_list_[packets[i].first] =
94         make_pair(PacketType_Resend, packets[i].second);
95   }
96   if (state_ == State_Unblocked) {
97     SendStoredPackets();
98   }
99   return true;
100 }
101
102 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
103   if (state_ == State_TransportBlocked) {
104     packet_list_[PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
105         make_pair(PacketType_RTCP, packet);
106   } else {
107     // We pass the RTCP packets straight through.
108     if (!transport_->SendPacket(
109             packet,
110             base::Bind(&PacedSender::SendStoredPackets,
111                        weak_factory_.GetWeakPtr()))) {
112       state_ = State_TransportBlocked;
113     }
114
115   }
116   return true;
117 }
118
119 PacketRef PacedSender::GetNextPacket(PacketType* packet_type) {
120   std::map<PacketKey, std::pair<PacketType, PacketRef> >::iterator i;
121   i = packet_list_.begin();
122   DCHECK(i != packet_list_.end());
123   *packet_type = i->second.first;
124   PacketRef ret = i->second.second;
125   packet_list_.erase(i);
126   return ret;
127 }
128
129 bool PacedSender::empty() const {
130   return packet_list_.empty();
131 }
132
133 size_t PacedSender::size() const {
134   return packet_list_.size();
135 }
136
137 // This function can be called from three places:
138 // 1. User called one of the Send* functions and we were in an unblocked state.
139 // 2. state_ == State_TransportBlocked and the transport is calling us to
140 //    let us know that it's ok to send again.
141 // 3. state_ == State_BurstFull and there are still packets to send. In this
142 //    case we called PostDelayedTask on this function to start a new burst.
143 void PacedSender::SendStoredPackets() {
144   State previous_state = state_;
145   state_ = State_Unblocked;
146   if (empty()) {
147     return;
148   }
149
150   base::TimeTicks now = clock_->NowTicks();
151   // I don't actually trust that PostDelayTask(x - now) will mean that
152   // now >= x when the call happens, so check if the previous state was
153   // State_BurstFull too.
154   if (now >= burst_end_ || previous_state == State_BurstFull) {
155     // Start a new burst.
156     current_burst_size_ = 0;
157     burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
158
159     // The goal here is to try to send out the queued packets over the next
160     // three bursts, while trying to keep the burst size below 10 if possible.
161     // We have some evidence that sending more than 12 packets in a row doesn't
162     // work very well, but we don't actually know why yet. Sending out packets
163     // sooner is better than sending out packets later as that gives us more
164     // time to re-send them if needed. So if we have less than 30 packets, just
165     // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
166     // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
167     // which is more bandwidth than the cast library should need, and sending
168     // out more data per second is unlikely to be helpful.
169     size_t max_burst_size = std::min(
170         kMaxBurstSize,
171         std::max(kTargetBurstSize, size() / kPacingMaxBurstsPerFrame));
172
173     // If the queue is long, issue a warning. Try to limit the number of
174     // warnings issued by only issuing the warning when the burst size
175     // grows. Otherwise we might get 100 warnings per second.
176     if (max_burst_size > next_next_max_burst_size_ && size() > 100) {
177       LOG(WARNING) << "Packet queue is very long:" << size();
178     }
179
180     max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
181     next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
182     next_next_max_burst_size_ = max_burst_size;
183   }
184
185   base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
186                                 weak_factory_.GetWeakPtr());
187   while (!empty()) {
188     if (current_burst_size_ >= max_burst_size_) {
189       transport_task_runner_->PostDelayedTask(FROM_HERE,
190                                               cb,
191                                               burst_end_ - now);
192       state_ = State_BurstFull;
193       return;
194     }
195     PacketType packet_type;
196     PacketRef packet = GetNextPacket(&packet_type);
197
198     switch (packet_type) {
199       case PacketType_Resend:
200         LogPacketEvent(packet->data, true);
201         break;
202       case PacketType_Normal:
203         LogPacketEvent(packet->data, false);
204         break;
205       case PacketType_RTCP:
206         break;
207     }
208     if (!transport_->SendPacket(packet, cb)) {
209       state_ = State_TransportBlocked;
210       return;
211     }
212     current_burst_size_++;
213   }
214   state_ = State_Unblocked;
215 }
216
217 void PacedSender::LogPacketEvent(const Packet& packet, bool retransmit) {
218   // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
219   // if the packet is audio or video.
220   DCHECK_GE(packet.size(), 12u);
221   base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
222   uint32 ssrc;
223   bool success = reader.ReadU32(&ssrc);
224   DCHECK(success);
225   bool is_audio;
226   if (ssrc == audio_ssrc_) {
227     is_audio = true;
228   } else if (ssrc == video_ssrc_) {
229     is_audio = false;
230   } else {
231     DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
232     return;
233   }
234
235   CastLoggingEvent event = GetLoggingEvent(is_audio, retransmit);
236
237   logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, packet);
238 }
239
240 }  // namespace transport
241 }  // namespace cast
242 }  // namespace media