stream: Use push() for readable.wrap()
authorisaacs <i@izs.me>
Tue, 8 Jan 2013 03:40:08 +0000 (19:40 -0800)
committerisaacs <i@izs.me>
Thu, 10 Jan 2013 21:49:53 +0000 (13:49 -0800)
lib/_stream_readable.js

index bd9126b..6dac946 100644 (file)
@@ -611,16 +611,11 @@ Readable.prototype.wrap = function(stream) {
     state.ended = true;
     if (state.decoder) {
       var chunk = state.decoder.end();
-      if (chunk && chunk.length) {
-        state.buffer.push(chunk);
-        state.length += chunk.length;
-      }
+      if (chunk && chunk.length)
+        self.push(chunk);
     }
 
-    if (state.length > 0)
-      self.emit('readable');
-    else
-      endReadable(self);
+    self.push(null);
   });
 
   stream.on('data', function(chunk) {
@@ -629,12 +624,8 @@ Readable.prototype.wrap = function(stream) {
     if (!chunk || !chunk.length)
       return;
 
-    state.buffer.push(chunk);
-    state.length += chunk.length;
-    self.emit('readable');
-
-    // if not consumed, then pause the stream.
-    if (state.length > state.lowWaterMark && !paused) {
+    var ret = self.push(chunk);
+    if (!ret) {
       paused = true;
       stream.pause();
     }
@@ -657,40 +648,13 @@ Readable.prototype.wrap = function(stream) {
     stream.on(ev, self.emit.bind(self, ev));
   });
 
-  // consume some bytes.  if not all is consumed, then
-  // pause the underlying stream.
-  this.read = function(n) {
-    if (state.length === 0) {
-      state.needReadable = true;
-      return null;
-    }
-
-    if (isNaN(n) || n <= 0)
-      n = state.length;
-
-    if (n > state.length) {
-      if (!state.ended) {
-        state.needReadable = true;
-        return null;
-      } else
-        n = state.length;
-    }
-
-    var ret = fromList(n, state.buffer, state.length, !!state.decoder);
-    state.length -= n;
-
-    if (state.length === 0 && !state.ended)
-      state.needReadable = true;
-
-    if (state.length <= state.lowWaterMark && paused) {
+  // when we try to consume some more bytes, simply unpause the
+  // underlying stream.
+  self._read = function(n, cb) {
+    if (paused) {
       stream.resume();
       paused = false;
     }
-
-    if (state.length === 0 && state.ended)
-      endReadable(this);
-
-    return ret;
   };
 };