streams2: Handle sync read callbacks nicely
authorisaacs <i@izs.me>
Thu, 24 Jan 2013 01:52:45 +0000 (17:52 -0800)
committerisaacs <i@izs.me>
Thu, 24 Jan 2013 15:49:27 +0000 (07:49 -0800)
lib/_stream_readable.js
test/simple/test-stream2-read-sync-stack.js [new file with mode: 0644]

index a592180..f57be90 100644 (file)
@@ -286,16 +286,9 @@ function onread(stream, er, chunk) {
 
     // if we've ended and we have some data left, then emit
     // 'readable' now to make sure it gets picked up.
-    if (!sync) {
-      if (state.length > 0) {
-        state.needReadable = false;
-        if (!state.emittedReadable) {
-          state.emittedReadable = true;
-          stream.emit('readable');
-        }
-      } else
-        endReadable(stream);
-    } else
+    if (state.length > 0)
+      emitReadable(stream);
+    else
       endReadable(stream);
     return;
   }
@@ -320,15 +313,29 @@ function onread(stream, er, chunk) {
     return;
   }
 
-  if (state.needReadable && !sync) {
-    state.needReadable = false;
-    if (!state.emittedReadable) {
-      state.emittedReadable = true;
-      stream.emit('readable');
-    }
+  // Don't emit readable right away in sync mode, because this can trigger
+  // another read() call => stack overflow.  This way, it might trigger
+  // a nextTick recursion warning, but that's not so bad.
+  if (state.needReadable) {
+    if (!sync)
+      emitReadable(stream);
+    else
+      process.nextTick(function() {
+        emitReadable(stream);
+      });
   }
 }
 
+function emitReadable(stream) {
+  var state = stream._readableState;
+  state.needReadable = false;
+  if (state.emittedReadable)
+    return;
+
+  state.emittedReadable = true;
+  stream.emit('readable');
+}
+
 // abstract method.  to be overridden in specific implementation classes.
 // call cb(er, data) where data is <= n in length.
 // for virtual (non-string, non-buffer) streams, "length" is somewhat
diff --git a/test/simple/test-stream2-read-sync-stack.js b/test/simple/test-stream2-read-sync-stack.js
new file mode 100644 (file)
index 0000000..4e5ab17
--- /dev/null
@@ -0,0 +1,54 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var Readable = require('stream').Readable;
+var r = new Readable();
+var N = 256 * 1024;
+
+// Go ahead and allow the pathological case for this test.
+// Yes, it's an infinite loop, that's the point.
+process.maxTickDepth = N + 2;
+
+var reads = 0;
+r._read = function(n, cb) {
+  var chunk = reads++ === N ? null : new Buffer(1);
+  cb(null, chunk);
+};
+
+r.on('readable', function onReadable() {
+  if (!(r._readableState.length % 256))
+    console.error('readable', r._readableState.length);
+  r.read(N * 2);
+});
+
+var ended = false;
+r.on('end', function onEnd() {
+  ended = true;
+});
+
+r.read(0);
+
+process.on('exit', function() {
+  assert(ended);
+  console.log('ok');
+});