return this;
};
-// kludge for on('data', fn) consumers. Sad.
-// This is *not* part of the new readable stream interface.
-// It is an ugly unfortunate mess of history.
+// set up data events if they are asked for
+// Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
- // https://github.com/isaacs/readable-stream/issues/16
- // if we're already flowing, then no need to set up data events.
if (ev === 'data' && !this._readableState.flowing)
emitDataEvents(this);
+ if (ev === 'readable')
+ this.read(0);
+
return res;
};
Readable.prototype.addListener = Readable.prototype.on;
util.inherits(TestReader, R);
TestReader.prototype.read = function(n) {
+ if (n === 0) return null;
var max = this._buffer.length - this._pos;
n = n || max;
n = Math.max(n, 0);
this.received.push(c.toString());
this.emit('write', c);
return true;
-
- // flip back and forth between immediate acceptance and not.
- this.flush = !this.flush;
- if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10);
- return this.flush;
};
TestWriter.prototype.end = function(c) {
console.log('# %s', name);
fn({
same: assert.deepEqual,
+ ok: assert,
equal: assert.equal,
end: function () {
count--;
var w = new TestWriter;
var flush = true;
+
w.on('end', function(received) {
t.same(received, expect);
t.end();
t.end();
})
});
+
+test('adding readable triggers data flow', function(t) {
+ var r = new R({ highWaterMark: 5 });
+ var onReadable = false;
+ var readCalled = 0;
+
+ r._read = function(n) {
+ if (readCalled++ === 2)
+ r.push(null);
+ else
+ r.push(new Buffer('asdf'));
+ };
+
+ var called = false;
+ r.on('readable', function() {
+ onReadable = true;
+ r.read();
+ });
+
+ r.on('end', function() {
+ t.equal(readCalled, 3);
+ t.ok(onReadable);
+ t.end();
+ });
+});
util.inherits(TestReader, stream.Readable);
TestReader.prototype._read = function(size) {
- stream.push(new Buffer('hallo'));
+ this.push(new Buffer('hallo'));
};
var src = new TestReader();