async-wrap: add methods to udp/tcp/pipe/timers
authorTrevor Norris <trev.norris@gmail.com>
Mon, 7 Oct 2013 19:39:52 +0000 (12:39 -0700)
committerTrevor Norris <trev.norris@gmail.com>
Thu, 31 Oct 2013 23:34:11 +0000 (16:34 -0700)
Now it's possible to add/remove an async listener to an individual
handle created by UDP, TCP, Pipe or Timer.

lib/timers.js
src/pipe_wrap.cc
src/tcp_wrap.cc
src/udp_wrap.cc
test/simple/test-asynclistener-error-add-after.js [new file with mode: 0644]
test/simple/test-asynclistener-error-net.js [new file with mode: 0644]
test/simple/test-asynclistener-remove-after.js [new file with mode: 0644]

index 0fce78d..a94aa53 100644 (file)
@@ -200,6 +200,33 @@ exports.active = function(item) {
 };
 
 
+function timerAddAsyncListener(obj) {
+  if (!this._asyncQueue)
+    this._asyncQueue = [];
+  var queue = this._asyncQueue;
+  // This queue will be small. Probably always <= 3 items.
+  for (var i = 0; i < queue.length; i++) {
+    if (queue[i].uid === obj.uid)
+      return;
+  }
+  this._asyncQueue.push(obj);
+}
+
+
+function timerRemoveAsyncListener(obj) {
+  if (!this._asyncQueue)
+    return;
+  var queue = this._asyncQueue;
+  // This queue will be small. Probably always <= 3 items.
+  for (var i = 0; i < queue.length; i++) {
+    if (queue[i].uid === obj.uid) {
+      queue.splice(i, 1);
+      return;
+    }
+  }
+}
+
+
 /*
  * DOM-style timers
  */
@@ -335,6 +362,10 @@ Timeout.prototype.close = function() {
   }
 };
 
+// For domain compatibility need to attach this API.
+Timeout.prototype.addAsyncListener = timerAddAsyncListener;
+Timeout.prototype.removeAsyncListener = timerRemoveAsyncListener;
+
 
 var immediateQueue = {};
 L.init(immediateQueue);
@@ -390,8 +421,20 @@ function processImmediate() {
 }
 
 
+function Immediate() { }
+
+Immediate.prototype.addAsyncListener = timerAddAsyncListener;
+Immediate.prototype.removeAsyncListener = timerRemoveAsyncListener;
+Immediate.prototype.domain = undefined;
+Immediate.prototype._onImmediate = undefined;
+Immediate.prototype._asyncQueue = undefined;
+Immediate.prototype._idleNext = undefined;
+Immediate.prototype._idlePrev = undefined;
+
+
 exports.setImmediate = function(callback) {
-  var immediate = {}, args;
+  var immediate = new Immediate();
+  var args;
 
   L.init(immediate);
 
index a4e4ed1..a522775 100644 (file)
@@ -111,6 +111,8 @@ void PipeWrap::Initialize(Handle<Object> target,
   NODE_SET_PROTOTYPE_METHOD(t, "setPendingInstances", SetPendingInstances);
 #endif
 
+  AsyncWrap::AddMethods<PipeWrap>(t);
+
   target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "Pipe"), t->GetFunction());
   env->set_pipe_constructor_template(t);
 }
index 73058db..85fff07 100644 (file)
@@ -116,6 +116,8 @@ void TCPWrap::Initialize(Handle<Object> target,
                             SetSimultaneousAccepts);
 #endif
 
+  AsyncWrap::AddMethods<TCPWrap>(t);
+
   target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "TCP"), t->GetFunction());
   env->set_tcp_constructor_template(t);
 }
index 2af278d..5a16a55 100644 (file)
@@ -119,6 +119,9 @@ void UDPWrap::Initialize(Handle<Object> target,
   NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
   NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
 
+
+  AsyncWrap::AddMethods<UDPWrap>(t);
+
   target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "UDP"), t->GetFunction());
   env->set_udp_constructor_function(t->GetFunction());
 }
diff --git a/test/simple/test-asynclistener-error-add-after.js b/test/simple/test-asynclistener-error-add-after.js
new file mode 100644 (file)
index 0000000..1f8f9d5
--- /dev/null
@@ -0,0 +1,161 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var dns = require('dns');
+var fs = require('fs');
+var net = require('net');
+
+var errorMsgs = [];
+var caught = 0;
+var expectCaught = 0;
+var exitCbRan = false;
+
+function asyncL() { }
+
+var callbacksObj = {
+  error: function(value, er) {
+    var idx = errorMsgs.indexOf(er.message);
+    caught++;
+
+    process._rawDebug('Handling error: ' + er.message);
+
+    if (-1 < idx)
+      errorMsgs.splice(idx, 1);
+    else
+      throw new Error('Message not found: ' + er.message);
+
+    return true;
+  }
+};
+
+var listener = process.createAsyncListener(asyncL, callbacksObj);
+
+process.on('exit', function(code) {
+  // Just in case.
+  process.removeAsyncListener(listener);
+
+  if (code > 0)
+    return;
+
+  assert.ok(!exitCbRan);
+  exitCbRan = true;
+
+  if (errorMsgs.length > 0)
+    throw new Error('Errors not fired: ' + errorMsgs);
+
+  assert.equal(caught, expectCaught);
+  process._rawDebug('ok');
+});
+
+
+// Simple cases
+errorMsgs.push('setTimeout - simple');
+errorMsgs.push('setImmediate - simple');
+errorMsgs.push('setInterval - simple');
+setTimeout(function() {
+  throw new Error('setTimeout - simple');
+}).addAsyncListener(listener);
+expectCaught++;
+
+setImmediate(function() {
+  throw new Error('setImmediate - simple');
+}).addAsyncListener(listener);
+expectCaught++;
+
+var b = setInterval(function() {
+  clearInterval(b);
+  throw new Error('setInterval - simple');
+}).addAsyncListener(listener);
+expectCaught++;
+
+
+// Deeply nested
+errorMsgs.push('setInterval - nested');
+errorMsgs.push('setImmediate - nested');
+errorMsgs.push('setTimeout - nested');
+setTimeout(function() {
+  setImmediate(function() {
+    var b = setInterval(function() {
+      clearInterval(b);
+      throw new Error('setInterval - nested');
+    }).addAsyncListener(listener);
+    expectCaught++;
+    throw new Error('setImmediate - nested');
+  }).addAsyncListener(listener);
+  expectCaught++;
+  throw new Error('setTimeout - nested');
+}).addAsyncListener(listener);
+expectCaught++;
+
+
+// Net
+var iter = 3;
+for (var i = 0; i < iter; i++) {
+  errorMsgs.push('net - error: server connection');
+  errorMsgs.push('net - error: client data');
+  errorMsgs.push('net - error: server data');
+}
+errorMsgs.push('net - error: server closed');
+
+var server = net.createServer(function(c) {
+  c._handle.addAsyncListener(listener);
+
+  c.on('data', function() {
+    if (0 === --iter) {
+      server.close(function() {
+        process._rawDebug('net - server closing');
+        throw new Error('net - error: server closed');
+      });
+      expectCaught++;
+    }
+    process._rawDebug('net - connection received data');
+    throw new Error('net - error: server data');
+  });
+  expectCaught++;
+
+  c.end('bye');
+  process._rawDebug('net - connection received');
+  throw new Error('net - error: server connection');
+});
+expectCaught += iter;
+
+server.listen(common.PORT, function() {
+  // Test adding the async listener after server creation. Though it
+  // won't catch errors that originate synchronously from this point.
+  server._handle.addAsyncListener(listener);
+  for (var i = 0; i < iter; i++)
+    clientConnect();
+});
+
+function clientConnect() {
+  var client = net.connect(common.PORT, function() {
+    client._handle.addAsyncListener(listener);
+  });
+
+  client.on('data', function() {
+    client.end('see ya');
+    process._rawDebug('net - client received data');
+    throw new Error('net - error: client data');
+  });
+  expectCaught++;
+}
diff --git a/test/simple/test-asynclistener-error-net.js b/test/simple/test-asynclistener-error-net.js
new file mode 100644 (file)
index 0000000..ed83790
--- /dev/null
@@ -0,0 +1,109 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var dns = require('dns');
+var fs = require('fs');
+var net = require('net');
+
+var errorMsgs = [];
+var caught = 0;
+var expectCaught = 0;
+
+function asyncL() { }
+
+var callbacksObj = {
+  error: function(value, er) {
+    var idx = errorMsgs.indexOf(er.message);
+    caught++;
+
+    process._rawDebug('Handling error: ' + er.message);
+
+    if (-1 < idx)
+      errorMsgs.splice(idx, 1);
+    else
+      throw new Error('Message not found: ' + er.message);
+
+    return true;
+  }
+};
+
+var listener = process.addAsyncListener(asyncL, callbacksObj);
+
+process.on('exit', function(code) {
+  process.removeAsyncListener(listener);
+
+  if (code > 0)
+    return;
+
+  if (errorMsgs.length > 0)
+    throw new Error('Errors not fired: ' + errorMsgs);
+
+  assert.equal(caught, expectCaught);
+  process._rawDebug('ok');
+});
+
+
+// Net
+var iter = 3;
+for (var i = 0; i < iter; i++) {
+  errorMsgs.push('net - error: server connection');
+  errorMsgs.push('net - error: client data');
+  errorMsgs.push('net - error: server data');
+}
+errorMsgs.push('net - error: server closed');
+
+var server = net.createServer(function(c) {
+  c.on('data', function() {
+    if (0 === --iter) {
+      server.close(function() {
+        process._rawDebug('net - server closing');
+        throw new Error('net - error: server closed');
+      });
+      expectCaught++;
+    }
+    process._rawDebug('net - connection received data');
+    throw new Error('net - error: server data');
+  });
+  expectCaught++;
+
+  c.end('bye');
+  process._rawDebug('net - connection received');
+  throw new Error('net - error: server connection');
+});
+expectCaught += iter;
+
+server.listen(common.PORT, function() {
+  for (var i = 0; i < iter; i++)
+    clientConnect();
+});
+
+function clientConnect() {
+  var client = net.connect(common.PORT, function() { });
+
+  client.on('data', function() {
+    client.end('see ya');
+    process._rawDebug('net - client received data');
+    throw new Error('net - error: client data');
+  });
+  expectCaught++;
+}
diff --git a/test/simple/test-asynclistener-remove-after.js b/test/simple/test-asynclistener-remove-after.js
new file mode 100644 (file)
index 0000000..1a1cca2
--- /dev/null
@@ -0,0 +1,66 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var net = require('net');
+
+
+// TODO(trevnorris): Test has the flaw that it's not checking if the async
+// flag has been removed on the class instance.
+var listener = process.addAsyncListener(function() { });
+
+
+// Test timers
+
+setImmediate(function() {
+  assert.equal(this._asyncQueue.length, 0);
+}).removeAsyncListener(listener);
+
+setTimeout(function() {
+  assert.equal(this._asyncQueue.length, 0);
+}).removeAsyncListener(listener);
+
+setInterval(function() {
+  clearInterval(this);
+  assert.equal(this._asyncQueue.length, 0);
+}).removeAsyncListener(listener);
+
+
+// Test net
+
+var server = net.createServer(function(c) {
+  c._handle.removeAsyncListener(listener);
+  assert.equal(c._handle._asyncQueue.length, 0);
+});
+
+server.listen(common.PORT, function() {
+  server._handle.removeAsyncListener(listener);
+  assert.equal(server._handle._asyncQueue.length, 0);
+
+  var client = net.connect(common.PORT, function() {
+    client._handle.removeAsyncListener(listener);
+    assert.equal(client._handle._asyncQueue.length, 0);
+    client.end();
+    server.close();
+  });
+});