stream_wrap: use `uv_try_write` where possible
authorFedor Indutny <fedor.indutny@gmail.com>
Tue, 28 Jan 2014 22:48:10 +0000 (02:48 +0400)
committerFedor Indutny <fedor.indutny@gmail.com>
Tue, 28 Jan 2014 22:49:03 +0000 (02:49 +0400)
Use `uv_try_write` for string and buffer writes, thus avoiding to do
allocations and copying in some of the cases.

benchmark/net/tcp-raw-pipe.js
benchmark/net/tcp-raw-s2c.js
lib/net.js
src/env.h
src/stream_wrap.cc
src/stream_wrap.h
src/tls_wrap.cc
src/tls_wrap.h
test/simple/test-tcp-wrap-listen.js

index 91c69e9..bda6839 100644 (file)
@@ -51,7 +51,7 @@ function server() {
       if (nread < 0)
         fail(nread, 'read');
 
-      var writeReq = {};
+      var writeReq = { async: false };
       err = clientHandle.writeBuffer(writeReq, buffer);
 
       if (err)
index 6fb6568..500be1b 100644 (file)
@@ -68,7 +68,7 @@ function server() {
       write();
 
     function write() {
-      var writeReq = { oncomplete: afterWrite };
+      var writeReq = { async: false, oncomplete: afterWrite };
       var err;
       switch (type) {
         case 'buf':
@@ -82,8 +82,13 @@ function server() {
           break;
       }
 
-      if (err)
+      if (err) {
         fail(err, 'write');
+      } else if (!writeReq.async) {
+        process.nextTick(function() {
+          afterWrite(null, clientHandle, writeReq);
+        });
+      }
     }
 
     function afterWrite(err, handle, req) {
index 800bae3..8e6dcbf 100644 (file)
@@ -626,7 +626,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
     return false;
   }
 
-  var req = { oncomplete: afterWrite };
+  var req = { oncomplete: afterWrite, async: false };
   var err;
 
   if (writev) {
@@ -660,10 +660,10 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
 
   // If it was entirely flushed, we can write some more right now.
   // However, if more is left in the queue, then wait until that clears.
-  if (this._handle.writeQueueSize === 0)
-    cb();
-  else
+  if (req.async && this._handle.writeQueueSize != 0)
     req.cb = cb;
+  else
+    cb();
 };
 
 
index 23a3959..26630bb 100644 (file)
--- a/src/env.h
+++ b/src/env.h
@@ -53,6 +53,7 @@ namespace node {
 #define PER_ISOLATE_STRING_PROPERTIES(V)                                      \
   V(address_string, "address")                                                \
   V(atime_string, "atime")                                                    \
+  V(async, "async")                                                           \
   V(async_queue_string, "_asyncQueue")                                        \
   V(birthtime_string, "birthtime")                                            \
   V(blksize_string, "blksize")                                                \
index e0079b8..848abef 100644 (file)
@@ -33,6 +33,7 @@
 #include "util-inl.h"
 
 #include <stdlib.h>  // abort()
+#include <string.h>  // memcpy()
 #include <limits.h>  // INT_MAX
 
 
@@ -49,6 +50,7 @@ using v8::Number;
 using v8::Object;
 using v8::PropertyCallbackInfo;
 using v8::String;
+using v8::True;
 using v8::Undefined;
 using v8::Value;
 
@@ -200,30 +202,43 @@ void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
   Local<Object> buf_obj = args[1].As<Object>();
 
   size_t length = Buffer::Length(buf_obj);
-  char* storage = new char[sizeof(WriteWrap)];
-  WriteWrap* req_wrap =
-      new(storage) WriteWrap(env, req_wrap_obj, wrap);
 
+  char* storage;
+  WriteWrap* req_wrap;
   uv_buf_t buf;
   WriteBuffer(buf_obj, &buf);
 
-  int err = wrap->callbacks()->DoWrite(req_wrap,
-                                       &buf,
-                                       1,
-                                       NULL,
-                                       StreamWrap::AfterWrite);
+  // Try writing immediately without allocation
+  uv_buf_t* bufs = &buf;
+  size_t count = 1;
+  int err = wrap->callbacks()->TryWrite(&bufs, &count);
+  if (err == 0)
+    goto done;
+  assert(count == 1);
+
+  // Allocate, or write rest
+  storage = new char[sizeof(WriteWrap)];
+  req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
+
+  err = wrap->callbacks()->DoWrite(req_wrap,
+                                   bufs,
+                                   count,
+                                   NULL,
+                                   StreamWrap::AfterWrite);
   req_wrap->Dispatched();
-  req_wrap_obj->Set(env->bytes_string(),
-                    Integer::NewFromUnsigned(length, node_isolate));
-  const char* msg = wrap->callbacks()->Error();
-  if (msg != NULL)
-    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+  req_wrap_obj->Set(env->async(), True(node_isolate));
 
   if (err) {
     req_wrap->~WriteWrap();
     delete[] storage;
   }
 
+ done:
+  const char* msg = wrap->callbacks()->Error();
+  if (msg != NULL)
+    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+  req_wrap_obj->Set(env->bytes_string(),
+                    Integer::NewFromUnsigned(length, node_isolate));
   args.GetReturnValue().Set(err);
 }
 
@@ -256,22 +271,53 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
     return;
   }
 
-  char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
-  WriteWrap* req_wrap =
-      new(storage) WriteWrap(env, req_wrap_obj, wrap);
+  // Try writing immediately if write size isn't too big
+  char* storage;
+  WriteWrap* req_wrap;
+  char* data;
+  char stack_storage[16384];  // 16kb
+  size_t data_size;
+  uv_buf_t buf;
+
+  bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
+                   (!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
+  if (try_write) {
+    data_size = StringBytes::Write(stack_storage,
+                                   storage_size,
+                                   string,
+                                   encoding);
+    buf = uv_buf_init(stack_storage, data_size);
+
+    uv_buf_t* bufs = &buf;
+    size_t count = 1;
+    err = wrap->callbacks()->TryWrite(&bufs, &count);
+
+    // Success
+    if (err == 0)
+      goto done;
+
+    // Failure, or partial write
+    assert(count == 1);
+  }
+
+  storage = new char[sizeof(WriteWrap) + storage_size + 15];
+  req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
 
-  char* data = reinterpret_cast<char*>(ROUND_UP(
+  data = reinterpret_cast<char*>(ROUND_UP(
       reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
 
-  size_t data_size;
-  data_size = StringBytes::Write(data, storage_size, string, encoding);
+  if (try_write) {
+    // Copy partial data
+    memcpy(data, buf.base, buf.len);
+    data_size = buf.len;
+  } else {
+    // Write it
+    data_size = StringBytes::Write(data, storage_size, string, encoding);
+  }
 
   assert(data_size <= storage_size);
 
-  uv_buf_t buf;
-
-  buf.base = data;
-  buf.len = data_size;
+  buf = uv_buf_init(data, data_size);
 
   if (!wrap->is_named_pipe_ipc()) {
     err = wrap->callbacks()->DoWrite(req_wrap,
@@ -301,17 +347,19 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
   }
 
   req_wrap->Dispatched();
-  req_wrap->object()->Set(env->bytes_string(),
-                          Integer::NewFromUnsigned(data_size, node_isolate));
-  const char* msg = wrap->callbacks()->Error();
-  if (msg != NULL)
-    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+  req_wrap->object()->Set(env->async(), True(node_isolate));
 
   if (err) {
     req_wrap->~WriteWrap();
     delete[] storage;
   }
 
+ done:
+  const char* msg = wrap->callbacks()->Error();
+  if (msg != NULL)
+    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+  req_wrap_obj->Set(env->bytes_string(),
+                    Integer::NewFromUnsigned(data_size, node_isolate));
   args.GetReturnValue().Set(err);
 }
 
@@ -405,6 +453,7 @@ void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
     delete[] bufs;
 
   req_wrap->Dispatched();
+  req_wrap->object()->Set(env->async(), True(node_isolate));
   req_wrap->object()->Set(env->bytes_string(),
                           Number::New(node_isolate, bytes));
   const char* msg = wrap->callbacks()->Error();
@@ -518,6 +567,47 @@ const char* StreamWrapCallbacks::Error() {
 }
 
 
+// NOTE: Call to this function could change both `buf`'s and `count`'s
+// values, shifting their base and decrementing their length. This is
+// required in order to skip the data that was successfully written via
+// uv_try_write().
+int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
+  int err;
+  size_t written;
+  uv_buf_t* vbufs = *bufs;
+  size_t vcount = *count;
+
+  err = uv_try_write(wrap()->stream(), vbufs, vcount);
+  if (err < 0)
+    return err;
+
+  // Slice off the buffers: skip all written buffers and slice the one that
+  // was partially written.
+  written = err;
+  for (; written != 0 && vcount > 0; vbufs++, vcount--) {
+    // Slice
+    if (vbufs[0].len > written) {
+      vbufs[0].base += written;
+      vbufs[0].len -= written;
+      written = 0;
+      break;
+
+    // Discard
+    } else {
+      written -= vbufs[0].len;
+    }
+  }
+
+  *bufs = vbufs;
+  *count = vcount;
+
+  if (vcount == 0)
+    return 0;
+  else
+    return -1;
+}
+
+
 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
                                  uv_buf_t* bufs,
                                  size_t count,
index d1a94fb..f91bb8b 100644 (file)
@@ -74,6 +74,9 @@ class StreamWrapCallbacks {
   }
 
   virtual const char* Error();
+
+  virtual int TryWrite(uv_buf_t** bufs, size_t* count);
+
   virtual int DoWrite(WriteWrap* w,
                       uv_buf_t* bufs,
                       size_t count,
index 6d9425c..92febc1 100644 (file)
@@ -511,6 +511,12 @@ const char* TLSCallbacks::Error() {
 }
 
 
+int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
+  // TODO(indutny): Support it
+  return -1;
+}
+
+
 int TLSCallbacks::DoWrite(WriteWrap* w,
                           uv_buf_t* bufs,
                           size_t count,
index db78009..946cc1c 100644 (file)
@@ -51,6 +51,7 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
                          v8::Handle<v8::Context> context);
 
   const char* Error();
+  int TryWrite(uv_buf_t** bufs, size_t* count);
   int DoWrite(WriteWrap* w,
               uv_buf_t* bufs,
               size_t count,
index 1a3dc12..940ea8b 100644 (file)
@@ -55,7 +55,7 @@ server.onconnection = function(err, client) {
 
       assert.equal(0, client.writeQueueSize);
 
-      var req = {};
+      var req = { async: false };
       var err = client.writeBuffer(req, buffer);
       assert.equal(err, 0);
       client.pendingWrites.push(req);
@@ -64,7 +64,12 @@ server.onconnection = function(err, client) {
       // 11 bytes should flush
       assert.equal(0, client.writeQueueSize);
 
-      req.oncomplete = function(status, client_, req_) {
+      if (req.async && client.writeQueueSize != 0)
+        req.oncomplete = done;
+      else
+        process.nextTick(done.bind(null, 0, client, req));
+
+      function done(status, client_, req_) {
         assert.equal(req, client.pendingWrites.shift());
 
         // Check parameters.