Use events for all HTTP messages.
authorRyan <ry@tinyclouds.org>
Fri, 26 Jun 2009 16:30:55 +0000 (18:30 +0200)
committerRyan <ry@tinyclouds.org>
Sun, 28 Jun 2009 17:08:26 +0000 (19:08 +0200)
This is a rather large refactor! Mostly for the better side. I've had to
remove some functionality like req.interrupt(). A lot of other work is left
messy or incomplete.

benchmark/http_simple.js
src/events.js
src/http.cc
src/http.js
src/node.cc
test/mjsunit/test-http-client-race.js
test/mjsunit/test-http-proxy.js
test/mjsunit/test-http.js

index 2e3bbb3..099a641 100644 (file)
@@ -3,7 +3,7 @@ for (var i = 0; i < 20*1024; i++) {
   fixed += "C";
 }
 stored = {};
-new node.http.Server(function (req, res) {
+node.http.createServer(function (req, res) {
   var commands = req.uri.path.split("/");
   var command = commands[1];
   var body = "";
index f665cec..f63641c 100644 (file)
@@ -4,9 +4,11 @@
 var emitter = node.EventEmitter.prototype;
 
 emitter.addListener = function (type, listener) {
-  if (!this._events) this._events = {};
-  if (!this._events.hasOwnProperty(type)) this._events[type] = [];
-  this._events[type].push(listener);
+  if (listener instanceof Function) {
+    if (!this._events) this._events = {};
+    if (!this._events.hasOwnProperty(type)) this._events[type] = [];
+    this._events[type].push(listener);
+  }
 };
 
 emitter.listeners = function (type, listener) {
index 5e43ebf..a6d8efc 100644 (file)
@@ -87,52 +87,47 @@ int
 HTTPConnection::on_message_begin (http_parser *parser)
 {
   HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
-  HandleScope scope;
-
-  Local<Value> on_message_v = connection->handle_->Get(ON_MESSAGE_SYMBOL);
-  if (!on_message_v->IsFunction()) return -1;
-  Handle<Function> on_message = Handle<Function>::Cast(on_message_v);
+  connection->Emit("MessageBegin", 0, NULL);
+  return 0;
+}
 
-  TryCatch try_catch;
-  Local<Object> message_handler = on_message->NewInstance();
-  if (try_catch.HasCaught()) {
-    FatalException(try_catch);
-    return -1;
-  }
+int
+HTTPConnection::on_message_complete (http_parser *parser)
+{
+  HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
+  connection->Emit("MessageComplete", 0, NULL);
+  return 0;
+}
 
-  connection->handle_->SetHiddenValue(MESSAGE_HANDLER_SYMBOL, message_handler);
+int
+HTTPConnection::on_uri (http_parser *parser, const char *buf, size_t len)
+{
+  HandleScope scope;
+  HTTPConnection *connection = static_cast<HTTPConnection*>(parser->data);
+  Local<Value> argv[1] = { String::New(buf, len) };
+  connection->Emit("URI", 1, argv);
   return 0;
 }
 
-#define DEFINE_PARSER_CALLBACK(name, symbol)                                  \
-int                                                                           \
-HTTPConnection::name (http_parser *parser, const char *buf, size_t len)       \
-{                                                                             \
-  HandleScope scope;                                                          \
-  HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);   \
-  Local<Value> message_handler_v =                                            \
-    connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);              \
-  if (message_handler_v->IsObject() == false)                                 \
-    return -1;                                                                \
-  Local<Object> message_handler = message_handler_v->ToObject();              \
-  Local<Value> callback_v = message_handler->Get(symbol);                     \
-  if (callback_v->IsFunction() == false)                                      \
-    return 0;                                                                 \
-  Local<Function> callback = Local<Function>::Cast(callback_v);               \
-  TryCatch try_catch;                                                         \
-  Local<Value> argv[1] = { String::New(buf, len) };                           \
-  Local<Value> ret = callback->Call(message_handler, 1, argv);                \
-  if (ret.IsEmpty()) {                                                        \
-    FatalException(try_catch);                                               \
-    return -2;                                                                \
-  }                                                                           \
-  if (ret->IsFalse()) return -3;                                              \
-  return 0;                                                                   \
+int
+HTTPConnection::on_header_field (http_parser *parser, const char *buf, size_t len)
+{
+  HandleScope scope;
+  HTTPConnection *connection = static_cast<HTTPConnection*>(parser->data);
+  Local<Value> argv[1] = { String::New(buf, len) };
+  connection->Emit("HeaderField", 1, argv);
+  return 0;
 }
 
-DEFINE_PARSER_CALLBACK(on_uri,          ON_URI_SYMBOL)
-DEFINE_PARSER_CALLBACK(on_header_field, ON_HEADER_FIELD_SYMBOL)
-DEFINE_PARSER_CALLBACK(on_header_value, ON_HEADER_VALUE_SYMBOL)
+int
+HTTPConnection::on_header_value (http_parser *parser, const char *buf, size_t len)
+{
+  HandleScope scope;
+  HTTPConnection *connection = static_cast<HTTPConnection*>(parser->data);
+  Local<Value> argv[1] = { String::New(buf, len) };
+  connection->Emit("HeaderValue", 1, argv);
+  return 0;
+}
 
 static inline Local<String>
 GetMethod (int method)
@@ -162,17 +157,15 @@ HTTPConnection::on_headers_complete (http_parser *parser)
   HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
   HandleScope scope;
 
-  Local<Value> message_handler_v = 
-    connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);
-  Local<Object> message_handler = message_handler_v->ToObject();
+  Local<Object> message_info = Object::New();
 
   // METHOD 
   if (connection->parser_.type == HTTP_REQUEST)
-    message_handler->Set(METHOD_SYMBOL, GetMethod(connection->parser_.method));
+    message_info->Set(METHOD_SYMBOL, GetMethod(connection->parser_.method));
 
   // STATUS 
   if (connection->parser_.type == HTTP_RESPONSE)
-    message_handler->Set(STATUS_CODE_SYMBOL, 
+    message_info->Set(STATUS_CODE_SYMBOL, 
         Integer::New(connection->parser_.status_code));
 
   // VERSION
@@ -183,25 +176,14 @@ HTTPConnection::on_headers_complete (http_parser *parser)
           , connection->parser_.version_major
           , connection->parser_.version_minor
           ); 
-  message_handler->Set(HTTP_VERSION_SYMBOL, String::New(version));
+  message_info->Set(HTTP_VERSION_SYMBOL, String::New(version));
 
-  message_handler->Set(SHOULD_KEEP_ALIVE_SYMBOL, 
+  message_info->Set(SHOULD_KEEP_ALIVE_SYMBOL, 
       http_parser_should_keep_alive(&connection->parser_) ? True() : False());
 
-  Local<Value> on_headers_complete_v =
-    message_handler->Get(ON_HEADERS_COMPLETE_SYMBOL);
-  if (on_headers_complete_v->IsFunction() == false) return 0;
-
-  Handle<Function> on_headers_complete =
-    Handle<Function>::Cast(on_headers_complete_v);
+  Local<Value> argv[1] = { message_info };
 
-  TryCatch try_catch;
-  Local<Value> ret = on_headers_complete->Call(message_handler, 0, NULL);
-  if (ret.IsEmpty()) {
-    FatalException(try_catch);
-    return -2;
-  }
-  if (ret->IsFalse()) return -3;
+  connection->Emit("HeadersComplete", 1, argv);
 
   return 0;
 }
@@ -214,17 +196,11 @@ HTTPConnection::on_body (http_parser *parser, const char *buf, size_t len)
   HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
   HandleScope scope;
 
-  Local<Value> message_handler_v = 
-    connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);
-  Local<Object> message_handler = message_handler_v->ToObject();
-
-  Local<Value> on_body_v = message_handler->Get(ON_BODY_SYMBOL);
-  if (on_body_v->IsFunction() == false) return 0;
-  Handle<Function> on_body = Handle<Function>::Cast(on_body_v);
-
   Handle<Value> argv[1];
+
   // TODO each message should have their encoding. 
   // don't look at the conneciton for encoding
+
   if (connection->encoding_ == UTF8) {
     // utf8 encoding
     Handle<String> chunk = String::New((const char*)buf, len);
@@ -240,40 +216,7 @@ HTTPConnection::on_body (http_parser *parser, const char *buf, size_t len)
     argv[0] = array;
   }
 
-  TryCatch try_catch;
-  Local<Value> ret = on_body->Call(message_handler, 1, argv);
-  if (ret.IsEmpty()) {
-    FatalException(try_catch);
-    return -2;
-  }
-  if (ret->IsFalse()) return -3;
-
-  return 0;
-}
-
-int
-HTTPConnection::on_message_complete (http_parser *parser)
-{
-  HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
-  HandleScope scope;
-
-  Local<Value> message_handler_v = 
-    connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);
-  connection->handle_->DeleteHiddenValue(MESSAGE_HANDLER_SYMBOL);
-
-  Local<Object> message_handler = message_handler_v->ToObject();
-
-  Local<Value> on_msg_complete_v = message_handler->Get(ON_MESSAGE_COMPLETE_SYMBOL);
-  if (on_msg_complete_v->IsFunction() == false) return 0;
-  Handle<Function> on_msg_complete = Handle<Function>::Cast(on_msg_complete_v);
-
-  TryCatch try_catch;
-  Local<Value> ret = on_msg_complete->Call(message_handler, 0, NULL);
-  if (ret.IsEmpty()) {
-    FatalException(try_catch);
-    return -2;
-  }
-  if (ret->IsFalse()) return -3;
+  connection->Emit("Body", 1, argv);
 
   return 0;
 }
index 848265a..a9c8ff6 100644 (file)
@@ -341,16 +341,34 @@ node.http.ServerResponse = function (connection) {
 
 node.http.Client = node.http.LowLevelClient; // FIXME
 
+node.http.Client.prototype.flush = function (request) {
+  //p(request);
+  if (this.readyState == "closed") {
+    this.reconnect();
+    return;
+  }
+  //node.debug("HTTP CLIENT flush. readyState = " + connection.readyState);
+  while ( request === this.requests[0]
+       && request.output.length > 0
+       && this.readyState == "open"
+        )
+  {
+    var out = request.output.shift();
+    this.send(out[0], out[1]);
+  }
+};
+
 node.http.createClient = function (port, host) {
   var client = new node.http.Client();
-  var requests = client.requests = [];
+
+  client.requests = [];
 
   client.reconnect = function () { return client.connect(port, host) };
 
   client.addListener("Connect", function () {
     //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState);
-    //node.debug("requests[0].uri = '" + requests[0].uri + "'");
-    requests[0].flush();
+    //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'");
+    client.flush(client.requests[0]);
   });
 
   client.addListener("EOF", function () {
@@ -365,7 +383,7 @@ node.http.createClient = function (port, host) {
      
     //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
     // If there are more requests to handle, reconnect.
-    if (requests.length > 0) {
+    if (client.requests.length > 0) {
       //node.debug("HTTP CLIENT: reconnecting");
       client.connect(port, host);
     }
@@ -374,7 +392,7 @@ node.http.createClient = function (port, host) {
   var req, res;
 
   client.addListener("MessageBegin", function () {
-    req = requests.shift();
+    req = client.requests.shift();
     res = createClientResponse(client);
   });
 
@@ -416,30 +434,37 @@ node.http.createClient = function (port, host) {
 };
 
 node.http.Client.prototype.get = function (uri, headers) {
-  return createClientRequest(this, "GET", uri, headers);
+  var req = createClientRequest(this, "GET", uri, headers);
+  this.requests.push(req);
+  return req;
 };
 
 node.http.Client.prototype.head = function (uri, headers) {
-  return createClientRequest(this, "HEAD", uri, headers);
+  var req = createClientRequest(this, "HEAD", uri, headers);
+  this.requests.push(req);
+  return req;
 };
 
 node.http.Client.prototype.post = function (uri, headers) {
-  return createClientRequest(this, "POST", uri, headers);
+  var req = createClientRequest(this, "POST", uri, headers);
+  this.requests.push(req);
+  return req;
 };
 
 node.http.Client.prototype.del = function (uri, headers) {
-  return createClientRequest(this, "DELETE", uri, headers);
+  var req = createClientRequest(this, "DELETE", uri, headers);
+  this.requests.push(req);
+  return req;
 };
 
 node.http.Client.prototype.put = function (uri, headers) {
-  return createClientRequest(this, "PUT", uri, headers);
+  var req = createClientRequest(this, "PUT", uri, headers);
+  this.requests.push(req);
+  return req;
 };
 
 function createClientRequest (connection, method, uri, header_lines) {
   var req = new node.EventEmitter;
-  var requests = connection.requests;
-
-  requests.push(this);
 
   req.uri = uri;
 
@@ -476,8 +501,9 @@ function createClientRequest (connection, method, uri, header_lines) {
 
   header += CRLF;
    
-  var output = [];
-  send(output, header);
+  req.output = [];
+
+  send(req.output, header);
 
   req.sendBody = function (chunk, encoding) {
     if (sent_content_length_header == false && chunked_encoding == false) {
@@ -486,31 +512,15 @@ function createClientRequest (connection, method, uri, header_lines) {
     }
 
     if (chunked_encoding) {
-      send(output, chunk.length.toString(16));
-      send(output, CRLF);
-      send(output, chunk, encoding);
-      send(output, CRLF);
+      send(req.output, chunk.length.toString(16));
+      send(req.output, CRLF);
+      send(req.output, chunk, encoding);
+      send(req.output, CRLF);
     } else {
-      send(output, chunk, encoding);
+      send(req.output, chunk, encoding);
     }
 
-    req.flush();
-  };
-
-  req.flush = function ( ) {
-    if (connection.readyState == "closed") {
-      connection.reconnect();
-      return;
-    }
-    //node.debug("HTTP CLIENT flush. readyState = " + connection.readyState);
-    while ( req === requests[0]
-         && output.length > 0
-         && connection.readyState == "open"
-          )
-    {
-      var out = output.shift();
-      connection.send(out[0], out[1]);
-    }
+    connection.flush(req);
   };
 
   req.finished = false;
@@ -519,9 +529,9 @@ function createClientRequest (connection, method, uri, header_lines) {
     req.addListener("Response", responseListener);
 
     if (chunked_encoding)
-      send(output, "0\r\n\r\n"); // last chunk
+      send(req.output, "0\r\n\r\n"); // last chunk
 
-    req.flush();
+    connection.flush(req);
   };
 
   return req;
index c1375d4..405a80e 100644 (file)
@@ -237,7 +237,6 @@ OnFatalError (const char* location, const char* message)
   exit(1);
 }
 
-
 void
 node::FatalException (TryCatch &try_catch)
 {
index 379cce4..875f53f 100644 (file)
@@ -15,7 +15,7 @@ var server = node.http.createServer(function (req, res) {
 });
 server.listen(PORT);
 
-var client = new node.http.Client(PORT);
+var client = node.http.createClient(PORT);
 
 var body1 = "";
 var body2 = "";
index 620f601..5a3e88f 100644 (file)
@@ -12,7 +12,7 @@ var backend = node.http.createServer(function (req, res) {
 // node.debug("listen backend")
 backend.listen(BACKEND_PORT);
 
-var proxy_client = new node.http.Client(BACKEND_PORT);
+var proxy_client = node.http.createClient(BACKEND_PORT);
 var proxy = node.http.createServer(function (req, res) {
   // node.debug("proxy req");
   var proxy_req = proxy_client.get(req.uri.path);
@@ -33,7 +33,7 @@ proxy.listen(PROXY_PORT);
 var body = "";
 
 function onLoad () {
-  var client = new node.http.Client(PROXY_PORT);
+  var client = node.http.createClient(PROXY_PORT);
   var req = client.get("/test");
   // node.debug("client req")
   req.finish(function (res) {
index f90b73a..2dea8aa 100644 (file)
@@ -29,7 +29,7 @@ function onLoad () {
     //assertEquals("127.0.0.1", res.connection.remoteAddress);
   }).listen(PORT);
 
-  var client = new node.http.Client(PORT);
+  var client = node.http.createClient(PORT);
   var req = client.get("/hello");
   req.finish(function (res) {
     assertEquals(200, res.statusCode);