net2 HTTPClient work
authorRyan Dahl <ry@tinyclouds.org>
Thu, 18 Mar 2010 20:21:33 +0000 (13:21 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Thu, 18 Mar 2010 20:21:52 +0000 (13:21 -0700)
lib/http2.js
test/simple/test-http-client-upload.js

index ed31454..995a585 100644 (file)
@@ -4,6 +4,108 @@ var events = require('events');
 
 var HTTPParser = process.binding('http_parser').HTTPParser;
 
+var parserFreeList = [];
+
+function newParser (type) {
+  var parser;
+  if (parserFreeList.length) {
+    parser = parserFreeList.shift();
+    parser.reinitialize(type);
+  } else {
+    parser = new HTTPParser(type);
+
+    parser.onMessageBegin = function () {
+      parser.incoming = new IncomingMessage(parser.socket);
+      parser.field = null;
+      parser.value = null;
+    };
+
+    // Only servers will get URL events.
+    parser.onURL = function (b, start, len) {
+      var slice = b.asciiSlice(start, start+len);
+      if (parser.incoming.url) {
+        parser.incoming.url += slice;
+      } else {
+        // Almost always will branch here.
+        parser.incoming.url = slice;
+      }
+    };
+
+    parser.onHeaderField = function (b, start, len) {
+      var slice = b.asciiSlice(start, start+len).toLowerCase();
+      if (parser.value) {
+        parser.incoming._addHeaderLine(parser.field, parser.value);
+        parser.field = null;
+        parser.value = null;
+      }
+      if (parser.field) {
+        parser.field += slice;
+      } else {
+        parser.field = slice;
+      }
+    };
+
+    parser.onHeaderValue = function (b, start, len) {
+      var slice = b.asciiSlice(start, start+len);
+      if (parser.value) {
+        parser.value += slice;
+      } else {
+        parser.value = slice;
+      }
+    };
+
+    parser.onHeadersComplete = function (info) {
+      if (parser.field && parser.value) {
+        parser.incoming._addHeaderLine(parser.field, parser.value);
+      }
+
+      parser.incoming.httpVersionMajor = info.versionMajor;
+      parser.incoming.httpVersionMinor = info.versionMinor;
+
+      if (info.method) {
+        // server only
+        parser.incoming.method = info.method;
+      } else {
+        // client only
+        parser.incoming.statusCode = info.statusCode;
+      }
+
+      parser.onIncoming(parser.incoming, info.shouldKeepAlive);
+    };
+
+    parser.onBody = function (b, start, len) {
+      // TODO body encoding?
+      var enc = parser.incoming._encoding;
+      if (!enc) {
+        parser.incoming.emit('data', b.slice(start, start+len));
+      } else {
+        var string;
+        switch (enc) {
+          case 'utf8':
+            string = b.utf8Slice(start, start+len);
+            break;
+          case 'ascii':
+            string = b.asciiSlice(start, start+len);
+            break;
+          default:
+            throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer');
+        }
+        parser.incoming.emit('data', string);
+      }
+    };
+
+    parser.onMessageComplete = function () {
+      parser.incoming.emit("end");
+    };
+  }
+  return parser;
+}
+
+function freeParser (parser) {
+  if (parserFreeList.length < 1000) parserFreeList.push(parser);
+}
+
+
 var CRLF = "\r\n";
 var STATUS_CODES = exports.STATUS_CODES = {
   100 : 'Continue',
@@ -45,11 +147,11 @@ var STATUS_CODES = exports.STATUS_CODES = {
   505 : 'HTTP Version not supported'
 };
 
-var connectionExpression = /Connection/i;
-var transferEncodingExpression = /Transfer-Encoding/i;
-var closeExpression = /close/i;
-var chunkExpression = /chunk/i;
-var contentLengthExpression = /Content-Length/i;
+var connection_expression = /Connection/i;
+var transfer_encoding_expression = /Transfer-Encoding/i;
+var close_expression = /close/i;
+var chunk_expression = /chunk/i;
+var content_length_expression = /Content-Length/i;
 
 
 /* Abstract base class for ServerRequest and ClientResponse. */
@@ -60,10 +162,14 @@ function IncomingMessage (socket) {
   this.httpVersion = null;
   this.headers = {};
 
+  // request (server) only
+  this.url = "";
+
   this.method = null;
 
   // response (client) only
   this.statusCode = null;
+  this.client = this.socket;
 }
 sys.inherits(IncomingMessage, events.EventEmitter);
 exports.IncomingMessage = IncomingMessage;
@@ -73,16 +179,21 @@ IncomingMessage.prototype._parseQueryString = function () {
 };
 
 IncomingMessage.prototype.setBodyEncoding = function (enc) {
-  // TODO: Find a cleaner way of doing this.
-  this.socket.setEncoding(enc);
+  // TODO deprecation message?
+  this.setEncoding(enc);
+};
+
+IncomingMessage.prototype.setEncoding = function (enc) {
+  // TODO check values, error out on bad, and deprecation message?
+  this._encoding = enc.toLowerCase();
 };
 
 IncomingMessage.prototype.pause = function () {
-  this.socket.readPause();
+  this.socket.pause();
 };
 
 IncomingMessage.prototype.resume = function () {
-  this.socket.readResume();
+  this.socket.resume();
 };
 
 IncomingMessage.prototype._addHeaderLine = function (field, value) {
@@ -95,18 +206,21 @@ IncomingMessage.prototype._addHeaderLine = function (field, value) {
   }
 };
 
-function OutgoingMessage () {
-  events.EventEmitter.call(this);
+function OutgoingMessage (socket) {
+  events.EventEmitter.call(this, socket);
+
+  this.socket = socket;
 
   this.output = [];
   this.outputEncodings = [];
 
   this.closeOnFinish = false;
-  this.chunkEncoding = false;
-  this.shouldKeepAlive = true;
-  this.useChunkedEncodingByDefault = true;
+  this.chunked_encoding = false;
+  this.should_keep_alive = true;
+  this.use_chunked_encoding_by_default = true;
 
   this.flushing = false;
+  this.headWritten = false;
 
   this.finished = false;
 }
@@ -141,14 +255,14 @@ OutgoingMessage.prototype._send = function (data, encoding) {
   this.outputEncodings.push(encoding);
 };
 
-OutgoingMessage.prototype._sendHeaderLines = function (first_line, headers) {
-  var sentConnectionHeader = false;
-  var sendContentLengthHeader = false;
-  var sendTransferEncodingHeader = false;
+OutgoingMessage.prototype.sendHeaderLines = function (first_line, headers) {
+  var sent_connection_header = false;
+  var sent_content_length_header = false;
+  var sent_transfer_encoding_header = false;
 
   // first_line in the case of request is: "GET /index.html HTTP/1.1\r\n"
   // in the case of response it is: "HTTP/1.1 200 OK\r\n"
-  var messageHeader = first_line;
+  var message_header = first_line;
   var field, value;
   for (var i in headers) {
     if (headers[i] instanceof Array) {
@@ -160,52 +274,64 @@ OutgoingMessage.prototype._sendHeaderLines = function (first_line, headers) {
       value = headers[i];
     }
 
-    messageHeader += field + ": " + value + CRLF;
+    message_header += field + ": " + value + CRLF;
 
-    if (connectionExpression.test(field)) {
-      sentConnectionHeader = true;
-      if (closeExpression.test(value)) this.closeOnFinish = true;
+    if (connection_expression.test(field)) {
+      sent_connection_header = true;
+      if (close_expression.test(value)) this.closeOnFinish = true;
 
-    } else if (transferEncodingExpression.test(field)) {
-      sendTransferEncodingHeader = true;
-      if (chunkExpression.test(value)) this.chunkEncoding = true;
+    } else if (transfer_encoding_expression.test(field)) {
+      sent_transfer_encoding_header = true;
+      if (chunk_expression.test(value)) this.chunked_encoding = true;
 
-    } else if (contentLengthExpression.test(field)) {
-      sendContentLengthHeader = true;
+    } else if (content_length_expression.test(field)) {
+      sent_content_length_header = true;
 
     }
   }
 
   // keep-alive logic
-  if (sentConnectionHeader == false) {
-    if (this.shouldKeepAlive &&
-        (sendContentLengthHeader || this.useChunkedEncodingByDefault)) {
-      messageHeader += "Connection: keep-alive\r\n";
+  if (sent_connection_header == false) {
+    if (this.should_keep_alive &&
+        (sent_content_length_header || this.use_chunked_encoding_by_default)) {
+      message_header += "Connection: keep-alive\r\n";
     } else {
       this.closeOnFinish = true;
-      messageHeader += "Connection: close\r\n";
+      message_header += "Connection: close\r\n";
     }
   }
 
-  if (sendContentLengthHeader == false && sendTransferEncodingHeader == false) {
-    if (this.useChunkedEncodingByDefault) {
-      messageHeader += "Transfer-Encoding: chunked\r\n";
-      this.chunkEncoding = true;
+  if (sent_content_length_header == false && sent_transfer_encoding_header == false) {
+    if (this.use_chunked_encoding_by_default) {
+      message_header += "Transfer-Encoding: chunked\r\n";
+      this.chunked_encoding = true;
     }
     else {
       this.closeOnFinish = true;
     }
   }
 
-  messageHeader += CRLF;
+  message_header += CRLF;
 
-  this._send(messageHeader);
-  // wait until the first body chunk, or finish(), is sent to flush.
+  this._send(message_header);
+  // wait until the first body chunk, or close(), is sent to flush.
 };
 
+
+OutgoingMessage.prototype.sendBody = function () {
+  throw new Error("sendBody() has been renamed to write(). " + 
+                  "The 'body' event has been renamed to 'data' and " +
+                  "the 'complete' event has been renamed to 'end'.");
+};
+
+
 OutgoingMessage.prototype.write = function (chunk, encoding) {
+  if ( (this instanceof ServerResponse) && !this.headWritten) {
+    throw new Error("writeHead() must be called before write()")
+  }
+
   encoding = encoding || "ascii";
-  if (this.chunkEncoding) {
+  if (this.chunked_encoding) {
     this._send(process._byteLength(chunk, encoding).toString(16));
     this._send(CRLF);
     this._send(chunk, encoding);
@@ -221,65 +347,90 @@ OutgoingMessage.prototype.write = function (chunk, encoding) {
   }
 };
 
-OutgoingMessage.prototype.sendBody = function ()  {
-  throw new Error('sendBody() renamed to write()');
-};
-  
-
 OutgoingMessage.prototype.flush = function () {
   this.emit("flush");
 };
 
+OutgoingMessage.prototype.finish = function () {
+  throw new Error("finish() has been renamed to close().");
+};
+
 OutgoingMessage.prototype.close = function () {
-  if (this.chunkEncoding) this._send("0\r\n\r\n"); // last chunk
+  if (this.chunked_encoding) this._send("0\r\n\r\n"); // last chunk
   this.finished = true;
   this.flush();
 };
 
 
 function ServerResponse (req) {
-  OutgoingMessage.call(this);
+  OutgoingMessage.call(this, req.socket);
 
   if (req.httpVersionMajor < 1 || req.httpVersionMinor < 1) {
-    this.useChunkedEncodingByDefault = false;
-    this.shouldKeepAlive = false;
+    this.use_chunked_encoding_by_default = false;
+    this.should_keep_alive = false;
   }
 }
 sys.inherits(ServerResponse, OutgoingMessage);
 exports.ServerResponse = ServerResponse;
 
-ServerResponse.prototype.writeHead = function (statusCode, headers) {
-  var reason = STATUS_CODES[statusCode] || "unknown";
-  var status_line = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF;
-  this._sendHeaderLines(status_line, headers);
-};
 
-ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead;
+ServerResponse.prototype.writeHead = function (statusCode) {
+  var reasonPhrase, headers, headerIndex;
 
-ServerResponse.prototype.sendHeader = function () {
-  throw new Error('sendHeader renamed to writeHead()');
+  if (typeof arguments[1] == 'string') {
+    reasonPhrase = arguments[1];
+    headerIndex = 2;
+  } else {
+    reasonPhrase = STATUS_CODES[statusCode] || "unknown";
+    headerIndex = 1;
+  }
+
+  if (typeof arguments[headerIndex] == 'object') {
+    headers = arguments[headerIndex];
+  } else {
+    headers = {};
+  }
+
+  var status_line = "HTTP/1.1 " + statusCode.toString() + " "
+                  + reasonPhrase + CRLF;
+  this.sendHeaderLines(status_line, headers);
+  this.headWritten = true;
 };
 
+// TODO eventually remove sendHeader(), writeHeader()
+ServerResponse.prototype.sendHeader = ServerResponse.prototype.writeHead;
+ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead;
 
-function ClientRequest (method, url, headers) {
-  OutgoingMessage.call(this);
+function ClientRequest (socket, method, url, headers) {
+  OutgoingMessage.call(this, socket);
 
-  this.shouldKeepAlive = false;
+  this.should_keep_alive = false;
   if (method === "GET" || method === "HEAD") {
-    this.useChunkedEncodingByDefault = false;
+    this.use_chunked_encoding_by_default = false;
   } else {
-    this.useChunkedEncodingByDefault = true;
+    this.use_chunked_encoding_by_default = true;
   }
   this.closeOnFinish = true;
 
-  this._sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers);
+  this.sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers);
 }
 sys.inherits(ClientRequest, OutgoingMessage);
 exports.ClientRequest = ClientRequest;
 
-ClientRequest.prototype.finish = function (responseListener) {
-  this.addListener("response", responseListener);
-  OutgoingMessage.prototype.finish.call(this);
+ClientRequest.prototype.finish = function () {
+  throw new Error( "finish() has been renamed to close() and no longer takes "
+                 + "a response handler as an argument. Manually add a 'response' listener "
+                 + "to the request object."
+                 );
+};
+
+ClientRequest.prototype.close = function () {
+  if (arguments.length > 0) {
+    throw new Error( "ClientRequest.prototype.close does not take any arguments. "
+                   + "Add a response listener manually to the request object."
+                   );
+  }
+  OutgoingMessage.prototype.close.call(this);
 };
 
 
@@ -309,97 +460,27 @@ function flushMessageQueue (socket, queue) {
 }
 
 
-var parserFreeList = [];
-
-function newParser (type) {
-  var parser;
-  if (parserFreeList.length) {
-    parser = parserFreeList.shift();
-    parser.reinitialize(type);
-  } else {
-    parser = new HTTPParser(type);
-
-    parser.onMessageBegin = function () {
-      parser.incoming = new IncomingMessage(parser.socket);
-      parser.field = null;
-      parser.value = null;
-    };
-
-    // Only servers will get URL events.
-    parser.onURL = function (b, start, len) {
-      var slice = b.asciiSlice(start, start+len);
-      if (parser.incoming.url) {
-        parser.incoming.url += slice;
-      } else {
-        // Almost always will branch here.
-        parser.incoming.url = slice;
-      }
-    };
-
-    parser.onHeaderField = function (b, start, len) {
-      var slice = b.asciiSlice(start, start+len).toLowerCase();
-      if (parser.value) {
-        parser.incoming._addHeaderLine(parser.field, parser.value);
-        parser.field = null;
-        parser.value = null;
-      }
-      if (parser.field) {
-        parser.field += slice;
-      } else {
-        parser.field = slice;
-      }
-    };
-
-    parser.onHeaderValue = function (b, start, len) {
-      var slice = b.asciiSlice(start, start+len);
-      if (parser.value) {
-        parser.value += slice;
-      } else {
-        parser.value = slice;
-      }
-    };
-
-    parser.onHeadersComplete = function (info) {
-      if (parser.field && parser.value) {
-        parser.incoming._addHeaderLine(parser.field, parser.value);
-      }
-
-      parser.incoming.httpVersionMajor = info.versionMajor;
-      parser.incoming.httpVersionMinor = info.versionMinor;
-
-      if (info.method) {
-        // server only
-        parser.incoming.method = info.method;
-      } else {
-        // client only
-        parser.incoming.statusCode = info.statusCode;
-      }
-
-      parser.onIncoming(parser.incoming, info.shouldKeepAlive);
-    };
-
-    parser.onBody = function (b, start, len) {
-      parser.incoming.emit("data", b.slice(start, start+len));
-    };
-
-    parser.onMessageComplete = function () {
-      parser.incoming.emit("end");
-    };
-  }
-  return parser;
+function Server (requestListener) {
+  net.Server.call(this);
+  this.addListener("request", requestListener);
+  this.addListener("connection", connectionListener);
 }
+sys.inherits(Server, net.Server);
 
-function freeParser (parser) {
-  if (parserFreeList.length < 1000) parserFreeList.push(parser);
-}
+exports.Server = Server;
+
+exports.createServer = function (requestListener) {
+  return new Server(requestListener);
+};
 
 function connectionListener (socket) {
   var self = this;
-  var parser = newParser('request');
   // An array of responses for each socket. In pipelined connections
   // we need to keep track of the order they were sent.
   var responses = [];
 
+  var parser = newParser('request');
+
   socket.ondata = function (d, start, end) {
     parser.execute(d, start, end - start);
   };
@@ -437,64 +518,56 @@ function connectionListener (socket) {
 }
 
 
-function Server (requestListener, options) {
-  net.Server.call(this, connectionListener);
-  //server.setOptions(options);
-  this.addListener('request', requestListener);
-}
-sys.inherits(Server, net.Server);
-exports.Server = Server;
-exports.createServer = function (requestListener, options) {
-  return new Server(requestListener, options);
-};
-
-
-
-function Client () {
+function Client ( ) {
   net.Stream.call(this);
 
   var self = this;
+
   var requests = [];
   var currentRequest;
 
   var parser = newParser('response');
-  parser.socket = self;
-
-  self.addListener("connect", function () {
-    self.resetParser();
-    currentRequest = requests.shift();
-    currentRequest.flush();
-  });
+  parser.socket = this;
 
-  self.ondata = function (d, start, end) {
-    parser.execute(d, start, end - start);
+  self._reconnect = function () {
+    if (self.readyState != "opening") {
+      //sys.debug("HTTP CLIENT: reconnecting readyState = " + self.readyState);
+      self.connect(self.port, self.host);
+    }
   };
 
-  parser.onIncoming = function (res) {
-    //sys.debug("incoming response!");
-
-    res.addListener('end', function ( ) {
-      //sys.debug("request complete disconnecting. readyState = " + self.readyState);
-      self.close();
+  self._pushRequest = function (req) {
+    req.addListener("flush", function () {
+        /*
+      if (self.readyState == "closed") {
+        //sys.debug("HTTP CLIENT request flush. reconnect.  readyState = " + self.readyState);
+        self._reconnect();
+        return;
+      }
+        */
+      //sys.debug("self flush  readyState = " + self.readyState);
+      if (req == currentRequest) flushMessageQueue(self, [req]);
     });
-
-    currentRequest.emit("response", res);
+    requests.push(req);
   };
 
-  self._pushRequest = function (req) {
+  this.ondata = function (d, start, end) {
+    parser.execute(d, start, end - start);
   };
 
-  self.addListener("end", function () {
-    self.close();
+  self.addListener("connect", function () {
+    parser.reinitialize('response');
+    currentRequest = requests.shift();
+    currentRequest.flush();
   });
 
-  self.onend = function () {
+  self.addListener("end", function () {
     parser.finish();
-    // unref the parser for easy gc
     freeParser(parser);
+
     //sys.debug("self got end closing. readyState = " + self.readyState);
     self.close();
-  };
+  });
 
   self.addListener("close", function (had_error) {
     if (had_error) {
@@ -509,53 +582,30 @@ function Client () {
       self._reconnect();
     }
   });
-}
-sys.inherits(Client, net.Stream);
 
+  parser.onIncoming = function (res) {
+    sys.debug("incoming response!");
 
-exports.Client = Client;
-
-
-exports.createClient = function (port, host) {
-  var client = new Client();
-  client.port = port;
-  client.host = host;
-  client.connect(port, host);
-  return client;
-};
-
+    res.addListener('end', function ( ) {
+      //sys.debug("request complete disconnecting. readyState = " + self.readyState);
+      self.close();
+    });
 
-Client.prototype._reconnect = function () {
-  if (this.readyState != "opening") {
-    //sys.debug("HTTP CLIENT: reconnecting readyState = " + self.readyState);
-    this.connect(this.port, this.host);
-  }
+    currentRequest.emit("response", res);
+  };
 };
+sys.inherits(Client, net.Stream);
 
+exports.Client = Client;
 
-Client.prototype.request = function (method, url, headers) {
-  var self = this;
-
-  if (typeof(url) != "string") { // assume method was omitted, shift arguments
-    headers = url;
-    url = method;
-    method = null;
-  }
-  var req = new ClientRequest(this, method || "GET", url, headers);
-
-  req.addListener("flush", function () {
-    if (self.readyState == "closed") {
-      //sys.debug("HTTP CLIENT request flush. reconnect.  readyState = " + self.readyState);
-      self._reconnect();
-      return;
-    }
-    //sys.debug("self flush  readyState = " + self.readyState);
-    if (req == currentRequest) flushMessageQueue(self, [req]);
-  });
-  requests.push(req);
+exports.createClient = function (port, host) {
+  var c = new Client;
+  c.port = port;
+  c.host = host;
+  c.connect(port, host);
+  return c;
+}
 
-  return req;
-};
 
 Client.prototype.get = function () {
   throw new Error("client.get(...) is now client.request('GET', ...)");
@@ -577,9 +627,20 @@ Client.prototype.put = function () {
   throw new Error("client.put(...) is now client.request('PUT', ...)");
 };
 
+Client.prototype.request = function (method, url, headers) {
+  if (typeof(url) != "string") { // assume method was omitted, shift arguments
+    headers = url;
+    url = method;
+    method = null;
+  }
+  var req = new ClientRequest(this, method || "GET", url, headers);
+  this._pushRequest(req);
+  return req;
+};
+
 
 exports.cat = function (url, encoding_, headers_) {
-  var encoding = 'utf8', 
+  var encoding = 'utf8',
       headers = {},
       callback = null;
 
index 4c11bd4..e1d746c 100644 (file)
@@ -1,5 +1,5 @@
 require("../common");
-http = require("http");
+http = require("http2");
 
 var sent_body = "";
 var server_req_complete = false;
@@ -33,7 +33,7 @@ req.write('3\n');
 
 puts("client finished sending request");
 req.addListener('response', function(res) {
-  res.setBodyEncoding("utf8");
+  res.setEncoding("utf8");
   res.addListener('data', function(chunk) {
     puts(chunk);
   });