streams2: Writable organization, add 'finishing' flag
authorisaacs <i@izs.me>
Tue, 13 Nov 2012 07:33:06 +0000 (23:33 -0800)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:31 +0000 (17:00 -0800)
lib/_stream_writable.js

index 744ec41..cfcd2e2 100644 (file)
@@ -27,20 +27,41 @@ module.exports = Writable;
 Writable.WritableState = WritableState;
 
 var util = require('util');
+var assert = require('assert');
 var Stream = require('stream');
 
 util.inherits(Writable, Stream);
 
-function WritableState(options) {
+function WritableState(options, stream) {
   options = options || {};
-  this.highWaterMark = options.highWaterMark || 16 * 1024;
+
+  // the point at which write() starts returning false
   this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
       options.highWaterMark : 16 * 1024;
+
+  // the point that it has to get to before we call _write(chunk,cb)
+  // default to pushing everything out as fast as possible.
   this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
-      options.lowWaterMark : 1024;
+      options.lowWaterMark : 0;
+
+  // cast to ints.
+  assert(typeof this.lowWaterMark === 'number');
+  assert(typeof this.highWaterMark === 'number');
+  this.lowWaterMark = ~~this.lowWaterMark;
+  this.highWaterMark = ~~this.highWaterMark;
+  assert(this.lowWaterMark >= 0);
+  assert(this.highWaterMark >= this.lowWaterMark,
+         this.highWaterMark + '>=' + this.lowWaterMark);
+
   this.needDrain = false;
-  this.ended = false;
+  // at the start of calling end()
   this.ending = false;
+  // when end() has been called, and returned
+  this.ended = false;
+  // when 'finish' has emitted
+  this.finished = false;
+  // when 'finish' is being emitted
+  this.finishing = false;
 
   // should we decode strings into buffers before passing to _write?
   // this is here so that some node-core streams can optimize string
@@ -53,7 +74,22 @@ function WritableState(options) {
   // socket or file.
   this.length = 0;
 
+  // a flag to see when we're in the middle of a write.
   this.writing = false;
+
+  // a flag to be able to tell if the onwrite cb is called immediately,
+  // or on a later tick.
+  this.sync = false;
+
+  // the callback that's passed to _write(chunk,cb)
+  this.onwrite = onwrite.bind(stream);
+
+  // the callback that the user supplies to write(chunk,encoding,cb)
+  this.writecb = null;
+
+  // the amount that is being written when _write is called.
+  this.writelen = 0;
+
   this.buffer = [];
 }
 
@@ -63,7 +99,7 @@ function Writable(options) {
   if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
     return new Writable(options);
 
-  this._writableState = new WritableState(options);
+  this._writableState = new WritableState(options, this);
 
   // legacy.
   this.writable = true;
@@ -71,23 +107,26 @@ function Writable(options) {
   Stream.call(this);
 }
 
-// Override this method for sync streams
-// override the _write(chunk, cb) method for async streams
+// Override this method or _write(chunk, cb)
 Writable.prototype.write = function(chunk, encoding, cb) {
   var state = this._writableState;
-  if (state.ended) {
-    this.emit('error', new Error('write after end'));
-    return;
-  }
 
   if (typeof encoding === 'function') {
     cb = encoding;
     encoding = null;
   }
 
+  if (state.ended) {
+    var er = new Error('write after end');
+    if (typeof cb === 'function')
+      cb(er);
+    this.emit('error', er);
+    return;
+  }
+
   var l = chunk.length;
   if (false === state.decodeStrings)
-    chunk = [chunk, encoding];
+    chunk = [chunk, encoding || 'utf8'];
   else if (typeof chunk === 'string' || encoding) {
     chunk = new Buffer(chunk + '', encoding);
     l = chunk.length;
@@ -107,70 +146,84 @@ Writable.prototype.write = function(chunk, encoding, cb) {
   }
 
   state.writing = true;
-  var sync = true;
-  this._write(chunk, writecb.bind(this));
-  sync = false;
+  state.sync = true;
+  state.writelen = l;
+  state.writecb = cb;
+  this._write(chunk, state.onwrite);
+  state.sync = false;
 
   return ret;
+};
 
-  function writecb(er) {
-    state.writing = false;
-    if (er) {
-      if (cb) {
-        if (sync)
-          process.nextTick(cb.bind(null, er));
-        else
-          cb(er);
-      } else
-        this.emit('error', er);
-      return;
-    }
-    state.length -= l;
+function onwrite(er) {
+  var state = this._writableState;
+  var sync = state.sync;
+  var cb = state.writecb;
+  var l = state.writelen;
+
+  state.writing = false;
+  state.writelen = null;
+  state.writecb = null;
 
+  if (er) {
     if (cb) {
-      // don't call the cb until the next tick if we're in sync mode.
-      // also, defer if we're about to write some more right now.
-      if (sync || state.buffer.length)
-        process.nextTick(cb);
-      else
-        cb();
-    }
-
-    if (state.length === 0 && (state.ended || state.ending)) {
-      // emit 'finish' at the very end.
-      this.emit('finish');
-      return;
-    }
-
-    // if there's something in the buffer waiting, then do that, too.
-    if (state.buffer.length) {
-      var chunkCb = state.buffer.shift();
-      chunk = chunkCb[0];
-      cb = chunkCb[1];
-
-      if (false === state.decodeStrings)
-        l = chunk[0].length;
+      if (sync)
+        process.nextTick(cb.bind(null, er));
       else
-        l = chunk.length;
-
-      state.writing = true;
-      this._write(chunk, writecb.bind(this));
-    }
-
-    if (state.length <= state.lowWaterMark && state.needDrain) {
-      // Must force callback to be called on nextTick, so that we don't
-      // emit 'drain' before the write() consumer gets the 'false' return
-      // value, and has a chance to attach a 'drain' listener.
-      process.nextTick(function() {
-        if (!state.needDrain)
-          return;
-        state.needDrain = false;
-        this.emit('drain');
-      }.bind(this));
-    }
+        cb(er);
+    } else
+      this.emit('error', er);
+    return;
+  }
+  state.length -= l;
+
+  if (cb) {
+    // don't call the cb until the next tick if we're in sync mode.
+    // also, defer if we're about to write some more right now.
+    if (sync || state.buffer.length)
+      process.nextTick(cb);
+    else
+      cb();
   }
 
-};
+  if (state.length === 0 && (state.ended || state.ending)) {
+    // emit 'finish' at the very end.
+    state.finishing = true;
+    this.emit('finish');
+    state.finished = true;
+    return;
+  }
+
+  // if there's something in the buffer waiting, then do that, too.
+  if (state.buffer.length) {
+    var chunkCb = state.buffer.shift();
+    var chunk = chunkCb[0];
+    cb = chunkCb[1];
+
+    if (false === state.decodeStrings)
+      l = chunk[0].length;
+    else
+      l = chunk.length;
+
+    state.writelen = l;
+    state.writecb = cb;
+    state.writechunk = chunk;
+    state.writing = true;
+    this._write(chunk, state.onwrite);
+  }
+
+  if (state.length <= state.lowWaterMark && state.needDrain) {
+    // Must force callback to be called on nextTick, so that we don't
+    // emit 'drain' before the write() consumer gets the 'false' return
+    // value, and has a chance to attach a 'drain' listener.
+    process.nextTick(function() {
+      if (!state.needDrain)
+        return;
+      state.needDrain = false;
+      this.emit('drain');
+    }.bind(this));
+  }
+}
 
 Writable.prototype._write = function(chunk, cb) {
   process.nextTick(cb.bind(this, new Error('not implemented')));
@@ -178,10 +231,18 @@ Writable.prototype._write = function(chunk, cb) {
 
 Writable.prototype.end = function(chunk, encoding) {
   var state = this._writableState;
+
+  // ignore unnecessary end() calls.
+  if (state.ending || state.ended || state.finished)
+    return;
+
   state.ending = true;
   if (chunk)
     this.write(chunk, encoding);
-  else if (state.length === 0)
+  else if (state.length === 0) {
+    state.finishing = true;
     this.emit('finish');
+    state.finished = true;
+  }
   state.ended = true;
 };