http: fix stalled pipeline bug
authorFedor Indutny <fedor@indutny.com>
Tue, 13 Oct 2015 06:16:39 +0000 (02:16 -0400)
committerJames M Snell <jasnell@gmail.com>
Thu, 29 Oct 2015 15:38:40 +0000 (08:38 -0700)
This is a two-part fix:

- Fix pending data notification in `OutgoingMessage` to notify server
  about flushed data too
- Fix pause/resume behavior for the consumed socket. `resume` event is
  emitted on a next tick, and `socket._paused` can already be `true` at
  this time. Pause the socket again to avoid PAUSED error on parser.

Fix: https://github.com/nodejs/node/issues/3332
PR-URL: https://github.com/nodejs/node/pull/3342
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
lib/_http_common.js
lib/_http_outgoing.js
lib/_http_server.js
src/node_http_parser.cc
test/parallel/test-http-pipeline-regr-3332.js [new file with mode: 0644]

index 7570329..4b57460 100644 (file)
@@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() {
 
   parser._headers = [];
   parser._url = '';
+  parser._consumed = false;
 
   // Only called in the slow case where slow means
   // that the request headers were either fragmented
@@ -167,6 +168,9 @@ function freeParser(parser, req, socket) {
   if (parser) {
     parser._headers = [];
     parser.onIncoming = null;
+    if (parser._consumed)
+      parser.unconsume();
+    parser._consumed = false;
     if (parser.socket)
       parser.socket.parser = null;
     parser.socket = null;
index dd78ddf..d130246 100644 (file)
@@ -131,7 +131,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
       this.outputEncodings.unshift('binary');
       this.outputCallbacks.unshift(null);
       this.outputSize += this._header.length;
-      if (this._onPendingData !== null)
+      if (typeof this._onPendingData === 'function')
         this._onPendingData(this._header.length);
     }
     this._headerSent = true;
@@ -154,22 +154,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
     // There might be pending data in the this.output buffer.
     var outputLength = this.output.length;
     if (outputLength > 0) {
-      var output = this.output;
-      var outputEncodings = this.outputEncodings;
-      var outputCallbacks = this.outputCallbacks;
-      connection.cork();
-      for (var i = 0; i < outputLength; i++) {
-        connection.write(output[i], outputEncodings[i],
-                         outputCallbacks[i]);
-      }
-      connection.uncork();
-
-      this.output = [];
-      this.outputEncodings = [];
-      this.outputCallbacks = [];
-      if (this._onPendingData !== null)
-        this._onPendingData(-this.outputSize);
-      this.outputSize = 0;
+      this._flushOutput(connection);
     } else if (data.length === 0) {
       if (typeof callback === 'function')
         process.nextTick(callback);
@@ -194,7 +179,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
   this.outputEncodings.push(encoding);
   this.outputCallbacks.push(callback);
   this.outputSize += data.length;
-  if (this._onPendingData !== null)
+  if (typeof this._onPendingData === 'function')
     this._onPendingData(data.length);
   return false;
 };
@@ -630,26 +615,11 @@ OutgoingMessage.prototype._finish = function() {
 // to attempt to flush any pending messages out to the socket.
 OutgoingMessage.prototype._flush = function() {
   var socket = this.socket;
-  var outputLength, ret;
+  var ret;
 
   if (socket && socket.writable) {
     // There might be remaining data in this.output; write it out
-    outputLength = this.output.length;
-    if (outputLength > 0) {
-      var output = this.output;
-      var outputEncodings = this.outputEncodings;
-      var outputCallbacks = this.outputCallbacks;
-      socket.cork();
-      for (var i = 0; i < outputLength; i++) {
-        ret = socket.write(output[i], outputEncodings[i],
-                           outputCallbacks[i]);
-      }
-      socket.uncork();
-
-      this.output = [];
-      this.outputEncodings = [];
-      this.outputCallbacks = [];
-    }
+    ret = this._flushOutput(socket);
 
     if (this.finished) {
       // This is a queue to the server or client to bring in the next this.
@@ -661,6 +631,32 @@ OutgoingMessage.prototype._flush = function() {
   }
 };
 
+OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
+  var ret;
+  var outputLength = this.output.length;
+  if (outputLength <= 0)
+    return ret;
+
+  var output = this.output;
+  var outputEncodings = this.outputEncodings;
+  var outputCallbacks = this.outputCallbacks;
+  socket.cork();
+  for (var i = 0; i < outputLength; i++) {
+    ret = socket.write(output[i], outputEncodings[i],
+                       outputCallbacks[i]);
+  }
+  socket.uncork();
+
+  this.output = [];
+  this.outputEncodings = [];
+  this.outputCallbacks = [];
+  if (typeof this._onPendingData === 'function')
+    this._onPendingData(-this.outputSize);
+  this.outputSize = 0;
+
+  return ret;
+};
+
 
 OutgoingMessage.prototype.flushHeaders = function() {
   if (!this._header) {
index c11d369..dc7276d 100644 (file)
@@ -343,8 +343,10 @@ function connectionListener(socket) {
   socket.on = socketOnWrap;
 
   var external = socket._handle._externalStream;
-  if (external)
+  if (external) {
+    parser._consumed = true;
     parser.consume(external);
+  }
   external = null;
   parser[kOnExecute] = onParserExecute;
 
@@ -382,7 +384,7 @@ function connectionListener(socket) {
       socket.removeListener('data', socketOnData);
       socket.removeListener('end', socketOnEnd);
       socket.removeListener('close', serverSocketCloseListener);
-      parser.unconsume(socket._handle._externalStream);
+      unconsume(parser, socket);
       parser.finish();
       freeParser(parser, req, null);
       parser = null;
@@ -530,13 +532,38 @@ function connectionListener(socket) {
 exports._connectionListener = connectionListener;
 
 function onSocketResume() {
-  if (this._handle)
+  // It may seem that the socket is resumed, but this is an enemy's trick to
+  // deceive us! `resume` is emitted asynchronously, and may be called from
+  // `incoming.readStart()`. Stop the socket again here, just to preserve the
+  // state.
+  //
+  // We don't care about stream semantics for the consumed socket anyway.
+  if (this._paused) {
+    this.pause();
+    return;
+  }
+
+  if (this._handle && !this._handle.reading) {
+    this._handle.reading = true;
     this._handle.readStart();
+  }
 }
 
 function onSocketPause() {
-  if (this._handle)
+  if (this._handle && this._handle.reading) {
+    this._handle.reading = false;
     this._handle.readStop();
+  }
+}
+
+function unconsume(parser, socket) {
+  if (socket._handle) {
+    if (parser._consumed)
+      parser.unconsume(socket._handle._externalStream);
+    parser._consumed = false;
+    socket.removeListener('pause', onSocketPause);
+    socket.removeListener('resume', onSocketResume);
+  }
 }
 
 function socketOnWrap(ev, fn) {
@@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
     return res;
   }
 
-  if (this._handle && (ev === 'data' || ev === 'readable'))
-    this.parser.unconsume(this._handle._externalStream);
+  if (ev === 'data' || ev === 'readable')
+    unconsume(this.parser, this);
 
   return res;
 }
index 5f831ec..ff3dfb2 100644 (file)
@@ -484,13 +484,18 @@ class Parser : public BaseObject {
     if (parser->prev_alloc_cb_.is_empty())
       return;
 
-    CHECK(args[0]->IsExternal());
-    Local<External> stream_obj = args[0].As<External>();
-    StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
-    CHECK_NE(stream, nullptr);
+    // Restore stream's callbacks
+    if (args.Length() == 1 && args[0]->IsExternal()) {
+      Local<External> stream_obj = args[0].As<External>();
+      StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
+      CHECK_NE(stream, nullptr);
+
+      stream->set_alloc_cb(parser->prev_alloc_cb_);
+      stream->set_read_cb(parser->prev_read_cb_);
+    }
 
-    stream->set_alloc_cb(parser->prev_alloc_cb_);
-    stream->set_read_cb(parser->prev_read_cb_);
+    parser->prev_alloc_cb_.clear();
+    parser->prev_read_cb_.clear();
   }
 
 
diff --git a/test/parallel/test-http-pipeline-regr-3332.js b/test/parallel/test-http-pipeline-regr-3332.js
new file mode 100644 (file)
index 0000000..061e202
--- /dev/null
@@ -0,0 +1,41 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const http = require('http');
+const net = require('net');
+
+const big = new Buffer(16 * 1024);
+big.fill('A');
+
+const COUNT = 1e4;
+
+var received = 0;
+
+var client;
+const server = http.createServer(function(req, res) {
+  res.end(big, function() {
+    if (++received === COUNT) {
+      server.close();
+      client.end();
+    }
+  });
+}).listen(common.PORT, function() {
+  var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
+  client = net.connect(common.PORT, function() {
+    client.write(req);
+  });
+
+  // Just let the test terminate instead of hanging
+  client.on('close', function() {
+    if (received !== COUNT)
+      server.close();
+  });
+  client.resume();
+});
+
+process.on('exit', function() {
+  // The server should pause connection on pipeline flood, but it shoul still
+  // resume it and finish processing the requests, when its output queue will
+  // be empty again.
+  assert.equal(received, COUNT);
+});