511 : 'Network Authentication Required' // RFC 6585
};
+const kOnExecute = HTTPParser.kOnExecute | 0;
+
function ServerResponse(req) {
OutgoingMessage.call(this);
socket.on('end', socketOnEnd);
socket.on('data', socketOnData);
+ // We are consuming socket, so it won't get any actual data
+ socket.on('resume', onSocketResume);
+ socket.on('pause', onSocketPause);
+
+ socket.on('drain', socketOnDrain);
+
+ // Override on to unconsume on `data`, `readable` listeners
+ socket.on = socketOnWrap;
+
+ var external = socket._handle._externalStream;
+ if (external)
+ parser.consume(external);
+ external = null;
+ parser[kOnExecute] = onParserExecute;
+
// TODO(isaacs): Move all these functions out of here
function socketOnError(e) {
self.emit('clientError', e, this);
assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);
+
+ onParserExecuteCommon(ret, d);
+ }
+
+ function onParserExecute(ret, d) {
+ debug('SERVER socketOnParserExecute %d', ret);
+ onParserExecuteCommon(ret, undefined);
+ }
+
+ function onParserExecuteCommon(ret, d) {
if (ret instanceof Error) {
debug('parse error');
socket.destroy(ret);
var req = parser.incoming;
debug('SERVER upgrade or connect', req.method);
+ if (!d)
+ d = parser.getCurrentBuffer();
+
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
+ parser.unconsume(socket._handle._externalStream);
parser.finish();
freeParser(parser, req, null);
parser = null;
socket.resume();
}
}
- socket.on('drain', socketOnDrain);
function parserOnIncoming(req, shouldKeepAlive) {
incoming.push(req);
}
}
exports._connectionListener = connectionListener;
+
+function onSocketResume() {
+ this._handle.readStart();
+}
+
+function onSocketPause() {
+ this._handle.readStop();
+}
+
+function socketOnWrap(ev, fn) {
+ var res = net.Socket.prototype.on.call(this, ev, fn);
+ if (!this.parser) {
+ this.on = net.Socket.prototype.on;
+ return res;
+ }
+
+ if (ev === 'data' || ev === 'readable')
+ this.parser.unconsume(this._handle._externalStream);
+
+ return res;
+}
#include "base-object-inl.h"
#include "env.h"
#include "env-inl.h"
+#include "stream_base.h"
+#include "stream_base-inl.h"
#include "util.h"
#include "util-inl.h"
#include "v8.h"
using v8::Array;
using v8::Boolean;
using v8::Context;
+using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
const uint32_t kOnHeadersComplete = 1;
const uint32_t kOnBody = 2;
const uint32_t kOnMessageComplete = 3;
+const uint32_t kOnExecute = 4;
#define HTTP_CB(name) \
HTTP_DATA_CB(on_body) {
- HandleScope scope(env()->isolate());
+ EscapableHandleScope scope(env()->isolate());
Local<Object> obj = object();
Local<Value> cb = obj->Get(kOnBody);
if (!cb->IsFunction())
return 0;
+ // We came from consumed stream
+ if (current_buffer_.IsEmpty()) {
+ // Make sure Buffer will be in parent HandleScope
+ current_buffer_ = scope.Escape(Buffer::Copy(
+ env()->isolate(),
+ current_buffer_data_,
+ current_buffer_len_).ToLocalChecked());
+ }
+
Local<Value> argv[3] = {
current_buffer_,
Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_),
// var bytesParsed = parser->execute(buffer);
static void Execute(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
-
Parser* parser = Unwrap<Parser>(args.Holder());
CHECK(parser->current_buffer_.IsEmpty());
CHECK_EQ(parser->current_buffer_len_, 0);
// amount of overhead. Nothing else will run while http_parser_execute()
// runs, therefore this pointer can be set and used for the execution.
parser->current_buffer_ = buffer_obj;
- parser->current_buffer_len_ = buffer_len;
- parser->current_buffer_data_ = buffer_data;
- parser->got_exception_ = false;
- size_t nparsed =
- http_parser_execute(&parser->parser_, &settings, buffer_data, buffer_len);
-
- parser->Save();
-
- // Unassign the 'buffer_' variable
- parser->current_buffer_.Clear();
- parser->current_buffer_len_ = 0;
- parser->current_buffer_data_ = nullptr;
-
- // If there was an exception in one of the callbacks
- if (parser->got_exception_)
- return;
-
- Local<Integer> nparsed_obj = Integer::New(env->isolate(), nparsed);
- // If there was a parse error in one of the callbacks
- // TODO(bnoordhuis) What if there is an error on EOF?
- if (!parser->parser_.upgrade && nparsed != buffer_len) {
- enum http_errno err = HTTP_PARSER_ERRNO(&parser->parser_);
-
- Local<Value> e = Exception::Error(env->parse_error_string());
- Local<Object> obj = e->ToObject(env->isolate());
- obj->Set(env->bytes_parsed_string(), nparsed_obj);
- obj->Set(env->code_string(),
- OneByteString(env->isolate(), http_errno_name(err)));
+ Local<Value> ret = parser->Execute(buffer_data, buffer_len);
- args.GetReturnValue().Set(e);
- } else {
- args.GetReturnValue().Set(nparsed_obj);
- }
+ if (!ret.IsEmpty())
+ args.GetReturnValue().Set(ret);
}
}
- private:
+ static void Consume(const FunctionCallbackInfo<Value>& args) {
+ Parser* parser = Unwrap<Parser>(args.Holder());
+ Local<External> stream_obj = args[0].As<External>();
+ StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
+ CHECK_NE(stream, nullptr);
+
+ stream->Consume();
+
+ parser->prev_alloc_cb_ = stream->alloc_cb();
+ parser->prev_read_cb_ = stream->read_cb();
+
+ stream->set_alloc_cb({ OnAllocImpl, parser });
+ stream->set_read_cb({ OnReadImpl, parser });
+ }
+
+
+ static void Unconsume(const FunctionCallbackInfo<Value>& args) {
+ Parser* parser = Unwrap<Parser>(args.Holder());
+
+ // Already unconsumed
+ if (parser->prev_alloc_cb_.is_empty())
+ return;
+
+ CHECK(args[0]->IsExternal());
+ Local<External> stream_obj = args[0].As<External>();
+ StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
+ CHECK_NE(stream, nullptr);
+
+ stream->set_alloc_cb(parser->prev_alloc_cb_);
+ stream->set_read_cb(parser->prev_read_cb_);
+ }
+
+
+ static void GetCurrentBuffer(const FunctionCallbackInfo<Value>& args) {
+ Parser* parser = Unwrap<Parser>(args.Holder());
+
+ Local<Object> ret = Buffer::Copy(
+ parser->env(),
+ parser->current_buffer_data_,
+ parser->current_buffer_len_).ToLocalChecked();
+
+ args.GetReturnValue().Set(ret);
+ }
+
+ protected:
+ static const size_t kAllocBufferSize = 64 * 1024;
+
+ static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
+ Parser* parser = static_cast<Parser*>(ctx);
+ Environment* env = parser->env();
+
+ if (env->http_parser_buffer() == nullptr)
+ env->set_http_parser_buffer(new char[kAllocBufferSize]);
+
+ buf->base = env->http_parser_buffer();
+ buf->len = kAllocBufferSize;
+ }
+
+
+ static void OnReadImpl(ssize_t nread,
+ const uv_buf_t* buf,
+ uv_handle_type pending,
+ void* ctx) {
+ Parser* parser = static_cast<Parser*>(ctx);
+ HandleScope scope(parser->env()->isolate());
+
+ if (nread < 0) {
+ uv_buf_t tmp_buf;
+ tmp_buf.base = nullptr;
+ tmp_buf.len = 0;
+ parser->prev_read_cb_.fn(nread,
+ &tmp_buf,
+ pending,
+ parser->prev_read_cb_.ctx);
+ return;
+ }
+
+ // Ignore, empty reads have special meaning in http parser
+ if (nread == 0)
+ return;
+
+ parser->current_buffer_.Clear();
+ Local<Value> ret = parser->Execute(buf->base, nread);
+
+ // Exception
+ if (ret.IsEmpty())
+ return;
+
+ Local<Object> obj = parser->object();
+ Local<Value> cb = obj->Get(kOnExecute);
+
+ if (!cb->IsFunction())
+ return;
+
+ // Hooks for GetCurrentBuffer
+ parser->current_buffer_len_ = nread;
+ parser->current_buffer_data_ = buf->base;
+
+ cb.As<Function>()->Call(obj, 1, &ret);
+
+ parser->current_buffer_len_ = 0;
+ parser->current_buffer_data_ = nullptr;
+
+ parser->env()->KickNextTick();
+ }
+
+
+ Local<Value> Execute(char* data, size_t len) {
+ EscapableHandleScope scope(env()->isolate());
+
+ current_buffer_len_ = len;
+ current_buffer_data_ = data;
+ got_exception_ = false;
+
+ size_t nparsed =
+ http_parser_execute(&parser_, &settings, data, len);
+
+ Save();
+
+ // Unassign the 'buffer_' variable
+ current_buffer_.Clear();
+ current_buffer_len_ = 0;
+ current_buffer_data_ = nullptr;
+
+ // If there was an exception in one of the callbacks
+ if (got_exception_)
+ return scope.Escape(Local<Value>());
+
+ Local<Integer> nparsed_obj = Integer::New(env()->isolate(), nparsed);
+ // If there was a parse error in one of the callbacks
+ // TODO(bnoordhuis) What if there is an error on EOF?
+ if (!parser_.upgrade && nparsed != len) {
+ enum http_errno err = HTTP_PARSER_ERRNO(&parser_);
+
+ Local<Value> e = Exception::Error(env()->parse_error_string());
+ Local<Object> obj = e->ToObject(env()->isolate());
+ obj->Set(env()->bytes_parsed_string(), nparsed_obj);
+ obj->Set(env()->code_string(),
+ OneByteString(env()->isolate(), http_errno_name(err)));
+
+ return scope.Escape(e);
+ }
+ return scope.Escape(nparsed_obj);
+ }
Local<Array> CreateHeaders() {
// num_values_ is either -1 or the entry # of the last header
Local<Object> current_buffer_;
size_t current_buffer_len_;
char* current_buffer_data_;
+ StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
+ StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
static const struct http_parser_settings settings;
};
Integer::NewFromUnsigned(env->isolate(), kOnBody));
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnMessageComplete"),
Integer::NewFromUnsigned(env->isolate(), kOnMessageComplete));
+ t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnExecute"),
+ Integer::NewFromUnsigned(env->isolate(), kOnExecute));
Local<Array> methods = Array::New(env->isolate());
#define V(num, name, string) \
env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
env->SetProtoMethod(t, "pause", Parser::Pause<true>);
env->SetProtoMethod(t, "resume", Parser::Pause<false>);
+ env->SetProtoMethod(t, "consume", Parser::Consume);
+ env->SetProtoMethod(t, "unconsume", Parser::Unconsume);
+ env->SetProtoMethod(t, "getCurrentBuffer", Parser::GetCurrentBuffer);
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"),
t->GetFunction());