Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / device / serial / data_sender.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_sender.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11
12 namespace device {
13
14 // Represents a send that is not yet fulfilled.
15 class DataSender::PendingSend {
16  public:
17   PendingSend(const base::StringPiece& data,
18               const DataSentCallback& callback,
19               const SendErrorCallback& error_callback,
20               int32_t fatal_error_value);
21
22   // Invoked to report that |num_bytes| of data have been sent. Subtracts the
23   // number of bytes that were part of this send from |num_bytes|. Returns
24   // whether this send has been completed. If this send has been completed, this
25   // calls |callback_|.
26   bool ReportBytesSent(uint32_t* num_bytes);
27
28   // Invoked to report that |num_bytes| of data have been sent and then an
29   // error, |error| was encountered. Subtracts the number of bytes that were
30   // part of this send from |num_bytes|. If this send was not completed before
31   // the error, this calls |error_callback_| to report the error. Otherwise,
32   // this calls |callback_|. Returns the number of bytes sent but not acked.
33   uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
34
35   // Reports |fatal_error_value_| to |receive_error_callback_|.
36   void DispatchFatalError();
37
38   // Attempts to send any data not yet sent to |sink|.
39   bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
40
41  private:
42   // Invoked to update |bytes_acked_| and |num_bytes|.
43   void ReportBytesSentInternal(uint32_t* num_bytes);
44
45   // The data to send.
46   const base::StringPiece data_;
47
48   // The callback to report success.
49   const DataSentCallback callback_;
50
51   // The callback to report errors.
52   const SendErrorCallback error_callback_;
53
54   // The error value to report when DispatchFatalError() is called.
55   const int32_t fatal_error_value_;
56
57   // The number of bytes sent to the DataSink.
58   uint32_t bytes_sent_;
59
60   // The number of bytes acked.
61   uint32_t bytes_acked_;
62 };
63
64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
65                        uint32_t buffer_size,
66                        int32_t fatal_error_value)
67     : sink_(sink.Pass()),
68       fatal_error_value_(fatal_error_value),
69       available_buffer_capacity_(buffer_size),
70       shut_down_(false) {
71   sink_.set_error_handler(this);
72   sink_.set_client(this);
73   sink_->Init(buffer_size);
74 }
75
76 DataSender::~DataSender() {
77   ShutDown();
78 }
79
80 bool DataSender::Send(const base::StringPiece& data,
81                       const DataSentCallback& callback,
82                       const SendErrorCallback& error_callback) {
83   DCHECK(!callback.is_null() && !error_callback.is_null());
84   if (!pending_cancel_.is_null() || shut_down_)
85     return false;
86
87   pending_sends_.push(linked_ptr<PendingSend>(
88       new PendingSend(data, callback, error_callback, fatal_error_value_)));
89   SendInternal();
90   return true;
91 }
92
93 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
94   DCHECK(!callback.is_null());
95   if (!pending_cancel_.is_null() || shut_down_)
96     return false;
97   if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
98     base::MessageLoop::current()->PostTask(FROM_HERE, callback);
99     return true;
100   }
101
102   pending_cancel_ = callback;
103   sink_->Cancel(error);
104   return true;
105 }
106
107 void DataSender::ReportBytesSent(uint32_t bytes_sent) {
108   if (shut_down_)
109     return;
110
111   available_buffer_capacity_ += bytes_sent;
112   while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
113          sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
114     sends_awaiting_ack_.pop();
115   }
116   if (bytes_sent > 0 && !pending_sends_.empty()) {
117     bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
118     DCHECK(!finished);
119     if (finished) {
120       ShutDown();
121       return;
122     }
123   }
124   if (bytes_sent != 0) {
125     ShutDown();
126     return;
127   }
128   if (pending_sends_.empty() && sends_awaiting_ack_.empty())
129     RunCancelCallback();
130   SendInternal();
131 }
132
133 void DataSender::ReportBytesSentAndError(
134     uint32_t bytes_sent,
135     int32_t error,
136     const mojo::Callback<void()>& callback) {
137   if (shut_down_)
138     return;
139
140   available_buffer_capacity_ += bytes_sent;
141   while (!sends_awaiting_ack_.empty()) {
142     available_buffer_capacity_ +=
143         sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
144                                                              error);
145     sends_awaiting_ack_.pop();
146   }
147   while (!pending_sends_.empty()) {
148     available_buffer_capacity_ +=
149         pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
150     pending_sends_.pop();
151   }
152   callback.Run();
153   RunCancelCallback();
154 }
155
156 void DataSender::OnConnectionError() {
157   ShutDown();
158 }
159
160 void DataSender::SendInternal() {
161   while (!pending_sends_.empty() && available_buffer_capacity_) {
162     if (pending_sends_.front()->SendData(sink_.get(),
163                                          &available_buffer_capacity_)) {
164       sends_awaiting_ack_.push(pending_sends_.front());
165       pending_sends_.pop();
166     }
167   }
168 }
169
170 void DataSender::RunCancelCallback() {
171   DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
172   if (pending_cancel_.is_null())
173     return;
174
175   base::MessageLoop::current()->PostTask(FROM_HERE,
176                                          base::Bind(pending_cancel_));
177   pending_cancel_.Reset();
178 }
179
180 void DataSender::ShutDown() {
181   shut_down_ = true;
182   while (!pending_sends_.empty()) {
183     pending_sends_.front()->DispatchFatalError();
184     pending_sends_.pop();
185   }
186   while (!sends_awaiting_ack_.empty()) {
187     sends_awaiting_ack_.front()->DispatchFatalError();
188     sends_awaiting_ack_.pop();
189   }
190   RunCancelCallback();
191 }
192
193 DataSender::PendingSend::PendingSend(const base::StringPiece& data,
194                                      const DataSentCallback& callback,
195                                      const SendErrorCallback& error_callback,
196                                      int32_t fatal_error_value)
197     : data_(data),
198       callback_(callback),
199       error_callback_(error_callback),
200       fatal_error_value_(fatal_error_value),
201       bytes_sent_(0),
202       bytes_acked_(0) {
203 }
204
205 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
206   ReportBytesSentInternal(num_bytes);
207   if (bytes_acked_ < data_.size())
208     return false;
209
210   base::MessageLoop::current()->PostTask(FROM_HERE,
211                                          base::Bind(callback_, bytes_acked_));
212   return true;
213 }
214
215 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
216                                                           int32_t error) {
217   ReportBytesSentInternal(num_bytes);
218   if (*num_bytes > 0) {
219     base::MessageLoop::current()->PostTask(FROM_HERE,
220                                            base::Bind(callback_, bytes_acked_));
221     return 0;
222   }
223   base::MessageLoop::current()->PostTask(
224       FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
225   return bytes_sent_ - bytes_acked_;
226 }
227
228 void DataSender::PendingSend::DispatchFatalError() {
229   base::MessageLoop::current()->PostTask(
230       FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
231 }
232
233 bool DataSender::PendingSend::SendData(serial::DataSink* sink,
234                                        uint32_t* available_buffer_size) {
235   uint32_t num_bytes_to_send =
236       std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
237                *available_buffer_size);
238   mojo::Array<uint8_t> bytes(num_bytes_to_send);
239   memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
240   bytes_sent_ += num_bytes_to_send;
241   *available_buffer_size -= num_bytes_to_send;
242   sink->OnData(bytes.Pass());
243   return bytes_sent_ == data_.size();
244 }
245
246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
247   bytes_acked_ += *num_bytes;
248   if (bytes_acked_ > bytes_sent_) {
249     *num_bytes = bytes_acked_ - bytes_sent_;
250     bytes_acked_ = bytes_sent_;
251   } else {
252     *num_bytes = 0;
253   }
254 }
255
256 }  // namespace device