Implement stream.send()
authorRyan Dahl <ry@tinyclouds.org>
Wed, 16 Dec 2009 12:50:28 +0000 (13:50 +0100)
committerRyan Dahl <ry@tinyclouds.org>
Tue, 29 Dec 2009 20:12:31 +0000 (21:12 +0100)
src/node_buffer.cc
src/node_net2.cc
tcp.js

index f211283..584ed18 100644 (file)
@@ -254,6 +254,90 @@ static Handle<Value> Slice(const Arguments &args) {
   return scope.Close(slice);
 }
 
+
+// var charsWritten = buffer.utf8Write(string, offset, length);
+static Handle<Value> Utf8Write(const Arguments &args) {
+  HandleScope scope;
+
+  struct buffer *buffer = BufferUnwrap(args.This());
+
+  if (!args[0]->IsString()) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Argument must be a string")));
+  }
+
+  Local<String> s = args[0]->ToString();
+
+  size_t offset = args[1]->Int32Value();
+
+  char *p = buffer_p(buffer, offset);
+  if (buffer_p(buffer, offset) == NULL) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Offset is out of bounds")));
+  }
+
+  size_t toWrite = args[2]->Int32Value();
+
+  if (buffer_remaining(buffer, offset) < toWrite) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Length is out of bounds")));
+  }
+
+  int written = s->WriteUtf8(p, toWrite);
+
+  return scope.Close(Integer::New(written));
+}
+
+
+// var charsWritten = buffer.asciiWrite(string, offset, length);
+static Handle<Value> AsciiWrite(const Arguments &args) {
+  HandleScope scope;
+
+  struct buffer *buffer = BufferUnwrap(args.This());
+
+  if (!args[0]->IsString()) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Argument must be a string")));
+  }
+
+  Local<String> s = args[0]->ToString();
+
+  size_t offset = args[1]->Int32Value();
+
+  char *p = buffer_p(buffer, offset);
+  if (buffer_p(buffer, offset) == NULL) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Offset is out of bounds")));
+  }
+
+  size_t toWrite = args[2]->Int32Value();
+
+  if (buffer_remaining(buffer, offset) < toWrite) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Length is out of bounds")));
+  }
+
+  // TODO Expose the second argument of WriteAscii?
+  // Could avoid doing slices when the string doesn't fit in a buffer.  V8
+  // slice() does copy the string, so exposing that argument would help.
+
+  int written = s->WriteAscii(p, 0, toWrite);
+
+  return scope.Close(Integer::New(written));
+}
+
+
+static Handle<Value> Utf8Length(const Arguments &args) {
+  HandleScope scope;
+  if (!args[0]->IsString()) {
+    return ThrowException(Exception::TypeError(String::New(
+            "Argument must be a string")));
+  }
+  Local<String> s = args[0]->ToString();
+  return scope.Close(Integer::New(s->Utf8Length()));
+}
+
+
 void InitBuffer(Handle<Object> target) {
   HandleScope scope;
 
@@ -271,6 +355,11 @@ void InitBuffer(Handle<Object> target) {
   // copy 
   NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Slice", Utf8Slice);
 
+  NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Write", Utf8Write);
+  NODE_SET_PROTOTYPE_METHOD(constructor_template, "asciiWrite", AsciiWrite);
+
+  NODE_SET_METHOD(constructor_template->GetFunction(), "utf8Length", Utf8Length);
+
   target->Set(String::NewSymbol("Buffer"), constructor_template->GetFunction());
 }
 
index 3d8a1fa..7b4a788 100644 (file)
@@ -425,9 +425,9 @@ static Handle<Value> Read(const Arguments& args) {
           String::New("Length is extends beyond buffer")));
   }
 
-  size_t bytes_read = read(fd,
-                           buffer_p(buffer, off),
-                           buffer_remaining(buffer, off));
+  ssize_t bytes_read = read(fd,
+                            buffer_p(buffer, off),
+                            buffer_remaining(buffer, off));
 
   if (bytes_read < 0) {
     if (errno == EAGAIN || errno == EINTR) return Null();
@@ -457,20 +457,20 @@ static Handle<Value> Write(const Arguments& args) {
   struct buffer * buffer = BufferUnwrap(args[1]);
 
   size_t off = args[2]->Int32Value();
-  if (buffer_p(buffer, off) == NULL) {
+  char *p = buffer_p(buffer, off);
+  if (p == NULL) {
     return ThrowException(Exception::Error(
           String::New("Offset is out of bounds")));
   }
 
   size_t len = args[3]->Int32Value();
-  if (buffer_remaining(buffer, off) < len) {
+  size_t remaining = buffer_remaining(buffer, off);
+  if (remaining < len) {
     return ThrowException(Exception::Error(
           String::New("Length is extends beyond buffer")));
   }
 
-  size_t written = write(fd,
-                         buffer_p(buffer, off),
-                         buffer_remaining(buffer, off));
+  ssize_t written = write(fd, p, len);
 
   if (written < 0) {
     if (errno == EAGAIN || errno == EINTR) return Null();
diff --git a/tcp.js b/tcp.js
index 6dd29f8..36416d2 100644 (file)
--- a/tcp.js
+++ b/tcp.js
@@ -16,15 +16,18 @@ var read      = process.read;
 var write     = process.write;
 var toRead    = process.toRead;
 
-var Peer = function (peerInfo) {
+var Stream = function (peerInfo) {
   process.EventEmitter.call();
 
   var self = this;
 
-  process.mixin(self, peerInfo);
+  self.fd = peerInfo.fd;
+  self.remoteAddress = peerInfo.remoteAddress;
+  self.remotePort = peerInfo.remotePort;
 
   // Allocated on demand.
   self.recvBuffer = null;
+  self.sendQueue = [];
 
   self.readWatcher = new process.IOWatcher(function () {
     debug("\n" + self.fd + " readable");
@@ -32,25 +35,25 @@ var Peer = function (peerInfo) {
     // If this is the first recv (recvBuffer doesn't exist) or we've used up
     // most of the recvBuffer, allocate a new one.
     if (!self.recvBuffer || 
-        self.recvBuffer.length - self.recvBufferBytesUsed < 128) {
+        self.recvBuffer.length - self.recvBuffer.used < 128) {
       self._allocateNewRecvBuf();
     }
 
-    debug("recvBufferBytesUsed " + self.recvBufferBytesUsed);
+    debug("recvBuffer.used " + self.recvBuffer.used);
     var bytesRead = read(self.fd,
                          self.recvBuffer,
-                         self.recvBufferBytesUsed,
-                         self.recvBuffer.length - self.recvBufferBytesUsed);
+                         self.recvBuffer.used,
+                         self.recvBuffer.length - self.recvBuffer.used);
     debug("bytesRead " + bytesRead + "\n");
 
-    if (bytesRead == 0) { 
+    if (bytesRead == 0) {
       self.readable = false;
       self.readWatcher.stop();
       self.emit("eof");
     } else {
-      var slice = self.recvBuffer.slice(self.recvBufferBytesUsed,
-                                        self.recvBufferBytesUsed + bytesRead);
-      self.recvBufferBytesUsed += bytesRead;
+      var slice = self.recvBuffer.slice(self.recvBuffer.used,
+                                        self.recvBuffer.used + bytesRead);
+      self.recvBuffer.used += bytesRead;
       self.emit("receive", slice);
     }
   });
@@ -58,18 +61,16 @@ var Peer = function (peerInfo) {
   self.readWatcher.start();
 
   self.writeWatcher = new process.IOWatcher(function () {
-    debug(self.fd + " writable");
+    self.flush();
   });
   self.writeWatcher.set(self.fd, false, true);
 
   self.readable = true;
   self.writable = true;
-
-  self._out = [];
 };
-process.inherits(Peer, process.EventEmitter);
+process.inherits(Stream, process.EventEmitter);
 
-Peer.prototype._allocateNewRecvBuf = function () {
+Stream.prototype._allocateNewRecvBuf = function () {
   var self = this;
 
   var newBufferSize = 1024; // TODO make this adjustable from user API
@@ -83,7 +84,7 @@ Peer.prototype._allocateNewRecvBuf = function () {
     } else if (bytesToRead == 0) {
       // Probably getting an EOF - so let's not allocate so much.
       if (self.recvBuffer &&
-          self.recvBuffer.length - self.recvBufferBytesUsed > 0) {
+          self.recvBuffer.length - self.recvBuffer.used > 0) {
         return; // just recv the eof on the old buf.
       }
       newBufferSize = 128;
@@ -91,10 +92,115 @@ Peer.prototype._allocateNewRecvBuf = function () {
   }
 
   self.recvBuffer = new process.Buffer(newBufferSize);
-  self.recvBufferBytesUsed = 0;
+  self.recvBuffer.used = 0;
+};
+
+Stream.prototype._allocateSendBuffer = function () {
+  var b = new process.Buffer(1024);
+  b.used = 0;
+  b.sent = 0;
+  this.sendQueue.push(b);
+  return b;
+};
+
+Stream.prototype.send = function (data, encoding) {
+  var self = this;
+  if (typeof(data) == "string") {
+    var buffer;
+    if (self.sendQueue.length == 0) {
+      buffer = self._allocateSendBuffer();
+    } else {
+      // walk through the sendQueue, find the first empty buffer
+      for (var i = 0; i < self.sendQueue.length; i++) {
+        if (self.sendQueue[i].used == 0) {
+          buffer = self.sendQueue[i];
+          break;
+        }
+      }
+      // if we didn't find one, take the last
+      if (!buffer) {
+        buffer = self.sendQueue[self.sendQueue.length-1];
+        // if last buffer is empty
+        if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
+      }
+    }
+
+    encoding = encoding || "ascii"; // default to ascii since it's faster
+
+    var charsWritten;
+
+    if (encoding.toLowerCase() == "utf8") {
+      charsWritten = buffer.utf8Write(data,
+                                      buffer.used,
+                                      buffer.length - buffer.used);
+      buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten));
+    } else {
+      // ascii
+      charsWritten = buffer.asciiWrite(data,
+                                       buffer.used,
+                                       buffer.length - buffer.used);
+      buffer.used += charsWritten;
+      debug("ascii charsWritten " + charsWritten);
+      debug("ascii buffer.used " + buffer.used);
+    }
+
+
+    // If we didn't finish, then recurse with the rest of the string.
+    if (charsWritten < data.length) {
+      debug("recursive send");
+      self.send(data.slice(charsWritten), encoding);
+    }
+  } else {
+    // data is a process.Buffer
+   
+    // walk through the sendQueue, find the first empty buffer
+    var inserted = false;
+    data.sent = 0;
+    data.used = data.length;
+    for (var i = 0; i < self.sendQueue.length; i++) {
+      if (self.sendQueue[i].used == 0) {
+        // if found, insert the data there
+        self.sendQueue.splice(i, 0, data);
+        inserted = true;
+        break;
+      }
+    }
+
+    if (!inserted) self.sendQueue.push(data);
+  }
+  this.flush();
+};
+
+// returns true if flushed without getting EAGAIN
+// false if it got EAGAIN
+Stream.prototype.flush = function () {
+  var self = this;
+  var bytesWritten;
+  while (self.sendQueue.length > 0) {
+    var b = self.sendQueue[0];
+
+    if (b.sent == b.used) {
+      // this can be improved - save the buffer for later?
+      self.sendQueue.shift()
+      continue;
+    }
+
+    bytesWritten = write(self.fd,
+                         b,
+                         b.sent,
+                         b.used - b.sent);
+    if (bytesWritten === null) {
+      this.writeWatcher.start();
+      return false;
+    }
+    b.sent += bytesWritten;
+    debug("bytes sent: " + b.sent);
+  }
+  this.writeWatcher.stop();
+  return true;
 };
 
-Peer.prototype.close = function () {
+Stream.prototype.close = function () {
   this.readable = false;
   this.writable = false;
 
@@ -113,14 +219,11 @@ var Server = function (listener) {
   }
 
   self.watcher = new process.IOWatcher(function (readable, writeable) {
-    debug("readable " + readable);
-    debug("writable " + writeable);
     while (self.fd) {
-      debug("accept from " + self.fd);
       var peerInfo = accept(self.fd);
       debug("accept: " + JSON.stringify(peerInfo));
       if (!peerInfo) return;
-      var peer = new Peer(peerInfo);
+      var peer = new Stream(peerInfo);
       self.emit("connection", peer);
     }
   });
@@ -132,6 +235,7 @@ Server.prototype.listen = function () {
 
   if (self.fd) throw new Error("Already running");
 
+  var backlogIndex;
   if (typeof(arguments[0]) == "string" && arguments.length == 1) {
     // the first argument specifies a path
     self.fd = process.socket("UNIX");
@@ -141,13 +245,15 @@ Server.prototype.listen = function () {
     //   unlink(SOCKFILE);
     // }
     bind(self.fd, arguments[0]);
+    backlogIndex = 1;
   } else {
     // the first argument is the port, the second an IP
     self.fd = process.socket("TCP");
     // TODO dns resolution on arguments[1]
     bind(self.fd, arguments[0], arguments[1]);
+    backlogIndex = typeof(arguments[1]) == "string" ? 2 : 1;
   }
-  listen(self.fd, 128); // TODO configurable backlog
+  listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128);
 
   self.watcher.set(self.fd, true, false); 
   self.watcher.start();
@@ -179,6 +285,7 @@ var server = new Server(function (peer) {
 
   peer.addListener("receive", function (b) {
     sys.puts("recv (" + b.length + "): " + b);
+    peer.send("pong\r\n");
   });
 });
 //server.listen(8000);