Optimize writing strings with Socket.write
authorBert Belder <bertbelder@gmail.com>
Mon, 7 May 2012 21:30:55 +0000 (23:30 +0200)
committerBert Belder <bertbelder@gmail.com>
Wed, 9 May 2012 01:56:19 +0000 (03:56 +0200)
lib/child_process.js
lib/net.js
src/pipe_wrap.cc
src/stream_wrap.cc
src/stream_wrap.h
src/tcp_wrap.cc
src/tty_wrap.cc
test/simple/test-tcp-wrap-listen.js

index d40566f..fdea144 100644 (file)
@@ -112,12 +112,12 @@ function setupChannel(target, channel) {
       return false;
     }
 
-    var buffer = Buffer(JSON.stringify(message) + '\n');
+    var string = JSON.stringify(message) + '\n';
 
     // Update simultaneous accepts on Windows
     net._setSimultaneousAccepts(sendHandle);
 
-    var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);
+    var writeReq = channel.writeUtf8String(string, sendHandle);
 
     if (!writeReq) {
       var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
index 4ebc940..ebf3962 100644 (file)
@@ -477,9 +477,22 @@ Socket.prototype.write = function(data, arg1, arg2) {
     }
   }
 
-  // Change strings to buffers. SLOW
   if (typeof data === 'string') {
-    data = new Buffer(data, encoding);
+    encoding = (encoding || 'utf8').toLowerCase();
+    switch (encoding) {
+      case 'utf8':
+      case 'utf-8':
+      case 'ascii':
+      case 'ucs2':
+      case 'ucs-2':
+      case 'utf16le':
+      case 'utf-16le':
+        // This encoding can be handled in the binding layer.
+        break;
+
+      default:
+        data = new Buffer(data, encoding);
+    }
   } else if (!Buffer.isBuffer(data)) {
     throw new TypeError('First argument must be a buffer or a string.');
   }
@@ -509,8 +522,33 @@ Socket.prototype._write = function(data, encoding, cb) {
     return false;
   }
 
-  // `encoding` is unused right now, `data` is always a buffer.
-  var writeReq = this._handle.write(data);
+  var writeReq;
+
+  if (Buffer.isBuffer(data)) {
+    writeReq = this._handle.writeBuffer(data);
+
+  } else {
+    switch (encoding) {
+      case 'utf8':
+      case 'utf-8':
+        writeReq = this._handle.writeUtf8String(data);
+        break;
+
+      case 'ascii':
+        writeReq = this._handle.writeAsciiString(data);
+        break;
+
+      case 'ucs2':
+      case 'ucs-2':
+      case 'utf16le':
+      case 'utf-16le':
+        writeReq = this._handle.writeUcs2String(data);
+        break;
+
+      default:
+        assert(0);
+    }
+  }
 
   if (!writeReq || typeof writeReq !== 'object') {
     this._destroy(errnoException(errno, 'write'), cb);
@@ -525,7 +563,7 @@ Socket.prototype._write = function(data, encoding, cb) {
 };
 
 
-function afterWrite(status, handle, req, buffer) {
+function afterWrite(status, handle, req) {
   var self = handle.socket;
 
   // callback may come after call to destroy.
index 8065461..d98f901 100644 (file)
@@ -100,9 +100,13 @@ void PipeWrap::Initialize(Handle<Object> target) {
 
   NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart);
   NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop);
-  NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write);
   NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown);
 
+  NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer);
+  NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString);
+  NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String);
+  NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String);
+
   NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
   NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen);
   NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
index 9fcb5b1..e67396c 100644 (file)
@@ -48,6 +48,8 @@ using v8::TryCatch;
 using v8::Context;
 using v8::Arguments;
 using v8::Integer;
+using v8::Number;
+using v8::Exception;
 
 
 #define UNWRAP \
@@ -64,10 +66,25 @@ using v8::Integer;
 
 
 typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
-typedef class ReqWrap<uv_write_t> WriteWrap;
+
+class WriteWrap: public ReqWrap<uv_write_t> {
+ public:
+  void* operator new(size_t size, char* storage) { return storage; }
+
+  // This is just to keep the compiler happy. It should never be called, since
+  // we don't use exceptions in node.
+  void operator delete(void* ptr, char* storage) { assert(0); }
+
+ protected:
+  // People should not be using the non-placement new and delete operator on a
+  // WriteWrap. Ensure this never happens.
+  void* operator new (size_t size) { assert(0); };
+  void operator delete(void* ptr) { assert(0); };
+};
 
 
 static Persistent<String> buffer_sym;
+static Persistent<String> bytes_sym;
 static Persistent<String> write_queue_size_sym;
 static Persistent<String> onread_sym;
 static Persistent<String> oncomplete_sym;
@@ -84,6 +101,7 @@ void StreamWrap::Initialize(Handle<Object> target) {
   HandleWrap::Initialize(target);
 
   buffer_sym = NODE_PSYMBOL("buffer");
+  bytes_sym = NODE_PSYMBOL("bytes");
   write_queue_size_sym = NODE_PSYMBOL("writeQueueSize");
   onread_sym = NODE_PSYMBOL("onread");
   oncomplete_sym = NODE_PSYMBOL("oncomplete");
@@ -226,7 +244,7 @@ void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
 }
 
 
-Handle<Value> StreamWrap::Write(const Arguments& args) {
+Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
   HandleScope scope;
 
   UNWRAP
@@ -248,7 +266,15 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
     length = args[2]->IntegerValue();
   }
 
-  WriteWrap* req_wrap = new WriteWrap();
+  if (length > INT_MAX) {
+    uv_err_t err;
+    err.code = UV_ENOBUFS;
+    SetErrno(err);
+    return scope.Close(v8::Null());
+  }
+
+  char* storage = new char[sizeof(WriteWrap)];
+  WriteWrap* req_wrap = new (storage) WriteWrap();
 
   req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
 
@@ -280,12 +306,153 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
   }
 
   req_wrap->Dispatched();
+  req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) length));
 
   wrap->UpdateWriteQueueSize();
 
   if (r) {
     SetErrno(uv_last_error(uv_default_loop()));
-    delete req_wrap;
+    req_wrap->~WriteWrap();
+    delete[] storage;
+    return scope.Close(v8::Null());
+  } else {
+    return scope.Close(req_wrap->object_);
+  }
+}
+
+
+enum WriteEncoding {
+  kAscii,
+  kUtf8,
+  kUcs2
+};
+
+template <WriteEncoding encoding>
+Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
+  HandleScope scope;
+  int r;
+
+  UNWRAP
+
+  if (args.Length() < 1)
+    return ThrowTypeError("Not enough arguments");
+
+  Local<String> string = args[0]->ToString();
+
+  // Compute the size of the storage that the string will be flattened into.
+  size_t storage_size;
+  switch (encoding) {
+    case kAscii:
+      storage_size = string->Length();
+      break;
+
+    case kUtf8:
+      if (!(string->MayContainNonAscii())) {
+        // If the string has only ascii characters, we know exactly how big
+        // the storage should be.
+        storage_size = string->Length();
+      } else if (string->Length() < 65536) {
+        // A single UCS2 codepoint never takes up more than 3 utf8 bytes.
+        // Unless the string is really long we just allocate so much space that
+        // we're certain the string fits in there entirely.
+        // TODO: maybe check handle->write_queue_size instead of string length?
+        storage_size = 3 * string->Length();
+      } else {
+        // The string is really long. Compute the allocation size that we
+        // actually need.
+        storage_size = string->Utf8Length();
+      }
+      break;
+
+    case kUcs2:
+      storage_size += string->Length() * sizeof(uint16_t);
+      break;
+
+    default:
+      // Unreachable.
+      assert(0);
+  }
+
+  if (storage_size > INT_MAX) {
+    uv_err_t err;
+    err.code = UV_ENOBUFS;
+    SetErrno(err);
+    return scope.Close(v8::Null());
+  }
+
+  char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
+  WriteWrap* req_wrap = new (storage) WriteWrap();
+
+  char* data = reinterpret_cast<char*>(ROUND_UP(
+      reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
+  size_t data_size;
+  switch (encoding) {
+  case kAscii:
+      data_size = string->WriteAscii(data, 0, -1,
+          String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
+      break;
+
+    case kUtf8:
+      data_size = string->WriteUtf8(data, -1, NULL,
+          String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
+      break;
+
+    case kUcs2: {
+      int chars_copied = string->Write((uint16_t*) data, 0, -1,
+          String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
+      data_size = chars_copied * sizeof(uint16_t);
+      break;
+    }
+
+    default:
+      // Unreachable
+      assert(0);
+  }
+
+  assert(data_size <= storage_size);
+
+  uv_buf_t buf;
+  buf.base = data;
+  buf.len = data_size;
+
+  bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
+                  ((uv_pipe_t*)wrap->stream_)->ipc;
+
+  if (!ipc_pipe) {
+    r = uv_write(&req_wrap->req_,
+                 wrap->stream_,
+                 &buf,
+                 1,
+                 StreamWrap::AfterWrite);
+
+  } else {
+    uv_stream_t* send_stream = NULL;
+
+    if (args[1]->IsObject()) {
+      Local<Object> send_stream_obj = args[1]->ToObject();
+      assert(send_stream_obj->InternalFieldCount() > 0);
+      StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
+          send_stream_obj->GetPointerFromInternalField(0));
+      send_stream = send_stream_wrap->GetStream();
+    }
+
+    r = uv_write2(&req_wrap->req_,
+                  wrap->stream_,
+                  &buf,
+                  1,
+                  send_stream,
+                  StreamWrap::AfterWrite);
+  }
+
+  req_wrap->Dispatched();
+  req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) data_size));
+
+  wrap->UpdateWriteQueueSize();
+
+  if (r) {
+    SetErrno(uv_last_error(uv_default_loop()));
+    req_wrap->~WriteWrap();
+    delete[] storage;
     return scope.Close(v8::Null());
   } else {
     return scope.Close(req_wrap->object_);
@@ -293,6 +460,21 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
 }
 
 
+Handle<Value> StreamWrap::WriteAsciiString(const Arguments& args) {
+  return WriteStringImpl<kAscii>(args);
+}
+
+
+Handle<Value> StreamWrap::WriteUtf8String(const Arguments& args) {
+  return WriteStringImpl<kUtf8>(args);
+}
+
+
+Handle<Value> StreamWrap::WriteUcs2String(const Arguments& args) {
+  return WriteStringImpl<kUcs2>(args);
+}
+
+
 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
   WriteWrap* req_wrap = (WriteWrap*) req->data;
   StreamWrap* wrap = (StreamWrap*) req->handle->data;
@@ -309,16 +491,16 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) {
 
   wrap->UpdateWriteQueueSize();
 
-  Local<Value> argv[4] = {
+  Local<Value> argv[] = {
     Integer::New(status),
     Local<Value>::New(wrap->object_),
-    Local<Value>::New(req_wrap->object_),
-    req_wrap->object_->GetHiddenValue(buffer_sym),
+    Local<Value>::New(req_wrap->object_)
   };
 
   MakeCallback(req_wrap->object_, oncomplete_sym, ARRAY_SIZE(argv), argv);
 
-  delete req_wrap;
+  req_wrap->~WriteWrap();
+  delete[] reinterpret_cast<char*>(req_wrap);
 }
 
 
index 278fda7..40947d5 100644 (file)
@@ -35,11 +35,15 @@ class StreamWrap : public HandleWrap {
   static void Initialize(v8::Handle<v8::Object> target);
 
   // JavaScript functions
-  static v8::Handle<v8::Value> Write(const v8::Arguments& args);
   static v8::Handle<v8::Value> ReadStart(const v8::Arguments& args);
   static v8::Handle<v8::Value> ReadStop(const v8::Arguments& args);
   static v8::Handle<v8::Value> Shutdown(const v8::Arguments& args);
 
+  static v8::Handle<v8::Value> WriteBuffer(const v8::Arguments& args);
+  static v8::Handle<v8::Value> WriteAsciiString(const v8::Arguments& args);
+  static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args);
+  static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args);
+
  protected:
   StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream);
   virtual ~StreamWrap() { }
@@ -61,6 +65,9 @@ class StreamWrap : public HandleWrap {
   static void OnReadCommon(uv_stream_t* handle, ssize_t nread,
       uv_buf_t buf, uv_handle_type pending);
 
+  template <enum WriteEncoding encoding>
+  static v8::Handle<v8::Value> WriteStringImpl(const v8::Arguments& args);
+
   size_t slab_offset_;
   uv_stream_t* stream_;
 };
index a158c45..4a38602 100644 (file)
@@ -110,9 +110,13 @@ void TCPWrap::Initialize(Handle<Object> target) {
 
   NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart);
   NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop);
-  NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write);
   NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown);
 
+  NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer);
+  NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString);
+  NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String);
+  NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String);
+
   NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
   NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen);
   NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
index 8647ea0..f3a956b 100644 (file)
@@ -73,7 +73,11 @@ class TTYWrap : StreamWrap {
 
     NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart);
     NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop);
-    NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write);
+
+    NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer);
+    NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString);
+    NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String);
+    NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String);
 
     NODE_SET_PROTOTYPE_METHOD(t, "getWindowSize", TTYWrap::GetWindowSize);
     NODE_SET_PROTOTYPE_METHOD(t, "setRawMode", SetRawMode);
index 18c2c64..a272a19 100644 (file)
@@ -55,21 +55,20 @@ server.onconnection = function(client) {
 
       assert.equal(0, client.writeQueueSize);
 
-      var req = client.write(buffer, offset, length);
+      var req = client.writeBuffer(buffer.slice(offset, offset + length));
       client.pendingWrites.push(req);
 
       console.log('client.writeQueueSize: ' + client.writeQueueSize);
       // 11 bytes should flush
       assert.equal(0, client.writeQueueSize);
 
-      req.oncomplete = function(status, client_, req_, buffer_) {
+      req.oncomplete = function(status, client_, req_) {
         assert.equal(req, client.pendingWrites.shift());
 
         // Check parameters.
         assert.equal(0, status);
         assert.equal(client, client_);
         assert.equal(req, req_);
-        assert.equal(buffer, buffer_);
 
         console.log('client.writeQueueSize: ' + client.writeQueueSize);
         assert.equal(0, client.writeQueueSize);