http_parser: consume StreamBase instance
authorFedor Indutny <fedor@indutny.com>
Wed, 12 Aug 2015 03:02:22 +0000 (20:02 -0700)
committerFedor Indutny <fedor@indutny.com>
Wed, 26 Aug 2015 19:45:22 +0000 (12:45 -0700)
Consume StreamBase instance and operate on incoming data directly
without allocating Buffer instances. Improves performance.

PR-URL: https://github.com/nodejs/node/pull/2355
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
lib/_http_server.js
src/env-inl.h
src/env.h
src/node_http_parser.cc
test/parallel/test-http-server-unconsume.js [new file with mode: 0644]

index 6769f4a..dd787fa 100644 (file)
@@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = {
   511 : 'Network Authentication Required' // RFC 6585
 };
 
+const kOnExecute = HTTPParser.kOnExecute | 0;
+
 
 function ServerResponse(req) {
   OutgoingMessage.call(this);
@@ -317,6 +319,21 @@ function connectionListener(socket) {
   socket.on('end', socketOnEnd);
   socket.on('data', socketOnData);
 
+  // We are consuming socket, so it won't get any actual data
+  socket.on('resume', onSocketResume);
+  socket.on('pause', onSocketPause);
+
+  socket.on('drain', socketOnDrain);
+
+  // Override on to unconsume on `data`, `readable` listeners
+  socket.on = socketOnWrap;
+
+  var external = socket._handle._externalStream;
+  if (external)
+    parser.consume(external);
+  external = null;
+  parser[kOnExecute] = onParserExecute;
+
   // TODO(isaacs): Move all these functions out of here
   function socketOnError(e) {
     self.emit('clientError', e, this);
@@ -326,6 +343,16 @@ function connectionListener(socket) {
     assert(!socket._paused);
     debug('SERVER socketOnData %d', d.length);
     var ret = parser.execute(d);
+
+    onParserExecuteCommon(ret, d);
+  }
+
+  function onParserExecute(ret, d) {
+    debug('SERVER socketOnParserExecute %d', ret);
+    onParserExecuteCommon(ret, undefined);
+  }
+
+  function onParserExecuteCommon(ret, d) {
     if (ret instanceof Error) {
       debug('parse error');
       socket.destroy(ret);
@@ -335,9 +362,13 @@ function connectionListener(socket) {
       var req = parser.incoming;
       debug('SERVER upgrade or connect', req.method);
 
+      if (!d)
+        d = parser.getCurrentBuffer();
+
       socket.removeListener('data', socketOnData);
       socket.removeListener('end', socketOnEnd);
       socket.removeListener('close', serverSocketCloseListener);
+      parser.unconsume(socket._handle._externalStream);
       parser.finish();
       freeParser(parser, req, null);
       parser = null;
@@ -400,7 +431,6 @@ function connectionListener(socket) {
       socket.resume();
     }
   }
-  socket.on('drain', socketOnDrain);
 
   function parserOnIncoming(req, shouldKeepAlive) {
     incoming.push(req);
@@ -480,3 +510,24 @@ function connectionListener(socket) {
   }
 }
 exports._connectionListener = connectionListener;
+
+function onSocketResume() {
+  this._handle.readStart();
+}
+
+function onSocketPause() {
+  this._handle.readStop();
+}
+
+function socketOnWrap(ev, fn) {
+  var res = net.Socket.prototype.on.call(this, ev, fn);
+  if (!this.parser) {
+    this.on = net.Socket.prototype.on;
+    return res;
+  }
+
+  if (ev === 'data' || ev === 'readable')
+    this.parser.unconsume(this._handle._externalStream);
+
+  return res;
+}
index 369bc2f..cbc8c4f 100644 (file)
@@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local<v8::Context> context,
       printed_error_(false),
       trace_sync_io_(false),
       debugger_agent_(this),
+      http_parser_buffer_(nullptr),
       context_(context->GetIsolate(), context) {
   // We'll be creating new objects so make sure we've entered the context.
   v8::HandleScope handle_scope(isolate());
@@ -200,6 +201,7 @@ inline Environment::~Environment() {
   isolate_data()->Put();
 
   delete[] heap_statistics_buffer_;
+  delete[] http_parser_buffer_;
 }
 
 inline void Environment::CleanupHandles() {
@@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) {
   heap_statistics_buffer_ = pointer;
 }
 
+inline char* Environment::http_parser_buffer() const {
+  return http_parser_buffer_;
+}
+
+inline void Environment::set_http_parser_buffer(char* buffer) {
+  CHECK_EQ(http_parser_buffer_, nullptr);  // Should be set only once.
+  http_parser_buffer_ = buffer;
+}
+
 inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
   return ContainerOf(&Environment::cares_timer_handle_, handle);
 }
index 5830b02..ce972d5 100644 (file)
--- a/src/env.h
+++ b/src/env.h
@@ -429,6 +429,9 @@ class Environment {
   inline uint32_t* heap_statistics_buffer() const;
   inline void set_heap_statistics_buffer(uint32_t* pointer);
 
+  inline char* http_parser_buffer() const;
+  inline void set_http_parser_buffer(char* buffer);
+
   inline void ThrowError(const char* errmsg);
   inline void ThrowTypeError(const char* errmsg);
   inline void ThrowRangeError(const char* errmsg);
@@ -526,6 +529,8 @@ class Environment {
 
   uint32_t* heap_statistics_buffer_ = nullptr;
 
+  char* http_parser_buffer_;
+
 #define V(PropertyName, TypeName)                                             \
   v8::Persistent<TypeName> PropertyName ## _;
   ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V)
index ef87f26..665ce28 100644 (file)
@@ -6,6 +6,8 @@
 #include "base-object-inl.h"
 #include "env.h"
 #include "env-inl.h"
+#include "stream_base.h"
+#include "stream_base-inl.h"
 #include "util.h"
 #include "util-inl.h"
 #include "v8.h"
@@ -36,6 +38,7 @@ namespace node {
 using v8::Array;
 using v8::Boolean;
 using v8::Context;
+using v8::EscapableHandleScope;
 using v8::Exception;
 using v8::Function;
 using v8::FunctionCallbackInfo;
@@ -54,6 +57,7 @@ const uint32_t kOnHeaders = 0;
 const uint32_t kOnHeadersComplete = 1;
 const uint32_t kOnBody = 2;
 const uint32_t kOnMessageComplete = 3;
+const uint32_t kOnExecute = 4;
 
 
 #define HTTP_CB(name)                                                         \
@@ -295,7 +299,7 @@ class Parser : public BaseObject {
 
 
   HTTP_DATA_CB(on_body) {
-    HandleScope scope(env()->isolate());
+    EscapableHandleScope scope(env()->isolate());
 
     Local<Object> obj = object();
     Local<Value> cb = obj->Get(kOnBody);
@@ -303,6 +307,15 @@ class Parser : public BaseObject {
     if (!cb->IsFunction())
       return 0;
 
+    // We came from consumed stream
+    if (current_buffer_.IsEmpty()) {
+      // Make sure Buffer will be in parent HandleScope
+      current_buffer_ = scope.Escape(Buffer::Copy(
+          env()->isolate(),
+          current_buffer_data_,
+          current_buffer_len_).ToLocalChecked());
+    }
+
     Local<Value> argv[3] = {
       current_buffer_,
       Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_),
@@ -374,8 +387,6 @@ class Parser : public BaseObject {
 
   // var bytesParsed = parser->execute(buffer);
   static void Execute(const FunctionCallbackInfo<Value>& args) {
-    Environment* env = Environment::GetCurrent(args);
-
     Parser* parser = Unwrap<Parser>(args.Holder());
     CHECK(parser->current_buffer_.IsEmpty());
     CHECK_EQ(parser->current_buffer_len_, 0);
@@ -390,40 +401,11 @@ class Parser : public BaseObject {
     // amount of overhead. Nothing else will run while http_parser_execute()
     // runs, therefore this pointer can be set and used for the execution.
     parser->current_buffer_ = buffer_obj;
-    parser->current_buffer_len_ = buffer_len;
-    parser->current_buffer_data_ = buffer_data;
-    parser->got_exception_ = false;
 
-    size_t nparsed =
-      http_parser_execute(&parser->parser_, &settings, buffer_data, buffer_len);
-
-    parser->Save();
-
-    // Unassign the 'buffer_' variable
-    parser->current_buffer_.Clear();
-    parser->current_buffer_len_ = 0;
-    parser->current_buffer_data_ = nullptr;
-
-    // If there was an exception in one of the callbacks
-    if (parser->got_exception_)
-      return;
-
-    Local<Integer> nparsed_obj = Integer::New(env->isolate(), nparsed);
-    // If there was a parse error in one of the callbacks
-    // TODO(bnoordhuis) What if there is an error on EOF?
-    if (!parser->parser_.upgrade && nparsed != buffer_len) {
-      enum http_errno err = HTTP_PARSER_ERRNO(&parser->parser_);
-
-      Local<Value> e = Exception::Error(env->parse_error_string());
-      Local<Object> obj = e->ToObject(env->isolate());
-      obj->Set(env->bytes_parsed_string(), nparsed_obj);
-      obj->Set(env->code_string(),
-               OneByteString(env->isolate(), http_errno_name(err)));
+    Local<Value> ret = parser->Execute(buffer_data, buffer_len);
 
-      args.GetReturnValue().Set(e);
-    } else {
-      args.GetReturnValue().Set(nparsed_obj);
-    }
+    if (!ret.IsEmpty())
+      args.GetReturnValue().Set(ret);
   }
 
 
@@ -478,7 +460,150 @@ class Parser : public BaseObject {
   }
 
 
- private:
+  static void Consume(const FunctionCallbackInfo<Value>& args) {
+    Parser* parser = Unwrap<Parser>(args.Holder());
+    Local<External> stream_obj = args[0].As<External>();
+    StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
+    CHECK_NE(stream, nullptr);
+
+    stream->Consume();
+
+    parser->prev_alloc_cb_ = stream->alloc_cb();
+    parser->prev_read_cb_ = stream->read_cb();
+
+    stream->set_alloc_cb({ OnAllocImpl, parser });
+    stream->set_read_cb({ OnReadImpl, parser });
+  }
+
+
+  static void Unconsume(const FunctionCallbackInfo<Value>& args) {
+    Parser* parser = Unwrap<Parser>(args.Holder());
+
+    // Already unconsumed
+    if (parser->prev_alloc_cb_.is_empty())
+      return;
+
+    CHECK(args[0]->IsExternal());
+    Local<External> stream_obj = args[0].As<External>();
+    StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
+    CHECK_NE(stream, nullptr);
+
+    stream->set_alloc_cb(parser->prev_alloc_cb_);
+    stream->set_read_cb(parser->prev_read_cb_);
+  }
+
+
+  static void GetCurrentBuffer(const FunctionCallbackInfo<Value>& args) {
+    Parser* parser = Unwrap<Parser>(args.Holder());
+
+    Local<Object> ret = Buffer::Copy(
+        parser->env(),
+        parser->current_buffer_data_,
+        parser->current_buffer_len_).ToLocalChecked();
+
+    args.GetReturnValue().Set(ret);
+  }
+
+ protected:
+  static const size_t kAllocBufferSize = 64 * 1024;
+
+  static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
+    Parser* parser = static_cast<Parser*>(ctx);
+    Environment* env = parser->env();
+
+    if (env->http_parser_buffer() == nullptr)
+      env->set_http_parser_buffer(new char[kAllocBufferSize]);
+
+    buf->base = env->http_parser_buffer();
+    buf->len = kAllocBufferSize;
+  }
+
+
+  static void OnReadImpl(ssize_t nread,
+                         const uv_buf_t* buf,
+                         uv_handle_type pending,
+                         void* ctx) {
+    Parser* parser = static_cast<Parser*>(ctx);
+    HandleScope scope(parser->env()->isolate());
+
+    if (nread < 0) {
+      uv_buf_t tmp_buf;
+      tmp_buf.base = nullptr;
+      tmp_buf.len = 0;
+      parser->prev_read_cb_.fn(nread,
+                               &tmp_buf,
+                               pending,
+                               parser->prev_read_cb_.ctx);
+      return;
+    }
+
+    // Ignore, empty reads have special meaning in http parser
+    if (nread == 0)
+      return;
+
+    parser->current_buffer_.Clear();
+    Local<Value> ret = parser->Execute(buf->base, nread);
+
+    // Exception
+    if (ret.IsEmpty())
+      return;
+
+    Local<Object> obj = parser->object();
+    Local<Value> cb = obj->Get(kOnExecute);
+
+    if (!cb->IsFunction())
+      return;
+
+    // Hooks for GetCurrentBuffer
+    parser->current_buffer_len_ = nread;
+    parser->current_buffer_data_ = buf->base;
+
+    cb.As<Function>()->Call(obj, 1, &ret);
+
+    parser->current_buffer_len_ = 0;
+    parser->current_buffer_data_ = nullptr;
+
+    parser->env()->KickNextTick();
+  }
+
+
+  Local<Value> Execute(char* data, size_t len) {
+    EscapableHandleScope scope(env()->isolate());
+
+    current_buffer_len_ = len;
+    current_buffer_data_ = data;
+    got_exception_ = false;
+
+    size_t nparsed =
+      http_parser_execute(&parser_, &settings, data, len);
+
+    Save();
+
+    // Unassign the 'buffer_' variable
+    current_buffer_.Clear();
+    current_buffer_len_ = 0;
+    current_buffer_data_ = nullptr;
+
+    // If there was an exception in one of the callbacks
+    if (got_exception_)
+      return scope.Escape(Local<Value>());
+
+    Local<Integer> nparsed_obj = Integer::New(env()->isolate(), nparsed);
+    // If there was a parse error in one of the callbacks
+    // TODO(bnoordhuis) What if there is an error on EOF?
+    if (!parser_.upgrade && nparsed != len) {
+      enum http_errno err = HTTP_PARSER_ERRNO(&parser_);
+
+      Local<Value> e = Exception::Error(env()->parse_error_string());
+      Local<Object> obj = e->ToObject(env()->isolate());
+      obj->Set(env()->bytes_parsed_string(), nparsed_obj);
+      obj->Set(env()->code_string(),
+               OneByteString(env()->isolate(), http_errno_name(err)));
+
+      return scope.Escape(e);
+    }
+    return scope.Escape(nparsed_obj);
+  }
 
   Local<Array> CreateHeaders() {
     // num_values_ is either -1 or the entry # of the last header
@@ -542,6 +667,8 @@ class Parser : public BaseObject {
   Local<Object> current_buffer_;
   size_t current_buffer_len_;
   char* current_buffer_data_;
+  StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
+  StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
   static const struct http_parser_settings settings;
 };
 
@@ -581,6 +708,8 @@ void InitHttpParser(Handle<Object> target,
          Integer::NewFromUnsigned(env->isolate(), kOnBody));
   t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnMessageComplete"),
          Integer::NewFromUnsigned(env->isolate(), kOnMessageComplete));
+  t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnExecute"),
+         Integer::NewFromUnsigned(env->isolate(), kOnExecute));
 
   Local<Array> methods = Array::New(env->isolate());
 #define V(num, name, string)                                                  \
@@ -595,6 +724,9 @@ void InitHttpParser(Handle<Object> target,
   env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
   env->SetProtoMethod(t, "pause", Parser::Pause<true>);
   env->SetProtoMethod(t, "resume", Parser::Pause<false>);
+  env->SetProtoMethod(t, "consume", Parser::Consume);
+  env->SetProtoMethod(t, "unconsume", Parser::Unconsume);
+  env->SetProtoMethod(t, "getCurrentBuffer", Parser::GetCurrentBuffer);
 
   target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"),
               t->GetFunction());
diff --git a/test/parallel/test-http-server-unconsume.js b/test/parallel/test-http-server-unconsume.js
new file mode 100644 (file)
index 0000000..0d33263
--- /dev/null
@@ -0,0 +1,30 @@
+'use strict';
+var common = require('../common');
+var assert = require('assert');
+var http = require('http');
+var net = require('net');
+
+var received = '';
+
+var server = http.createServer(function(req, res) {
+  res.writeHead(200);
+  res.end();
+
+  req.socket.on('data', function(data) {
+    received += data;
+  });
+
+  server.close();
+}).listen(common.PORT, function() {
+  var socket = net.connect(common.PORT, function() {
+    socket.write('PUT / HTTP/1.1\r\n\r\n');
+
+    socket.once('data', function() {
+      socket.end('hello world');
+    });
+  });
+});
+
+process.on('exit', function() {
+  assert.equal(received, 'hello world');
+});