1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_sender.h"
8 #include "base/message_loop/message_loop.h"
9 #include "device/serial/async_waiter.h"
13 // Represents a send that is not yet fulfilled.
14 class DataSender::PendingSend {
16 PendingSend(const base::StringPiece& data,
17 const DataSentCallback& callback,
18 const SendErrorCallback& error_callback,
19 int32_t fatal_error_value);
21 // Invoked to report that |num_bytes| of data have been sent. Subtracts the
22 // number of bytes that were part of this send from |num_bytes|. Returns
23 // whether this send has been completed. If this send has been completed, this
25 bool ReportBytesSent(uint32_t* num_bytes);
27 // Invoked to report that |num_bytes| of data have been sent and then an
28 // error, |error| was encountered. Subtracts the number of bytes that were
29 // part of this send from |num_bytes|. If this send was not completed before
30 // the error, this calls |error_callback_| to report the error. Otherwise,
31 // this calls |callback_|. Returns the number of bytes sent but not acked.
32 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
34 // Reports |fatal_error_value_| to |receive_error_callback_|.
35 void DispatchFatalError();
37 // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
38 // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
39 // or the error if one is encountered writing to |handle|.
40 MojoResult SendData(mojo::DataPipeProducerHandle handle);
43 // Invoked to update |bytes_acked_| and |num_bytes|.
44 void ReportBytesSentInternal(uint32_t* num_bytes);
47 const base::StringPiece data_;
49 // The callback to report success.
50 const DataSentCallback callback_;
52 // The callback to report errors.
53 const SendErrorCallback error_callback_;
55 // The error value to report when DispatchFatalError() is called.
56 const int32_t fatal_error_value_;
58 // The number of bytes sent to the data pipe.
61 // The number of bytes acked.
62 uint32_t bytes_acked_;
65 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
67 int32_t fatal_error_value)
69 fatal_error_value_(fatal_error_value),
71 sink_.set_error_handler(this);
72 MojoCreateDataPipeOptions options = {
73 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
75 options.struct_size = sizeof(options);
76 mojo::ScopedDataPipeConsumerHandle remote_handle;
77 MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
78 DCHECK_EQ(MOJO_RESULT_OK, result);
79 sink_->Init(remote_handle.Pass());
80 sink_.set_client(this);
83 DataSender::~DataSender() {
87 bool DataSender::Send(const base::StringPiece& data,
88 const DataSentCallback& callback,
89 const SendErrorCallback& error_callback) {
90 DCHECK(!callback.is_null() && !error_callback.is_null());
91 if (!pending_cancel_.is_null() || shut_down_)
94 pending_sends_.push(linked_ptr<PendingSend>(
95 new PendingSend(data, callback, error_callback, fatal_error_value_)));
100 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
101 DCHECK(!callback.is_null());
102 if (!pending_cancel_.is_null() || shut_down_)
104 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
105 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
109 pending_cancel_ = callback;
110 sink_->Cancel(error);
114 void DataSender::ReportBytesSent(uint32_t bytes_sent) {
118 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
119 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
120 sends_awaiting_ack_.pop();
122 if (bytes_sent > 0 && !pending_sends_.empty()) {
123 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
130 if (bytes_sent != 0) {
134 if (pending_sends_.empty() && sends_awaiting_ack_.empty())
138 void DataSender::ReportBytesSentAndError(
141 const mojo::Callback<void(uint32_t)>& callback) {
145 uint32_t bytes_to_flush = 0;
146 while (!sends_awaiting_ack_.empty()) {
147 bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
149 sends_awaiting_ack_.pop();
151 while (!pending_sends_.empty()) {
153 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
154 pending_sends_.pop();
156 callback.Run(bytes_to_flush);
160 void DataSender::OnConnectionError() {
164 void DataSender::SendInternal() {
165 while (!pending_sends_.empty()) {
166 MojoResult result = pending_sends_.front()->SendData(handle_.get());
167 if (result == MOJO_RESULT_OK) {
168 sends_awaiting_ack_.push(pending_sends_.front());
169 pending_sends_.pop();
170 } else if (result == MOJO_RESULT_SHOULD_WAIT) {
171 waiter_.reset(new AsyncWaiter(
173 MOJO_HANDLE_SIGNAL_WRITABLE,
174 base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
183 void DataSender::OnDoneWaiting(MojoResult result) {
185 if (result != MOJO_RESULT_OK) {
192 void DataSender::RunCancelCallback() {
193 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
194 if (pending_cancel_.is_null())
197 base::MessageLoop::current()->PostTask(FROM_HERE,
198 base::Bind(pending_cancel_));
199 pending_cancel_.Reset();
202 void DataSender::ShutDown() {
205 while (!pending_sends_.empty()) {
206 pending_sends_.front()->DispatchFatalError();
207 pending_sends_.pop();
209 while (!sends_awaiting_ack_.empty()) {
210 sends_awaiting_ack_.front()->DispatchFatalError();
211 sends_awaiting_ack_.pop();
216 DataSender::PendingSend::PendingSend(const base::StringPiece& data,
217 const DataSentCallback& callback,
218 const SendErrorCallback& error_callback,
219 int32_t fatal_error_value)
222 error_callback_(error_callback),
223 fatal_error_value_(fatal_error_value),
228 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
229 ReportBytesSentInternal(num_bytes);
230 if (bytes_acked_ < data_.size())
233 base::MessageLoop::current()->PostTask(FROM_HERE,
234 base::Bind(callback_, bytes_acked_));
238 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
240 ReportBytesSentInternal(num_bytes);
241 if (*num_bytes > 0) {
242 base::MessageLoop::current()->PostTask(FROM_HERE,
243 base::Bind(callback_, bytes_acked_));
246 base::MessageLoop::current()->PostTask(
247 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
248 return bytes_sent_ - bytes_acked_;
251 void DataSender::PendingSend::DispatchFatalError() {
252 base::MessageLoop::current()->PostTask(
253 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
256 MojoResult DataSender::PendingSend::SendData(
257 mojo::DataPipeProducerHandle handle) {
258 uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
259 MojoResult result = mojo::WriteDataRaw(handle,
260 data_.data() + bytes_sent_,
262 MOJO_WRITE_DATA_FLAG_NONE);
263 if (result != MOJO_RESULT_OK)
266 bytes_sent_ += bytes_to_send;
267 return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
270 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
271 bytes_acked_ += *num_bytes;
272 if (bytes_acked_ > bytes_sent_) {
273 *num_bytes = bytes_acked_ - bytes_sent_;
274 bytes_acked_ = bytes_sent_;
280 } // namespace device