stream: Fix unshift() race conditions
authorisaacs <i@izs.me>
Thu, 11 Apr 2013 22:01:26 +0000 (15:01 -0700)
committerisaacs <i@izs.me>
Thu, 11 Apr 2013 23:12:48 +0000 (16:12 -0700)
Fix #5272

The consumption of a readable stream is a dance with 3 partners.

1. The specific stream Author (A)
2. The Stream Base class (B), and
3. The Consumer of the stream (C)

When B calls the _read() method that A implements, it sets a 'reading'
flag, so that parallel calls to _read() can be avoided.  When A calls
stream.push(), B knows that it's safe to start calling _read() again.

If the consumer C is some kind of parser that wants in some cases to
pass the source stream off to some other party, but not before "putting
back" some bit of previously consumed data (as in the case of Node's
websocket http upgrade implementation).  So, stream.unshift() will
generally *never* be called by A, but *only* called by C.

Prior to this patch, stream.unshift() *also* unset the state.reading
flag, meaning that C could indicate the end of a read, and B would
dutifully fire off another _read() call to A.  This is inappropriate.
In the case of fs streams, and other variably-laggy streams that don't
tolerate overlapped _read() calls, this causes big problems.

Also, calling stream.shift() after the 'end' event did not raise any
kind of error, but would cause very strange behavior indeed.  Calling it
after the EOF chunk was seen, but before the 'end' event was fired would
also cause weird behavior, and could lead to data being lost, since it
would not emit another 'readable' event.

This change makes it so that:

1. stream.unshift() does *not* set state.reading = false
2. stream.unshift() is allowed up until the 'end' event.
3. unshifting onto a EOF-encountered and zero-length (but not yet
end-emitted) stream will defer the 'end' event until the new data is
consumed.
4. pushing onto a EOF-encountered stream is now an error.

So, if you read(), you have that single tick to safely unshift() data
back into the stream, even if the null chunk was pushed, and the length
was 0.

doc/api/stream.markdown
lib/_stream_readable.js
test/simple/test-stream-unshift-read-race.js [new file with mode: 0644]

index 2c9fc1c6c4c60b9a0a34adcc0ee6b21cc6b194d1..ce7656b8d7001df60085958f7df18c28abc72d69 100644 (file)
@@ -184,20 +184,31 @@ stream._read = function(n) {
 * `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
 * return {Boolean} Whether or not more pushes should be performed
 
+Note: **This function should usually be called by Readable consumers,
+NOT by implementors of Readable subclasses.**  It does not indicate
+the end of a `_read()` transaction in the way that
+`readable.push(chunk)` does.  If you find that you have to call
+`this.unshift(chunk)` in your Readable class, then there's a good
+chance you ought to be using the
+[stream.Transform](#stream_class_stream_transform) class instead.
+
 This is the corollary of `readable.push(chunk)`.  Rather than putting
 the data at the *end* of the read queue, it puts it at the *front* of
 the read queue.
 
-This is useful in certain use-cases where a stream is being consumed
-by a parser, which needs to "un-consume" some data that it has
-optimistically pulled out of the source.
+This is useful in certain cases where a stream is being consumed by a
+parser, which needs to "un-consume" some data that it has
+optimistically pulled out of the source, so that the stream can be
+passed on to some other party.
 
 ```javascript
 // A parser for a simple data protocol.
 // The "header" is a JSON object, followed by 2 \n characters, and
 // then a message body.
 //
-// Note: This can be done more simply as a Transform stream.  See below.
+// NOTE: This can be done more simply as a Transform stream!
+// Using Readable directly for this is sub-optimal.  See the
+// alternative example below under the Transform section.
 
 function SimpleProtocol(source, options) {
   if (!(this instanceof SimpleProtocol))
index e30a327f9dc71fca5022a5a59de193acbc76cb5e..070628288eef2f6ac2769349b1eecad8794b427f 100644 (file)
@@ -115,34 +115,44 @@ Readable.prototype.push = function(chunk) {
 
 Readable.prototype.unshift = function(chunk) {
   var state = this._readableState;
-  if (typeof chunk === 'string' && !state.objectMode)
-    chunk = new Buffer(chunk, arguments[1]);
   return readableAddChunk(this, state, chunk, true);
 };
 
 function readableAddChunk(stream, state, chunk, addToFront) {
-  state.reading = false;
-
   var er = chunkInvalid(state, chunk);
   if (er) {
     stream.emit('error', er);
   } else if (chunk === null || chunk === undefined) {
-    onEofChunk(stream, state);
+    state.reading = false;
+    if (!state.ended)
+      onEofChunk(stream, state);
   } else if (state.objectMode || chunk && chunk.length > 0) {
-    if (state.decoder)
-      chunk = state.decoder.write(chunk);
+    if (state.ended && !addToFront) {
+      var e = new Error('stream.push() after EOF');
+      stream.emit('error', e);
+    } else if (state.endEmitted && addToFront) {
+      var e = new Error('stream.unshift() after end event');
+      stream.emit('error', e);
+    } else {
+      if (state.decoder && !addToFront)
+        chunk = state.decoder.write(chunk);
 
-    // update the buffer info.
-    state.length += state.objectMode ? 1 : chunk.length;
-    if (addToFront)
-      state.buffer.unshift(chunk);
-    else
-      state.buffer.push(chunk);
+      // update the buffer info.
+      state.length += state.objectMode ? 1 : chunk.length;
+      if (addToFront) {
+        state.buffer.unshift(chunk);
+      } else {
+        state.reading = false;
+        state.buffer.push(chunk);
+      }
 
-    if (state.needReadable)
-      emitReadable(stream);
+      if (state.needReadable)
+        emitReadable(stream);
 
-    maybeReadMore(stream, state);
+      maybeReadMore(stream, state);
+    }
+  } else {
+    state.reading = false;
   }
 
   return needMoreData(state);
@@ -877,10 +887,13 @@ function endReadable(stream) {
 
   if (!state.endEmitted && state.calledRead) {
     state.ended = true;
-    state.endEmitted = true;
     process.nextTick(function() {
-      stream.readable = false;
-      stream.emit('end');
+      // Check that we didn't get one last unshift.
+      if (!state.endEmitted && state.length === 0) {
+        state.endEmitted = true;
+        stream.readable = false;
+        stream.emit('end');
+      }
     });
   }
 }
diff --git a/test/simple/test-stream-unshift-read-race.js b/test/simple/test-stream-unshift-read-race.js
new file mode 100644 (file)
index 0000000..83fd9fa
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+// This test verifies that:
+// 1. unshift() does not cause colliding _read() calls.
+// 2. unshift() after the 'end' event is an error, but after the EOF
+//    signalling null, it is ok, and just creates a new readable chunk.
+// 3. push() after the EOF signaling null is an error.
+// 4. _read() is not called after pushing the EOF null chunk.
+
+var stream = require('stream');
+var hwm = 10;
+var r = stream.Readable({ highWaterMark: hwm });
+var chunks = 10;
+var t = (chunks * 5);
+
+var data = new Buffer(chunks * hwm + Math.ceil(hwm / 2));
+for (var i = 0; i < data.length; i++) {
+  var c = 'asdf'.charCodeAt(i % 4);
+  data[i] = c;
+}
+
+var pos = 0;
+var pushedNull = false;
+r._read = function(n) {
+  assert(!pushedNull, '_read after null push');
+
+  // every third chunk is fast
+  push(!(chunks % 3));
+
+  function push(fast) {
+    assert(!pushedNull, 'push() after null push');
+    var c = pos >= data.length ? null : data.slice(pos, pos + n);
+    pushedNull = c === null;
+    if (fast) {
+      pos += n;
+      r.push(c);
+      if (c === null) pushError();
+    } else {
+      setTimeout(function() {
+        pos += n;
+        r.push(c);
+        if (c === null) pushError();
+      });
+    }
+  }
+};
+
+function pushError() {
+  assert.throws(function() {
+    r.push(new Buffer(1));
+  });
+}
+
+
+var w = stream.Writable();
+var written = [];
+w._write = function(chunk, encoding, cb) {
+  written.push(chunk.toString());
+  cb();
+};
+
+var ended = false;
+r.on('end', function() {
+  assert(!ended, 'end emitted more than once');
+  assert.throws(function() {
+    r.unshift(new Buffer(1));
+  });
+  ended = true;
+  w.end();
+});
+
+r.on('readable', function() {
+  var chunk;
+  while (null !== (chunk = r.read(10))) {
+    w.write(chunk);
+    if (chunk.length > 4)
+      r.unshift(new Buffer('1234'));
+  }
+});
+
+var finished = false;
+w.on('finish', function() {
+  finished = true;
+  // each chunk should start with 1234, and then be asfdasdfasdf...
+  // The first got pulled out before the first unshift('1234'), so it's
+  // lacking that piece.
+  assert.equal(written[0], 'asdfasdfas');
+  var asdf = 'd';
+  console.error('0: %s', written[0]);
+  for (var i = 1; i < written.length; i++) {
+    console.error('%s: %s', i.toString(32), written[i]);
+    assert.equal(written[i].slice(0, 4), '1234');
+    for (var j = 4; j < written[i].length; j++) {
+      var c = written[i].charAt(j);
+      assert.equal(c, asdf);
+      switch (asdf) {
+        case 'a': asdf = 's'; break;
+        case 's': asdf = 'd'; break;
+        case 'd': asdf = 'f'; break;
+        case 'f': asdf = 'a'; break;
+      }
+    }
+  }
+});
+
+process.on('exit', function() {
+  assert.equal(written.length, 18);
+  assert(ended, 'stream ended');
+  assert(finished, 'stream finished');
+  console.log('ok');
+});