#include "device/serial/data_sender.h"
+#include <algorithm>
+
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
-#include "device/serial/async_waiter.h"
namespace device {
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
- // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
- // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
- // or the error if one is encountered writing to |handle|.
- MojoResult SendData(mojo::DataPipeProducerHandle handle);
+ // Attempts to send any data not yet sent to |sink|.
+ bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
private:
// Invoked to update |bytes_acked_| and |num_bytes|.
// The error value to report when DispatchFatalError() is called.
const int32_t fatal_error_value_;
- // The number of bytes sent to the data pipe.
+ // The number of bytes sent to the DataSink.
uint32_t bytes_sent_;
// The number of bytes acked.
int32_t fatal_error_value)
: sink_(sink.Pass()),
fatal_error_value_(fatal_error_value),
+ available_buffer_capacity_(buffer_size),
shut_down_(false) {
sink_.set_error_handler(this);
- MojoCreateDataPipeOptions options = {
- sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
- };
- options.struct_size = sizeof(options);
- mojo::ScopedDataPipeConsumerHandle remote_handle;
- MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
- DCHECK_EQ(MOJO_RESULT_OK, result);
- sink_->Init(remote_handle.Pass());
sink_.set_client(this);
+ sink_->Init(buffer_size);
}
DataSender::~DataSender() {
if (shut_down_)
return;
+ available_buffer_capacity_ += bytes_sent;
while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
sends_awaiting_ack_.pop();
}
if (pending_sends_.empty() && sends_awaiting_ack_.empty())
RunCancelCallback();
+ SendInternal();
}
void DataSender::ReportBytesSentAndError(
uint32_t bytes_sent,
int32_t error,
- const mojo::Callback<void(uint32_t)>& callback) {
+ const mojo::Callback<void()>& callback) {
if (shut_down_)
return;
- uint32_t bytes_to_flush = 0;
+ available_buffer_capacity_ += bytes_sent;
while (!sends_awaiting_ack_.empty()) {
- bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
- &bytes_sent, error);
+ available_buffer_capacity_ +=
+ sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
+ error);
sends_awaiting_ack_.pop();
}
while (!pending_sends_.empty()) {
- bytes_to_flush +=
+ available_buffer_capacity_ +=
pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
pending_sends_.pop();
}
- callback.Run(bytes_to_flush);
+ callback.Run();
RunCancelCallback();
}
}
void DataSender::SendInternal() {
- while (!pending_sends_.empty()) {
- MojoResult result = pending_sends_.front()->SendData(handle_.get());
- if (result == MOJO_RESULT_OK) {
+ while (!pending_sends_.empty() && available_buffer_capacity_) {
+ if (pending_sends_.front()->SendData(sink_.get(),
+ &available_buffer_capacity_)) {
sends_awaiting_ack_.push(pending_sends_.front());
pending_sends_.pop();
- } else if (result == MOJO_RESULT_SHOULD_WAIT) {
- waiter_.reset(new AsyncWaiter(
- handle_.get(),
- MOJO_HANDLE_SIGNAL_WRITABLE,
- base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
- return;
- } else {
- ShutDown();
- return;
}
}
}
-void DataSender::OnDoneWaiting(MojoResult result) {
- waiter_.reset();
- if (result != MOJO_RESULT_OK) {
- ShutDown();
- return;
- }
- SendInternal();
-}
-
void DataSender::RunCancelCallback() {
DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
if (pending_cancel_.is_null())
}
void DataSender::ShutDown() {
- waiter_.reset();
shut_down_ = true;
while (!pending_sends_.empty()) {
pending_sends_.front()->DispatchFatalError();
FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
}
-MojoResult DataSender::PendingSend::SendData(
- mojo::DataPipeProducerHandle handle) {
- uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
- MojoResult result = mojo::WriteDataRaw(handle,
- data_.data() + bytes_sent_,
- &bytes_to_send,
- MOJO_WRITE_DATA_FLAG_NONE);
- if (result != MOJO_RESULT_OK)
- return result;
-
- bytes_sent_ += bytes_to_send;
- return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
+bool DataSender::PendingSend::SendData(serial::DataSink* sink,
+ uint32_t* available_buffer_size) {
+ uint32_t num_bytes_to_send =
+ std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
+ *available_buffer_size);
+ mojo::Array<uint8_t> bytes(num_bytes_to_send);
+ memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
+ bytes_sent_ += num_bytes_to_send;
+ *available_buffer_size -= num_bytes_to_send;
+ sink->OnData(bytes.Pass());
+ return bytes_sent_ == data_.size();
}
void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {