large http.js refactor
authorRyan <ry@tinyclouds.org>
Tue, 14 Jul 2009 16:31:50 +0000 (18:31 +0200)
committerRyan <ry@tinyclouds.org>
Tue, 14 Jul 2009 16:31:50 +0000 (18:31 +0200)
src/http.js
src/net.h

index 353437f..1352b81 100644 (file)
@@ -139,27 +139,6 @@ function toRaw(string) {
   return a;
 }
 
-function send (output, data, encoding) {
-  if (data.constructor === String)
-    encoding = encoding || "ascii";
-  else
-    encoding = "raw";
-
-  output.push([data, encoding]);
-}
-
-/* This is a wrapper around the LowLevelServer interface. It provides
- * connection handling, overflow checking, and some data buffering.
- */
-node.http.createServer = function (requestListener, options) {
-  var server = new node.http.LowLevelServer();
-  server.addListener("connection", connectionListener);
-  server.addListener("request", requestListener);
-  //server.setOptions(options);
-  return server;
-};
-
-
 
 /* Abstract base class for ServerRequest and ClientResponse. */
 var IncomingMessage = function (connection) {
@@ -168,7 +147,14 @@ var IncomingMessage = function (connection) {
   this.connection = connection;
   this.httpVersion = null;
   this.headers = [];
-  this.last_was_value = false; // TODO: remove me.
+
+  // request (server) only 
+  this.uri = "";
+  this.method = null;
+
+  // response (client) only
+  this.statusCode = null;
+  this.client = this.connection;
 };
 inherits(IncomingMessage, node.EventEmitter);
 
@@ -186,215 +172,301 @@ IncomingMessage.prototype._emitComplete = function () {
 };
 
 
-var ServerRequest = function (connection) {
-  IncomingMessage.call(this, connection);
+var OutgoingMessage = function () {
+  node.EventEmitter.call(this);
 
-  this.uri = "";
-  this.method = null;
-};
-inherits(ServerRequest, IncomingMessage);
+  this.output = [];
 
+  this.sent_connection_header = false;
+  this.sent_content_length_header = false;
+  this.sent_transfer_encoding_header = false;
 
-var ClientResponse = function (connection) {
-  IncomingMessage.call(this, connection);
+  this.closeOnFinish = false;
+  this.chunked_encoding = false;
+  this.should_keep_alive = true;
+  this.use_chunked_encoding_by_default = true;
 
-  this.statusCode = null;
-  this.client = this.connection;
+  this.finished = false;
 };
-inherits(ClientResponse, IncomingMessage);
+inherits(OutgoingMessage, node.EventEmitter);
 
+OutgoingMessage.prototype.send = function (data, encoding) {
+  if (data.constructor === String) {
+    encoding = encoding || "ascii";
+  } else {
+    encoding = "raw";
+  }
+  this.output.push([data, encoding]);
+};
 
+OutgoingMessage.prototype.sendHeaderLines = function (first_line, header_lines) {
+  header_lines = header_lines || [];
 
+  // first_line in the case of request is: "GET /index.html HTTP/1.1\r\n"
+  // in the case of response it is: "HTTP/1.1 200 OK\r\n"
+  var header = first_line;
 
+  for (var i = 0; i < header_lines.length; i++) {
+    var field = header_lines[i][0];
+    var value = header_lines[i][1];
 
-function connectionListener (connection) {
-  // An array of responses for each connection. In pipelined connections
-  // we need to keep track of the order they were sent.
-  connection.responses = [];
+    header += field + ": " + value + CRLF;
+    
+    if (connection_expression.exec(field)) {
+      this.sent_connection_header = true;
+      if (close_expression.exec(value)) this.closeOnFinish = true;
 
-  // is this really needed?
-  connection.addListener("eof", function () {
-    if (connection.responses.length == 0) {
-      connection.close();
+    } else if (transfer_encoding_expression.exec(field)) {
+      this.sent_transfer_encoding_header = true;
+      if (chunk_expression.exec(value)) this.chunked_encoding = true;
+
+    } else if (content_length_expression.exec(field)) {
+      this.sent_content_length_header = true;
+
+    }
+  }
+
+  // keep-alive logic 
+  if (this.sent_connection_header == false) {
+    if (this.should_keep_alive) {
+      header += "Connection: keep-alive\r\n";
     } else {
-      connection.responses[connection.responses.length-1].closeOnFinish = true;
+      this.closeOnFinish = true;
+      header += "Connection: close\r\n";
     }
-  });
+  }
+
+  if (this.sent_content_length_header == false && this.sent_transfer_encoding_header == false) {
+    if (this.use_chunked_encoding_by_default) {
+      header += "Transfer-Encoding: chunked\r\n";
+      this.chunked_encoding = true;
+    }
+  }
+
+  header += CRLF;
+
+  this.send(header);
+};
+
+OutgoingMessage.prototype.sendBody = function (chunk, encoding) {
+  /*
+  if (this.sent_content_length_header == false && this.chunked_encoding == false) {
+    throw "Content-Length header (or Transfer-Encoding:chunked) not set";
+  }
+  */
+
+  if (this.chunked_encoding) {
+    this.send(chunk.length.toString(16));
+    this.send(CRLF);
+    this.send(chunk, encoding);
+    this.send(CRLF);
+  } else {
+    this.send(chunk, encoding);
+  }
 
-  var req, res;
+  this.flush();
+};
+
+OutgoingMessage.prototype.flush = function () {
+  this.emit("flush");
+};
+
+OutgoingMessage.prototype.finish = function () {
+  if (this.chunked_encoding) this.send("0\r\n\r\n"); // last chunk
+  this.finished = true;
+  this.flush();
+};
+
+
+var ServerResponse = function () {
+  OutgoingMessage.call(this);
+
+  this.should_keep_alive = true;
+  this.use_chunked_encoding_by_default = true;
+};
+inherits(ServerResponse, OutgoingMessage);
+
+ServerResponse.prototype.sendHeader = function (statusCode, headers) {
+  var reason = node.http.STATUS_CODES[statusCode] || "unknown";
+  var status_line = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF;
+  this.sendHeaderLines(status_line, headers);
+};
+
+
+var ClientRequest = function (method, uri, header_lines) {
+  OutgoingMessage.call(this);
+
+  this.should_keep_alive = false;
+  this.use_chunked_encoding_by_default = false;
+  this.closeOnFinish = true;
+
+  this.sendHeaderLines(method + " " + uri + " HTTP/1.1\r\n", header_lines);
+};
+inherits(ClientRequest, OutgoingMessage);
+
+ClientRequest.prototype.finish = function (responseListener) {
+  this.addListener("response", responseListener);
+  OutgoingMessage.prototype.finish.call(this);
+};
+
+
+function createIncomingMessageStream (connection, incoming_listener) {
+  var stream = new node.EventEmitter();
+
+  stream.addListener("incoming", incoming_listener);
+
+  var incoming;
+  var last_header_was_a_value = false;
 
   connection.addListener("message_begin", function () {
-    req = new ServerRequest(connection);
-    res = new node.http.ServerResponse(connection);
+    incoming = new IncomingMessage(connection);
   });
 
+  // Only servers will get URI events.
   connection.addListener("uri", function (data) {
-    req.uri += data;
+    incoming.uri += data;
   });
 
   connection.addListener("header_field", function (data) {
-    if (req.headers.length > 0 && req.last_was_value == false)
-      req.headers[req.headers.length-1][0] += data; 
-    else
-      req.headers.push([data]);
-    req.last_was_value = false;
+    if (incoming.headers.length > 0 && last_header_was_a_value == false) {
+      incoming.headers[incoming.headers.length-1][0] += data; 
+    } else {
+      incoming.headers.push([data]);
+    }
+    last_header_was_a_value = false;
   });
 
   connection.addListener("header_value", function (data) {
-    var last_pair = req.headers[req.headers.length-1];
-    if (last_pair.length == 1)
+    var last_pair = incoming.headers[incoming.headers.length-1];
+    if (last_pair.length == 1) {
       last_pair[1] = data;
-    else 
+    } else  {
       last_pair[1] += data;
-    req.last_was_value = true;
+    } 
+    last_header_was_a_value = true;
   });
 
   connection.addListener("headers_complete", function (info) {
-    req.httpVersion = info.httpVersion;
-    req.method = info.method;
-    req.uri = node.http.parseUri(req.uri); // TODO parse the URI lazily?
+    incoming.httpVersion = info.httpVersion;
 
-    res.should_keep_alive = info.should_keep_alive;
+    if (info.method) {
+      // server only
+      incoming.method = info.method; 
+      incoming.uri = node.http.parseUri(incoming.uri); // TODO parse the URI lazily?
+    } else {
+      // client only
+      incoming.statusCode = info.statusCode; 
+    }
 
-    connection.server.emit("request", [req, res]);
+    stream.emit("incoming", [incoming, info.should_keep_alive]);
   });
 
   connection.addListener("body", function (chunk) {
-    req._emitBody(chunk);
+    incoming.emit("body", [chunk]);
   });
 
   connection.addListener("message_complete", function () {
-    req._emitComplete()
+    incoming.emit("complete");
   });
+
+  return stream;
 }
 
-node.http.ServerResponse = function (connection) {
-  var responses = connection.responses;
-  responses.push(this);
-  this.connection = connection;
-  this.closeOnFinish = false;
-  var output = [];
-
-  var chunked_encoding = false;
-
-  this.sendHeader = function (statusCode, headers) {
-    var sent_connection_header = false;
-    var sent_transfer_encoding_header = false;
-    var sent_content_length_header = false;
-
-    var reason = node.http.STATUS_CODES[statusCode] || "unknown";
-    var header = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF;
-
-    for (var i = 0; i < headers.length; i++) {
-      var field = headers[i][0];
-      var value = headers[i][1];
-
-      header += field + ": " + value + CRLF;
-      
-      if (connection_expression.exec(field)) {
-        sent_connection_header = true;
-        if (close_expression.exec(value)) this.closeOnFinish = true;
-      } else if (transfer_encoding_expression.exec(field)) {
-        sent_transfer_encoding_header = true;
-        if (chunk_expression.exec(value)) chunked_encoding = true;
-      } else if (content_length_expression.exec(field)) {
-        sent_content_length_header = true;
-      }
-    }
+/* Returns true if the message queue is finished and the connection
+ * should be closed. */
+function flushMessageQueue (connection, queue) {
+  if (connection.readyState === "closed" || connection.readyState === "readOnly") {
+    return false;
+  }
 
-    // keep-alive logic 
-    if (sent_connection_header == false) {
-      if (this.should_keep_alive) {
-        header += "Connection: keep-alive\r\n";
-      } else {
-        this.closeOnFinish = true;
-        header += "Connection: close\r\n";
-      }
-    }
+  while (queue[0]) {
+    var message = queue[0];
 
-    if ( sent_content_length_header == false && sent_transfer_encoding_header == false ) {
-      header += "Transfer-Encoding: chunked\r\n";
-      chunked_encoding = true;
+    while (message.output.length > 0) {
+      var out = message.output.shift();
+      connection.send(out[0], out[1]);
     }
+  
+    if (!message.finished) break;
 
-    header += CRLF;
+    message.emit("sent");
+    queue.shift();
 
-    send(output, header);
-  };
+    if (message.closeOnFinish) return true;
+  }
+  return false;
+}
 
-  this.sendBody = function (chunk, encoding) {
-    if (chunked_encoding) {
-      send(output, chunk.length.toString(16));
-      send(output, CRLF);
-      send(output, chunk, encoding);
-      send(output, CRLF);
-    } else {
-      send(output, chunk, encoding);
-    }
+/* This is a wrapper around the LowLevelServer interface. It provides
+ * connection handling, overflow checking, and some data buffering.
+ */
+node.http.createServer = function (requestListener, options) {
+  var server = new node.http.LowLevelServer();
+  //server.setOptions(options);
+  server.addListener("request", requestListener);
+  server.addListener("connection", connectionListener);
+  return server;
+};
 
-    this.flush();
-  };
+function connectionListener (connection) {
+  // An array of responses for each connection. In pipelined connections
+  // we need to keep track of the order they were sent.
+  var responses = [];
 
-  this.flush = function () {
-    if (connection.readyState === "closed" || connection.readyState === "readOnly")
-    {
-      responses = [];
-      return;
-    }
-    if (responses.length > 0 && responses[0] === this) {
-      while (output.length > 0) {
-        var out = output.shift();
-        connection.send(out[0], out[1]);
-      }
+  // is this really needed?
+  connection.addListener("eof", function () {
+    if (responses.length == 0) {
+      connection.close();
+    } else {
+      responses[responses.length-1].closeOnFinish = true;
     }
-  };
-
-  this.finished = false;
-  this.finish = function () {
-    if (chunked_encoding) send(output, "0\r\n\r\n"); // last chunk
-
-    this.finished = true;
+  });
 
-    while (responses.length > 0 && responses[0].finished) {
-      var res = responses[0];
-      res.flush();
-      if (res.closeOnFinish)
-        connection.fullClose();
-      responses.shift();
+  var flushResponse = function () {
+    if(flushMessageQueue(connection, responses)) {
+      connection.fullClose();
     }
   };
-};
 
+  createIncomingMessageStream(connection, function (incoming, should_keep_alive) {
+    var req = incoming;
 
-node.http.Client = node.http.LowLevelClient; // FIXME
+    var res = new ServerResponse(connection);
+    res.should_keep_alive = should_keep_alive;
+    res.addListener("flush", flushResponse);
+    responses.push(res);
+    
+    connection.server.emit("request", [req, res]);
+  });
+}
 
-node.http.Client.prototype.flush = function (request) {
-  //p(request);
-  if (this.readyState == "closed") {
-    this.reconnect();
-    return;
-  }
-  //node.debug("HTTP CLIENT flush. readyState = " + connection.readyState);
-  while ( request === this.requests[0] && request.output.length > 0 && this.readyState == "open" ) {
-    var out = request.output.shift();
-    this.send(out[0], out[1]);
-  }
-};
+node.http.Client = node.http.LowLevelClient; // FIXME
 
 node.http.createClient = function (port, host) {
   var client = new node.http.Client();
 
-  client.requests = [];
+  var requests = [];
 
-  client.reconnect = function () { return client.connect(port, host); };
+  client._pushRequest = function (req) {
+    req.addListener("flush", function () {
+      if (client.readyState == "closed") {
+        //node.debug("HTTP CLIENT request flush. reconnect.  readyState = " + client.readyState);
+        client.connect(port, host); // reconnect
+        return;
+      }
+      if (req == requests[0]) flushMessageQueue(client, [req]);
+    });
+    requests.push(req);
+  };
 
   client.addListener("connect", function () {
     //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState);
     //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'");
-    client.flush(client.requests[0]);
+    requests[0].flush();
   });
 
   client.addListener("eof", function () {
+    //node.debug("client got eof closing. readyState = " + client.readyState);
     client.close();
   });
 
@@ -405,159 +477,61 @@ node.http.createClient = function (port, host) {
     }
      
     //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
+
     // If there are more requests to handle, reconnect.
-    if (client.requests.length > 0) {
-      //node.debug("HTTP CLIENT: reconnecting");
-      client.connect(port, host);
+    if (requests.length > 0 && client.readyState != "opening") {
+      //node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState);
+      client.connect(port, host); // reconnect
     }
   });
 
-  var req, res;
-
-  client.addListener("message_begin", function () {
-    req = client.requests.shift();
-    res = new ClientResponse(client);
-  });
+  createIncomingMessageStream(client, function (res) {
+    //node.debug("incoming response!");
 
-  client.addListener("header_field", function (data) {
-    if (res.headers.length > 0 && res.last_was_value == false)
-      res.headers[res.headers.length-1][0] += data; 
-    else
-      res.headers.push([data]);
-    res.last_was_value = false;
-  });
-
-  client.addListener("header_value", function (data) {
-    var last_pair = res.headers[res.headers.length-1];
-    if (last_pair.length == 1) {
-      last_pair[1] = data;
-    } else {
-      last_pair[1] += data;
-    }
-    res.last_was_value = true;
-  });
-
-  client.addListener("headers_complete", function (info) {
-    res.statusCode = info.statusCode;
-    res.httpVersion = info.httpVersion;
+    res.addListener("complete", function ( ) {
+      //node.debug("request complete disconnecting. readyState = " + client.readyState);
+      client.close();
+    });
 
+    var req = requests.shift();
     req.emit("response", [res]);
   });
 
-  client.addListener("body", function (chunk) {
-    res._emitBody(chunk);
-  });
-
-  client.addListener("message_complete", function () {
-    client.close();
-    res._emitComplete();
-  });
-
   return client;
 };
 
 node.http.Client.prototype.get = function (uri, headers) {
-  var req = createClientRequest(this, "GET", uri, headers);
-  this.requests.push(req);
+  var req = new ClientRequest("GET", uri, headers);
+  this._pushRequest(req);
   return req;
 };
 
 node.http.Client.prototype.head = function (uri, headers) {
-  var req = createClientRequest(this, "HEAD", uri, headers);
-  this.requests.push(req);
+  var req = new ClientRequest("HEAD", uri, headers);
+  this._pushRequest(req);
   return req;
 };
 
 node.http.Client.prototype.post = function (uri, headers) {
-  var req = createClientRequest(this, "POST", uri, headers);
-  this.requests.push(req);
+  var req = new ClientRequest("POST", uri, headers);
+  this._pushRequest(req);
   return req;
 };
 
 node.http.Client.prototype.del = function (uri, headers) {
-  var req = createClientRequest(this, "DELETE", uri, headers);
-  this.requests.push(req);
+  var req = new ClientRequest("DELETE", uri, headers);
+  this._pushRequest(req);
   return req;
 };
 
 node.http.Client.prototype.put = function (uri, headers) {
-  var req = createClientRequest(this, "PUT", uri, headers);
-  this.requests.push(req);
+  var req = new ClientRequest("PUT", uri, headers);
+  this._pushRequest(req);
   return req;
 };
 
-function createClientRequest (connection, method, uri, header_lines) {
-  var req = new node.EventEmitter;
-
-  req.uri = uri;
-
-  var chunked_encoding = false;
-  req.closeOnFinish = false;
-
-  var sent_connection_header = false;
-  var sent_transfer_encoding_header = false;
-  var sent_content_length_header = false;
-
-  var header = method + " " + uri + " HTTP/1.1\r\n";
-
-  header_lines = header_lines || [];
-  for (var i = 0; i < header_lines.length; i++) {
-    var field = header_lines[i][0];
-    var value = header_lines[i][1];
-
-    header += field + ": " + value + CRLF;
-    
-    if (connection_expression.exec(field)) {
-      sent_connection_header = true;
-      if (close_expression.exec(value)) req.closeOnFinish = true;
-    } else if (transfer_encoding_expression.exec(field)) {
-      sent_transfer_encoding_header = true;
-      if (chunk_expression.exec(value)) chunked_encoding = true;
-    } else if (content_length_expression.exec(field)) {
-      sent_content_length_header = true;
-    }
-  }
 
-  if (sent_connection_header == false) {
-    header += "Connection: keep-alive\r\n";
-  }
-
-  header += CRLF;
-   
-  req.output = [];
-
-  send(req.output, header);
-
-  req.sendBody = function (chunk, encoding) {
-    if (sent_content_length_header == false && chunked_encoding == false) {
-      throw "Content-Length header (or Transfer-Encoding:chunked) not set";
-    }
 
-    if (chunked_encoding) {
-      send(req.output, chunk.length.toString(16));
-      send(req.output, CRLF);
-      send(req.output, chunk, encoding);
-      send(req.output, CRLF);
-    } else {
-      send(req.output, chunk, encoding);
-    }
-
-    connection.flush(req);
-  };
-
-  req.finished = false;
-
-  req.finish = function (responseListener) {
-    req.addListener("response", responseListener);
-
-    if (chunked_encoding)
-      send(req.output, "0\r\n\r\n"); // last chunk
-
-    connection.flush(req);
-  };
-
-  return req;
-}
 
 node.http.cat = function (url, encoding) {
   var promise = new node.Promise();
index 22d0975..5992feb 100644 (file)
--- a/src/net.h
+++ b/src/net.h
@@ -88,6 +88,11 @@ private:
 
   static void on_close (evnet_socket *s) {
     Connection *connection = static_cast<Connection*> (s->data);
+
+    assert(connection->socket_.fd < 0);
+    assert(connection->socket_.read_action == NULL);
+    assert(connection->socket_.write_action == NULL);
+
     connection->OnDisconnect();
 
     if (s->errorno) {
@@ -95,6 +100,7 @@ private:
     }
 
     assert(connection->attached_);
+
     connection->Detach();
   }