--- /dev/null
+// test the speed of .pipe() with sockets
+
+var common = require('../common.js');
+var PORT = common.PORT;
+
+var bench = common.createBenchmark(main, {
+ len: [4, 8, 16, 32, 64, 128, 512, 1024],
+ type: ['buf'],
+ dur: [5],
+});
+
+var dur;
+var len;
+var type;
+var chunk;
+var encoding;
+
+function main(conf) {
+ dur = +conf.dur;
+ len = +conf.len;
+ type = conf.type;
+
+ switch (type) {
+ case 'buf':
+ chunk = new Buffer(len);
+ chunk.fill('x');
+ break;
+ case 'utf':
+ encoding = 'utf8';
+ chunk = new Array(len / 2 + 1).join('ΓΌ');
+ break;
+ case 'asc':
+ encoding = 'ascii';
+ chunk = new Array(len + 1).join('x');
+ break;
+ default:
+ throw new Error('invalid type: ' + type);
+ break;
+ }
+
+ server();
+}
+
+var net = require('net');
+
+function Writer() {
+ this.received = 0;
+ this.writable = true;
+}
+
+Writer.prototype.write = function(chunk, encoding, cb) {
+ this.received += chunk.length;
+
+ if (typeof encoding === 'function')
+ encoding();
+ else if (typeof cb === 'function')
+ cb();
+
+ return true;
+};
+
+// doesn't matter, never emits anything.
+Writer.prototype.on = function() {};
+Writer.prototype.once = function() {};
+Writer.prototype.emit = function() {};
+
+function server() {
+ var writer = new Writer();
+
+ // the actual benchmark.
+ var server = net.createServer(function(socket) {
+ socket.pipe(writer);
+ });
+
+ server.listen(PORT, function() {
+ var socket = net.connect(PORT);
+ socket.on('connect', function() {
+ bench.start();
+
+ socket.on('drain', send)
+ send()
+
+ setTimeout(function() {
+ var bytes = writer.received;
+ var gbits = (bytes * 8) / (1024 * 1024 * 1024);
+ bench.end(gbits);
+ }, dur * 1000);
+
+ function send() {
+ socket.cork();
+ while(socket.write(chunk, encoding)) {}
+ socket.uncork();
+ }
+ });
+ });
+}
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
+
+ // count buffered requests
+ this.bufferedRequestCount = 0;
+
+ // create the two objects needed to store the corked requests
+ // they are not a linked list, as no new elements are inserted in there
+ this.corkedRequestsFree = new CorkedRequest(this);
+ this.corkedRequestsFree.next = new CorkedRequest(this);
}
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
+ state.bufferedRequestCount += 1;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
}
}
-
// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
- var buffer = [];
- var cbs = [];
+ var l = state.bufferedRequestCount;
+ var buffer = new Array(l);
+ var holder = state.corkedRequestsFree;
+ holder.entry = entry;
+
+ var count = 0;
while (entry) {
- cbs.push(entry.callback);
- buffer.push(entry);
+ buffer[count] = entry;
entry = entry.next;
+ count += 1;
}
- // count the one we are adding, as well.
- // TODO(isaacs) clean this up
+ doWrite(stream, state, true, state.length, buffer, '', holder.finish);
+
+ // doWrite is always async, defer these to save a bit of time
+ // as the hot path ends with doWrite
state.pendingcb++;
state.lastBufferedRequest = null;
- doWrite(stream, state, true, state.length, buffer, '', function(err) {
- for (var i = 0; i < cbs.length; i++) {
- state.pendingcb--;
- cbs[i](err);
- }
- });
-
- // Clear buffer
+ state.corkedRequestsFree = holder.next;
+ holder.next = null;
} else {
// Slow case, write chunks one-by-one
while (entry) {
if (entry === null)
state.lastBufferedRequest = null;
}
+
+ state.bufferedRequestCount = 0;
state.bufferedRequest = entry;
state.bufferProcessing = false;
}
}
state.ended = true;
}
+
+// It seems a linked list but it is not
+// there will be only 2 of these for each stream
+function CorkedRequest(state) {
+ this.next = null;
+ this.entry = null;
+
+ this.finish = (err) => {
+ var entry = this.entry;
+ this.entry = null;
+ while (entry) {
+ var cb = entry.callback;
+ state.pendingcb--;
+ cb(err);
+ entry = entry.next;
+ }
+ if (state.corkedRequestsFree) {
+ state.corkedRequestsFree.next = this;
+ } else {
+ state.corkedRequestsFree = this;
+ }
+ };
+}