Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / device / serial / data_source_sender.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_source_sender.h"
6
7 #include <limits>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12
13 namespace device {
14
15 // Represents a send that is not yet fulfilled.
16 class DataSourceSender::PendingSend {
17  public:
18   PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
19
20   // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
21   // one of Done() and DoneWithError() will be called with the result.
22   void GetData(void* data, uint32_t num_bytes);
23
24  private:
25   class Buffer;
26   // Reports a successful write of |bytes_written|.
27   void Done(uint32_t bytes_written);
28
29   // Reports a partially successful or unsuccessful write of |bytes_written|
30   // with an error of |error|.
31   void DoneWithError(uint32_t bytes_written, int32_t error);
32
33   // The DataSourceSender that owns this.
34   DataSourceSender* sender_;
35
36   // The callback to call to get data.
37   ReadyCallback callback_;
38
39   // Whether the buffer specified by GetData() has been passed to |callback_|,
40   // but has not yet called Done() or DoneWithError().
41   bool buffer_in_use_;
42 };
43
44 // A Writable implementation that provides a view of a data pipe owned by a
45 // DataSourceSender.
46 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
47  public:
48   Buffer(scoped_refptr<DataSourceSender> sender,
49          PendingSend* send,
50          char* buffer,
51          uint32_t buffer_size);
52   virtual ~Buffer();
53
54   // WritableBuffer overrides.
55   virtual char* GetData() OVERRIDE;
56   virtual uint32_t GetSize() OVERRIDE;
57   virtual void Done(uint32_t bytes_written) OVERRIDE;
58   virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
59
60  private:
61   // The DataSourceSender whose data pipe we are providing a view.
62   scoped_refptr<DataSourceSender> sender_;
63
64   // The PendingSend to which this buffer has been created in response.
65   PendingSend* pending_send_;
66
67   char* buffer_;
68   uint32_t buffer_size_;
69 };
70
71 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
72                                    const ErrorCallback& error_callback)
73     : ready_callback_(ready_callback),
74       error_callback_(error_callback),
75       bytes_sent_(0),
76       shut_down_(false) {
77   DCHECK(!ready_callback.is_null() && !error_callback.is_null());
78 }
79
80 void DataSourceSender::ShutDown() {
81   shut_down_ = true;
82   waiter_.reset();
83   ready_callback_.Reset();
84   error_callback_.Reset();
85 }
86
87 DataSourceSender::~DataSourceSender() {
88   DCHECK(shut_down_);
89 }
90
91 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
92   // This should never occur. |handle_| is only valid and |pending_send_| is
93   // only set after Init is called. Receiving an invalid |handle| from the
94   // client is also unrecoverable.
95   if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) {
96     DispatchFatalError();
97     return;
98   }
99   handle_ = handle.Pass();
100   pending_send_.reset(new PendingSend(this, ready_callback_));
101   StartWaiting();
102 }
103
104 void DataSourceSender::Resume() {
105   if (pending_send_ || !handle_.is_valid()) {
106     DispatchFatalError();
107     return;
108   }
109
110   pending_send_.reset(new PendingSend(this, ready_callback_));
111   StartWaiting();
112 }
113
114 void DataSourceSender::OnConnectionError() {
115   DispatchFatalError();
116 }
117
118 void DataSourceSender::StartWaiting() {
119   DCHECK(pending_send_ && !waiter_);
120   waiter_.reset(
121       new AsyncWaiter(handle_.get(),
122                       MOJO_HANDLE_SIGNAL_WRITABLE,
123                       base::Bind(&DataSourceSender::OnDoneWaiting, this)));
124 }
125
126 void DataSourceSender::OnDoneWaiting(MojoResult result) {
127   DCHECK(pending_send_ && !shut_down_ && waiter_);
128   waiter_.reset();
129   if (result != MOJO_RESULT_OK) {
130     DispatchFatalError();
131     return;
132   }
133   void* data = NULL;
134   uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
135   result = mojo::BeginWriteDataRaw(
136       handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
137   if (result != MOJO_RESULT_OK) {
138     DispatchFatalError();
139     return;
140   }
141   pending_send_->GetData(static_cast<char*>(data), num_bytes);
142 }
143
144 void DataSourceSender::Done(uint32_t bytes_written) {
145   DoneInternal(bytes_written);
146   if (!shut_down_)
147     StartWaiting();
148 }
149
150 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
151   DoneInternal(bytes_written);
152   pending_send_.reset();
153   if (!shut_down_)
154     client()->OnError(bytes_sent_, error);
155   // We don't call StartWaiting here so we don't send any additional data until
156   // Resume() is called.
157 }
158
159 void DataSourceSender::DoneInternal(uint32_t bytes_written) {
160   DCHECK(pending_send_);
161   if (shut_down_)
162     return;
163
164   bytes_sent_ += bytes_written;
165   MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
166   if (result != MOJO_RESULT_OK) {
167     DispatchFatalError();
168     return;
169   }
170 }
171
172 void DataSourceSender::DispatchFatalError() {
173   if (shut_down_)
174     return;
175
176   error_callback_.Run();
177   ShutDown();
178 }
179
180 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
181                                            const ReadyCallback& callback)
182     : sender_(sender), callback_(callback), buffer_in_use_(false) {
183 }
184
185 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
186   DCHECK(!buffer_in_use_);
187   buffer_in_use_ = true;
188   callback_.Run(scoped_ptr<WritableBuffer>(
189       new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
190 }
191
192 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
193   DCHECK(buffer_in_use_);
194   buffer_in_use_ = false;
195   sender_->Done(bytes_written);
196 }
197
198 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
199                                                   int32_t error) {
200   DCHECK(buffer_in_use_);
201   buffer_in_use_ = false;
202   sender_->DoneWithError(bytes_written, error);
203 }
204
205 DataSourceSender::PendingSend::Buffer::Buffer(
206     scoped_refptr<DataSourceSender> sender,
207     PendingSend* send,
208     char* buffer,
209     uint32_t buffer_size)
210     : sender_(sender),
211       pending_send_(send),
212       buffer_(buffer),
213       buffer_size_(buffer_size) {
214 }
215
216 DataSourceSender::PendingSend::Buffer::~Buffer() {
217   if (sender_)
218     pending_send_->Done(0);
219 }
220
221 char* DataSourceSender::PendingSend::Buffer::GetData() {
222   return buffer_;
223 }
224
225 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
226   return buffer_size_;
227 }
228
229 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
230   DCHECK(sender_);
231   pending_send_->Done(bytes_written);
232   sender_ = NULL;
233   pending_send_ = NULL;
234   buffer_ = NULL;
235   buffer_size_ = 0;
236 }
237
238 void DataSourceSender::PendingSend::Buffer::DoneWithError(
239     uint32_t bytes_written,
240     int32_t error) {
241   DCHECK(sender_);
242   pending_send_->DoneWithError(bytes_written, error);
243   sender_ = NULL;
244   pending_send_ = NULL;
245   buffer_ = NULL;
246   buffer_size_ = 0;
247 }
248
249 }  // namespace device