stream: Don't require read(0) to emit 'readable' event
authorisaacs <i@izs.me>
Sun, 3 Mar 2013 00:03:22 +0000 (16:03 -0800)
committerisaacs <i@izs.me>
Mon, 4 Mar 2013 15:38:32 +0000 (07:38 -0800)
When a readable listener is added, call read(0) so that data will flow in, up to
the high water mark.

Otherwise, it's somewhat confusing that you have to listen for readable,
and ALSO call read() (when it will certainly return null) just to get some
data out of the stream.

See: #4720

lib/_stream_readable.js
test/simple/test-stream2-basic.js
test/simple/test-stream2-unpipe-leak.js

index d22795a..17d186f 100644 (file)
@@ -613,17 +613,17 @@ Readable.prototype.unpipe = function(dest) {
   return this;
 };
 
-// kludge for on('data', fn) consumers.  Sad.
-// This is *not* part of the new readable stream interface.
-// It is an ugly unfortunate mess of history.
+// set up data events if they are asked for
+// Ensure readable listeners eventually get something
 Readable.prototype.on = function(ev, fn) {
   var res = Stream.prototype.on.call(this, ev, fn);
 
-  // https://github.com/isaacs/readable-stream/issues/16
-  // if we're already flowing, then no need to set up data events.
   if (ev === 'data' && !this._readableState.flowing)
     emitDataEvents(this);
 
+  if (ev === 'readable')
+    this.read(0);
+
   return res;
 };
 Readable.prototype.addListener = Readable.prototype.on;
index d3b53fd..512231b 100644 (file)
@@ -38,6 +38,7 @@ function TestReader(n) {
 util.inherits(TestReader, R);
 
 TestReader.prototype.read = function(n) {
+  if (n === 0) return null;
   var max = this._buffer.length - this._pos;
   n = n || max;
   n = Math.max(n, 0);
@@ -80,11 +81,6 @@ TestWriter.prototype.write = function(c) {
   this.received.push(c.toString());
   this.emit('write', c);
   return true;
-
-  // flip back and forth between immediate acceptance and not.
-  this.flush = !this.flush;
-  if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10);
-  return this.flush;
 };
 
 TestWriter.prototype.end = function(c) {
@@ -113,6 +109,7 @@ function run() {
   console.log('# %s', name);
   fn({
     same: assert.deepEqual,
+    ok: assert,
     equal: assert.equal,
     end: function () {
       count--;
@@ -187,6 +184,7 @@ test('pipe', function(t) {
 
   var w = new TestWriter;
   var flush = true;
+
   w.on('end', function(received) {
     t.same(received, expect);
     t.end();
@@ -450,3 +448,28 @@ test('sync _read ending', function (t) {
     t.end();
   })
 });
+
+test('adding readable triggers data flow', function(t) {
+  var r = new R({ highWaterMark: 5 });
+  var onReadable = false;
+  var readCalled = 0;
+
+  r._read = function(n) {
+    if (readCalled++ === 2)
+      r.push(null);
+    else
+      r.push(new Buffer('asdf'));
+  };
+
+  var called = false;
+  r.on('readable', function() {
+    onReadable = true;
+    r.read();
+  });
+
+  r.on('end', function() {
+    t.equal(readCalled, 3);
+    t.ok(onReadable);
+    t.end();
+  });
+});
index 993dd16..3ab02e9 100644 (file)
@@ -43,7 +43,7 @@ function TestReader() {
 util.inherits(TestReader, stream.Readable);
 
 TestReader.prototype._read = function(size) {
-  stream.push(new Buffer('hallo'));
+  this.push(new Buffer('hallo'));
 };
 
 var src = new TestReader();