Further net2 compatibilities
authorRyan Dahl <ry@tinyclouds.org>
Wed, 10 Mar 2010 00:27:49 +0000 (16:27 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Wed, 10 Mar 2010 00:27:49 +0000 (16:27 -0800)
lib/net.js
src/node_buffer.cc
test/pummel/test-tcp-many-clients.js
test/pummel/test-tcp-pause.js
test/simple/test-tcp-reconnect.js

index 656f824..820a9a8 100644 (file)
@@ -1,4 +1,5 @@
 var sys = require("./sys");
+var fs = require("./fs");
 var debugLevel = 0;
 if ('NODE_DEBUG' in process.ENV) debugLevel = 1;
 function debug (x) {
@@ -43,13 +44,13 @@ function FreeList (name, max, constructor) {
 }
 
 FreeList.prototype.alloc = function () {
-  debug("alloc " + this.name + " " + this.list.length);
+  //debug("alloc " + this.name + " " + this.list.length);
   return this.list.length ? this.list.shift()
                           : this.constructor.apply(this, arguments);
 }
 
 FreeList.prototype.free = function (obj) {
-  debug("free " + this.name + " " + this.list.length);
+  //debug("free " + this.name + " " + this.list.length);
   if (this.list.length < this.max) {
     this.list.push(obj);
   }
@@ -88,7 +89,7 @@ function initSocket (self) {
       allocRecvBuffer();
     }
 
-    debug('recvBuffer.used ' + recvBuffer.used);
+    //debug('recvBuffer.used ' + recvBuffer.used);
     var bytesRead;
 
     try {
@@ -97,7 +98,7 @@ function initSocket (self) {
                             recvBuffer,
                             recvBuffer.used,
                             recvBuffer.length - recvBuffer.used);
-        debug('recvMsg.fd ' + recvMsg.fd);
+        //debug('recvMsg.fd ' + recvMsg.fd);
         if (recvMsg.fd) {
           self.emit('fd', recvMsg.fd);
         }
@@ -112,7 +113,7 @@ function initSocket (self) {
       return;
     }
 
-    debug('bytesRead ' + bytesRead + '\n');
+    //debug('bytesRead ' + bytesRead + '\n');
 
     if (!recvMsg.fd && bytesRead == 0) {
       self.readable = false;
@@ -125,14 +126,32 @@ function initSocket (self) {
     } else if (bytesRead > 0) {
       var start = recvBuffer.used;
       var end = recvBuffer.used + bytesRead;
-      
-      if (self._events && self._events['data']) {
-        // emit a slice
-        self.emit('data', recvBuffer.slice(start, end));
+
+      if (!self._encoding) {
+        if (self._events && self._events['data']) {
+          // emit a slice
+          self.emit('data', recvBuffer.slice(start, end));
+        }
+
+        // Optimization: emit the original buffer with end points
+        if (self.ondata) self.ondata(recvBuffer, start, end);
+      } else {
+        // TODO remove me - we should only output Buffer
+
+        var string;
+        switch (self._encoding) {
+          case 'utf8':
+            string = recvBuffer.utf8Slice(start, end);
+            break;
+          case 'ascii':
+            string = recvBuffer.asciiSlice(start, end);
+            break;
+          default:
+            throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer'); 
+        }
+        self.emit('data', string);
       }
 
-      // Optimization: emit the original buffer with end points
-      if (self.ondata) self.ondata(recvBuffer, start, end);
 
       recvBuffer.used += bytesRead;
     }
@@ -173,8 +192,7 @@ function Socket (peerInfo) {
     this.remoteAddress = peerInfo.remoteAddress;
     this.remotePort = peerInfo.remotePort;
 
-    this._readWatcher.set(this.fd, true, false);
-    this._readWatcher.start();
+    this.resume();
     this.readable = true;
 
     this._writeWatcher.set(this.fd, false, true);
@@ -206,6 +224,7 @@ Socket.prototype._writeString = function (data, encoding) {
   var self = this;
   if (!self.writable) throw new Error('Socket is not writable');
   var buffer;
+
   if (self._writeQueue.length == 0) {
     buffer = self._allocateSendBuffer();
   } else {
@@ -229,16 +248,12 @@ Socket.prototype._writeString = function (data, encoding) {
     bytesWritten = charsWritten;
   } else if (encoding.toLowerCase() == 'utf8') {
     buffer.isFd = false;
-    charsWritten = buffer.utf8Write(data,
-                                    buffer.used,
-                                    buffer.length - buffer.used);
+    charsWritten = buffer.utf8Write(data, buffer.used);
     bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten));
   } else {
     // ascii
     buffer.isFd = false;
-    charsWritten = buffer.asciiWrite(data,
-                                     buffer.used,
-                                     buffer.length - buffer.used);
+    charsWritten = buffer.asciiWrite(data, buffer.used);
     bytesWritten = charsWritten;
   }
 
@@ -249,12 +264,12 @@ Socket.prototype._writeString = function (data, encoding) {
     self._writeQueueSize += bytesWritten;
   }
 
-  debug('charsWritten ' + charsWritten);
-  debug('buffer.used ' + buffer.used);
+  //debug('charsWritten ' + charsWritten);
+  //debug('buffer.used ' + buffer.used);
 
   // If we didn't finish, then recurse with the rest of the string.
   if (charsWritten < data.length) {
-    debug('recursive write');
+    //debug('recursive write');
     self._writeString(data.slice(charsWritten), encoding);
   }
 };
@@ -270,6 +285,10 @@ Socket.prototype.send = function () {
   throw new Error('send renamed to write');
 };
 
+Socket.prototype.setEncoding = function (enc) {
+  // TODO check values, error out on bad, and deprecation message?
+  this._encoding = enc.toLowerCase();
+};
 
 // Returns true if all the data was flushed to socket. Returns false if
 // something was queued. If data was queued, then the "drain" event will
@@ -368,11 +387,11 @@ Socket.prototype.flush = function () {
     if (b.isFd) {
       b.sent = b.used;
       self._writeMessageQueueSize -= 1;
-      debug('sent fd: ' + fdToSend);
+      //debug('sent fd: ' + fdToSend);
     } else {
       b.sent += bytesWritten;
       self._writeQueueSize -= bytesWritten;
-      debug('bytes sent: ' + b.sent);
+      //debug('bytes sent: ' + b.sent);
     }
   }
   if (self._writeWatcher) self._writeWatcher.stop();
@@ -380,6 +399,39 @@ Socket.prototype.flush = function () {
 };
 
 
+function doConnect (socket, port, host) {
+  try {
+    connect(socket.fd, port, host);
+  } catch (e) {
+    socket.forceClose(e);
+  }
+
+  // Don't start the read watcher until connection is established
+  socket._readWatcher.set(socket.fd, true, false);
+
+  // How to connect on POSIX: Wait for fd to become writable, then call
+  // socketError() if there isn't an error, we're connected. AFAIK this a
+  // platform independent way determining when a non-blocking connection
+  // is established, but I have only seen it documented in the Linux
+ // Manual Page connect(2) under the error code EINPROGRESS.
+  socket._writeWatcher.set(socket.fd, false, true);
+  socket._writeWatcher.start();
+  socket._writeWatcher.callback = function () {
+    var errno = socketError(socket.fd);
+    if (errno == 0) {
+      // connection established
+      socket._readWatcher.start();
+      socket.readable = true;
+      socket.writable = true;
+      socket._writeWatcher.callback = socket._doFlush;
+      socket.emit('connect');
+    } else if (errno != EINPROGRESS) {
+      socket.forceClose(errnoException(errno, 'connect'));
+    }
+  };
+}
+
+
 // var stream = new Socket();
 // stream.connect(80)               - TCP connect to port 80 on the localhost
 // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
@@ -390,55 +442,28 @@ Socket.prototype.connect = function () {
   var self = this;
   if (self.fd) throw new Error('Socket already opened');
 
-  function doConnect () {
-    try {
-      connect(self.fd, arguments[0], arguments[1]);
-    } catch (e) {
-      close(self.fd);
-      throw e;
-    }
-
-    // Don't start the read watcher until connection is established
-    self._readWatcher.set(self.fd, true, false);
-
-    // How to connect on POSIX: Wait for fd to become writable, then call
-    // socketError() if there isn't an error, we're connected. AFAIK this a
-    // platform independent way determining when a non-blocking connection
-    // is established, but I have only seen it documented in the Linux
-   // Manual Page connect(2) under the error code EINPROGRESS.
-    self._writeWatcher.set(self.fd, false, true);
-    self._writeWatcher.start();
-    self._writeWatcher.callback = function () {
-      var errno = socketError(self.fd);
-      if (errno == 0) {
-        // connection established
-        self._readWatcher.start();
-        self.readable = true;
-        self.writable = true;
-        self._writeWatcher.callback = self._doFlush;
-        self.emit('connect');
-      } else if (errno != EINPROGRESS) {
-        self.forceClose(errnoException(errno, 'connect'));
-      }
-    };
-  }
-
   if (typeof(arguments[0]) == 'string') {
     self.fd = socket('unix');
     self.type = 'unix';
     // TODO check if sockfile exists?
-    doConnect(arguments[0]);
+    doConnect(self, arguments[0]);
   } else {
     self.fd = socket('tcp');
+    debug('new fd = ' + self.fd);
     self.type = 'tcp';
     // TODO dns resolution on arguments[1]
     var port = arguments[0];
+    var yyy = xxx++;
     lookupDomainName(arguments[1], function (ip) {
-      doConnect(port, ip);
+      debug('doConnect ' + self.fd + '   yyy=' + yyy);
+      doConnect(self, port, ip);
+      debug('doConnect done ' + self.fd + '   yyy=' + yyy);
     });
   }
 };
 
+var xxx = 0;
+
 
 Socket.prototype.address = function () {
   return getsockname(this.fd);
@@ -449,8 +474,19 @@ Socket.prototype.setNoDelay = function (v) {
 };
 
 
+Socket.prototype.pause = function () {
+  this._readWatcher.stop();
+};
+
+Socket.prototype.resume = function () {
+  if (!this.fd) throw new Error('Cannot resume() closed Socket.');
+  this._readWatcher.set(this.fd, true, false);
+  this._readWatcher.start();
+};
+
 Socket.prototype.forceClose = function (exception) {
   // recvBuffer is shared between sockets, so don't need to free it here.
+  var self = this;
 
   var b;
   while (this._writeQueue.length) {
@@ -472,8 +508,12 @@ Socket.prototype.forceClose = function (exception) {
 
   if (this.fd) {
     close(this.fd);
+    debug('close ' + this.fd);
     this.fd = null;
-    this.emit('close', exception);
+    process.nextTick(function () {
+      if (exception) self.emit('error', exception);
+      self.emit('close', exception ? true : false);
+    });
   }
 };
 
@@ -517,12 +557,14 @@ function Server (listener) {
   self.watcher.callback = function (readable, writeable) {
     while (self.fd) {
       var peerInfo = accept(self.fd);
-      debug('accept: ' + JSON.stringify(peerInfo));
       if (!peerInfo) return;
       var peer = new Socket(peerInfo);
       peer.type = self.type;
       peer.server = self;
       self.emit('connection', peer);
+      // The 'connect' event  probably should be removed for server-side
+      // sockets. It's redundent.
+      peer.emit('connect'); 
     }
   };
 }
@@ -539,7 +581,13 @@ exports.createServer = function (listener) {
  */
 function lookupDomainName (dn, callback) {
   if (!needsLookup(dn)) {
-    callback(dn);
+    // Always wait until the next tick this is so people can do
+    //
+    //   server.listen(8000);
+    //   server.addListener('listening', fn);
+    //
+    // Marginally slower, but a lot fewer WTFs.
+    process.nextTick(function () { callback(dn); })
   } else {
     debug("getaddrinfo 4 " + dn);
     getaddrinfo(dn, 4, function (r4) {
@@ -589,9 +637,9 @@ Server.prototype.listen = function () {
     var path = arguments[0];
     self.path = path;
     // unlink sockfile if it exists
-    process.fs.stat(path, function (r) {
-      if (r instanceof Error) {
-        if (r.errno == ENOENT) {
+    fs.stat(path, function (err, r) {
+      if (err) {
+        if (err.errno == ENOENT) {
           bind(self.fd, path);
           doListen();
         } else {
@@ -601,7 +649,7 @@ Server.prototype.listen = function () {
         if (!r.isFile()) {
           throw new Error("Non-file exists at  " + path);
         } else {
-          process.fs.unlink(path, function (err) {
+          fs.unlink(path, function (err) {
             if (err) {
               throw err;
             } else {
@@ -623,9 +671,7 @@ Server.prototype.listen = function () {
     self.fd = socket('tcp');
     self.type = 'tcp';
     var port = arguments[0];
-    debug("starting tcp server on port " + port);
     lookupDomainName(arguments[1], function (ip) {
-      debug("starting tcp server on ip " + ip);
       bind(self.fd, port, ip);
       doListen();
     });
@@ -648,7 +694,7 @@ Server.prototype.close = function () {
   self.fd = null;
 
   if (self.type === "unix") {
-    process.fs.unlink(self.path, function () {
+    fs.unlink(self.path, function () {
       self.emit("close");
     });
   } else {
index d5b68aa..70820dd 100644 (file)
@@ -8,6 +8,8 @@
 
 #include <node.h>
 
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
 namespace node {
 
 using namespace v8;
@@ -226,7 +228,8 @@ Handle<Value> Buffer::Utf8Write(const Arguments &args) {
             "Not enough space in Buffer for string")));
   }
 
-  int written = s->WriteUtf8((char*)p);
+  int written = s->WriteUtf8((char*)p, buffer->length_ - offset);
+
   return scope.Close(Integer::New(written));
 }
 
@@ -253,12 +256,9 @@ Handle<Value> Buffer::AsciiWrite(const Arguments &args) {
 
   const char *p = buffer->data_ + offset;
 
-  if (s->Length() + offset > buffer->length_) {
-    return ThrowException(Exception::TypeError(String::New(
-            "Not enough space in Buffer for string")));
-  }
+  size_t towrite = MIN(s->Length(), buffer->length_ - offset);
 
-  int written = s->WriteAscii((char*)p);
+  int written = s->WriteAscii((char*)p, 0, towrite);
   return scope.Close(Integer::New(written));
 }
 
index f6de67b..1c27169 100644 (file)
@@ -1,5 +1,5 @@
 require("../common");
-tcp = require("tcp");
+net = require("net");
 // settings
 var bytes = 1024*40;
 var concurrency = 100;
@@ -13,7 +13,7 @@ for (var i = 0; i < bytes; i++) {
   body += "C";
 }
 
-var server = tcp.createServer(function (c) {
+var server = net.createServer(function (c) {
   c.addListener("connect", function () {
     total_connections++;
     print("#");
@@ -24,8 +24,10 @@ var server = tcp.createServer(function (c) {
 server.listen(PORT);
 
 function runClient (callback) {
-  var client = tcp.createConnection(PORT);
+  var client = net.createConnection(PORT);
+
   client.connections = 0;
+
   client.setEncoding("utf8");
 
   client.addListener("connect", function () {
@@ -38,14 +40,25 @@ function runClient (callback) {
     this.recved += chunk;
   });
 
-  client.addListener("end", function (had_error) {
+  client.addListener("end", function () {
     client.close();
   });
 
+  client.addListener("error", function (e) {
+    puts("\n\nERROOOOOr");
+    throw e;
+  });
+
   client.addListener("close", function (had_error) {
     print(".");
     assert.equal(false, had_error);
     assert.equal(bytes, client.recved.length);
+
+    if (client.fd) {
+      puts(client.fd);
+    }
+    assert.ok(!client.fd);
+
     if (this.connections < connections_per_client) {
       this.connect(PORT);
     } else {
@@ -54,13 +67,14 @@ function runClient (callback) {
   });
 }
 
-
-var finished_clients = 0;
-for (var i = 0; i < concurrency; i++) {
-  runClient(function () {
-    if (++finished_clients == concurrency) server.close();
-  });
-}
+server.addListener('listening', function () {
+  var finished_clients = 0;
+  for (var i = 0; i < concurrency; i++) {
+    runClient(function () {
+      if (++finished_clients == concurrency) server.close();
+    });
+  }
+});
 
 process.addListener("exit", function () {
   assert.equal(connections_per_client * concurrency, total_connections);
index 71b83df..adb6154 100644 (file)
@@ -1,8 +1,8 @@
 require("../common");
-tcp = require("tcp");
+net = require("net");
 N = 200;
 
-server = tcp.createServer(function (connection) {
+server = net.createServer(function (connection) {
   function write (j) {
     if (j >= N) {
       connection.close();
@@ -21,7 +21,7 @@ server.listen(PORT);
 recv = "";
 chars_recved = 0;
 
-client = tcp.createConnection(PORT);
+client = net.createConnection(PORT);
 client.setEncoding("ascii");
 client.addListener("data", function (d) {
     print(d);
index 2a1ce65..66d93e3 100644 (file)
@@ -1,12 +1,12 @@
 require("../common");
-tcp = require("tcp");
+net = require('net');
 var N = 50;
 
 var c = 0;
 var client_recv_count = 0;
 var disconnect_count = 0;
 
-var server = tcp.createServer(function (socket) {
+var server = net.createServer(function (socket) {
   socket.addListener("connect", function () {
     socket.write("hello\r\n");
   });
@@ -20,33 +20,38 @@ var server = tcp.createServer(function (socket) {
     assert.equal(false, had_error);
   });
 });
+
 server.listen(PORT);
 
-var client = tcp.createConnection(PORT);
+server.addListener('listening', function () {
+  puts('listening');
+  var client = net.createConnection(PORT);
 
-client.setEncoding("UTF8");
+  client.setEncoding("UTF8");
 
-client.addListener("connect", function () {
-  puts("client connected.");
-});
+  client.addListener("connect", function () {
+    puts("client connected.");
+  });
 
-client.addListener("data", function (chunk) {
-  client_recv_count += 1;
-  puts("client_recv_count " + client_recv_count);
-  assert.equal("hello\r\n", chunk);
-  client.close();
-});
+  client.addListener("data", function (chunk) {
+    client_recv_count += 1;
+    puts("client_recv_count " + client_recv_count);
+    assert.equal("hello\r\n", chunk);
+    client.close();
+  });
 
-client.addListener("close", function (had_error) {
-  puts("disconnect");
-  assert.equal(false, had_error);
-  if (disconnect_count++ < N)
-    client.connect(PORT); // reconnect
-  else
-    server.close();
+  client.addListener("close", function (had_error) {
+    puts("disconnect");
+    assert.equal(false, had_error);
+    if (disconnect_count++ < N)
+      client.connect(PORT); // reconnect
+    else
+      server.close();
+  });
 });
 
 process.addListener("exit", function () {
   assert.equal(N+1, disconnect_count);
   assert.equal(N+1, client_recv_count);
 });
+