* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
* return {Boolean} Whether or not more pushes should be performed
+Note: **This function should usually be called by Readable consumers,
+NOT by implementors of Readable subclasses.** It does not indicate
+the end of a `_read()` transaction in the way that
+`readable.push(chunk)` does. If you find that you have to call
+`this.unshift(chunk)` in your Readable class, then there's a good
+chance you ought to be using the
+[stream.Transform](#stream_class_stream_transform) class instead.
+
This is the corollary of `readable.push(chunk)`. Rather than putting
the data at the *end* of the read queue, it puts it at the *front* of
the read queue.
-This is useful in certain use-cases where a stream is being consumed
-by a parser, which needs to "un-consume" some data that it has
-optimistically pulled out of the source.
+This is useful in certain cases where a stream is being consumed by a
+parser, which needs to "un-consume" some data that it has
+optimistically pulled out of the source, so that the stream can be
+passed on to some other party.
```javascript
// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
-// Note: This can be done more simply as a Transform stream. See below.
+// NOTE: This can be done more simply as a Transform stream!
+// Using Readable directly for this is sub-optimal. See the
+// alternative example below under the Transform section.
function SimpleProtocol(source, options) {
if (!(this instanceof SimpleProtocol))
Readable.prototype.unshift = function(chunk) {
var state = this._readableState;
- if (typeof chunk === 'string' && !state.objectMode)
- chunk = new Buffer(chunk, arguments[1]);
return readableAddChunk(this, state, chunk, true);
};
function readableAddChunk(stream, state, chunk, addToFront) {
- state.reading = false;
-
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (chunk === null || chunk === undefined) {
- onEofChunk(stream, state);
+ state.reading = false;
+ if (!state.ended)
+ onEofChunk(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) {
- if (state.decoder)
- chunk = state.decoder.write(chunk);
+ if (state.ended && !addToFront) {
+ var e = new Error('stream.push() after EOF');
+ stream.emit('error', e);
+ } else if (state.endEmitted && addToFront) {
+ var e = new Error('stream.unshift() after end event');
+ stream.emit('error', e);
+ } else {
+ if (state.decoder && !addToFront)
+ chunk = state.decoder.write(chunk);
- // update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront)
- state.buffer.unshift(chunk);
- else
- state.buffer.push(chunk);
+ // update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront) {
+ state.buffer.unshift(chunk);
+ } else {
+ state.reading = false;
+ state.buffer.push(chunk);
+ }
- if (state.needReadable)
- emitReadable(stream);
+ if (state.needReadable)
+ emitReadable(stream);
- maybeReadMore(stream, state);
+ maybeReadMore(stream, state);
+ }
+ } else {
+ state.reading = false;
}
return needMoreData(state);
if (!state.endEmitted && state.calledRead) {
state.ended = true;
- state.endEmitted = true;
process.nextTick(function() {
- stream.readable = false;
- stream.emit('end');
+ // Check that we didn't get one last unshift.
+ if (!state.endEmitted && state.length === 0) {
+ state.endEmitted = true;
+ stream.readable = false;
+ stream.emit('end');
+ }
});
}
}
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+// This test verifies that:
+// 1. unshift() does not cause colliding _read() calls.
+// 2. unshift() after the 'end' event is an error, but after the EOF
+// signalling null, it is ok, and just creates a new readable chunk.
+// 3. push() after the EOF signaling null is an error.
+// 4. _read() is not called after pushing the EOF null chunk.
+
+var stream = require('stream');
+var hwm = 10;
+var r = stream.Readable({ highWaterMark: hwm });
+var chunks = 10;
+var t = (chunks * 5);
+
+var data = new Buffer(chunks * hwm + Math.ceil(hwm / 2));
+for (var i = 0; i < data.length; i++) {
+ var c = 'asdf'.charCodeAt(i % 4);
+ data[i] = c;
+}
+
+var pos = 0;
+var pushedNull = false;
+r._read = function(n) {
+ assert(!pushedNull, '_read after null push');
+
+ // every third chunk is fast
+ push(!(chunks % 3));
+
+ function push(fast) {
+ assert(!pushedNull, 'push() after null push');
+ var c = pos >= data.length ? null : data.slice(pos, pos + n);
+ pushedNull = c === null;
+ if (fast) {
+ pos += n;
+ r.push(c);
+ if (c === null) pushError();
+ } else {
+ setTimeout(function() {
+ pos += n;
+ r.push(c);
+ if (c === null) pushError();
+ });
+ }
+ }
+};
+
+function pushError() {
+ assert.throws(function() {
+ r.push(new Buffer(1));
+ });
+}
+
+
+var w = stream.Writable();
+var written = [];
+w._write = function(chunk, encoding, cb) {
+ written.push(chunk.toString());
+ cb();
+};
+
+var ended = false;
+r.on('end', function() {
+ assert(!ended, 'end emitted more than once');
+ assert.throws(function() {
+ r.unshift(new Buffer(1));
+ });
+ ended = true;
+ w.end();
+});
+
+r.on('readable', function() {
+ var chunk;
+ while (null !== (chunk = r.read(10))) {
+ w.write(chunk);
+ if (chunk.length > 4)
+ r.unshift(new Buffer('1234'));
+ }
+});
+
+var finished = false;
+w.on('finish', function() {
+ finished = true;
+ // each chunk should start with 1234, and then be asfdasdfasdf...
+ // The first got pulled out before the first unshift('1234'), so it's
+ // lacking that piece.
+ assert.equal(written[0], 'asdfasdfas');
+ var asdf = 'd';
+ console.error('0: %s', written[0]);
+ for (var i = 1; i < written.length; i++) {
+ console.error('%s: %s', i.toString(32), written[i]);
+ assert.equal(written[i].slice(0, 4), '1234');
+ for (var j = 4; j < written[i].length; j++) {
+ var c = written[i].charAt(j);
+ assert.equal(c, asdf);
+ switch (asdf) {
+ case 'a': asdf = 's'; break;
+ case 's': asdf = 'd'; break;
+ case 'd': asdf = 'f'; break;
+ case 'f': asdf = 'a'; break;
+ }
+ }
+ }
+});
+
+process.on('exit', function() {
+ assert.equal(written.length, 18);
+ assert(ended, 'stream ended');
+ assert(finished, 'stream finished');
+ console.log('ok');
+});