stream: Do not call endReadable on a non-empty stream
authorisaacs <i@izs.me>
Mon, 14 Jan 2013 19:25:39 +0000 (11:25 -0800)
committerisaacs <i@izs.me>
Mon, 14 Jan 2013 23:22:42 +0000 (15:22 -0800)
Say that a stream's current read queue has 101 bytes in it, and the
underlying resource has ended (ie, reached EOF).

If you do something like this:

    stream.read(100); // leave a byte behind
    stream.read(0); // read(0) for some reason

then the read(0) will get 0 from the howMuchToRead function.  Since the
stream was ended, this was incorrectly treating the 0 as a "there is no
more in the buffer", and emitting 'end' before that last byte was read.

Why have the read(0) in the first place?  We do this in some cases to
trigger the last few bytes of a net socket (such as a child process's
stdio pipes).  This was causing issues when piping a `git archive` job
to a file: the resulting tarball was incomplete, because it occasionally
was not getting the last chunk.

lib/_stream_readable.js
test/simple/test-stream2-readable-non-empty-end.js [new file with mode: 0644]

index 6dac946..7ad3974 100644 (file)
@@ -151,7 +151,8 @@ Readable.prototype.read = function(n) {
 
   // if we've ended, and we're now clear, then finish it up.
   if (n === 0 && state.ended) {
-    endReadable(this);
+    if (state.length === 0)
+      endReadable(this);
     return null;
   }
 
@@ -726,6 +727,12 @@ function fromList(n, list, length, stringMode) {
 
 function endReadable(stream) {
   var state = stream._readableState;
+
+  // If we get here before consuming all the bytes, then that is a
+  // bug in node.  Should never happen.
+  if (state.length > 0)
+    throw new Error('endReadable called on non-empty stream');
+
   if (state.endEmitted)
     return;
   state.ended = true;
diff --git a/test/simple/test-stream2-readable-non-empty-end.js b/test/simple/test-stream2-readable-non-empty-end.js
new file mode 100644 (file)
index 0000000..351bef6
--- /dev/null
@@ -0,0 +1,78 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common.js');
+var Readable = require('_stream_readable');
+
+var len = 0;
+var chunks = new Array(10);
+for (var i = 1; i <= 10; i++) {
+  chunks[i-1] = new Buffer(i);
+  len += i;
+}
+
+var test = new Readable();
+var n = 0;
+test._read = function(size, cb) {
+  var chunk = chunks[n++];
+  setTimeout(function() {
+    cb(null, chunk);
+  });
+};
+
+test.on('end', thrower);
+function thrower() {
+  throw new Error('this should not happen!');
+}
+
+var bytesread = 0;
+test.on('readable', function() {
+  var b = len - bytesread - 1;
+  var res = test.read(b);
+  if (res) {
+    bytesread += res.length;
+    console.error('br=%d len=%d', bytesread, len);
+    setTimeout(next);
+  }
+  test.read(0);
+});
+test.read(0);
+
+function next() {
+  // now let's make 'end' happen
+  test.removeListener('end', thrower);
+
+  var endEmitted = false;
+  process.on('exit', function() {
+    assert(endEmitted, 'end should be emitted by now');
+  });
+  test.on('end', function() {
+    endEmitted = true;
+  });
+
+  // one to get the last byte
+  var r = test.read();
+  assert(r);
+  assert.equal(r.length, 1);
+  r = test.read();
+  assert.equal(r, null);
+}