var Stream = require('stream');
var util = require('util');
var assert = require('assert');
+var StringDecoder;
util.inherits(Readable, Stream);
this.pipes = [];
this.flowing = false;
this.ended = false;
+ this.endEmitted = false;
this.stream = stream;
this.reading = false;
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
+
+ this.decoder = null;
+ if (options.encoding) {
+ if (!StringDecoder)
+ StringDecoder = require('string_decoder').StringDecoder;
+ this.decoder = new StringDecoder(options.encoding);
+ }
}
function Readable(options) {
Stream.apply(this);
}
+// backwards compatibility.
+Readable.prototype.setEncoding = function(enc) {
+ if (!StringDecoder)
+ StringDecoder = require('string_decoder').StringDecoder;
+ this._readableState.decoder = new StringDecoder(enc);
+};
+
// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
var state = this._readableState;
if (state.length === 0 && state.ended) {
- process.nextTick(this.emit.bind(this, 'end'));
+ endReadable(this);
return null;
}
n = state.length;
}
- var ret = n > 0 ? fromList(n, state.buffer, state.length) : null;
+
+ var ret;
+ if (n > 0)
+ ret = fromList(n, state.buffer, state.length, !!state.decoder);
+ else
+ ret = null;
if (ret === null || ret.length === 0)
state.needReadable = true;
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
this.emit('readable');
+ else
+ endReadable(this);
return;
}
+ if (state.decoder)
+ chunk = state.decoder.write(chunk);
+
state.length += chunk.length;
state.buffer.push(chunk);
- if (state.length < state.lowWaterMark)
+
+ // if we haven't gotten enough to pass the lowWaterMark,
+ // and we haven't ended, then don't bother telling the user
+ // that it's time to read more data. Otherwise, that'll
+ // probably kick off another stream.read(), which can trigger
+ // another _read(n,cb) before this one returns!
+ if (state.length < state.lowWaterMark) {
this._read(state.bufferSize, onread.bind(this));
+ return;
+ }
// now we have something to call this.read() to get.
if (state.needReadable) {
stream.on('end', function() {
state.ended = true;
if (state.length === 0)
- this.emit('end');
+ endReadable(this);
}.bind(this));
stream.on('data', function(chunk) {
n = state.length;
}
- var ret = fromList(n, state.buffer, state.length);
+ var ret = fromList(n, state.buffer, state.length, !!state.decoder);
state.length -= n;
if (state.length < state.lowWaterMark && paused) {
}
if (state.length === 0 && state.ended)
- process.nextTick(this.emit.bind(this, 'end'));
+ endReadable(this);
return ret;
};
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
-// If there's no data, then
-function fromList(n, list, length) {
+function fromList(n, list, length, stringMode) {
var ret;
// nothing in the list, definitely empty.
return null;
}
- if (length === 0) {
+ if (length === 0)
ret = null;
- } else if (!n || n >= length) {
+ else if (!n || n >= length) {
// read it all, truncate the array.
- ret = Buffer.concat(list, length);
+ if (stringMode)
+ ret = list.join('');
+ else
+ ret = Buffer.concat(list, length);
list.length = 0;
} else {
// read just some of it.
if (n < list[0].length) {
// just take a part of the first list item.
+ // slice is the same for buffers and strings.
var buf = list[0];
ret = buf.slice(0, n);
list[0] = buf.slice(n);
} else {
// complex case.
// we have enough to cover it, but it spans past the first buffer.
- ret = new Buffer(n);
+ if (stringMode)
+ ret = '';
+ else
+ ret = new Buffer(n);
+
var c = 0;
for (var i = 0, l = list.length; i < l && c < n; i++) {
var buf = list[0];
var cpy = Math.min(n - c, buf.length);
- buf.copy(ret, c, 0, cpy);
- if (cpy < buf.length) {
+
+ if (stringMode)
+ ret += buf.slice(0, cpy);
+ else
+ buf.copy(ret, c, 0, cpy);
+
+ if (cpy < buf.length)
list[0] = buf.slice(cpy);
- } else {
+ else
list.shift();
- }
+
c += cpy;
}
}
return ret;
}
+
+function endReadable(stream) {
+ var state = stream._readableState;
+ if (state.endEmitted)
+ return;
+ state.ended = true;
+ state.endEmitted = true;
+ process.nextTick(stream.emit.bind(stream, 'end'));
+}