stream: make Readable.wrap support objectMode
authorDaniel Moore <polaris@northhorizon.net>
Mon, 29 Apr 2013 13:25:08 +0000 (09:25 -0400)
committerisaacs <i@izs.me>
Wed, 8 May 2013 18:59:28 +0000 (11:59 -0700)
Added a check to see if the stream is in objectMode before deciding
whether to include or exclude data from an old-style wrapped stream.

lib/_stream_readable.js
test/simple/test-stream2-readable-wrap.js

index 070628288eef2f6ac2769349b1eecad8794b427f..63babe42655ececb4ba17b6c6bc2a33ded2b0b4a 100644 (file)
@@ -767,7 +767,7 @@ Readable.prototype.wrap = function(stream) {
   stream.on('data', function(chunk) {
     if (state.decoder)
       chunk = state.decoder.write(chunk);
-    if (!chunk || !chunk.length)
+    if (!chunk || !state.objectMode && !chunk.length)
       return;
 
     var ret = self.push(chunk);
index 5fa5d185b55217fbf089c309dd25191219b47fe5..6b272be46bf06fae68cba9defc1b98bebbbdc91c 100644 (file)
@@ -26,69 +26,83 @@ var Readable = require('_stream_readable');
 var Writable = require('_stream_writable');
 var EE = require('events').EventEmitter;
 
-var old = new EE;
-var r = new Readable({ highWaterMark: 10 });
-assert.equal(r, r.wrap(old));
+var testRuns = 0, completedRuns = 0;
+function runTest(highWaterMark, objectMode, produce) {
+  testRuns++;
 
-var ended = false;
-r.on('end', function() {
-  ended = true;
-});
+  var old = new EE;
+  var r = new Readable({ highWaterMark: highWaterMark, objectMode: objectMode });
+  assert.equal(r, r.wrap(old));
 
-var pauses = 0;
-var resumes = 0;
+  var ended = false;
+  r.on('end', function() {
+    ended = true;
+  });
 
-old.pause = function() {
-  pauses++;
-  old.emit('pause');
-  flowing = false;
-};
+  var pauses = 0;
+  var resumes = 0;
 
-old.resume = function() {
-  resumes++;
-  old.emit('resume');
-  flow();
-};
-
-var flowing;
-var chunks = 10;
-var oldEnded = false;
-function flow() {
-  flowing = true;
-  while (flowing && chunks-- > 0) {
-    old.emit('data', new Buffer('xxxxxxxxxx'));
-  }
-  if (chunks <= 0) {
-    oldEnded = true;
-    old.emit('end');
+  old.pause = function() {
+    pauses++;
+    old.emit('pause');
+    flowing = false;
+  };
+
+  old.resume = function() {
+    resumes++;
+    old.emit('resume');
+    flow();
+  };
+
+  var flowing;
+  var chunks = 10;
+  var oldEnded = false;
+  var expected = [];
+  function flow() {
+    flowing = true;
+    while (flowing && chunks-- > 0) {
+      var item = produce();
+      expected.push(item);
+      console.log('emit', chunks);
+      old.emit('data', item);
+    }
+    if (chunks <= 0) {
+      oldEnded = true;
+      console.log('old end', chunks, flowing);
+      old.emit('end');
+    }
   }
-}
 
-var w = new Writable({ highWaterMark: 20 });
-var written = [];
-w._write = function(chunk, encoding, cb) {
-  written.push(chunk.toString());
-  setTimeout(cb);
-};
+  var w = new Writable({ highWaterMark: highWaterMark * 2, objectMode: objectMode });
+  var written = [];
+  w._write = function(chunk, encoding, cb) {
+    console.log(chunk);
+    written.push(chunk);
+    setTimeout(cb);
+  };
 
-var finished = false;
-w.on('finish', function() {
-  finished = true;
-});
+  w.on('finish', function() {
+    completedRuns++;
+    performAsserts();
+  });
 
+  r.pipe(w);
 
-var expect = new Array(11).join('xxxxxxxxxx');
+  flow();
 
-r.pipe(w);
+  function performAsserts() { 
+    assert(ended);
+    assert(oldEnded);
+    assert.deepEqual(written, expected);
+    assert.equal(pauses, 10);
+    assert.equal(resumes, 9);
+  }
+}
 
-flow();
+runTest(10, false, function(){ return new Buffer('xxxxxxxxxx'); });
+runTest(1, true, function(){ return { foo: 'bar' }; });
 
 process.on('exit', function() {
-  assert.equal(pauses, 10);
-  assert.equal(resumes, 9);
-  assert(ended);
-  assert(finished);
-  assert(oldEnded);
-  assert.equal(written.join(''), expect);
+  assert.equal(testRuns, completedRuns);
   console.log('ok');
 });