fs: implemented WriteStream#writev
authorRon Korving <rkorving@wizcorp.jp>
Tue, 28 Jul 2015 05:59:35 +0000 (14:59 +0900)
committerJeremiah Senkpiel <fishrock123@rocketmail.com>
Tue, 15 Sep 2015 01:51:00 +0000 (21:51 -0400)
Streams with writev allow many buffers to be pushed to underlying OS
APIs in one batch, in this case improving write throughput by an order
of magnitude. This is especially noticeable when writing many (small)
buffers.

PR-URL: https://github.com/nodejs/node/pull/2167
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
lib/fs.js
src/node_file.cc

index 9bfd14c..10e5b63 100644 (file)
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1867,6 +1867,50 @@ WriteStream.prototype._write = function(data, encoding, cb) {
 };
 
 
+function writev(fd, chunks, position, callback) {
+  function wrapper(err, written) {
+    // Retain a reference to chunks so that they can't be GC'ed too soon.
+    callback(err, written || 0, chunks);
+  }
+
+  const req = new FSReqWrap();
+  req.oncomplete = wrapper;
+  binding.writeBuffers(fd, chunks, position, req);
+}
+
+
+WriteStream.prototype._writev = function(data, cb) {
+  if (typeof this.fd !== 'number')
+    return this.once('open', function() {
+      this._writev(data, cb);
+    });
+
+  const self = this;
+  const len = data.length;
+  const chunks = new Array(len);
+  var size = 0;
+
+  for (var i = 0; i < len; i++) {
+    var chunk = data[i].chunk;
+
+    chunks[i] = chunk;
+    size += chunk.length;
+  }
+
+  writev(this.fd, chunks, this.pos, function(er, bytes) {
+    if (er) {
+      self.destroy();
+      return cb(er);
+    }
+    self.bytesWritten += bytes;
+    cb();
+  });
+
+  if (this.pos !== undefined)
+    this.pos += size;
+};
+
+
 WriteStream.prototype.destroy = ReadStream.prototype.destroy;
 WriteStream.prototype.close = ReadStream.prototype.close;
 
index e1fb1ce..2230886 100644 (file)
@@ -907,6 +907,60 @@ static void WriteBuffer(const FunctionCallbackInfo<Value>& args) {
 }
 
 
+// Wrapper for writev(2).
+//
+// bytesWritten = writev(fd, chunks, position, callback)
+// 0 fd        integer. file descriptor
+// 1 chunks    array of buffers to write
+// 2 position  if integer, position to write at in the file.
+//             if null, write from the current position
+static void WriteBuffers(const FunctionCallbackInfo<Value>& args) {
+  Environment* env = Environment::GetCurrent(args);
+
+  CHECK(args[0]->IsInt32());
+  CHECK(args[1]->IsArray());
+
+  int fd = args[0]->Int32Value();
+  Local<Array> chunks = args[1].As<Array>();
+  int64_t pos = GET_OFFSET(args[2]);
+  Local<Value> req = args[3];
+
+  uint32_t chunkCount = chunks->Length();
+
+  uv_buf_t s_iovs[1024];  // use stack allocation when possible
+  uv_buf_t* iovs;
+
+  if (chunkCount > ARRAY_SIZE(s_iovs))
+    iovs = new uv_buf_t[chunkCount];
+  else
+    iovs = s_iovs;
+
+  for (uint32_t i = 0; i < chunkCount; i++) {
+    Local<Value> chunk = chunks->Get(i);
+
+    if (!Buffer::HasInstance(chunk)) {
+      if (iovs != s_iovs)
+        delete[] iovs;
+      return env->ThrowTypeError("Array elements all need to be buffers");
+    }
+
+    iovs[i] = uv_buf_init(Buffer::Data(chunk), Buffer::Length(chunk));
+  }
+
+  if (req->IsObject()) {
+    ASYNC_CALL(write, req, fd, iovs, chunkCount, pos)
+    if (iovs != s_iovs)
+      delete[] iovs;
+    return;
+  }
+
+  SYNC_CALL(write, nullptr, fd, iovs, chunkCount, pos)
+  if (iovs != s_iovs)
+    delete[] iovs;
+  args.GetReturnValue().Set(SYNC_RESULT);
+}
+
+
 // Wrapper for write(2).
 //
 // bytesWritten = write(fd, string, position, enc, callback)
@@ -1248,6 +1302,7 @@ void InitFs(Local<Object> target,
   env->SetMethod(target, "readlink", ReadLink);
   env->SetMethod(target, "unlink", Unlink);
   env->SetMethod(target, "writeBuffer", WriteBuffer);
+  env->SetMethod(target, "writeBuffers", WriteBuffers);
   env->SetMethod(target, "writeString", WriteString);
 
   env->SetMethod(target, "chmod", Chmod);