this.readable = this.writable = true;
this._paused = false;
+ this._needDrain = false;
this._pending = [];
this._pendingCallbacks = [];
this._pendingBytes = 0;
data = new Buffer(data, encoding);
}
- debug('clearIn data');
+ debug((this === this.pair.cleartext ? 'clear' : 'encrypted') + 'In data');
this._pending.push(data);
this._pendingCallbacks.push(cb);
this.pair._writeCalled = true;
this.pair.cycle();
- return this._pendingBytes < 128 * 1024;
+ // In the following cases, write() should return a false,
+ // then this stream should eventually emit 'drain' event.
+ //
+ // 1. There are pending data more than 128k bytes.
+ // 2. A forward stream shown below is paused.
+ // A) EncryptedStream for CleartextStream.write().
+ // B) CleartextStream for EncryptedStream.write().
+ //
+ if (!this._needDrain) {
+ if (this._pendingBytes >= 128 * 1024) {
+ this._needDrain = true;
+ } else {
+ if (this === this.pair.cleartext) {
+ this._needDrain = this.pair.encrypted._paused;
+ } else {
+ this._needDrain = this.pair.cleartext._paused;
+ }
+ }
+ }
+ return !this._needDrain;
};
assert(rv === tmp.length);
}
- // If we've cleared all of incoming encrypted data, emit drain.
- if (havePending && this._pending.length === 0) {
- debug('drain');
- this.emit('drain');
- if (this.__destroyOnDrain) this.end();
+ // If pending data has cleared, 'drain' event should be emitted
+ // after write() returns a false.
+ // Except when a forward stream shown below is paused.
+ // A) EncryptedStream for CleartextStream._pull().
+ // B) CleartextStream for EncryptedStream._pull().
+ //
+ if (this._needDrain && this._pending.length === 0) {
+ var paused;
+ if (this === this.pair.cleartext) {
+ paused = this.pair.encrypted._paused;
+ } else {
+ paused = this.pair.cleartext._paused;
+ }
+ if (!paused) {
+ debug('drain');
+ process.nextTick(this.emit.bind(this, 'drain'));
+ this._needDrain = false;
+ if (this.__destroyOnDrain) this.end();
+ }
}
};