http: Avoid 'data'/'end' events after pause()
authorkoichik <koichik@improvement.jp>
Mon, 26 Dec 2011 15:14:47 +0000 (16:14 +0100)
committerkoichik <koichik@improvement.jp>
Mon, 26 Dec 2011 15:14:47 +0000 (16:14 +0100)
Fixes #1040.

lib/http.js
test/simple/test-http-pause.js [new file with mode: 0644]

index 70e16b4..f549ba1 100644 (file)
@@ -26,6 +26,7 @@ var EventEmitter = require('events').EventEmitter;
 var FreeList = require('freelist').FreeList;
 var HTTPParser = process.binding('http_parser').HTTPParser;
 var assert = require('assert').ok;
+var END_OF_FILE = {};
 
 
 var debug;
@@ -108,11 +109,10 @@ var parsers = new FreeList('parsers', 1000, function() {
   parser.onBody = function(b, start, len) {
     // TODO body encoding?
     var slice = b.slice(start, start + len);
-    if (parser.incoming._decoder) {
-      var string = parser.incoming._decoder.write(slice);
-      if (string.length) parser.incoming.emit('data', string);
+    if (parser.incoming._paused || parser.incoming._pendings.length) {
+      parser.incoming._pendings.push(slice);
     } else {
-      parser.incoming.emit('data', slice);
+      parser.incoming._emitData(slice);
     }
   };
 
@@ -133,8 +133,12 @@ var parsers = new FreeList('parsers', 1000, function() {
 
     if (!parser.incoming.upgrade) {
       // For upgraded connections, also emit this after parser.execute
-      parser.incoming.readable = false;
-      parser.incoming.emit('end');
+      if (parser.incoming._paused || parser.incoming._pendings.length) {
+        parser.incoming._pendings.push(END_OF_FILE);
+      } else {
+        parser.incoming.readable = false;
+        parser.incoming.emit('end');
+      }
     }
   };
 
@@ -224,6 +228,9 @@ function IncomingMessage(socket) {
 
   this.readable = true;
 
+  this._paused = false;
+  this._pendings = [];
+
   // request (server) only
   this.url = '';
 
@@ -251,12 +258,44 @@ IncomingMessage.prototype.setEncoding = function(encoding) {
 
 
 IncomingMessage.prototype.pause = function() {
+  this._paused = true;
   this.socket.pause();
 };
 
 
 IncomingMessage.prototype.resume = function() {
-  this.socket.resume();
+  this._paused = false;
+  if (this.socket) {
+    this.socket.resume();
+  }
+  if (this._pendings.length) {
+    var self = this;
+    process.nextTick(function() {
+      while (!self._paused && self._pendings.length) {
+        var chunk = self._pendings.shift();
+        if (chunk !== END_OF_FILE) {
+          assert(Buffer.isBuffer(chunk));
+          self._emitData(chunk);
+        } else {
+          assert(self._pendings.length === 0);
+          self.readable = false;
+          self.emit('end');
+        }
+      }
+    });
+  }
+};
+
+
+IncomingMessage.prototype._emitData = function(d) {
+  if (this._decoder) {
+    var string = this._decoder.write(d);
+    if (string.length) {
+      this.emit('data', string);
+    }
+  } else {
+    this.emit('data', d);
+  }
 };
 
 
diff --git a/test/simple/test-http-pause.js b/test/simple/test-http-pause.js
new file mode 100644 (file)
index 0000000..0edf6d6
--- /dev/null
@@ -0,0 +1,75 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var http = require('http');
+
+var expectedServer = 'Request Body from Client';
+var resultServer = '';
+var expectedClient = 'Response Body from Server';
+var resultClient = '';
+
+var server = http.createServer(function(req, res) {
+  common.debug('pause server request');
+  req.pause();
+  setTimeout(function() {
+    common.debug('resume server request');
+    req.resume();
+    req.setEncoding('utf8');
+    req.on('data', function(chunk) {
+      resultServer += chunk;
+    });
+    req.on('end', function() {
+      common.debug(resultServer);
+      res.writeHead(200);
+      res.end(expectedClient);
+    });
+  }, 100);
+});
+
+server.listen(common.PORT, function() {
+  var req = http.request({
+    port: common.PORT,
+    path: '/',
+    method: 'POST'
+  }, function(res) {
+    common.debug('pause client response');
+    res.pause();
+    setTimeout(function() {
+      common.debug('resume client response');
+      res.resume();
+      res.on('data', function(chunk) {
+        resultClient += chunk;
+      });
+      res.on('end', function() {
+        common.debug(resultClient);
+        server.close();
+      });
+    }, 100);
+  });
+  req.end(expectedServer);
+});
+
+process.on('exit', function() {
+  assert.equal(expectedServer, resultServer);
+  assert.equal(expectedClient, resultClient);
+});