http_server: pause socket properly
authorFedor Indutny <fedor@indutny.com>
Sat, 3 Oct 2015 19:27:19 +0000 (15:27 -0400)
committerRod Vagg <rod@vagg.org>
Mon, 5 Oct 2015 11:33:41 +0000 (22:33 +1100)
Account pending response data to decide whether pause the socket or
not. Writable stream state is a not reliable measure, because it just
says how much data is pending on a **current** request, thus not helping
much with problem we are trying to solve here.

PR-URL: https://github.com/nodejs/node/pull/3128

lib/_http_outgoing.js
lib/_http_server.js

index 0b725d5..4c87a36 100644 (file)
@@ -49,6 +49,7 @@ function OutgoingMessage() {
   this.output = [];
   this.outputEncodings = [];
   this.outputCallbacks = [];
+  this.outputSize = 0;
 
   this.writable = true;
 
@@ -71,6 +72,8 @@ function OutgoingMessage() {
   this._header = null;
   this._headers = null;
   this._headerNames = {};
+
+  this._onPendingData = null;
 }
 util.inherits(OutgoingMessage, Stream);
 
@@ -120,6 +123,9 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
       this.output.unshift(this._header);
       this.outputEncodings.unshift('binary');
       this.outputCallbacks.unshift(null);
+      this.outputSize += this._header.length;
+      if (this._onPendingData !== null)
+        this._onPendingData(this._header.length);
     }
     this._headerSent = true;
   }
@@ -152,6 +158,9 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
       this.output = [];
       this.outputEncodings = [];
       this.outputCallbacks = [];
+      if (this._onPendingData !== null)
+        this._onPendingData(-this.outputSize);
+      this.outputSize = 0;
     } else if (data.length === 0) {
       if (typeof callback === 'function')
         process.nextTick(callback);
@@ -175,6 +184,9 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
   this.output.push(data);
   this.outputEncodings.push(encoding);
   this.outputCallbacks.push(callback);
+  this.outputSize += data.length;
+  if (this._onPendingData !== null)
+    this._onPendingData(data.length);
   return false;
 };
 
index f2e2dcf..2d6cb53 100644 (file)
@@ -241,6 +241,8 @@ function Server(requestListener) {
   });
 
   this.timeout = 2 * 60 * 1000;
+
+  this._pendingResponseData = 0;
 }
 util.inherits(Server, net.Server);
 
@@ -260,6 +262,13 @@ function connectionListener(socket) {
   var self = this;
   var outgoing = [];
   var incoming = [];
+  var outgoingData = 0;
+
+  function updateOutgoingData(delta) {
+    outgoingData += delta;
+    if (socket._paused && outgoingData < socket._writableState.highWaterMark)
+      return socketOnDrain();
+  }
 
   function abortIncoming() {
     while (incoming.length) {
@@ -425,8 +434,10 @@ function connectionListener(socket) {
 
   socket._paused = false;
   function socketOnDrain() {
+    var needPause = outgoingData > socket._writableState.highWaterMark;
+
     // If we previously paused, then start reading again.
-    if (socket._paused) {
+    if (socket._paused && !needPause) {
       socket._paused = false;
       socket.parser.resume();
       socket.resume();
@@ -440,7 +451,8 @@ function connectionListener(socket) {
     // so that we don't become overwhelmed by a flood of
     // pipelined requests that may never be resolved.
     if (!socket._paused) {
-      var needPause = socket._writableState.needDrain;
+      var needPause = socket._writableState.needDrain ||
+          outgoingData >= socket._writableState.highWaterMark;
       if (needPause) {
         socket._paused = true;
         // We also need to pause the parser, but don't do that until after
@@ -451,6 +463,7 @@ function connectionListener(socket) {
     }
 
     var res = new ServerResponse(req);
+    res._onPendingData = updateOutgoingData;
 
     res.shouldKeepAlive = shouldKeepAlive;
     DTRACE_HTTP_SERVER_REQUEST(req, socket);