streams2: setEncoding and abstract out endReadable
authorisaacs <i@izs.me>
Wed, 3 Oct 2012 23:52:14 +0000 (16:52 -0700)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 01:00:24 +0000 (17:00 -0800)
lib/_stream_readable.js

index 7b2d76c..db76ab7 100644 (file)
@@ -24,6 +24,7 @@ module.exports = Readable;
 var Stream = require('stream');
 var util = require('util');
 var assert = require('assert');
+var StringDecoder;
 
 util.inherits(Readable, Stream);
 
@@ -41,12 +42,20 @@ function ReadableState(options, stream) {
   this.pipes = [];
   this.flowing = false;
   this.ended = false;
+  this.endEmitted = false;
   this.stream = stream;
   this.reading = false;
 
   // whenever we return null, then we set a flag to say
   // that we're awaiting a 'readable' event emission.
   this.needReadable = false;
+
+  this.decoder = null;
+  if (options.encoding) {
+    if (!StringDecoder)
+      StringDecoder = require('string_decoder').StringDecoder;
+    this.decoder = new StringDecoder(options.encoding);
+  }
 }
 
 function Readable(options) {
@@ -54,12 +63,19 @@ function Readable(options) {
   Stream.apply(this);
 }
 
+// backwards compatibility.
+Readable.prototype.setEncoding = function(enc) {
+  if (!StringDecoder)
+    StringDecoder = require('string_decoder').StringDecoder;
+  this._readableState.decoder = new StringDecoder(enc);
+};
+
 // you can override either this method, or _read(n, cb) below.
 Readable.prototype.read = function(n) {
   var state = this._readableState;
 
   if (state.length === 0 && state.ended) {
-    process.nextTick(this.emit.bind(this, 'end'));
+    endReadable(this);
     return null;
   }
 
@@ -85,7 +101,12 @@ Readable.prototype.read = function(n) {
       n = state.length;
   }
 
-  var ret = n > 0 ? fromList(n, state.buffer, state.length) : null;
+
+  var ret;
+  if (n > 0)
+    ret = fromList(n, state.buffer, state.length, !!state.decoder);
+  else
+    ret = null;
 
   if (ret === null || ret.length === 0)
     state.needReadable = true;
@@ -108,13 +129,26 @@ Readable.prototype.read = function(n) {
         // 'readable' now to make sure it gets picked up.
         if (state.length > 0)
           this.emit('readable');
+        else
+          endReadable(this);
         return;
       }
 
+      if (state.decoder)
+        chunk = state.decoder.write(chunk);
+
       state.length += chunk.length;
       state.buffer.push(chunk);
-      if (state.length < state.lowWaterMark)
+
+      // if we haven't gotten enough to pass the lowWaterMark,
+      // and we haven't ended, then don't bother telling the user
+      // that it's time to read more data.  Otherwise, that'll
+      // probably kick off another stream.read(), which can trigger
+      // another _read(n,cb) before this one returns!
+      if (state.length < state.lowWaterMark) {
         this._read(state.bufferSize, onread.bind(this));
+        return;
+      }
 
       // now we have something to call this.read() to get.
       if (state.needReadable) {
@@ -309,7 +343,7 @@ Readable.prototype.wrap = function(stream) {
   stream.on('end', function() {
     state.ended = true;
     if (state.length === 0)
-      this.emit('end');
+      endReadable(this);
   }.bind(this));
 
   stream.on('data', function(chunk) {
@@ -360,7 +394,7 @@ Readable.prototype.wrap = function(stream) {
         n = state.length;
     }
 
-    var ret = fromList(n, state.buffer, state.length);
+    var ret = fromList(n, state.buffer, state.length, !!state.decoder);
     state.length -= n;
 
     if (state.length < state.lowWaterMark && paused) {
@@ -369,7 +403,7 @@ Readable.prototype.wrap = function(stream) {
     }
 
     if (state.length === 0 && state.ended)
-      process.nextTick(this.emit.bind(this, 'end'));
+      endReadable(this);
 
     return ret;
   };
@@ -382,8 +416,7 @@ Readable._fromList = fromList;
 
 // Pluck off n bytes from an array of buffers.
 // Length is the combined lengths of all the buffers in the list.
-// If there's no data, then 
-function fromList(n, list, length) {
+function fromList(n, list, length, stringMode) {
   var ret;
 
   // nothing in the list, definitely empty.
@@ -391,16 +424,20 @@ function fromList(n, list, length) {
     return null;
   }
 
-  if (length === 0) {
+  if (length === 0)
     ret = null;
-  else if (!n || n >= length) {
+  else if (!n || n >= length) {
     // read it all, truncate the array.
-    ret = Buffer.concat(list, length);
+    if (stringMode)
+      ret = list.join('');
+    else
+      ret = Buffer.concat(list, length);
     list.length = 0;
   } else {
     // read just some of it.
     if (n < list[0].length) {
       // just take a part of the first list item.
+      // slice is the same for buffers and strings.
       var buf = list[0];
       ret = buf.slice(0, n);
       list[0] = buf.slice(n);
@@ -410,17 +447,26 @@ function fromList(n, list, length) {
     } else {
       // complex case.
       // we have enough to cover it, but it spans past the first buffer.
-      ret = new Buffer(n);
+      if (stringMode)
+        ret = '';
+      else
+        ret = new Buffer(n);
+
       var c = 0;
       for (var i = 0, l = list.length; i < l && c < n; i++) {
         var buf = list[0];
         var cpy = Math.min(n - c, buf.length);
-        buf.copy(ret, c, 0, cpy);
-        if (cpy < buf.length) {
+
+        if (stringMode)
+          ret += buf.slice(0, cpy);
+        else
+          buf.copy(ret, c, 0, cpy);
+
+        if (cpy < buf.length)
           list[0] = buf.slice(cpy);
-        } else {
+        else
           list.shift();
-        }
+
         c += cpy;
       }
     }
@@ -428,3 +474,12 @@ function fromList(n, list, length) {
 
   return ret;
 }
+
+function endReadable(stream) {
+  var state = stream._readableState;
+  if (state.endEmitted)
+    return;
+  state.ended = true;
+  state.endEmitted = true;
+  process.nextTick(stream.emit.bind(stream, 'end'));
+}