// Override this method for sync streams
// override the _write(chunk, cb) method for async streams
-Writable.prototype.write = function(chunk, encoding) {
+Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
if (state.ended) {
this.emit('error', new Error('write after end'));
return;
}
+ if (typeof encoding === 'function') {
+ cb = encoding;
+ encoding = null;
+ }
+
var l = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding];
if (ret === false)
state.needDrain = true;
+ // if we're already writing something, then just put this
+ // in the queue, and wait our turn.
if (state.writing) {
- state.buffer.push(chunk);
+ state.buffer.push([chunk, cb]);
return ret;
}
state.writing = true;
+ var sync = true;
this._write(chunk, writecb.bind(this));
+ sync = false;
return ret;
function writecb(er) {
state.writing = false;
if (er) {
- this.emit('error', er);
+ if (cb) {
+ if (sync)
+ process.nextTick(cb.bind(null, er));
+ else
+ cb(er);
+ } else
+ this.emit('error', er);
return;
}
state.length -= l;
+ if (cb) {
+ // don't call the cb until the next tick if we're in sync mode.
+ // also, defer if we're about to write some more right now.
+ if (sync || state.buffer.length)
+ process.nextTick(cb);
+ else
+ cb();
+ }
+
if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
this.emit('finish');
// if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) {
- chunk = state.buffer.shift();
- l = chunk.length;
+ var chunkCb = state.buffer.shift();
+ chunk = chunkCb[0];
+ cb = chunkCb[1];
+
+ if (false === state.decodeStrings)
+ l = chunk[0].length;
+ else
+ l = chunk.length;
+
state.writing = true;
this._write(chunk, writecb.bind(this));
}