1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_source_sender.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
15 // Represents a send that is not yet fulfilled.
16 class DataSourceSender::PendingSend {
18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
20 // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
21 // one of Done() and DoneWithError() will be called with the result.
22 void GetData(void* data, uint32_t num_bytes);
26 // Reports a successful write of |bytes_written|.
27 void Done(uint32_t bytes_written);
29 // Reports a partially successful or unsuccessful write of |bytes_written|
30 // with an error of |error|.
31 void DoneWithError(uint32_t bytes_written, int32_t error);
33 // The DataSourceSender that owns this.
34 DataSourceSender* sender_;
36 // The callback to call to get data.
37 ReadyCallback callback_;
39 // Whether the buffer specified by GetData() has been passed to |callback_|,
40 // but has not yet called Done() or DoneWithError().
44 // A Writable implementation that provides a view of a data pipe owned by a
46 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
48 Buffer(scoped_refptr<DataSourceSender> sender,
51 uint32_t buffer_size);
54 // WritableBuffer overrides.
55 virtual char* GetData() OVERRIDE;
56 virtual uint32_t GetSize() OVERRIDE;
57 virtual void Done(uint32_t bytes_written) OVERRIDE;
58 virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
61 // The DataSourceSender whose data pipe we are providing a view.
62 scoped_refptr<DataSourceSender> sender_;
64 // The PendingSend to which this buffer has been created in response.
65 PendingSend* pending_send_;
68 uint32_t buffer_size_;
71 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
72 const ErrorCallback& error_callback)
73 : ready_callback_(ready_callback),
74 error_callback_(error_callback),
77 DCHECK(!ready_callback.is_null() && !error_callback.is_null());
80 void DataSourceSender::ShutDown() {
83 ready_callback_.Reset();
84 error_callback_.Reset();
87 DataSourceSender::~DataSourceSender() {
91 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
92 // This should never occur. |handle_| is only valid and |pending_send_| is
93 // only set after Init is called. Receiving an invalid |handle| from the
94 // client is also unrecoverable.
95 if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) {
99 handle_ = handle.Pass();
100 pending_send_.reset(new PendingSend(this, ready_callback_));
104 void DataSourceSender::Resume() {
105 if (pending_send_ || !handle_.is_valid()) {
106 DispatchFatalError();
110 pending_send_.reset(new PendingSend(this, ready_callback_));
114 void DataSourceSender::OnConnectionError() {
115 DispatchFatalError();
118 void DataSourceSender::StartWaiting() {
119 DCHECK(pending_send_ && !waiter_);
121 new AsyncWaiter(handle_.get(),
122 MOJO_HANDLE_SIGNAL_WRITABLE,
123 base::Bind(&DataSourceSender::OnDoneWaiting, this)));
126 void DataSourceSender::OnDoneWaiting(MojoResult result) {
127 DCHECK(pending_send_ && !shut_down_ && waiter_);
129 if (result != MOJO_RESULT_OK) {
130 DispatchFatalError();
134 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
135 result = mojo::BeginWriteDataRaw(
136 handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
137 if (result != MOJO_RESULT_OK) {
138 DispatchFatalError();
141 pending_send_->GetData(static_cast<char*>(data), num_bytes);
144 void DataSourceSender::Done(uint32_t bytes_written) {
145 DoneInternal(bytes_written);
150 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
151 DoneInternal(bytes_written);
152 pending_send_.reset();
154 client()->OnError(bytes_sent_, error);
155 // We don't call StartWaiting here so we don't send any additional data until
156 // Resume() is called.
159 void DataSourceSender::DoneInternal(uint32_t bytes_written) {
160 DCHECK(pending_send_);
164 bytes_sent_ += bytes_written;
165 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
166 if (result != MOJO_RESULT_OK) {
167 DispatchFatalError();
172 void DataSourceSender::DispatchFatalError() {
176 error_callback_.Run();
180 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
181 const ReadyCallback& callback)
182 : sender_(sender), callback_(callback), buffer_in_use_(false) {
185 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
186 DCHECK(!buffer_in_use_);
187 buffer_in_use_ = true;
188 callback_.Run(scoped_ptr<WritableBuffer>(
189 new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
192 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
193 DCHECK(buffer_in_use_);
194 buffer_in_use_ = false;
195 sender_->Done(bytes_written);
198 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
200 DCHECK(buffer_in_use_);
201 buffer_in_use_ = false;
202 sender_->DoneWithError(bytes_written, error);
205 DataSourceSender::PendingSend::Buffer::Buffer(
206 scoped_refptr<DataSourceSender> sender,
209 uint32_t buffer_size)
213 buffer_size_(buffer_size) {
216 DataSourceSender::PendingSend::Buffer::~Buffer() {
218 pending_send_->Done(0);
221 char* DataSourceSender::PendingSend::Buffer::GetData() {
225 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
229 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
231 pending_send_->Done(bytes_written);
233 pending_send_ = NULL;
238 void DataSourceSender::PendingSend::Buffer::DoneWithError(
239 uint32_t bytes_written,
242 pending_send_->DoneWithError(bytes_written, error);
244 pending_send_ = NULL;
249 } // namespace device