1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 define('data_sender', [
6 'device/serial/data_stream.mojom',
7 'device/serial/data_stream_serialization.mojom',
9 'mojo/public/js/router',
10 ], function(dataStreamMojom, serialization, core, routerModule) {
16 * A pending send operation.
17 * @param {!ArrayBuffer} data The data to be sent.
19 * @alias module:data_sender~PendingSend
22 function PendingSend(data) {
24 * The remaining data to be sent.
25 * @type {!ArrayBuffer}
30 * The total length of data to be sent.
34 this.length_ = data.byteLength;
36 * The number of bytes that have been received by the DataSink.
40 this.bytesReceivedBySink_ = 0;
42 * The promise that will be resolved or rejected when this send completes
43 * or fails, respectively.
44 * @type {!Promise.<number>}
47 this.promise_ = new Promise(function(resolve, reject) {
49 * The callback to call on success.
53 this.successCallback_ = resolve;
55 * The callback to call with the error on failure.
59 this.errorCallback_ = reject;
64 * Returns the promise that will be resolved when this operation completes or
65 * rejected if an error occurs.
66 * @return {!Promise.<number>} A promise to the number of bytes sent.
68 PendingSend.prototype.getPromise = function() {
73 * @typedef module:data_sender~PendingSend.ReportBytesResult
74 * @property {number} bytesUnreported The number of bytes reported that were
75 * not part of the send.
76 * @property {boolean} done Whether this send has completed.
77 * @property {?number} bytesToFlush The number of bytes to flush in the event
82 * Invoked when the DataSink reports that bytes have been sent. Resolves the
84 * [getPromise()]{@link module:data_sender~PendingSend#getPromise} once all
85 * bytes have been reported as sent.
86 * @param {number} numBytes The number of bytes sent.
87 * @return {!module:data_sender~PendingSend.ReportBytesResult}
89 PendingSend.prototype.reportBytesSent = function(numBytes) {
90 var result = this.reportBytesSentInternal_(numBytes);
91 if (this.bytesReceivedBySink_ == this.length_) {
93 this.successCallback_(this.bytesReceivedBySink_);
99 * Invoked when the DataSink reports an error. Rejects the promise returned by
100 * [getPromise()]{@link module:data_sender~PendingSend#getPromise} unless the
101 * error occurred after this send, that is, unless numBytes is greater than
102 * the nubmer of outstanding bytes.
103 * @param {number} numBytes The number of bytes sent.
104 * @param {number} error The error reported by the DataSink.
105 * @return {!module:data_sender~PendingSend.ReportBytesResult}
107 PendingSend.prototype.reportBytesSentAndError = function(numBytes, error) {
108 var result = this.reportBytesSentInternal_(numBytes);
109 // If there are remaining bytes to report, the error occurred after this
110 // PendingSend so we should report success.
111 if (result.bytesUnreported > 0) {
112 this.successCallback_(this.bytesReceivedBySink_);
113 result.bytesToFlush = 0;
119 e.bytesSent = this.bytesReceivedBySink_;
120 this.errorCallback_(e);
122 result.bytesToFlush =
123 this.length_ - this.data_.byteLength - this.bytesReceivedBySink_;
128 * Updates the internal state in response to a report from the DataSink.
129 * @param {number} numBytes The number of bytes sent.
130 * @return {!module:data_sender~PendingSend.ReportBytesResult}
133 PendingSend.prototype.reportBytesSentInternal_ = function(numBytes) {
134 this.bytesReceivedBySink_ += numBytes;
135 var result = {bytesUnreported: 0};
136 if (this.bytesReceivedBySink_ > this.length_) {
137 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_;
138 this.bytesReceivedBySink_ = this.length_;
145 * Writes pending data into the data pipe.
146 * @param {!DataSink} sink The DataSink to receive the data.
147 * @param {number} availableBufferCapacity The maximum number of bytes to
149 * @return {!Object} result The send result.
150 * @return {boolean} result.completed Whether all of the pending data was
152 * @return {number} result.remainingBufferCapacity The remaining send buffer
155 PendingSend.prototype.sendData = function(sink, availableBufferCapacity) {
157 Math.min(availableBufferCapacity, this.data_.byteLength);
158 sink.onData(new Uint8Array(this.data_, 0, numBytesToSend));
159 this.data_ = this.data_.slice(numBytesToSend);
161 completed: this.data_.byteLength == 0,
162 remainingBufferCapacity: availableBufferCapacity - numBytesToSend,
167 * A DataSender that sends data to a DataSink.
168 * @param {!MojoHandle} handle The handle to the DataSink.
169 * @param {number} bufferSize How large a buffer to use for data.
170 * @param {number} fatalErrorValue The send error value to report in the
171 * event of a fatal error.
173 * @alias module:data_sender.DataSender
175 function DataSender(handle, bufferSize, fatalErrorValue) {
176 this.init_(handle, fatalErrorValue, bufferSize);
177 this.sink_.init(bufferSize);
180 DataSender.prototype =
181 $Object.create(dataStreamMojom.DataSinkClient.stubClass.prototype);
184 * Closes this DataSender.
186 DataSender.prototype.close = function() {
189 this.shutDown_ = true;
190 this.router_.close();
191 while (this.pendingSends_.length) {
192 this.pendingSends_.pop().reportBytesSentAndError(
193 0, this.fatalErrorValue_);
195 while (this.sendsAwaitingAck_.length) {
196 this.sendsAwaitingAck_.pop().reportBytesSentAndError(
197 0, this.fatalErrorValue_);
199 this.callCancelCallback_();
203 * Initialize this DataSender.
204 * @param {!MojoHandle} sink A handle to the DataSink
205 * @param {number} fatalErrorValue The error to dispatch in the event of a
207 * @param {number} bufferSize The size of the send buffer.
210 DataSender.prototype.init_ = function(sink, fatalErrorValue, bufferSize) {
212 * The error to be dispatched in the event of a fatal error.
216 this.fatalErrorValue_ = fatalErrorValue;
218 * Whether this DataSender has shut down.
222 this.shutDown_ = false;
224 * The [Router]{@link module:mojo/public/js/router.Router} for the
225 * connection to the DataSink.
228 this.router_ = new routerModule.Router(sink);
230 * The connection to the DataSink.
233 this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_);
234 this.router_.setIncomingReceiver(this);
236 * A queue of sends that have not fully sent their data to the DataSink.
237 * @type {!module:data_sender~PendingSend[]}
240 this.pendingSends_ = [];
242 * A queue of sends that have sent their data to the DataSink, but have not
243 * been received by the DataSink.
244 * @type {!module:data_sender~PendingSend[]}
247 this.sendsAwaitingAck_ = [];
250 * The callback that will resolve a pending cancel if one is in progress.
254 this.pendingCancel_ = null;
257 * The promise that will be resolved when a pending cancel completes if one
262 this.cancelPromise_ = null;
264 * The available send buffer capacity.
268 this.availableBufferCapacity_ = bufferSize;
272 * Serializes this DataSender.
273 * This will cancel any sends in progress before the returned promise
275 * @return {!Promise.<SerializedDataSender>} A promise that will resolve to
276 * the serialization of this DataSender. If this DataSender has shut down,
277 * the promise will resolve to null.
279 DataSender.prototype.serialize = function() {
281 return Promise.resolve(null);
283 var readyToSerialize = Promise.resolve();
284 if (this.pendingSends_.length || this.sendsAwaitingAck_.length) {
285 if (this.pendingCancel_)
286 readyToSerialize = this.cancelPromise_;
288 readyToSerialize = this.cancel(this.fatalErrorValue_);
290 return readyToSerialize.then(function() {
291 var serialized = new serialization.SerializedDataSender();
292 serialized.sink = this.router_.connector_.handle_;
293 serialized.fatal_error_value = this.fatalErrorValue_;
294 serialized.buffer_size = this.availableBufferCapacity_;
295 this.router_.connector_.handle_ = null;
296 this.router_.close();
297 this.shutDown_ = true;
303 * Deserializes a SerializedDataSender.
304 * @param {SerializedDataSender} serialized The serialized DataSender.
305 * @return {!DataSender} The deserialized DataSender.
307 DataSender.deserialize = function(serialized) {
308 var sender = $Object.create(DataSender.prototype);
309 sender.deserialize_(serialized);
314 * Deserializes a SerializedDataSender into this DataSender.
315 * @param {SerializedDataSender} serialized The serialized DataSender.
318 DataSender.prototype.deserialize_ = function(serialized) {
320 this.shutDown_ = true;
324 serialized.sink, serialized.fatal_error_value, serialized.buffer_size);
328 * Sends data to the DataSink.
329 * @return {!Promise.<number>} A promise to the number of bytes sent. If an
330 * error occurs, the promise will reject with an Error object with a
331 * property error containing the error code.
332 * @throws Will throw if this has encountered a fatal error or a cancel is in
335 DataSender.prototype.send = function(data) {
337 throw new Error('DataSender has been closed');
338 if (this.pendingCancel_)
339 throw new Error('Cancel in progress');
340 var send = new PendingSend(data);
341 this.pendingSends_.push(send);
342 this.sendInternal_();
343 return send.getPromise();
346 DataSender.prototype.sendInternal_ = function() {
347 while (this.pendingSends_.length && this.availableBufferCapacity_) {
348 var result = this.pendingSends_[0].sendData(
349 this.sink_, this.availableBufferCapacity_);
350 this.availableBufferCapacity_ = result.remainingBufferCapacity;
351 if (result.completed) {
352 this.sendsAwaitingAck_.push(this.pendingSends_.shift());
358 * Requests the cancellation of any in-progress sends. Calls to
359 * [send()]{@link module:data_sender.DataSender#send} will fail until the
360 * cancel has completed.
361 * @param {number} error The error to report for cancelled sends.
362 * @return {!Promise} A promise that will resolve when the cancel completes.
363 * @throws Will throw if this has encountered a fatal error or another cancel
366 DataSender.prototype.cancel = function(error) {
368 throw new Error('DataSender has been closed');
369 if (this.pendingCancel_)
370 throw new Error('Cancel already in progress');
371 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
372 return Promise.resolve();
374 this.sink_.cancel(error);
375 this.cancelPromise_ = new Promise(function(resolve) {
376 this.pendingCancel_ = resolve;
378 return this.cancelPromise_;
382 * Calls and clears the pending cancel callback if one is pending.
385 DataSender.prototype.callCancelCallback_ = function() {
386 if (this.pendingCancel_) {
387 this.cancelPromise_ = null;
388 this.pendingCancel_();
389 this.pendingCancel_ = null;
394 * Invoked by the DataSink to report that data has been successfully sent.
395 * @param {number} numBytes The number of bytes sent.
398 DataSender.prototype.reportBytesSent = function(numBytes) {
399 this.availableBufferCapacity_ += numBytes;
400 while (numBytes > 0 && this.sendsAwaitingAck_.length) {
401 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes);
402 numBytes = result.bytesUnreported;
404 this.sendsAwaitingAck_.shift();
406 if (numBytes > 0 && this.pendingSends_.length) {
407 var result = this.pendingSends_[0].reportBytesSent(numBytes);
408 numBytes = result.bytesUnreported;
410 // A cancel is completed when all of the sends that were in progress have
411 // completed or failed. This is the case where all sends complete
413 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
414 this.callCancelCallback_();
416 this.sendInternal_();
420 * Invoked by the DataSink to report an error in sending data.
421 * @param {number} numBytes The number of bytes sent.
422 * @param {number} error The error reported by the DataSink.
425 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) {
426 this.availableBufferCapacity_ += numBytes;
427 while (this.sendsAwaitingAck_.length) {
428 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError(
430 numBytes = result.bytesUnreported;
431 this.sendsAwaitingAck_.shift();
432 this.availableBufferCapacity_ += result.bytesToFlush;
434 while (this.pendingSends_.length) {
435 var result = this.pendingSends_[0].reportBytesSentAndError(
437 numBytes = result.bytesUnreported;
438 this.pendingSends_.shift();
439 // Note: Only the first PendingSend in |pendingSends_| will have data to
440 // flush as only the first can have sent data to the DataSink.
441 this.availableBufferCapacity_ += result.bytesToFlush;
443 this.callCancelCallback_();
444 return Promise.resolve();
447 return {DataSender: DataSender};