streams2: Support write(chunk,[encoding],[callback])
authorisaacs <i@izs.me>
Wed, 10 Oct 2012 00:31:29 +0000 (17:31 -0700)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:27 +0000 (17:00 -0800)
lib/_stream_writable.js

index 4eb2a5f..a84cbc3 100644 (file)
@@ -73,13 +73,18 @@ function Writable(options) {
 
 // Override this method for sync streams
 // override the _write(chunk, cb) method for async streams
-Writable.prototype.write = function(chunk, encoding) {
+Writable.prototype.write = function(chunk, encoding, cb) {
   var state = this._writableState;
   if (state.ended) {
     this.emit('error', new Error('write after end'));
     return;
   }
 
+  if (typeof encoding === 'function') {
+    cb = encoding;
+    encoding = null;
+  }
+
   var l = chunk.length;
   if (false === state.decodeStrings)
     chunk = [chunk, encoding];
@@ -94,24 +99,43 @@ Writable.prototype.write = function(chunk, encoding) {
   if (ret === false)
     state.needDrain = true;
 
+  // if we're already writing something, then just put this
+  // in the queue, and wait our turn.
   if (state.writing) {
-    state.buffer.push(chunk);
+    state.buffer.push([chunk, cb]);
     return ret;
   }
 
   state.writing = true;
+  var sync = true;
   this._write(chunk, writecb.bind(this));
+  sync = false;
 
   return ret;
 
   function writecb(er) {
     state.writing = false;
     if (er) {
-      this.emit('error', er);
+      if (cb) {
+        if (sync)
+          process.nextTick(cb.bind(null, er));
+        else
+          cb(er);
+      } else
+        this.emit('error', er);
       return;
     }
     state.length -= l;
 
+    if (cb) {
+      // don't call the cb until the next tick if we're in sync mode.
+      // also, defer if we're about to write some more right now.
+      if (sync || state.buffer.length)
+        process.nextTick(cb);
+      else
+        cb();
+    }
+
     if (state.length === 0 && (state.ended || state.ending)) {
       // emit 'finish' at the very end.
       this.emit('finish');
@@ -120,8 +144,15 @@ Writable.prototype.write = function(chunk, encoding) {
 
     // if there's something in the buffer waiting, then do that, too.
     if (state.buffer.length) {
-      chunk = state.buffer.shift();
-      l = chunk.length;
+      var chunkCb = state.buffer.shift();
+      chunk = chunkCb[0];
+      cb = chunkCb[1];
+
+      if (false === state.decodeStrings)
+        l = chunk[0].length;
+      else
+        l = chunk.length;
+
       state.writing = true;
       this._write(chunk, writecb.bind(this));
     }