Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / device / serial / data_sender.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_sender.h"
6
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop.h"
9 #include "device/serial/async_waiter.h"
10
11 namespace device {
12
13 // Represents a send that is not yet fulfilled.
14 class DataSender::PendingSend {
15  public:
16   PendingSend(const base::StringPiece& data,
17               const DataSentCallback& callback,
18               const SendErrorCallback& error_callback,
19               int32_t fatal_error_value);
20
21   // Invoked to report that |num_bytes| of data have been sent. Subtracts the
22   // number of bytes that were part of this send from |num_bytes|. Returns
23   // whether this send has been completed. If this send has been completed, this
24   // calls |callback_|.
25   bool ReportBytesSent(uint32_t* num_bytes);
26
27   // Invoked to report that |num_bytes| of data have been sent and then an
28   // error, |error| was encountered. Subtracts the number of bytes that were
29   // part of this send from |num_bytes|. If this send was not completed before
30   // the error, this calls |error_callback_| to report the error. Otherwise,
31   // this calls |callback_|. Returns the number of bytes sent but not acked.
32   uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
33
34   // Reports |fatal_error_value_| to |receive_error_callback_|.
35   void DispatchFatalError();
36
37   // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
38   // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
39   // or the error if one is encountered writing to |handle|.
40   MojoResult SendData(mojo::DataPipeProducerHandle handle);
41
42  private:
43   // Invoked to update |bytes_acked_| and |num_bytes|.
44   void ReportBytesSentInternal(uint32_t* num_bytes);
45
46   // The data to send.
47   const base::StringPiece data_;
48
49   // The callback to report success.
50   const DataSentCallback callback_;
51
52   // The callback to report errors.
53   const SendErrorCallback error_callback_;
54
55   // The error value to report when DispatchFatalError() is called.
56   const int32_t fatal_error_value_;
57
58   // The number of bytes sent to the data pipe.
59   uint32_t bytes_sent_;
60
61   // The number of bytes acked.
62   uint32_t bytes_acked_;
63 };
64
65 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
66                        uint32_t buffer_size,
67                        int32_t fatal_error_value)
68     : sink_(sink.Pass()),
69       fatal_error_value_(fatal_error_value),
70       shut_down_(false) {
71   sink_.set_error_handler(this);
72   MojoCreateDataPipeOptions options = {
73       sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
74   };
75   options.struct_size = sizeof(options);
76   mojo::ScopedDataPipeConsumerHandle remote_handle;
77   MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
78   DCHECK_EQ(MOJO_RESULT_OK, result);
79   sink_->Init(remote_handle.Pass());
80   sink_.set_client(this);
81 }
82
83 DataSender::~DataSender() {
84   ShutDown();
85 }
86
87 bool DataSender::Send(const base::StringPiece& data,
88                       const DataSentCallback& callback,
89                       const SendErrorCallback& error_callback) {
90   DCHECK(!callback.is_null() && !error_callback.is_null());
91   if (!pending_cancel_.is_null() || shut_down_)
92     return false;
93
94   pending_sends_.push(linked_ptr<PendingSend>(
95       new PendingSend(data, callback, error_callback, fatal_error_value_)));
96   SendInternal();
97   return true;
98 }
99
100 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
101   DCHECK(!callback.is_null());
102   if (!pending_cancel_.is_null() || shut_down_)
103     return false;
104   if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
105     base::MessageLoop::current()->PostTask(FROM_HERE, callback);
106     return true;
107   }
108
109   pending_cancel_ = callback;
110   sink_->Cancel(error);
111   return true;
112 }
113
114 void DataSender::ReportBytesSent(uint32_t bytes_sent) {
115   if (shut_down_)
116     return;
117
118   while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
119          sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
120     sends_awaiting_ack_.pop();
121   }
122   if (bytes_sent > 0 && !pending_sends_.empty()) {
123     bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
124     DCHECK(!finished);
125     if (finished) {
126       ShutDown();
127       return;
128     }
129   }
130   if (bytes_sent != 0) {
131     ShutDown();
132     return;
133   }
134   if (pending_sends_.empty() && sends_awaiting_ack_.empty())
135     RunCancelCallback();
136 }
137
138 void DataSender::ReportBytesSentAndError(
139     uint32_t bytes_sent,
140     int32_t error,
141     const mojo::Callback<void(uint32_t)>& callback) {
142   if (shut_down_)
143     return;
144
145   uint32_t bytes_to_flush = 0;
146   while (!sends_awaiting_ack_.empty()) {
147     bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
148         &bytes_sent, error);
149     sends_awaiting_ack_.pop();
150   }
151   while (!pending_sends_.empty()) {
152     bytes_to_flush +=
153         pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
154     pending_sends_.pop();
155   }
156   callback.Run(bytes_to_flush);
157   RunCancelCallback();
158 }
159
160 void DataSender::OnConnectionError() {
161   ShutDown();
162 }
163
164 void DataSender::SendInternal() {
165   while (!pending_sends_.empty()) {
166     MojoResult result = pending_sends_.front()->SendData(handle_.get());
167     if (result == MOJO_RESULT_OK) {
168       sends_awaiting_ack_.push(pending_sends_.front());
169       pending_sends_.pop();
170     } else if (result == MOJO_RESULT_SHOULD_WAIT) {
171       waiter_.reset(new AsyncWaiter(
172           handle_.get(),
173           MOJO_HANDLE_SIGNAL_WRITABLE,
174           base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
175       return;
176     } else {
177       ShutDown();
178       return;
179     }
180   }
181 }
182
183 void DataSender::OnDoneWaiting(MojoResult result) {
184   waiter_.reset();
185   if (result != MOJO_RESULT_OK) {
186     ShutDown();
187     return;
188   }
189   SendInternal();
190 }
191
192 void DataSender::RunCancelCallback() {
193   DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
194   if (pending_cancel_.is_null())
195     return;
196
197   base::MessageLoop::current()->PostTask(FROM_HERE,
198                                          base::Bind(pending_cancel_));
199   pending_cancel_.Reset();
200 }
201
202 void DataSender::ShutDown() {
203   waiter_.reset();
204   shut_down_ = true;
205   while (!pending_sends_.empty()) {
206     pending_sends_.front()->DispatchFatalError();
207     pending_sends_.pop();
208   }
209   while (!sends_awaiting_ack_.empty()) {
210     sends_awaiting_ack_.front()->DispatchFatalError();
211     sends_awaiting_ack_.pop();
212   }
213   RunCancelCallback();
214 }
215
216 DataSender::PendingSend::PendingSend(const base::StringPiece& data,
217                                      const DataSentCallback& callback,
218                                      const SendErrorCallback& error_callback,
219                                      int32_t fatal_error_value)
220     : data_(data),
221       callback_(callback),
222       error_callback_(error_callback),
223       fatal_error_value_(fatal_error_value),
224       bytes_sent_(0),
225       bytes_acked_(0) {
226 }
227
228 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
229   ReportBytesSentInternal(num_bytes);
230   if (bytes_acked_ < data_.size())
231     return false;
232
233   base::MessageLoop::current()->PostTask(FROM_HERE,
234                                          base::Bind(callback_, bytes_acked_));
235   return true;
236 }
237
238 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
239                                                           int32_t error) {
240   ReportBytesSentInternal(num_bytes);
241   if (*num_bytes > 0) {
242     base::MessageLoop::current()->PostTask(FROM_HERE,
243                                            base::Bind(callback_, bytes_acked_));
244     return 0;
245   }
246   base::MessageLoop::current()->PostTask(
247       FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
248   return bytes_sent_ - bytes_acked_;
249 }
250
251 void DataSender::PendingSend::DispatchFatalError() {
252   base::MessageLoop::current()->PostTask(
253       FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
254 }
255
256 MojoResult DataSender::PendingSend::SendData(
257     mojo::DataPipeProducerHandle handle) {
258   uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
259   MojoResult result = mojo::WriteDataRaw(handle,
260                                          data_.data() + bytes_sent_,
261                                          &bytes_to_send,
262                                          MOJO_WRITE_DATA_FLAG_NONE);
263   if (result != MOJO_RESULT_OK)
264     return result;
265
266   bytes_sent_ += bytes_to_send;
267   return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
268 }
269
270 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
271   bytes_acked_ += *num_bytes;
272   if (bytes_acked_ > bytes_sent_) {
273     *num_bytes = bytes_acked_ - bytes_sent_;
274     bytes_acked_ = bytes_sent_;
275   } else {
276     *num_bytes = 0;
277   }
278 }
279
280 }  // namespace device