From 9836a4eeda1e2d43aad0923f1f72b364792629bc Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 29 Jan 2014 02:48:10 +0400 Subject: [PATCH] stream_wrap: use `uv_try_write` where possible Use `uv_try_write` for string and buffer writes, thus avoiding to do allocations and copying in some of the cases. --- benchmark/net/tcp-raw-pipe.js | 2 +- benchmark/net/tcp-raw-s2c.js | 9 ++- lib/net.js | 8 +- src/env.h | 1 + src/stream_wrap.cc | 146 +++++++++++++++++++++++++++++------- src/stream_wrap.h | 3 + src/tls_wrap.cc | 6 ++ src/tls_wrap.h | 1 + test/simple/test-tcp-wrap-listen.js | 9 ++- 9 files changed, 148 insertions(+), 37 deletions(-) diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js index 91c69e9..bda6839 100644 --- a/benchmark/net/tcp-raw-pipe.js +++ b/benchmark/net/tcp-raw-pipe.js @@ -51,7 +51,7 @@ function server() { if (nread < 0) fail(nread, 'read'); - var writeReq = {}; + var writeReq = { async: false }; err = clientHandle.writeBuffer(writeReq, buffer); if (err) diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js index 6fb6568..500be1b 100644 --- a/benchmark/net/tcp-raw-s2c.js +++ b/benchmark/net/tcp-raw-s2c.js @@ -68,7 +68,7 @@ function server() { write(); function write() { - var writeReq = { oncomplete: afterWrite }; + var writeReq = { async: false, oncomplete: afterWrite }; var err; switch (type) { case 'buf': @@ -82,8 +82,13 @@ function server() { break; } - if (err) + if (err) { fail(err, 'write'); + } else if (!writeReq.async) { + process.nextTick(function() { + afterWrite(null, clientHandle, writeReq); + }); + } } function afterWrite(err, handle, req) { diff --git a/lib/net.js b/lib/net.js index 800bae3..8e6dcbf 100644 --- a/lib/net.js +++ b/lib/net.js @@ -626,7 +626,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { return false; } - var req = { oncomplete: afterWrite }; + var req = { oncomplete: afterWrite, async: false }; var err; if (writev) { @@ -660,10 +660,10 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { // If it was entirely flushed, we can write some more right now. // However, if more is left in the queue, then wait until that clears. - if (this._handle.writeQueueSize === 0) - cb(); - else + if (req.async && this._handle.writeQueueSize != 0) req.cb = cb; + else + cb(); }; diff --git a/src/env.h b/src/env.h index 23a3959..26630bb 100644 --- a/src/env.h +++ b/src/env.h @@ -53,6 +53,7 @@ namespace node { #define PER_ISOLATE_STRING_PROPERTIES(V) \ V(address_string, "address") \ V(atime_string, "atime") \ + V(async, "async") \ V(async_queue_string, "_asyncQueue") \ V(birthtime_string, "birthtime") \ V(blksize_string, "blksize") \ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index e0079b8..848abef 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -33,6 +33,7 @@ #include "util-inl.h" #include // abort() +#include // memcpy() #include // INT_MAX @@ -49,6 +50,7 @@ using v8::Number; using v8::Object; using v8::PropertyCallbackInfo; using v8::String; +using v8::True; using v8::Undefined; using v8::Value; @@ -200,30 +202,43 @@ void StreamWrap::WriteBuffer(const FunctionCallbackInfo& args) { Local buf_obj = args[1].As(); size_t length = Buffer::Length(buf_obj); - char* storage = new char[sizeof(WriteWrap)]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); + char* storage; + WriteWrap* req_wrap; uv_buf_t buf; WriteBuffer(buf_obj, &buf); - int err = wrap->callbacks()->DoWrite(req_wrap, - &buf, - 1, - NULL, - StreamWrap::AfterWrite); + // Try writing immediately without allocation + uv_buf_t* bufs = &buf; + size_t count = 1; + int err = wrap->callbacks()->TryWrite(&bufs, &count); + if (err == 0) + goto done; + assert(count == 1); + + // Allocate, or write rest + storage = new char[sizeof(WriteWrap)]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); + + err = wrap->callbacks()->DoWrite(req_wrap, + bufs, + count, + NULL, + StreamWrap::AfterWrite); req_wrap->Dispatched(); - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(length, node_isolate)); - const char* msg = wrap->callbacks()->Error(); - if (msg != NULL) - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->async(), True(node_isolate)); if (err) { req_wrap->~WriteWrap(); delete[] storage; } + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != NULL) + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(length, node_isolate)); args.GetReturnValue().Set(err); } @@ -256,22 +271,53 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo& args) { return; } - char* storage = new char[sizeof(WriteWrap) + storage_size + 15]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); + // Try writing immediately if write size isn't too big + char* storage; + WriteWrap* req_wrap; + char* data; + char stack_storage[16384]; // 16kb + size_t data_size; + uv_buf_t buf; + + bool try_write = storage_size + 15 <= sizeof(stack_storage) && + (!wrap->is_named_pipe_ipc() || !args[2]->IsObject()); + if (try_write) { + data_size = StringBytes::Write(stack_storage, + storage_size, + string, + encoding); + buf = uv_buf_init(stack_storage, data_size); + + uv_buf_t* bufs = &buf; + size_t count = 1; + err = wrap->callbacks()->TryWrite(&bufs, &count); + + // Success + if (err == 0) + goto done; + + // Failure, or partial write + assert(count == 1); + } + + storage = new char[sizeof(WriteWrap) + storage_size + 15]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - char* data = reinterpret_cast(ROUND_UP( + data = reinterpret_cast(ROUND_UP( reinterpret_cast(storage) + sizeof(WriteWrap), 16)); - size_t data_size; - data_size = StringBytes::Write(data, storage_size, string, encoding); + if (try_write) { + // Copy partial data + memcpy(data, buf.base, buf.len); + data_size = buf.len; + } else { + // Write it + data_size = StringBytes::Write(data, storage_size, string, encoding); + } assert(data_size <= storage_size); - uv_buf_t buf; - - buf.base = data; - buf.len = data_size; + buf = uv_buf_init(data, data_size); if (!wrap->is_named_pipe_ipc()) { err = wrap->callbacks()->DoWrite(req_wrap, @@ -301,17 +347,19 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo& args) { } req_wrap->Dispatched(); - req_wrap->object()->Set(env->bytes_string(), - Integer::NewFromUnsigned(data_size, node_isolate)); - const char* msg = wrap->callbacks()->Error(); - if (msg != NULL) - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap->object()->Set(env->async(), True(node_isolate)); if (err) { req_wrap->~WriteWrap(); delete[] storage; } + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != NULL) + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(data_size, node_isolate)); args.GetReturnValue().Set(err); } @@ -405,6 +453,7 @@ void StreamWrap::Writev(const FunctionCallbackInfo& args) { delete[] bufs; req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(node_isolate)); req_wrap->object()->Set(env->bytes_string(), Number::New(node_isolate, bytes)); const char* msg = wrap->callbacks()->Error(); @@ -518,6 +567,47 @@ const char* StreamWrapCallbacks::Error() { } +// NOTE: Call to this function could change both `buf`'s and `count`'s +// values, shifting their base and decrementing their length. This is +// required in order to skip the data that was successfully written via +// uv_try_write(). +int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { + int err; + size_t written; + uv_buf_t* vbufs = *bufs; + size_t vcount = *count; + + err = uv_try_write(wrap()->stream(), vbufs, vcount); + if (err < 0) + return err; + + // Slice off the buffers: skip all written buffers and slice the one that + // was partially written. + written = err; + for (; written != 0 && vcount > 0; vbufs++, vcount--) { + // Slice + if (vbufs[0].len > written) { + vbufs[0].base += written; + vbufs[0].len -= written; + written = 0; + break; + + // Discard + } else { + written -= vbufs[0].len; + } + } + + *bufs = vbufs; + *count = vcount; + + if (vcount == 0) + return 0; + else + return -1; +} + + int StreamWrapCallbacks::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/src/stream_wrap.h b/src/stream_wrap.h index d1a94fb..f91bb8b 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -74,6 +74,9 @@ class StreamWrapCallbacks { } virtual const char* Error(); + + virtual int TryWrite(uv_buf_t** bufs, size_t* count); + virtual int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 6d9425c..92febc1 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -511,6 +511,12 @@ const char* TLSCallbacks::Error() { } +int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { + // TODO(indutny): Support it + return -1; +} + + int TLSCallbacks::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/src/tls_wrap.h b/src/tls_wrap.h index db78009..946cc1c 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -51,6 +51,7 @@ class TLSCallbacks : public crypto::SSLWrap, v8::Handle context); const char* Error(); + int TryWrite(uv_buf_t** bufs, size_t* count); int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/test/simple/test-tcp-wrap-listen.js b/test/simple/test-tcp-wrap-listen.js index 1a3dc12..940ea8b 100644 --- a/test/simple/test-tcp-wrap-listen.js +++ b/test/simple/test-tcp-wrap-listen.js @@ -55,7 +55,7 @@ server.onconnection = function(err, client) { assert.equal(0, client.writeQueueSize); - var req = {}; + var req = { async: false }; var err = client.writeBuffer(req, buffer); assert.equal(err, 0); client.pendingWrites.push(req); @@ -64,7 +64,12 @@ server.onconnection = function(err, client) { // 11 bytes should flush assert.equal(0, client.writeQueueSize); - req.oncomplete = function(status, client_, req_) { + if (req.async && client.writeQueueSize != 0) + req.oncomplete = done; + else + process.nextTick(done.bind(null, 0, client, req)); + + function done(status, client_, req_) { assert.equal(req, client.pendingWrites.shift()); // Check parameters. -- 2.7.4