Writable.WritableState = WritableState;
var util = require('util');
+var assert = require('assert');
var Stream = require('stream');
util.inherits(Writable, Stream);
-function WritableState(options) {
+function WritableState(options, stream) {
options = options || {};
- this.highWaterMark = options.highWaterMark || 16 * 1024;
+
+ // the point at which write() starts returning false
this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
options.highWaterMark : 16 * 1024;
+
+ // the point that it has to get to before we call _write(chunk,cb)
+ // default to pushing everything out as fast as possible.
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
- options.lowWaterMark : 1024;
+ options.lowWaterMark : 0;
+
+ // cast to ints.
+ assert(typeof this.lowWaterMark === 'number');
+ assert(typeof this.highWaterMark === 'number');
+ this.lowWaterMark = ~~this.lowWaterMark;
+ this.highWaterMark = ~~this.highWaterMark;
+ assert(this.lowWaterMark >= 0);
+ assert(this.highWaterMark >= this.lowWaterMark,
+ this.highWaterMark + '>=' + this.lowWaterMark);
+
this.needDrain = false;
- this.ended = false;
+ // at the start of calling end()
this.ending = false;
+ // when end() has been called, and returned
+ this.ended = false;
+ // when 'finish' has emitted
+ this.finished = false;
+ // when 'finish' is being emitted
+ this.finishing = false;
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// socket or file.
this.length = 0;
+ // a flag to see when we're in the middle of a write.
this.writing = false;
+
+ // a flag to be able to tell if the onwrite cb is called immediately,
+ // or on a later tick.
+ this.sync = false;
+
+ // the callback that's passed to _write(chunk,cb)
+ this.onwrite = onwrite.bind(stream);
+
+ // the callback that the user supplies to write(chunk,encoding,cb)
+ this.writecb = null;
+
+ // the amount that is being written when _write is called.
+ this.writelen = 0;
+
this.buffer = [];
}
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
return new Writable(options);
- this._writableState = new WritableState(options);
+ this._writableState = new WritableState(options, this);
// legacy.
this.writable = true;
Stream.call(this);
}
-// Override this method for sync streams
-// override the _write(chunk, cb) method for async streams
+// Override this method or _write(chunk, cb)
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
- if (state.ended) {
- this.emit('error', new Error('write after end'));
- return;
- }
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
+ if (state.ended) {
+ var er = new Error('write after end');
+ if (typeof cb === 'function')
+ cb(er);
+ this.emit('error', er);
+ return;
+ }
+
var l = chunk.length;
if (false === state.decodeStrings)
- chunk = [chunk, encoding];
+ chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
l = chunk.length;
}
state.writing = true;
- var sync = true;
- this._write(chunk, writecb.bind(this));
- sync = false;
+ state.sync = true;
+ state.writelen = l;
+ state.writecb = cb;
+ this._write(chunk, state.onwrite);
+ state.sync = false;
return ret;
+};
- function writecb(er) {
- state.writing = false;
- if (er) {
- if (cb) {
- if (sync)
- process.nextTick(cb.bind(null, er));
- else
- cb(er);
- } else
- this.emit('error', er);
- return;
- }
- state.length -= l;
+function onwrite(er) {
+ var state = this._writableState;
+ var sync = state.sync;
+ var cb = state.writecb;
+ var l = state.writelen;
+
+ state.writing = false;
+ state.writelen = null;
+ state.writecb = null;
+ if (er) {
if (cb) {
- // don't call the cb until the next tick if we're in sync mode.
- // also, defer if we're about to write some more right now.
- if (sync || state.buffer.length)
- process.nextTick(cb);
- else
- cb();
- }
-
- if (state.length === 0 && (state.ended || state.ending)) {
- // emit 'finish' at the very end.
- this.emit('finish');
- return;
- }
-
- // if there's something in the buffer waiting, then do that, too.
- if (state.buffer.length) {
- var chunkCb = state.buffer.shift();
- chunk = chunkCb[0];
- cb = chunkCb[1];
-
- if (false === state.decodeStrings)
- l = chunk[0].length;
+ if (sync)
+ process.nextTick(cb.bind(null, er));
else
- l = chunk.length;
-
- state.writing = true;
- this._write(chunk, writecb.bind(this));
- }
-
- if (state.length <= state.lowWaterMark && state.needDrain) {
- // Must force callback to be called on nextTick, so that we don't
- // emit 'drain' before the write() consumer gets the 'false' return
- // value, and has a chance to attach a 'drain' listener.
- process.nextTick(function() {
- if (!state.needDrain)
- return;
- state.needDrain = false;
- this.emit('drain');
- }.bind(this));
- }
+ cb(er);
+ } else
+ this.emit('error', er);
+ return;
+ }
+ state.length -= l;
+
+ if (cb) {
+ // don't call the cb until the next tick if we're in sync mode.
+ // also, defer if we're about to write some more right now.
+ if (sync || state.buffer.length)
+ process.nextTick(cb);
+ else
+ cb();
}
-};
+ if (state.length === 0 && (state.ended || state.ending)) {
+ // emit 'finish' at the very end.
+ state.finishing = true;
+ this.emit('finish');
+ state.finished = true;
+ return;
+ }
+
+ // if there's something in the buffer waiting, then do that, too.
+ if (state.buffer.length) {
+ var chunkCb = state.buffer.shift();
+ var chunk = chunkCb[0];
+ cb = chunkCb[1];
+
+ if (false === state.decodeStrings)
+ l = chunk[0].length;
+ else
+ l = chunk.length;
+
+ state.writelen = l;
+ state.writecb = cb;
+ state.writechunk = chunk;
+ state.writing = true;
+ this._write(chunk, state.onwrite);
+ }
+
+ if (state.length <= state.lowWaterMark && state.needDrain) {
+ // Must force callback to be called on nextTick, so that we don't
+ // emit 'drain' before the write() consumer gets the 'false' return
+ // value, and has a chance to attach a 'drain' listener.
+ process.nextTick(function() {
+ if (!state.needDrain)
+ return;
+ state.needDrain = false;
+ this.emit('drain');
+ }.bind(this));
+ }
+}
Writable.prototype._write = function(chunk, cb) {
process.nextTick(cb.bind(this, new Error('not implemented')));
Writable.prototype.end = function(chunk, encoding) {
var state = this._writableState;
+
+ // ignore unnecessary end() calls.
+ if (state.ending || state.ended || state.finished)
+ return;
+
state.ending = true;
if (chunk)
this.write(chunk, encoding);
- else if (state.length === 0)
+ else if (state.length === 0) {
+ state.finishing = true;
this.emit('finish');
+ state.finished = true;
+ }
state.ended = true;
};