streams: 5% throughput gain when sending small chunks
authorMatteo Collina <hello@matteocollina.com>
Wed, 16 Dec 2015 08:39:11 +0000 (09:39 +0100)
committerMyles Borins <mborins@us.ibm.com>
Wed, 2 Mar 2016 22:01:11 +0000 (14:01 -0800)
Improves the performance when moving small buffers by 5%,
and it adds a benchmark to avoid regression in that area.
In all other cases it is equally performant to current master.

Full performance results available at:
https://gist.github.com/mcollina/717c35ad07d15710b6b9.

PR-URL: https://github.com/nodejs/node/pull/4354
Reviewed-By: James M Snell <jasnell@gmail.com>
benchmark/net/net-c2s-cork.js [new file with mode: 0644]
lib/_stream_writable.js

diff --git a/benchmark/net/net-c2s-cork.js b/benchmark/net/net-c2s-cork.js
new file mode 100644 (file)
index 0000000..5f8e0fa
--- /dev/null
@@ -0,0 +1,96 @@
+// test the speed of .pipe() with sockets
+
+var common = require('../common.js');
+var PORT = common.PORT;
+
+var bench = common.createBenchmark(main, {
+  len: [4, 8, 16, 32, 64, 128, 512, 1024],
+  type: ['buf'],
+  dur: [5],
+});
+
+var dur;
+var len;
+var type;
+var chunk;
+var encoding;
+
+function main(conf) {
+  dur = +conf.dur;
+  len = +conf.len;
+  type = conf.type;
+
+  switch (type) {
+    case 'buf':
+      chunk = new Buffer(len);
+      chunk.fill('x');
+      break;
+    case 'utf':
+      encoding = 'utf8';
+      chunk = new Array(len / 2 + 1).join('ΓΌ');
+      break;
+    case 'asc':
+      encoding = 'ascii';
+      chunk = new Array(len + 1).join('x');
+      break;
+    default:
+      throw new Error('invalid type: ' + type);
+      break;
+  }
+
+  server();
+}
+
+var net = require('net');
+
+function Writer() {
+  this.received = 0;
+  this.writable = true;
+}
+
+Writer.prototype.write = function(chunk, encoding, cb) {
+  this.received += chunk.length;
+
+  if (typeof encoding === 'function')
+    encoding();
+  else if (typeof cb === 'function')
+    cb();
+
+  return true;
+};
+
+// doesn't matter, never emits anything.
+Writer.prototype.on = function() {};
+Writer.prototype.once = function() {};
+Writer.prototype.emit = function() {};
+
+function server() {
+  var writer = new Writer();
+
+  // the actual benchmark.
+  var server = net.createServer(function(socket) {
+    socket.pipe(writer);
+  });
+
+  server.listen(PORT, function() {
+    var socket = net.connect(PORT);
+    socket.on('connect', function() {
+      bench.start();
+
+      socket.on('drain', send)
+      send()
+
+      setTimeout(function() {
+        var bytes = writer.received;
+        var gbits = (bytes * 8) / (1024 * 1024 * 1024);
+        bench.end(gbits);
+      }, dur * 1000);
+
+      function send() {
+        socket.cork();
+        while(socket.write(chunk, encoding)) {}
+        socket.uncork();
+      }
+    });
+  });
+}
index 9c7e263..7bbb97b 100644 (file)
@@ -108,6 +108,14 @@ function WritableState(options, stream) {
 
   // True if the error was already emitted and should not be thrown again
   this.errorEmitted = false;
+
+  // count buffered requests
+  this.bufferedRequestCount = 0;
+
+  // create the two objects needed to store the corked requests
+  // they are not a linked list, as no new elements are inserted in there
+  this.corkedRequestsFree = new CorkedRequest(this);
+  this.corkedRequestsFree.next = new CorkedRequest(this);
 }
 
 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
@@ -274,6 +282,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
     } else {
       state.bufferedRequest = state.lastBufferedRequest;
     }
+    state.bufferedRequestCount += 1;
   } else {
     doWrite(stream, state, false, len, chunk, encoding, cb);
   }
@@ -357,7 +366,6 @@ function onwriteDrain(stream, state) {
   }
 }
 
-
 // if there's something in the buffer waiting, then process it
 function clearBuffer(stream, state) {
   state.bufferProcessing = true;
@@ -365,26 +373,26 @@ function clearBuffer(stream, state) {
 
   if (stream._writev && entry && entry.next) {
     // Fast case, write everything using _writev()
-    var buffer = [];
-    var cbs = [];
+    var l = state.bufferedRequestCount;
+    var buffer = new Array(l);
+    var holder = state.corkedRequestsFree;
+    holder.entry = entry;
+
+    var count = 0;
     while (entry) {
-      cbs.push(entry.callback);
-      buffer.push(entry);
+      buffer[count] = entry;
       entry = entry.next;
+      count += 1;
     }
 
-    // count the one we are adding, as well.
-    // TODO(isaacs) clean this up
+    doWrite(stream, state, true, state.length, buffer, '', holder.finish);
+
+    // doWrite is always async, defer these to save a bit of time
+    // as the hot path ends with doWrite
     state.pendingcb++;
     state.lastBufferedRequest = null;
-    doWrite(stream, state, true, state.length, buffer, '', function(err) {
-      for (var i = 0; i < cbs.length; i++) {
-        state.pendingcb--;
-        cbs[i](err);
-      }
-    });
-
-    // Clear buffer
+    state.corkedRequestsFree = holder.next;
+    holder.next = null;
   } else {
     // Slow case, write chunks one-by-one
     while (entry) {
@@ -407,6 +415,8 @@ function clearBuffer(stream, state) {
     if (entry === null)
       state.lastBufferedRequest = null;
   }
+
+  state.bufferedRequestCount = 0;
   state.bufferedRequest = entry;
   state.bufferProcessing = false;
 }
@@ -484,3 +494,26 @@ function endWritable(stream, state, cb) {
   }
   state.ended = true;
 }
+
+// It seems a linked list but it is not
+// there will be only 2 of these for each stream
+function CorkedRequest(state) {
+  this.next = null;
+  this.entry = null;
+
+  this.finish = (err) => {
+    var entry = this.entry;
+    this.entry = null;
+    while (entry) {
+      var cb = entry.callback;
+      state.pendingcb--;
+      cb(err);
+      entry = entry.next;
+    }
+    if (state.corkedRequestsFree) {
+      state.corkedRequestsFree.next = this;
+    } else {
+      state.corkedRequestsFree = this;
+    }
+  };
+}