// found in the LICENSE file.
define('data_sender', [
- 'async_waiter',
'device/serial/data_stream.mojom',
'device/serial/data_stream_serialization.mojom',
- 'mojo/public/js/bindings/core',
- 'mojo/public/js/bindings/router',
-], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) {
+ 'mojo/public/js/core',
+ 'mojo/public/js/router',
+], function(dataStreamMojom, serialization, core, routerModule) {
/**
* @module data_sender
*/
/**
* Writes pending data into the data pipe.
- * @param {!MojoHandle} handle The handle to the data pipe.
- * @return {number} The Mojo result corresponding to the outcome:
- * <ul>
- * <li>RESULT_OK if the write completes successfully;
- * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or
- * <li>the data pipe error if the write failed.
- * </ul>
+ * @param {!DataSink} sink The DataSink to receive the data.
+ * @param {number} availableBufferCapacity The maximum number of bytes to
+ * send.
+ * @return {!Object} result The send result.
+ * @return {boolean} result.completed Whether all of the pending data was
+ * sent.
+ * @return {number} result.remainingBufferCapacity The remaining send buffer
+ * capacity.
*/
- PendingSend.prototype.sendData = function(handle) {
- var result = core.writeData(
- handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE);
- if (result.result != core.RESULT_OK)
- return result.result;
- this.data_ = this.data_.slice(result.numBytes);
- return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK;
+ PendingSend.prototype.sendData = function(sink, availableBufferCapacity) {
+ var numBytesToSend =
+ Math.min(availableBufferCapacity, this.data_.byteLength);
+ sink.onData(new Uint8Array(this.data_, 0, numBytesToSend));
+ this.data_ = this.data_.slice(numBytesToSend);
+ return {
+ completed: this.data_.byteLength == 0,
+ remainingBufferCapacity: availableBufferCapacity - numBytesToSend,
+ };
};
/**
* A DataSender that sends data to a DataSink.
* @param {!MojoHandle} handle The handle to the DataSink.
- * @param {number} bufferSize How large a buffer the data pipe should use.
+ * @param {number} bufferSize How large a buffer to use for data.
* @param {number} fatalErrorValue The send error value to report in the
* event of a fatal error.
* @constructor
* @alias module:data_sender.DataSender
*/
function DataSender(handle, bufferSize, fatalErrorValue) {
- var dataPipeOptions = {
- flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
- elementNumBytes: 1,
- capacityNumBytes: bufferSize,
- };
- var sendPipe = core.createDataPipe(dataPipeOptions);
- this.init_(handle, sendPipe.producerHandle, fatalErrorValue);
- this.sink_.init(sendPipe.consumerHandle);
+ this.init_(handle, fatalErrorValue, bufferSize);
+ this.sink_.init(bufferSize);
}
DataSender.prototype =
- $Object.create(dataStreamMojom.DataSinkClientStub.prototype);
+ $Object.create(dataStreamMojom.DataSinkClient.stubClass.prototype);
/**
* Closes this DataSender.
if (this.shutDown_)
return;
this.shutDown_ = true;
- this.waiter_.stop();
this.router_.close();
- core.close(this.sendPipe_);
while (this.pendingSends_.length) {
this.pendingSends_.pop().reportBytesSentAndError(
0, this.fatalErrorValue_);
/**
* Initialize this DataSender.
* @param {!MojoHandle} sink A handle to the DataSink
- * @param {!MojoHandle} dataPipe A handle to use for sending data to the
- * DataSink.
* @param {number} fatalErrorValue The error to dispatch in the event of a
* fatal error.
+ * @param {number} bufferSize The size of the send buffer.
* @private
*/
- DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) {
- /**
- * The handle to the data pipe to use for sending data.
- * @private
- */
- this.sendPipe_ = dataPipe;
+ DataSender.prototype.init_ = function(sink, fatalErrorValue, bufferSize) {
/**
* The error to be dispatched in the event of a fatal error.
* @const {number}
*/
this.shutDown_ = false;
/**
- * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
+ * The [Router]{@link module:mojo/public/js/router.Router} for the
* connection to the DataSink.
* @private
*/
* The connection to the DataSink.
* @private
*/
- this.sink_ = new dataStreamMojom.DataSinkProxy(this.router_);
+ this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_);
this.router_.setIncomingReceiver(this);
/**
- * The async waiter used to wait for
- * {@link module:data_sender.DataSender#sendPipe_} to be writable.
- * @type {!module:async_waiter.AsyncWaiter}
- * @private
- */
- this.waiter_ = new asyncWaiter.AsyncWaiter(
- this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE,
- this.onHandleReady_.bind(this));
- /**
- * A queue of sends that have not fully written their data to the data pipe.
+ * A queue of sends that have not fully sent their data to the DataSink.
* @type {!module:data_sender~PendingSend[]}
* @private
*/
this.pendingSends_ = [];
/**
- * A queue of sends that have written their data to the data pipe, but have
- * not been received by the DataSink.
+ * A queue of sends that have sent their data to the DataSink, but have not
+ * been received by the DataSink.
* @type {!module:data_sender~PendingSend[]}
* @private
*/
* @private
*/
this.cancelPromise_ = null;
+ /**
+ * The available send buffer capacity.
+ * @type {number}
+ * @private
+ */
+ this.availableBufferCapacity_ = bufferSize;
};
/**
return Promise.resolve(null);
var readyToSerialize = Promise.resolve();
- if (this.pendingSends_.length) {
+ if (this.pendingSends_.length || this.sendsAwaitingAck_.length) {
if (this.pendingCancel_)
readyToSerialize = this.cancelPromise_;
else
readyToSerialize = this.cancel(this.fatalErrorValue_);
}
return readyToSerialize.then(function() {
- this.waiter_.stop();
var serialized = new serialization.SerializedDataSender();
- serialized.sink = this.router_.connector_.handle_,
- serialized.data_pipe = this.sendPipe_,
- serialized.fatal_error_value = this.fatalErrorValue_,
+ serialized.sink = this.router_.connector_.handle_;
+ serialized.fatal_error_value = this.fatalErrorValue_;
+ serialized.buffer_size = this.availableBufferCapacity_;
this.router_.connector_.handle_ = null;
this.router_.close();
this.shutDown_ = true;
return;
}
this.init_(
- serialized.sink, serialized.data_pipe, serialized.fatal_error_value);
+ serialized.sink, serialized.fatal_error_value, serialized.buffer_size);
};
/**
throw new Error('Cancel in progress');
var send = new PendingSend(data);
this.pendingSends_.push(send);
- if (!this.waiter_.isWaiting())
- this.waiter_.start();
+ this.sendInternal_();
return send.getPromise();
};
+ DataSender.prototype.sendInternal_ = function() {
+ while (this.pendingSends_.length && this.availableBufferCapacity_) {
+ var result = this.pendingSends_[0].sendData(
+ this.sink_, this.availableBufferCapacity_);
+ this.availableBufferCapacity_ = result.remainingBufferCapacity;
+ if (result.completed) {
+ this.sendsAwaitingAck_.push(this.pendingSends_.shift());
+ }
+ }
+ };
+
/**
* Requests the cancellation of any in-progress sends. Calls to
* [send()]{@link module:data_sender.DataSender#send} will fail until the
};
/**
- * Invoked when
- * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to
- * write. Writes to the data pipe if the wait is successful.
- * @param {number} waitResult The result of the asynchronous wait.
- * @private
- */
- DataSender.prototype.onHandleReady_ = function(result) {
- if (result != core.RESULT_OK) {
- this.close();
- return;
- }
- while (this.pendingSends_.length) {
- var result = this.pendingSends_[0].sendData(this.sendPipe_);
- if (result == core.RESULT_OK) {
- this.sendsAwaitingAck_.push(this.pendingSends_.shift());
- } else if (result == core.RESULT_SHOULD_WAIT) {
- this.waiter_.start();
- return;
- } else {
- this.close();
- return;
- }
- }
- };
-
- /**
* Calls and clears the pending cancel callback if one is pending.
* @private
*/
* @private
*/
DataSender.prototype.reportBytesSent = function(numBytes) {
+ this.availableBufferCapacity_ += numBytes;
while (numBytes > 0 && this.sendsAwaitingAck_.length) {
var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes);
numBytes = result.bytesUnreported;
// successfully.
if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
this.callCancelCallback_();
+
+ this.sendInternal_();
};
/**
* @private
*/
DataSender.prototype.reportBytesSentAndError = function(numBytes, error) {
- var bytesToFlush = 0;
+ this.availableBufferCapacity_ += numBytes;
while (this.sendsAwaitingAck_.length) {
var result = this.sendsAwaitingAck_[0].reportBytesSentAndError(
numBytes, error);
numBytes = result.bytesUnreported;
this.sendsAwaitingAck_.shift();
- bytesToFlush += result.bytesToFlush;
+ this.availableBufferCapacity_ += result.bytesToFlush;
}
while (this.pendingSends_.length) {
var result = this.pendingSends_[0].reportBytesSentAndError(
numBytes = result.bytesUnreported;
this.pendingSends_.shift();
// Note: Only the first PendingSend in |pendingSends_| will have data to
- // flush as only the first can have written data to the data pipe.
- bytesToFlush += result.bytesToFlush;
+ // flush as only the first can have sent data to the DataSink.
+ this.availableBufferCapacity_ += result.bytesToFlush;
}
this.callCancelCallback_();
- return Promise.resolve({bytes_to_flush: bytesToFlush});
+ return Promise.resolve();
};
return {DataSender: DataSender};