From ef09b2c65dea6c2992d122f93c88dc743912fb06 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 14 Jul 2009 18:31:50 +0200 Subject: [PATCH] large http.js refactor --- src/http.js | 538 +++++++++++++++++++++++++++++------------------------------- src/net.h | 6 + 2 files changed, 262 insertions(+), 282 deletions(-) diff --git a/src/http.js b/src/http.js index 353437f..1352b81 100644 --- a/src/http.js +++ b/src/http.js @@ -139,27 +139,6 @@ function toRaw(string) { return a; } -function send (output, data, encoding) { - if (data.constructor === String) - encoding = encoding || "ascii"; - else - encoding = "raw"; - - output.push([data, encoding]); -} - -/* This is a wrapper around the LowLevelServer interface. It provides - * connection handling, overflow checking, and some data buffering. - */ -node.http.createServer = function (requestListener, options) { - var server = new node.http.LowLevelServer(); - server.addListener("connection", connectionListener); - server.addListener("request", requestListener); - //server.setOptions(options); - return server; -}; - - /* Abstract base class for ServerRequest and ClientResponse. */ var IncomingMessage = function (connection) { @@ -168,7 +147,14 @@ var IncomingMessage = function (connection) { this.connection = connection; this.httpVersion = null; this.headers = []; - this.last_was_value = false; // TODO: remove me. + + // request (server) only + this.uri = ""; + this.method = null; + + // response (client) only + this.statusCode = null; + this.client = this.connection; }; inherits(IncomingMessage, node.EventEmitter); @@ -186,215 +172,301 @@ IncomingMessage.prototype._emitComplete = function () { }; -var ServerRequest = function (connection) { - IncomingMessage.call(this, connection); +var OutgoingMessage = function () { + node.EventEmitter.call(this); - this.uri = ""; - this.method = null; -}; -inherits(ServerRequest, IncomingMessage); + this.output = []; + this.sent_connection_header = false; + this.sent_content_length_header = false; + this.sent_transfer_encoding_header = false; -var ClientResponse = function (connection) { - IncomingMessage.call(this, connection); + this.closeOnFinish = false; + this.chunked_encoding = false; + this.should_keep_alive = true; + this.use_chunked_encoding_by_default = true; - this.statusCode = null; - this.client = this.connection; + this.finished = false; }; -inherits(ClientResponse, IncomingMessage); +inherits(OutgoingMessage, node.EventEmitter); +OutgoingMessage.prototype.send = function (data, encoding) { + if (data.constructor === String) { + encoding = encoding || "ascii"; + } else { + encoding = "raw"; + } + this.output.push([data, encoding]); +}; +OutgoingMessage.prototype.sendHeaderLines = function (first_line, header_lines) { + header_lines = header_lines || []; + // first_line in the case of request is: "GET /index.html HTTP/1.1\r\n" + // in the case of response it is: "HTTP/1.1 200 OK\r\n" + var header = first_line; + for (var i = 0; i < header_lines.length; i++) { + var field = header_lines[i][0]; + var value = header_lines[i][1]; -function connectionListener (connection) { - // An array of responses for each connection. In pipelined connections - // we need to keep track of the order they were sent. - connection.responses = []; + header += field + ": " + value + CRLF; + + if (connection_expression.exec(field)) { + this.sent_connection_header = true; + if (close_expression.exec(value)) this.closeOnFinish = true; - // is this really needed? - connection.addListener("eof", function () { - if (connection.responses.length == 0) { - connection.close(); + } else if (transfer_encoding_expression.exec(field)) { + this.sent_transfer_encoding_header = true; + if (chunk_expression.exec(value)) this.chunked_encoding = true; + + } else if (content_length_expression.exec(field)) { + this.sent_content_length_header = true; + + } + } + + // keep-alive logic + if (this.sent_connection_header == false) { + if (this.should_keep_alive) { + header += "Connection: keep-alive\r\n"; } else { - connection.responses[connection.responses.length-1].closeOnFinish = true; + this.closeOnFinish = true; + header += "Connection: close\r\n"; } - }); + } + + if (this.sent_content_length_header == false && this.sent_transfer_encoding_header == false) { + if (this.use_chunked_encoding_by_default) { + header += "Transfer-Encoding: chunked\r\n"; + this.chunked_encoding = true; + } + } + + header += CRLF; + + this.send(header); +}; + +OutgoingMessage.prototype.sendBody = function (chunk, encoding) { + /* + if (this.sent_content_length_header == false && this.chunked_encoding == false) { + throw "Content-Length header (or Transfer-Encoding:chunked) not set"; + } + */ + + if (this.chunked_encoding) { + this.send(chunk.length.toString(16)); + this.send(CRLF); + this.send(chunk, encoding); + this.send(CRLF); + } else { + this.send(chunk, encoding); + } - var req, res; + this.flush(); +}; + +OutgoingMessage.prototype.flush = function () { + this.emit("flush"); +}; + +OutgoingMessage.prototype.finish = function () { + if (this.chunked_encoding) this.send("0\r\n\r\n"); // last chunk + this.finished = true; + this.flush(); +}; + + +var ServerResponse = function () { + OutgoingMessage.call(this); + + this.should_keep_alive = true; + this.use_chunked_encoding_by_default = true; +}; +inherits(ServerResponse, OutgoingMessage); + +ServerResponse.prototype.sendHeader = function (statusCode, headers) { + var reason = node.http.STATUS_CODES[statusCode] || "unknown"; + var status_line = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF; + this.sendHeaderLines(status_line, headers); +}; + + +var ClientRequest = function (method, uri, header_lines) { + OutgoingMessage.call(this); + + this.should_keep_alive = false; + this.use_chunked_encoding_by_default = false; + this.closeOnFinish = true; + + this.sendHeaderLines(method + " " + uri + " HTTP/1.1\r\n", header_lines); +}; +inherits(ClientRequest, OutgoingMessage); + +ClientRequest.prototype.finish = function (responseListener) { + this.addListener("response", responseListener); + OutgoingMessage.prototype.finish.call(this); +}; + + +function createIncomingMessageStream (connection, incoming_listener) { + var stream = new node.EventEmitter(); + + stream.addListener("incoming", incoming_listener); + + var incoming; + var last_header_was_a_value = false; connection.addListener("message_begin", function () { - req = new ServerRequest(connection); - res = new node.http.ServerResponse(connection); + incoming = new IncomingMessage(connection); }); + // Only servers will get URI events. connection.addListener("uri", function (data) { - req.uri += data; + incoming.uri += data; }); connection.addListener("header_field", function (data) { - if (req.headers.length > 0 && req.last_was_value == false) - req.headers[req.headers.length-1][0] += data; - else - req.headers.push([data]); - req.last_was_value = false; + if (incoming.headers.length > 0 && last_header_was_a_value == false) { + incoming.headers[incoming.headers.length-1][0] += data; + } else { + incoming.headers.push([data]); + } + last_header_was_a_value = false; }); connection.addListener("header_value", function (data) { - var last_pair = req.headers[req.headers.length-1]; - if (last_pair.length == 1) + var last_pair = incoming.headers[incoming.headers.length-1]; + if (last_pair.length == 1) { last_pair[1] = data; - else + } else { last_pair[1] += data; - req.last_was_value = true; + } + last_header_was_a_value = true; }); connection.addListener("headers_complete", function (info) { - req.httpVersion = info.httpVersion; - req.method = info.method; - req.uri = node.http.parseUri(req.uri); // TODO parse the URI lazily? + incoming.httpVersion = info.httpVersion; - res.should_keep_alive = info.should_keep_alive; + if (info.method) { + // server only + incoming.method = info.method; + incoming.uri = node.http.parseUri(incoming.uri); // TODO parse the URI lazily? + } else { + // client only + incoming.statusCode = info.statusCode; + } - connection.server.emit("request", [req, res]); + stream.emit("incoming", [incoming, info.should_keep_alive]); }); connection.addListener("body", function (chunk) { - req._emitBody(chunk); + incoming.emit("body", [chunk]); }); connection.addListener("message_complete", function () { - req._emitComplete() + incoming.emit("complete"); }); + + return stream; } -node.http.ServerResponse = function (connection) { - var responses = connection.responses; - responses.push(this); - this.connection = connection; - this.closeOnFinish = false; - var output = []; - - var chunked_encoding = false; - - this.sendHeader = function (statusCode, headers) { - var sent_connection_header = false; - var sent_transfer_encoding_header = false; - var sent_content_length_header = false; - - var reason = node.http.STATUS_CODES[statusCode] || "unknown"; - var header = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF; - - for (var i = 0; i < headers.length; i++) { - var field = headers[i][0]; - var value = headers[i][1]; - - header += field + ": " + value + CRLF; - - if (connection_expression.exec(field)) { - sent_connection_header = true; - if (close_expression.exec(value)) this.closeOnFinish = true; - } else if (transfer_encoding_expression.exec(field)) { - sent_transfer_encoding_header = true; - if (chunk_expression.exec(value)) chunked_encoding = true; - } else if (content_length_expression.exec(field)) { - sent_content_length_header = true; - } - } +/* Returns true if the message queue is finished and the connection + * should be closed. */ +function flushMessageQueue (connection, queue) { + if (connection.readyState === "closed" || connection.readyState === "readOnly") { + return false; + } - // keep-alive logic - if (sent_connection_header == false) { - if (this.should_keep_alive) { - header += "Connection: keep-alive\r\n"; - } else { - this.closeOnFinish = true; - header += "Connection: close\r\n"; - } - } + while (queue[0]) { + var message = queue[0]; - if ( sent_content_length_header == false && sent_transfer_encoding_header == false ) { - header += "Transfer-Encoding: chunked\r\n"; - chunked_encoding = true; + while (message.output.length > 0) { + var out = message.output.shift(); + connection.send(out[0], out[1]); } + + if (!message.finished) break; - header += CRLF; + message.emit("sent"); + queue.shift(); - send(output, header); - }; + if (message.closeOnFinish) return true; + } + return false; +} - this.sendBody = function (chunk, encoding) { - if (chunked_encoding) { - send(output, chunk.length.toString(16)); - send(output, CRLF); - send(output, chunk, encoding); - send(output, CRLF); - } else { - send(output, chunk, encoding); - } +/* This is a wrapper around the LowLevelServer interface. It provides + * connection handling, overflow checking, and some data buffering. + */ +node.http.createServer = function (requestListener, options) { + var server = new node.http.LowLevelServer(); + //server.setOptions(options); + server.addListener("request", requestListener); + server.addListener("connection", connectionListener); + return server; +}; - this.flush(); - }; +function connectionListener (connection) { + // An array of responses for each connection. In pipelined connections + // we need to keep track of the order they were sent. + var responses = []; - this.flush = function () { - if (connection.readyState === "closed" || connection.readyState === "readOnly") - { - responses = []; - return; - } - if (responses.length > 0 && responses[0] === this) { - while (output.length > 0) { - var out = output.shift(); - connection.send(out[0], out[1]); - } + // is this really needed? + connection.addListener("eof", function () { + if (responses.length == 0) { + connection.close(); + } else { + responses[responses.length-1].closeOnFinish = true; } - }; - - this.finished = false; - this.finish = function () { - if (chunked_encoding) send(output, "0\r\n\r\n"); // last chunk - - this.finished = true; + }); - while (responses.length > 0 && responses[0].finished) { - var res = responses[0]; - res.flush(); - if (res.closeOnFinish) - connection.fullClose(); - responses.shift(); + var flushResponse = function () { + if(flushMessageQueue(connection, responses)) { + connection.fullClose(); } }; -}; + createIncomingMessageStream(connection, function (incoming, should_keep_alive) { + var req = incoming; -node.http.Client = node.http.LowLevelClient; // FIXME + var res = new ServerResponse(connection); + res.should_keep_alive = should_keep_alive; + res.addListener("flush", flushResponse); + responses.push(res); + + connection.server.emit("request", [req, res]); + }); +} -node.http.Client.prototype.flush = function (request) { - //p(request); - if (this.readyState == "closed") { - this.reconnect(); - return; - } - //node.debug("HTTP CLIENT flush. readyState = " + connection.readyState); - while ( request === this.requests[0] && request.output.length > 0 && this.readyState == "open" ) { - var out = request.output.shift(); - this.send(out[0], out[1]); - } -}; +node.http.Client = node.http.LowLevelClient; // FIXME node.http.createClient = function (port, host) { var client = new node.http.Client(); - client.requests = []; + var requests = []; - client.reconnect = function () { return client.connect(port, host); }; + client._pushRequest = function (req) { + req.addListener("flush", function () { + if (client.readyState == "closed") { + //node.debug("HTTP CLIENT request flush. reconnect. readyState = " + client.readyState); + client.connect(port, host); // reconnect + return; + } + if (req == requests[0]) flushMessageQueue(client, [req]); + }); + requests.push(req); + }; client.addListener("connect", function () { //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState); //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'"); - client.flush(client.requests[0]); + requests[0].flush(); }); client.addListener("eof", function () { + //node.debug("client got eof closing. readyState = " + client.readyState); client.close(); }); @@ -405,159 +477,61 @@ node.http.createClient = function (port, host) { } //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState); + // If there are more requests to handle, reconnect. - if (client.requests.length > 0) { - //node.debug("HTTP CLIENT: reconnecting"); - client.connect(port, host); + if (requests.length > 0 && client.readyState != "opening") { + //node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState); + client.connect(port, host); // reconnect } }); - var req, res; - - client.addListener("message_begin", function () { - req = client.requests.shift(); - res = new ClientResponse(client); - }); + createIncomingMessageStream(client, function (res) { + //node.debug("incoming response!"); - client.addListener("header_field", function (data) { - if (res.headers.length > 0 && res.last_was_value == false) - res.headers[res.headers.length-1][0] += data; - else - res.headers.push([data]); - res.last_was_value = false; - }); - - client.addListener("header_value", function (data) { - var last_pair = res.headers[res.headers.length-1]; - if (last_pair.length == 1) { - last_pair[1] = data; - } else { - last_pair[1] += data; - } - res.last_was_value = true; - }); - - client.addListener("headers_complete", function (info) { - res.statusCode = info.statusCode; - res.httpVersion = info.httpVersion; + res.addListener("complete", function ( ) { + //node.debug("request complete disconnecting. readyState = " + client.readyState); + client.close(); + }); + var req = requests.shift(); req.emit("response", [res]); }); - client.addListener("body", function (chunk) { - res._emitBody(chunk); - }); - - client.addListener("message_complete", function () { - client.close(); - res._emitComplete(); - }); - return client; }; node.http.Client.prototype.get = function (uri, headers) { - var req = createClientRequest(this, "GET", uri, headers); - this.requests.push(req); + var req = new ClientRequest("GET", uri, headers); + this._pushRequest(req); return req; }; node.http.Client.prototype.head = function (uri, headers) { - var req = createClientRequest(this, "HEAD", uri, headers); - this.requests.push(req); + var req = new ClientRequest("HEAD", uri, headers); + this._pushRequest(req); return req; }; node.http.Client.prototype.post = function (uri, headers) { - var req = createClientRequest(this, "POST", uri, headers); - this.requests.push(req); + var req = new ClientRequest("POST", uri, headers); + this._pushRequest(req); return req; }; node.http.Client.prototype.del = function (uri, headers) { - var req = createClientRequest(this, "DELETE", uri, headers); - this.requests.push(req); + var req = new ClientRequest("DELETE", uri, headers); + this._pushRequest(req); return req; }; node.http.Client.prototype.put = function (uri, headers) { - var req = createClientRequest(this, "PUT", uri, headers); - this.requests.push(req); + var req = new ClientRequest("PUT", uri, headers); + this._pushRequest(req); return req; }; -function createClientRequest (connection, method, uri, header_lines) { - var req = new node.EventEmitter; - - req.uri = uri; - - var chunked_encoding = false; - req.closeOnFinish = false; - - var sent_connection_header = false; - var sent_transfer_encoding_header = false; - var sent_content_length_header = false; - - var header = method + " " + uri + " HTTP/1.1\r\n"; - - header_lines = header_lines || []; - for (var i = 0; i < header_lines.length; i++) { - var field = header_lines[i][0]; - var value = header_lines[i][1]; - - header += field + ": " + value + CRLF; - - if (connection_expression.exec(field)) { - sent_connection_header = true; - if (close_expression.exec(value)) req.closeOnFinish = true; - } else if (transfer_encoding_expression.exec(field)) { - sent_transfer_encoding_header = true; - if (chunk_expression.exec(value)) chunked_encoding = true; - } else if (content_length_expression.exec(field)) { - sent_content_length_header = true; - } - } - if (sent_connection_header == false) { - header += "Connection: keep-alive\r\n"; - } - - header += CRLF; - - req.output = []; - - send(req.output, header); - - req.sendBody = function (chunk, encoding) { - if (sent_content_length_header == false && chunked_encoding == false) { - throw "Content-Length header (or Transfer-Encoding:chunked) not set"; - } - if (chunked_encoding) { - send(req.output, chunk.length.toString(16)); - send(req.output, CRLF); - send(req.output, chunk, encoding); - send(req.output, CRLF); - } else { - send(req.output, chunk, encoding); - } - - connection.flush(req); - }; - - req.finished = false; - - req.finish = function (responseListener) { - req.addListener("response", responseListener); - - if (chunked_encoding) - send(req.output, "0\r\n\r\n"); // last chunk - - connection.flush(req); - }; - - return req; -} node.http.cat = function (url, encoding) { var promise = new node.Promise(); diff --git a/src/net.h b/src/net.h index 22d0975..5992feb 100644 --- a/src/net.h +++ b/src/net.h @@ -88,6 +88,11 @@ private: static void on_close (evnet_socket *s) { Connection *connection = static_cast (s->data); + + assert(connection->socket_.fd < 0); + assert(connection->socket_.read_action == NULL); + assert(connection->socket_.write_action == NULL); + connection->OnDisconnect(); if (s->errorno) { @@ -95,6 +100,7 @@ private: } assert(connection->attached_); + connection->Detach(); } -- 2.7.4