From ccec14b5689b954cbc3b7c409fa36ffb4472226f Mon Sep 17 00:00:00 2001 From: Trevor Norris Date: Mon, 7 Oct 2013 12:39:52 -0700 Subject: [PATCH] async-wrap: add methods to udp/tcp/pipe/timers Now it's possible to add/remove an async listener to an individual handle created by UDP, TCP, Pipe or Timer. --- lib/timers.js | 45 +++++- src/pipe_wrap.cc | 2 + src/tcp_wrap.cc | 2 + src/udp_wrap.cc | 3 + test/simple/test-asynclistener-error-add-after.js | 161 ++++++++++++++++++++++ test/simple/test-asynclistener-error-net.js | 109 +++++++++++++++ test/simple/test-asynclistener-remove-after.js | 66 +++++++++ 7 files changed, 387 insertions(+), 1 deletion(-) create mode 100644 test/simple/test-asynclistener-error-add-after.js create mode 100644 test/simple/test-asynclistener-error-net.js create mode 100644 test/simple/test-asynclistener-remove-after.js diff --git a/lib/timers.js b/lib/timers.js index 0fce78d..a94aa53 100644 --- a/lib/timers.js +++ b/lib/timers.js @@ -200,6 +200,33 @@ exports.active = function(item) { }; +function timerAddAsyncListener(obj) { + if (!this._asyncQueue) + this._asyncQueue = []; + var queue = this._asyncQueue; + // This queue will be small. Probably always <= 3 items. + for (var i = 0; i < queue.length; i++) { + if (queue[i].uid === obj.uid) + return; + } + this._asyncQueue.push(obj); +} + + +function timerRemoveAsyncListener(obj) { + if (!this._asyncQueue) + return; + var queue = this._asyncQueue; + // This queue will be small. Probably always <= 3 items. + for (var i = 0; i < queue.length; i++) { + if (queue[i].uid === obj.uid) { + queue.splice(i, 1); + return; + } + } +} + + /* * DOM-style timers */ @@ -335,6 +362,10 @@ Timeout.prototype.close = function() { } }; +// For domain compatibility need to attach this API. +Timeout.prototype.addAsyncListener = timerAddAsyncListener; +Timeout.prototype.removeAsyncListener = timerRemoveAsyncListener; + var immediateQueue = {}; L.init(immediateQueue); @@ -390,8 +421,20 @@ function processImmediate() { } +function Immediate() { } + +Immediate.prototype.addAsyncListener = timerAddAsyncListener; +Immediate.prototype.removeAsyncListener = timerRemoveAsyncListener; +Immediate.prototype.domain = undefined; +Immediate.prototype._onImmediate = undefined; +Immediate.prototype._asyncQueue = undefined; +Immediate.prototype._idleNext = undefined; +Immediate.prototype._idlePrev = undefined; + + exports.setImmediate = function(callback) { - var immediate = {}, args; + var immediate = new Immediate(); + var args; L.init(immediate); diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index a4e4ed1..a522775 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -111,6 +111,8 @@ void PipeWrap::Initialize(Handle target, NODE_SET_PROTOTYPE_METHOD(t, "setPendingInstances", SetPendingInstances); #endif + AsyncWrap::AddMethods(t); + target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "Pipe"), t->GetFunction()); env->set_pipe_constructor_template(t); } diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 73058db..85fff07 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -116,6 +116,8 @@ void TCPWrap::Initialize(Handle target, SetSimultaneousAccepts); #endif + AsyncWrap::AddMethods(t); + target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "TCP"), t->GetFunction()); env->set_tcp_constructor_template(t); } diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 2af278d..5a16a55 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -119,6 +119,9 @@ void UDPWrap::Initialize(Handle target, NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref); NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref); + + AsyncWrap::AddMethods(t); + target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "UDP"), t->GetFunction()); env->set_udp_constructor_function(t->GetFunction()); } diff --git a/test/simple/test-asynclistener-error-add-after.js b/test/simple/test-asynclistener-error-add-after.js new file mode 100644 index 0000000..1f8f9d5 --- /dev/null +++ b/test/simple/test-asynclistener-error-add-after.js @@ -0,0 +1,161 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); +var dns = require('dns'); +var fs = require('fs'); +var net = require('net'); + +var errorMsgs = []; +var caught = 0; +var expectCaught = 0; +var exitCbRan = false; + +function asyncL() { } + +var callbacksObj = { + error: function(value, er) { + var idx = errorMsgs.indexOf(er.message); + caught++; + + process._rawDebug('Handling error: ' + er.message); + + if (-1 < idx) + errorMsgs.splice(idx, 1); + else + throw new Error('Message not found: ' + er.message); + + return true; + } +}; + +var listener = process.createAsyncListener(asyncL, callbacksObj); + +process.on('exit', function(code) { + // Just in case. + process.removeAsyncListener(listener); + + if (code > 0) + return; + + assert.ok(!exitCbRan); + exitCbRan = true; + + if (errorMsgs.length > 0) + throw new Error('Errors not fired: ' + errorMsgs); + + assert.equal(caught, expectCaught); + process._rawDebug('ok'); +}); + + +// Simple cases +errorMsgs.push('setTimeout - simple'); +errorMsgs.push('setImmediate - simple'); +errorMsgs.push('setInterval - simple'); +setTimeout(function() { + throw new Error('setTimeout - simple'); +}).addAsyncListener(listener); +expectCaught++; + +setImmediate(function() { + throw new Error('setImmediate - simple'); +}).addAsyncListener(listener); +expectCaught++; + +var b = setInterval(function() { + clearInterval(b); + throw new Error('setInterval - simple'); +}).addAsyncListener(listener); +expectCaught++; + + +// Deeply nested +errorMsgs.push('setInterval - nested'); +errorMsgs.push('setImmediate - nested'); +errorMsgs.push('setTimeout - nested'); +setTimeout(function() { + setImmediate(function() { + var b = setInterval(function() { + clearInterval(b); + throw new Error('setInterval - nested'); + }).addAsyncListener(listener); + expectCaught++; + throw new Error('setImmediate - nested'); + }).addAsyncListener(listener); + expectCaught++; + throw new Error('setTimeout - nested'); +}).addAsyncListener(listener); +expectCaught++; + + +// Net +var iter = 3; +for (var i = 0; i < iter; i++) { + errorMsgs.push('net - error: server connection'); + errorMsgs.push('net - error: client data'); + errorMsgs.push('net - error: server data'); +} +errorMsgs.push('net - error: server closed'); + +var server = net.createServer(function(c) { + c._handle.addAsyncListener(listener); + + c.on('data', function() { + if (0 === --iter) { + server.close(function() { + process._rawDebug('net - server closing'); + throw new Error('net - error: server closed'); + }); + expectCaught++; + } + process._rawDebug('net - connection received data'); + throw new Error('net - error: server data'); + }); + expectCaught++; + + c.end('bye'); + process._rawDebug('net - connection received'); + throw new Error('net - error: server connection'); +}); +expectCaught += iter; + +server.listen(common.PORT, function() { + // Test adding the async listener after server creation. Though it + // won't catch errors that originate synchronously from this point. + server._handle.addAsyncListener(listener); + for (var i = 0; i < iter; i++) + clientConnect(); +}); + +function clientConnect() { + var client = net.connect(common.PORT, function() { + client._handle.addAsyncListener(listener); + }); + + client.on('data', function() { + client.end('see ya'); + process._rawDebug('net - client received data'); + throw new Error('net - error: client data'); + }); + expectCaught++; +} diff --git a/test/simple/test-asynclistener-error-net.js b/test/simple/test-asynclistener-error-net.js new file mode 100644 index 0000000..ed83790 --- /dev/null +++ b/test/simple/test-asynclistener-error-net.js @@ -0,0 +1,109 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); +var dns = require('dns'); +var fs = require('fs'); +var net = require('net'); + +var errorMsgs = []; +var caught = 0; +var expectCaught = 0; + +function asyncL() { } + +var callbacksObj = { + error: function(value, er) { + var idx = errorMsgs.indexOf(er.message); + caught++; + + process._rawDebug('Handling error: ' + er.message); + + if (-1 < idx) + errorMsgs.splice(idx, 1); + else + throw new Error('Message not found: ' + er.message); + + return true; + } +}; + +var listener = process.addAsyncListener(asyncL, callbacksObj); + +process.on('exit', function(code) { + process.removeAsyncListener(listener); + + if (code > 0) + return; + + if (errorMsgs.length > 0) + throw new Error('Errors not fired: ' + errorMsgs); + + assert.equal(caught, expectCaught); + process._rawDebug('ok'); +}); + + +// Net +var iter = 3; +for (var i = 0; i < iter; i++) { + errorMsgs.push('net - error: server connection'); + errorMsgs.push('net - error: client data'); + errorMsgs.push('net - error: server data'); +} +errorMsgs.push('net - error: server closed'); + +var server = net.createServer(function(c) { + c.on('data', function() { + if (0 === --iter) { + server.close(function() { + process._rawDebug('net - server closing'); + throw new Error('net - error: server closed'); + }); + expectCaught++; + } + process._rawDebug('net - connection received data'); + throw new Error('net - error: server data'); + }); + expectCaught++; + + c.end('bye'); + process._rawDebug('net - connection received'); + throw new Error('net - error: server connection'); +}); +expectCaught += iter; + +server.listen(common.PORT, function() { + for (var i = 0; i < iter; i++) + clientConnect(); +}); + +function clientConnect() { + var client = net.connect(common.PORT, function() { }); + + client.on('data', function() { + client.end('see ya'); + process._rawDebug('net - client received data'); + throw new Error('net - error: client data'); + }); + expectCaught++; +} diff --git a/test/simple/test-asynclistener-remove-after.js b/test/simple/test-asynclistener-remove-after.js new file mode 100644 index 0000000..1a1cca2 --- /dev/null +++ b/test/simple/test-asynclistener-remove-after.js @@ -0,0 +1,66 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + + +var common = require('../common'); +var assert = require('assert'); +var net = require('net'); + + +// TODO(trevnorris): Test has the flaw that it's not checking if the async +// flag has been removed on the class instance. +var listener = process.addAsyncListener(function() { }); + + +// Test timers + +setImmediate(function() { + assert.equal(this._asyncQueue.length, 0); +}).removeAsyncListener(listener); + +setTimeout(function() { + assert.equal(this._asyncQueue.length, 0); +}).removeAsyncListener(listener); + +setInterval(function() { + clearInterval(this); + assert.equal(this._asyncQueue.length, 0); +}).removeAsyncListener(listener); + + +// Test net + +var server = net.createServer(function(c) { + c._handle.removeAsyncListener(listener); + assert.equal(c._handle._asyncQueue.length, 0); +}); + +server.listen(common.PORT, function() { + server._handle.removeAsyncListener(listener); + assert.equal(server._handle._asyncQueue.length, 0); + + var client = net.connect(common.PORT, function() { + client._handle.removeAsyncListener(listener); + assert.equal(client._handle._asyncQueue.length, 0); + client.end(); + server.close(); + }); +}); -- 2.7.4