It's too slow, unfortunately.
this.endEmitted = false;
this.reading = false;
this.sync = false;
- this.onread = onread.bind(stream);
+ this.onread = function(er, data) {
+ onread(stream, er, data);
+ };
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
return ret;
};
-function onread(er, chunk) {
- var state = this._readableState;
+function onread(stream, er, chunk) {
+ var state = stream._readableState;
var sync = state.sync;
state.reading = false;
if (er)
- return this.emit('error', er);
+ return stream.emit('error', er);
if (!chunk || !chunk.length) {
// eof
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
- this.emit('readable');
+ stream.emit('readable');
}
} else
- endReadable(this);
+ endReadable(stream);
}
return;
}
// another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) {
state.reading = true;
- this._read(state.bufferSize, state.onread);
+ stream._read(state.bufferSize, state.onread);
return;
}
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
- this.emit('readable');
+ stream.emit('readable');
}
}
}
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n, cb) {
- process.nextTick(cb.bind(this, new Error('not implemented')));
+ process.nextTick(function() {
+ cb(new Error('not implemented'));
+ });
};
Readable.prototype.pipe = function(dest, pipeOpts) {
// start the flow.
if (!state.flowing) {
state.flowing = true;
- process.nextTick(flow.bind(null, src, pipeOpts));
+ process.nextTick(function() {
+ flow(src, pipeOpts);
+ });
}
return dest;
// at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again.
- src.once('readable', flow.bind(null, src, pipeOpts));
+ src.once('readable', function() {
+ flow(src, pipeOpts);
+ });
}
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var paused = false;
+ var self = this;
stream.on('end', function() {
state.ended = true;
if (state.decoder) {
}
if (state.length > 0)
- this.emit('readable');
+ self.emit('readable');
else
- endReadable(this);
- }.bind(this));
+ endReadable(self);
+ });
stream.on('data', function(chunk) {
if (state.decoder)
state.buffer.push(chunk);
state.length += chunk.length;
- this.emit('readable');
+ self.emit('readable');
// if not consumed, then pause the stream.
if (state.length > state.lowWaterMark && !paused) {
paused = true;
stream.pause();
}
- }.bind(this));
+ });
// proxy all the other methods.
// important when wrapping filters and duplexes.
// proxy certain important events.
var events = ['error', 'close', 'destroy', 'pause', 'resume'];
events.forEach(function(ev) {
- stream.on(ev, this.emit.bind(this, ev));
- }.bind(this));
+ stream.on(ev, self.emit.bind(self, ev));
+ });
// consume some bytes. if not all is consumed, then
// pause the underlying stream.
return;
state.ended = true;
state.endEmitted = true;
- process.nextTick(stream.emit.bind(stream, 'end'));
+ process.nextTick(function() {
+ stream.emit('end');
+ });
}
var util = require('util');
util.inherits(Transform, Duplex);
-function TransformState() {
+function TransformState(stream) {
this.buffer = [];
this.transforming = false;
this.pendingReadCb = null;
+ this.output = function(chunk) {
+ stream._output(chunk);
+ };
}
function Transform(options) {
Duplex.call(this, options);
// bind output so that it can be passed around as a regular function.
- this._output = this._output.bind(this);
+ var stream = this;
// the queue of _write chunks that are pending being transformed
- this._transformState = new TransformState();
+ var ts = this._transformState = new TransformState(stream);
// when the writable side finishes, then flush out anything remaining.
this.once('finish', function() {
if ('function' === typeof this._flush)
- this._flush(this._output, done.bind(this));
+ this._flush(ts.output, function(er) {
+ done(stream, er);
+ });
else
- done.call(this);
+ done(stream);
});
}
var req = ts.buffer.shift();
var chunk = req[0];
var writecb = req[1];
- var output = this._output;
ts.transforming = true;
- this._transform(chunk, output, function(er, data) {
+ this._transform(chunk, ts.output, function(er, data) {
ts.transforming = false;
if (data)
- output(data);
+ ts.output(data);
writecb(er);
- }.bind(this));
+ });
};
Transform.prototype._output = function(chunk) {
}
// otherwise, it's up to us to fill the rs buffer.
- var state = this._readableState;
- var len = state.length;
- state.buffer.push(chunk);
- state.length += chunk.length;
- if (state.needReadable) {
- state.needReadable = false;
+ var rs = this._readableState;
+ var len = rs.length;
+ rs.buffer.push(chunk);
+ rs.length += chunk.length;
+ if (rs.needReadable) {
+ rs.needReadable = false;
this.emit('readable');
}
};
-function done(er) {
+function done(stream, er) {
if (er)
- return this.emit('error', er);
+ return stream.emit('error', er);
// if there's nothing in the write buffer, then that means
// that nothing more will ever be provided
- var ws = this._writableState;
- var rs = this._readableState;
- var ts = this._transformState;
+ var ws = stream._writableState;
+ var rs = stream._readableState;
+ var ts = stream._transformState;
if (ws.length)
throw new Error('calling transform done when ws.length != 0');
// no more data coming from the writable side, we need to emit
// now so that the consumer knows to pick up the tail bits.
if (rs.length && rs.needReadable)
- this.emit('readable');
+ stream.emit('readable');
else if (rs.length === 0)
- this.emit('end');
+ stream.emit('end');
}
this.sync = false;
// the callback that's passed to _write(chunk,cb)
- this.onwrite = onwrite.bind(stream);
+ this.onwrite = function(er) {
+ onwrite(stream, er);
+ };
// the callback that the user supplies to write(chunk,encoding,cb)
this.writecb = null;
return ret;
};
-function onwrite(er) {
- var state = this._writableState;
+function onwrite(stream, er) {
+ var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
var l = state.writelen;
if (er) {
if (cb) {
if (sync)
- process.nextTick(cb.bind(null, er));
+ process.nextTick(function() {
+ cb(er);
+ });
else
cb(er);
} else
- this.emit('error', er);
+ stream.emit('error', er);
return;
}
state.length -= l;
if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
state.finishing = true;
- this.emit('finish');
+ stream.emit('finish');
state.finished = true;
return;
}
state.writecb = cb;
state.writechunk = chunk;
state.writing = true;
- this._write(chunk, state.onwrite);
+ stream._write(chunk, state.onwrite);
}
if (state.length <= state.lowWaterMark && state.needDrain) {
if (!state.needDrain)
return;
state.needDrain = false;
- this.emit('drain');
- }.bind(this));
+ stream.emit('drain');
+ });
}
}
Writable.prototype._write = function(chunk, cb) {
- process.nextTick(cb.bind(this, new Error('not implemented')));
+ process.nextTick(function() {
+ cb(new Error('not implemented'));
+ });
};
Writable.prototype.end = function(chunk, encoding) {