node: add AsyncListener support
authorTrevor Norris <trev.norris@gmail.com>
Tue, 24 Sep 2013 21:12:11 +0000 (14:12 -0700)
committerTrevor Norris <trev.norris@gmail.com>
Thu, 31 Oct 2013 21:17:51 +0000 (14:17 -0700)
AsyncListener is a JS API that works in tandem with the AsyncWrap class
to allow the user to be alerted to key events in the life cycle of an
asynchronous event. The AsyncWrap class has its own MakeCallback
implementation that core will be migrated to use, and uses state sharing
techniques to allow quicker communication between JS and C++ whether the
async event callbacks need to be called.

26 files changed:
lib/timers.js
node.gyp
src/async-wrap-inl.h [new file with mode: 0644]
src/async-wrap.h [new file with mode: 0644]
src/env-inl.h
src/env.h
src/fs_event_wrap.cc
src/handle_wrap.cc
src/handle_wrap.h
src/node.cc
src/node.js
src/node_file.cc
src/node_internals.h
src/pipe_wrap.cc
src/process_wrap.cc
src/req_wrap.h
src/signal_wrap.cc
src/stream_wrap.cc
src/tcp_wrap.cc
src/timer_wrap.cc
src/udp_wrap.cc
test/simple/test-asynclistener-error.js [new file with mode: 0644]
test/simple/test-asynclistener-multi-timeout.js [new file with mode: 0644]
test/simple/test-asynclistener-throw-before-infinite-recursion.js [new file with mode: 0644]
test/simple/test-asynclistener.js [new file with mode: 0644]
test/simple/test-domain-http-server.js

index 93dd65e..0fce78d 100644 (file)
@@ -30,6 +30,17 @@ var TIMEOUT_MAX = 2147483647; // 2^31-1
 
 var debug = require('util').debuglog('timer');
 
+var asyncFlags = process._asyncFlags;
+var runAsyncQueue = process._runAsyncQueue;
+var loadAsyncQueue = process._loadAsyncQueue;
+var unloadAsyncQueue = process._unloadAsyncQueue;
+
+// Do a little housekeeping.
+delete process._asyncFlags;
+delete process._runAsyncQueue;
+delete process._loadAsyncQueue;
+delete process._unloadAsyncQueue;
+
 
 // IDLE TIMEOUTS
 //
@@ -44,6 +55,9 @@ var debug = require('util').debuglog('timer');
 // value = list
 var lists = {};
 
+// Make Timer as monomorphic as possible.
+Timer.prototype._asyncQueue = undefined;
+
 // the main function - creates lists on demand and the watchers associated
 // with them.
 function insert(item, msecs) {
@@ -80,9 +94,9 @@ function listOnTimeout() {
   var now = Timer.now();
   debug('now: %s', now);
 
-  var first;
+  var diff, first, hasQueue, threw;
   while (first = L.peek(list)) {
-    var diff = now - first._idleStart;
+    diff = now - first._idleStart;
     if (diff < msecs) {
       list.start(msecs - diff, 0);
       debug('%d list wait because diff is %d', msecs, diff);
@@ -99,12 +113,20 @@ function listOnTimeout() {
       //
       // https://github.com/joyent/node/issues/2631
       var domain = first.domain;
-      if (domain && domain._disposed) continue;
+      if (domain && domain._disposed)
+        continue;
+
+      hasQueue = !!first._asyncQueue;
+
       try {
+        if (hasQueue)
+          loadAsyncQueue(first);
         if (domain)
           domain.enter();
-        var threw = true;
+        threw = true;
         first._onTimeout();
+        if (hasQueue)
+          unloadAsyncQueue(first);
         if (domain)
           domain.exit();
         threw = false;
@@ -162,7 +184,6 @@ exports.enroll = function(item, msecs) {
 exports.active = function(item) {
   var msecs = item._idleTimeout;
   if (msecs >= 0) {
-
     var list = lists[msecs];
     if (!list || L.isEmpty(list)) {
       insert(item, msecs);
@@ -171,6 +192,11 @@ exports.active = function(item) {
       L.append(list, item);
     }
   }
+  // Whether or not a new TimerWrap needed to be created, this should run
+  // for each item. This way each "item" (i.e. timer) can properly have
+  // their own domain assigned.
+  if (asyncFlags[0] > 0)
+    runAsyncQueue(item);
 };
 
 
@@ -316,16 +342,43 @@ L.init(immediateQueue);
 
 function processImmediate() {
   var queue = immediateQueue;
+  var domain, hasQueue, immediate;
 
   immediateQueue = {};
   L.init(immediateQueue);
 
   while (L.isEmpty(queue) === false) {
-    var immediate = L.shift(queue);
-    var domain = immediate.domain;
-    if (domain) domain.enter();
-    immediate._onImmediate();
-    if (domain) domain.exit();
+    immediate = L.shift(queue);
+    hasQueue = !!immediate._asyncQueue;
+    domain = immediate.domain;
+
+    if (hasQueue)
+      loadAsyncQueue(immediate);
+    if (domain)
+      domain.enter();
+
+    var threw = true;
+    try {
+      immediate._onImmediate();
+      threw = false;
+    } finally {
+      if (threw) {
+        if (!L.isEmpty(queue)) {
+          // Handle any remaining on next tick, assuming we're still
+          // alive to do so.
+          while (!L.isEmpty(immediateQueue)) {
+            L.append(queue, L.shift(immediateQueue));
+          }
+          immediateQueue = queue;
+          process.nextTick(processImmediate);
+        }
+      }
+    }
+
+    if (hasQueue)
+      unloadAsyncQueue(immediate);
+    if (domain)
+      domain.exit();
   }
 
   // Only round-trip to C++ land if we have to. Calling clearImmediate() on an
@@ -357,7 +410,11 @@ exports.setImmediate = function(callback) {
     process._immediateCallback = processImmediate;
   }
 
-  if (process.domain) immediate.domain = process.domain;
+  // setImmediates are handled more like nextTicks.
+  if (asyncFlags[0] > 0)
+    runAsyncQueue(immediate);
+  if (process.domain)
+    immediate.domain = process.domain;
 
   L.append(immediateQueue, immediate);
 
@@ -389,9 +446,10 @@ function unrefTimeout() {
 
   debug('unrefTimer fired');
 
-  var first;
+  var diff, domain, first, hasQueue, threw;
   while (first = L.peek(unrefList)) {
-    var diff = now - first._idleStart;
+    diff = now - first._idleStart;
+    hasQueue = !!first._asyncQueue;
 
     if (diff < first._idleTimeout) {
       diff = first._idleTimeout - diff;
@@ -403,17 +461,21 @@ function unrefTimeout() {
 
     L.remove(first);
 
-    var domain = first.domain;
+    domain = first.domain;
 
     if (!first._onTimeout) continue;
     if (domain && domain._disposed) continue;
 
     try {
+      if (hasQueue)
+        loadAsyncQueue(first);
       if (domain) domain.enter();
-      var threw = true;
+      threw = true;
       debug('unreftimer firing timeout');
       first._onTimeout();
       threw = false;
+      if (hasQueue)
+        unloadAsyncQueue(first);
       if (domain) domain.exit();
     } finally {
       if (threw) process.nextTick(unrefTimeout);
index 50c1394..4bb7398 100644 (file)
--- a/node.gyp
+++ b/node.gyp
         'src/udp_wrap.cc',
         'src/uv.cc',
         # headers to make for a more pleasant IDE experience
+        'src/async-wrap.h',
+        'src/async-wrap-inl.h',
         'src/env.h',
         'src/env-inl.h',
         'src/handle_wrap.h',
diff --git a/src/async-wrap-inl.h b/src/async-wrap-inl.h
new file mode 100644 (file)
index 0000000..055e31e
--- /dev/null
@@ -0,0 +1,324 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+#ifndef SRC_ASYNC_WRAP_INL_H_
+#define SRC_ASYNC_WRAP_INL_H_
+
+#include "async-wrap.h"
+#include "env.h"
+#include "env-inl.h"
+#include "util.h"
+#include "util-inl.h"
+#include "v8.h"
+#include <assert.h>
+
+namespace node {
+
+inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle<v8::Object> object)
+    : object_(env->isolate(), object),
+      env_(env),
+      async_flags_(NO_OPTIONS) {
+  assert(!object.IsEmpty());
+
+  if (!env->has_async_listeners())
+    return;
+
+  // TODO(trevnorris): Do we really need to TryCatch this call?
+  v8::TryCatch try_catch;
+  try_catch.SetVerbose(true);
+
+  v8::Local<v8::Value> val = object.As<v8::Value>();
+  env->async_listener_run_function()->Call(env->process_object(), 1, &val);
+
+  if (!try_catch.HasCaught())
+    async_flags_ |= ASYNC_LISTENERS;
+}
+
+
+inline AsyncWrap::~AsyncWrap() {
+  assert(persistent().IsEmpty());
+}
+
+
+template<typename TYPE>
+inline void AsyncWrap::AddMethods(v8::Handle<v8::FunctionTemplate> t) {
+  NODE_SET_PROTOTYPE_METHOD(t,
+                            "addAsyncListener",
+                            AddAsyncListener<TYPE>);
+  NODE_SET_PROTOTYPE_METHOD(t,
+                            "removeAsyncListener",
+                            RemoveAsyncListener<TYPE>);
+}
+
+
+inline uint32_t AsyncWrap::async_flags() const {
+  return async_flags_;
+}
+
+
+inline void AsyncWrap::set_flag(unsigned int flag) {
+  async_flags_ |= flag;
+}
+
+
+inline void AsyncWrap::remove_flag(unsigned int flag) {
+  async_flags_ &= ~flag;
+}
+
+
+inline bool AsyncWrap::has_async_queue() {
+  return async_flags() & ASYNC_LISTENERS;
+}
+
+
+inline Environment* AsyncWrap::env() const {
+  return env_;
+}
+
+
+inline v8::Local<v8::Object> AsyncWrap::object() {
+  return PersistentToLocal(env()->isolate(), persistent());
+}
+
+
+inline v8::Persistent<v8::Object>& AsyncWrap::persistent() {
+  return object_;
+}
+
+
+// I hate you domains.
+inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
+    const v8::Handle<v8::Function> cb,
+    int argc,
+    v8::Handle<v8::Value>* argv) {
+  assert(env()->context() == env()->isolate()->GetCurrentContext());
+
+  v8::Local<v8::Object> context = object();
+  v8::Local<v8::Object> process = env()->process_object();
+  v8::Local<v8::Value> domain_v = context->Get(env()->domain_string());
+  v8::Local<v8::Object> domain;
+
+  v8::TryCatch try_catch;
+  try_catch.SetVerbose(true);
+
+  if (has_async_queue()) {
+    v8::Local<v8::Value> val = context.As<v8::Value>();
+    env()->async_listener_load_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return v8::Undefined(env()->isolate());
+  }
+
+  bool has_domain = domain_v->IsObject();
+  if (has_domain) {
+    domain = domain_v.As<v8::Object>();
+
+    if (domain->Get(env()->disposed_string())->IsTrue())
+      return Undefined(env()->isolate());
+
+    v8::Local<v8::Function> enter =
+      domain->Get(env()->enter_string()).As<v8::Function>();
+    assert(enter->IsFunction());
+    enter->Call(domain, 0, NULL);
+
+    if (try_catch.HasCaught())
+      return Undefined(env()->isolate());
+  }
+
+  v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
+
+  if (try_catch.HasCaught()) {
+    return Undefined(env()->isolate());
+  }
+
+  if (has_async_queue()) {
+    v8::Local<v8::Value> val = context.As<v8::Value>();
+    env()->async_listener_unload_function()->Call(process, 1, &val);
+  }
+
+  if (has_domain) {
+    v8::Local<v8::Function> exit =
+      domain->Get(env()->exit_string()).As<v8::Function>();
+    assert(exit->IsFunction());
+    exit->Call(domain, 0, NULL);
+
+    if (try_catch.HasCaught())
+      return Undefined(env()->isolate());
+  }
+
+  Environment::TickInfo* tick_info = env()->tick_info();
+
+  if (tick_info->in_tick()) {
+    return ret;
+  }
+
+  if (tick_info->length() == 0) {
+    tick_info->set_index(0);
+    return ret;
+  }
+
+  tick_info->set_in_tick(true);
+
+  env()->tick_callback_function()->Call(process, 0, NULL);
+
+  tick_info->set_in_tick(false);
+
+  if (try_catch.HasCaught()) {
+    tick_info->set_last_threw(true);
+    return Undefined(env()->isolate());
+  }
+
+  return ret;
+}
+
+
+inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
+    const v8::Handle<v8::Function> cb,
+    int argc,
+    v8::Handle<v8::Value>* argv) {
+  if (env()->using_domains())
+    return MakeDomainCallback(cb, argc, argv);
+
+  assert(env()->context() == env()->isolate()->GetCurrentContext());
+
+  v8::Local<v8::Object> context = object();
+  v8::Local<v8::Object> process = env()->process_object();
+
+  v8::TryCatch try_catch;
+  try_catch.SetVerbose(true);
+
+  if (has_async_queue()) {
+    v8::Local<v8::Value> val = context.As<v8::Value>();
+    env()->async_listener_load_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return v8::Undefined(env()->isolate());
+  }
+
+  v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
+
+  if (try_catch.HasCaught()) {
+    return Undefined(env()->isolate());
+  }
+
+  if (has_async_queue()) {
+    v8::Local<v8::Value> val = context.As<v8::Value>();
+    env()->async_listener_unload_function()->Call(process, 1, &val);
+  }
+
+  Environment::TickInfo* tick_info = env()->tick_info();
+
+  if (tick_info->in_tick()) {
+    return ret;
+  }
+
+  if (tick_info->length() == 0) {
+    tick_info->set_index(0);
+    return ret;
+  }
+
+  tick_info->set_in_tick(true);
+
+  // TODO(trevnorris): Consider passing "context" to _tickCallback so it
+  // can then be passed as the first argument to the nextTick callback.
+  // That should greatly help needing to create closures.
+  env()->tick_callback_function()->Call(process, 0, NULL);
+
+  tick_info->set_in_tick(false);
+
+  if (try_catch.HasCaught()) {
+    tick_info->set_last_threw(true);
+    return Undefined(env()->isolate());
+  }
+
+  return ret;
+}
+
+
+inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
+    const v8::Handle<v8::String> symbol,
+    int argc,
+    v8::Handle<v8::Value>* argv) {
+  v8::Local<v8::Value> cb_v = object()->Get(symbol);
+  v8::Local<v8::Function> cb = cb_v.As<v8::Function>();
+  assert(cb->IsFunction());
+
+  return MakeCallback(cb, argc, argv);
+}
+
+
+inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
+    uint32_t index,
+    int argc,
+    v8::Handle<v8::Value>* argv) {
+  v8::Local<v8::Value> cb_v = object()->Get(index);
+  v8::Local<v8::Function> cb = cb_v.As<v8::Function>();
+  assert(cb->IsFunction());
+
+  return MakeCallback(cb, argc, argv);
+}
+
+
+template <typename TYPE>
+inline void AsyncWrap::AddAsyncListener(
+    const v8::FunctionCallbackInfo<v8::Value>& args) {
+  Environment* env = Environment::GetCurrent(args.GetIsolate());
+  v8::HandleScope handle_scope(args.GetIsolate());
+
+  v8::Local<v8::Object> handle = args.This();
+  v8::Local<v8::Value> listener = args[0];
+  assert(listener->IsObject());
+  assert(handle->InternalFieldCount() > 0);
+
+  env->async_listener_push_function()->Call(handle, 1, &listener);
+
+  TYPE* wrap = static_cast<TYPE*>(
+      handle->GetAlignedPointerFromInternalField(0));
+  assert(wrap != NULL);
+  wrap->set_flag(ASYNC_LISTENERS);
+}
+
+
+template <typename TYPE>
+inline void AsyncWrap::RemoveAsyncListener(
+    const v8::FunctionCallbackInfo<v8::Value>& args) {
+  Environment* env = Environment::GetCurrent(args.GetIsolate());
+  v8::HandleScope handle_scope(args.GetIsolate());
+
+  v8::Local<v8::Object> handle = args.This();
+  v8::Local<v8::Value> listener = args[0];
+  assert(listener->IsObject());
+  assert(handle->InternalFieldCount() > 0);
+
+  v8::Local<v8::Value> ret =
+      env->async_listener_strip_function()->Call(handle, 1, &listener);
+
+  if (ret->IsFalse()) {
+    TYPE* wrap = static_cast<TYPE*>(
+        handle->GetAlignedPointerFromInternalField(0));
+    assert(wrap != NULL);
+    wrap->remove_flag(ASYNC_LISTENERS);
+  }
+}
+
+}  // namespace node
+
+#endif  // SRC_ASYNC_WRAP_INL_H_
diff --git a/src/async-wrap.h b/src/async-wrap.h
new file mode 100644 (file)
index 0000000..4797386
--- /dev/null
@@ -0,0 +1,97 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+#ifndef SRC_ASYNC_WRAP_H_
+#define SRC_ASYNC_WRAP_H_
+
+#include "env.h"
+#include "v8.h"
+
+namespace node {
+
+class AsyncWrap {
+ public:
+  enum AsyncFlags {
+    NO_OPTIONS = 0,
+    ASYNC_LISTENERS = 1
+  };
+
+  inline AsyncWrap(Environment* env, v8::Handle<v8::Object> object);
+
+  inline ~AsyncWrap();
+
+  template <typename Type>
+  static inline void AddMethods(v8::Handle<v8::FunctionTemplate> t);
+
+  inline uint32_t async_flags() const;
+
+  inline void set_flag(unsigned int flag);
+
+  inline void remove_flag(unsigned int flag);
+
+  inline bool has_async_queue();
+
+  inline Environment* env() const;
+
+  // Returns the wrapped object.  Illegal to call in your destructor.
+  inline v8::Local<v8::Object> object();
+
+  // Parent class is responsible to Dispose.
+  inline v8::Persistent<v8::Object>& persistent();
+
+  // Only call these within a valid HandleScope.
+  inline v8::Handle<v8::Value> MakeCallback(const v8::Handle<v8::Function> cb,
+                                            int argc,
+                                            v8::Handle<v8::Value>* argv);
+  inline v8::Handle<v8::Value> MakeCallback(const v8::Handle<v8::String> symbol,
+                                            int argc,
+                                            v8::Handle<v8::Value>* argv);
+  inline v8::Handle<v8::Value> MakeCallback(uint32_t index,
+                                            int argc,
+                                            v8::Handle<v8::Value>* argv);
+
+ private:
+  // TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
+  // replacement is committed.
+  inline v8::Handle<v8::Value> MakeDomainCallback(
+      const v8::Handle<v8::Function> cb,
+      int argc,
+      v8::Handle<v8::Value>* argv);
+
+  // Add an async listener to an existing handle.
+  template <typename Type>
+  static inline void AddAsyncListener(
+      const v8::FunctionCallbackInfo<v8::Value>& args);
+
+  // Remove an async listener to an existing handle.
+  template <typename Type>
+  static inline void RemoveAsyncListener(
+      const v8::FunctionCallbackInfo<v8::Value>& args);
+
+  v8::Persistent<v8::Object> object_;
+  Environment* const env_;
+  uint32_t async_flags_;
+};
+
+}  // namespace node
+
+
+#endif  // SRC_ASYNC_WRAP_H_
index b520e07..b57cf99 100644 (file)
@@ -69,6 +69,23 @@ inline v8::Isolate* Environment::IsolateData::isolate() const {
   return isolate_;
 }
 
+inline Environment::AsyncListener::AsyncListener() {
+  for (int i = 0; i < kFieldsCount; ++i)
+    fields_[i] = 0;
+}
+
+inline uint32_t* Environment::AsyncListener::fields() {
+  return fields_;
+}
+
+inline int Environment::AsyncListener::fields_count() const {
+  return kFieldsCount;
+}
+
+inline uint32_t Environment::AsyncListener::count() const {
+  return fields_[kCount];
+}
+
 inline Environment::DomainFlag::DomainFlag() {
   for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
 }
@@ -86,7 +103,8 @@ inline uint32_t Environment::DomainFlag::count() const {
 }
 
 inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) {
-  for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
+  for (int i = 0; i < kFieldsCount; ++i)
+    fields_[i] = 0;
 }
 
 inline uint32_t* Environment::TickInfo::fields() {
@@ -187,6 +205,11 @@ inline v8::Isolate* Environment::isolate() const {
   return isolate_;
 }
 
+inline bool Environment::has_async_listeners() const {
+  // The const_cast is okay, it doesn't violate conceptual const-ness.
+  return const_cast<Environment*>(this)->async_listener()->count() > 0;
+}
+
 inline bool Environment::in_domain() const {
   // The const_cast is okay, it doesn't violate conceptual const-ness.
   return using_domains() &&
@@ -227,6 +250,10 @@ inline uv_loop_t* Environment::event_loop() const {
   return isolate_data()->event_loop();
 }
 
+inline Environment::AsyncListener* Environment::async_listener() {
+  return &async_listener_count_;
+}
+
 inline Environment::DomainFlag* Environment::domain_flag() {
   return &domain_flag_;
 }
index b45d250..37e4ae6 100644 (file)
--- a/src/env.h
+++ b/src/env.h
@@ -53,6 +53,7 @@ namespace node {
 #define PER_ISOLATE_STRING_PROPERTIES(V)                                      \
   V(address_string, "address")                                                \
   V(atime_string, "atime")                                                    \
+  V(async_queue_string, "_asyncQueue")                                        \
   V(birthtime_string, "birthtime")                                            \
   V(blksize_string, "blksize")                                                \
   V(blocks_string, "blocks")                                                  \
@@ -131,6 +132,11 @@ namespace node {
   V(write_queue_size_string, "writeQueueSize")                                \
 
 #define ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V)                           \
+  V(async_listener_load_function, v8::Function)                               \
+  V(async_listener_push_function, v8::Function)                               \
+  V(async_listener_run_function, v8::Function)                                \
+  V(async_listener_strip_function, v8::Function)                              \
+  V(async_listener_unload_function, v8::Function)                             \
   V(binding_cache_object, v8::Object)                                         \
   V(buffer_constructor_function, v8::Function)                                \
   V(context, v8::Context)                                                     \
@@ -163,6 +169,26 @@ RB_HEAD(ares_task_list, ares_task_t);
 
 class Environment {
  public:
+  class AsyncListener {
+   public:
+    inline uint32_t* fields();
+    inline int fields_count() const;
+    inline uint32_t count() const;
+
+   private:
+    friend class Environment;  // So we can call the constructor.
+    inline AsyncListener();
+
+    enum Fields {
+      kCount,
+      kFieldsCount
+    };
+
+    uint32_t fields_[kFieldsCount];
+
+    DISALLOW_COPY_AND_ASSIGN(AsyncListener);
+  };
+
   class DomainFlag {
    public:
     inline uint32_t* fields();
@@ -223,6 +249,7 @@ class Environment {
 
   inline v8::Isolate* isolate() const;
   inline uv_loop_t* event_loop() const;
+  inline bool has_async_listeners() const;
   inline bool in_domain() const;
 
   static inline Environment* from_immediate_check_handle(uv_check_t* handle);
@@ -235,6 +262,7 @@ class Environment {
   static inline Environment* from_idle_check_handle(uv_check_t* handle);
   inline uv_check_t* idle_check_handle();
 
+  inline AsyncListener* async_listener();
   inline DomainFlag* domain_flag();
   inline TickInfo* tick_info();
 
@@ -279,6 +307,7 @@ class Environment {
   uv_idle_t immediate_idle_handle_;
   uv_prepare_t idle_prepare_handle_;
   uv_check_t idle_check_handle_;
+  AsyncListener async_listener_count_;
   DomainFlag domain_flag_;
   TickInfo tick_info_;
   uv_timer_t cares_timer_handle_;
index 1fc901e..076a224 100644 (file)
@@ -19,6 +19,8 @@
 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
 #include "env.h"
 #include "env-inl.h"
 #include "util.h"
@@ -175,11 +177,7 @@ void FSEventWrap::OnEvent(uv_fs_event_t* handle, const char* filename,
     argv[2] = OneByteString(node_isolate, filename);
   }
 
-  MakeCallback(env,
-               wrap->object(),
-               env->onchange_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  wrap->MakeCallback(env->onchange_string(), ARRAY_SIZE(argv), argv);
 }
 
 
index d930bd3..be37d63 100644 (file)
@@ -20,6 +20,8 @@
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 #include "handle_wrap.h"
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
 #include "env.h"
 #include "env-inl.h"
 #include "util.h"
@@ -89,12 +91,11 @@ void HandleWrap::Close(const FunctionCallbackInfo<Value>& args) {
 HandleWrap::HandleWrap(Environment* env,
                        Handle<Object> object,
                        uv_handle_t* handle)
-    : env_(env),
+    : AsyncWrap(env, object),
       flags_(0),
       handle__(handle) {
   handle__->data = this;
   HandleScope scope(node_isolate);
-  persistent().Reset(node_isolate, object);
   Wrap<HandleWrap>(object, this);
   QUEUE_INSERT_TAIL(&handle_wrap_queue, &handle_wrap_queue_);
 }
@@ -121,7 +122,7 @@ void HandleWrap::OnClose(uv_handle_t* handle) {
   Local<Object> object = wrap->object();
 
   if (wrap->flags_ & kCloseCallback) {
-    MakeCallback(env, object, env->close_string());
+    wrap->MakeCallback(env->close_string(), 0, NULL);
   }
 
   object->SetAlignedPointerInInternalField(0, NULL);
index 73a43c3..47cc44f 100644 (file)
@@ -22,6 +22,7 @@
 #ifndef SRC_HANDLE_WRAP_H_
 #define SRC_HANDLE_WRAP_H_
 
+#include "async-wrap.h"
 #include "env.h"
 #include "node.h"
 #include "queue.h"
@@ -50,7 +51,7 @@ namespace node {
 //   js/c++ boundary crossing. At the javascript layer that should all be
 //   taken care of.
 
-class HandleWrap {
+class HandleWrap : public AsyncWrap {
  public:
   static void Close(const v8::FunctionCallbackInfo<v8::Value>& args);
   static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -64,24 +65,10 @@ class HandleWrap {
              uv_handle_t* handle);
   virtual ~HandleWrap();
 
-  inline Environment* env() const {
-    return env_;
-  }
-
-  inline v8::Local<v8::Object> object() {
-    return PersistentToLocal(env()->isolate(), persistent());
-  }
-
-  inline v8::Persistent<v8::Object>& persistent() {
-    return object_;
-  }
-
  private:
   friend void GetActiveHandles(const v8::FunctionCallbackInfo<v8::Value>&);
   static void OnClose(uv_handle_t* handle);
-  v8::Persistent<v8::Object> object_;
   QUEUE handle_wrap_queue_;
-  Environment* const env_;
   unsigned int flags_;
   // Using double underscore due to handle_ member in tcp_wrap. Probably
   // tcp_wrap should rename it's member to 'handle'.
index 4e30269..2e8a013 100644 (file)
@@ -40,6 +40,8 @@
 #endif
 
 #include "ares.h"
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
 #include "env.h"
 #include "env-inl.h"
 #include "handle_wrap.h"
@@ -841,6 +843,36 @@ Local<Value> WinapiErrnoException(int errorno,
 #endif
 
 
+void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
+  Environment* env = Environment::GetCurrent(args.GetIsolate());
+  HandleScope handle_scope(args.GetIsolate());
+
+  assert(args[0]->IsObject() &&
+         args[1]->IsFunction() &&
+         args[2]->IsFunction() &&
+         args[3]->IsFunction() &&
+         args[4]->IsFunction() &&
+         args[5]->IsFunction());
+
+  env->set_async_listener_run_function(args[1].As<Function>());
+  env->set_async_listener_load_function(args[2].As<Function>());
+  env->set_async_listener_unload_function(args[3].As<Function>());
+  env->set_async_listener_push_function(args[4].As<Function>());
+  env->set_async_listener_strip_function(args[5].As<Function>());
+
+  Local<Object> async_listener_flag_obj = args[0].As<Object>();
+  Environment::AsyncListener* async_listener = env->async_listener();
+  async_listener_flag_obj->SetIndexedPropertiesToExternalArrayData(
+      async_listener->fields(),
+      kExternalUnsignedIntArray,
+      async_listener->fields_count());
+
+  // Do a little housekeeping.
+  env->process_object()->Delete(
+      FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupAsyncListener"));
+}
+
+
 void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
   Environment* env = Environment::GetCurrent(args.GetIsolate());
 
@@ -882,25 +914,60 @@ void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
       domain_flag->fields(),
       kExternalUnsignedIntArray,
       domain_flag->fields_count());
+
+  // Do a little housekeeping.
+  env->process_object()->Delete(
+      FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse"));
+}
+
+
+void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
+  Environment* env = Environment::GetCurrent(args.GetIsolate());
+  HandleScope handle_scope(args.GetIsolate());
+
+  if (!args[0]->IsObject() || !args[1]->IsFunction())
+    abort();
+
+  // Values use to cross communicate with processNextTick.
+  Local<Object> tick_info_obj = args[0].As<Object>();
+  tick_info_obj->SetIndexedPropertiesToExternalArrayData(
+      env->tick_info()->fields(),
+      kExternalUnsignedIntArray,
+      env->tick_info()->fields_count());
+
+  env->set_tick_callback_function(args[1].As<Function>());
+
+  // Do a little housekeeping.
+  env->process_object()->Delete(
+      FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupNextTick"));
 }
 
 
 Handle<Value> MakeDomainCallback(Environment* env,
-                                 const Handle<Object> object,
+                                 Handle<Object> object,
                                  const Handle<Function> callback,
                                  int argc,
                                  Handle<Value> argv[]) {
   // If you hit this assertion, you forgot to enter the v8::Context first.
   assert(env->context() == env->isolate()->GetCurrentContext());
 
-  // TODO(trevnorris) Hook for long stack traces to be made here.
-
+  Local<Object> process = env->process_object();
   Local<Value> domain_v = object->Get(env->domain_string());
   Local<Object> domain;
 
   TryCatch try_catch;
   try_catch.SetVerbose(true);
 
+  // TODO(trevnorris): This is sucky for performance. Fix it.
+  bool has_async_queue = object->Has(env->async_queue_string());
+  if (has_async_queue) {
+    Local<Value> argv[] = { object };
+    env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
+
+    if (try_catch.HasCaught())
+      return Undefined(node_isolate);
+  }
+
   bool has_domain = domain_v->IsObject();
   if (has_domain) {
     domain = domain_v.As<Object>();
@@ -926,6 +993,14 @@ Handle<Value> MakeDomainCallback(Environment* env,
     return Undefined(node_isolate);
   }
 
+  if (has_async_queue) {
+    Local<Value> val = object.As<Value>();
+    env->async_listener_unload_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return Undefined(node_isolate);
+  }
+
   if (has_domain) {
     Local<Function> exit =
         domain->Get(env->exit_string()).As<Function>();
@@ -955,10 +1030,7 @@ Handle<Value> MakeDomainCallback(Environment* env,
 
   tick_info->set_in_tick(true);
 
-  // process nextTicks after call
-  Local<Object> process_object = env->process_object();
-  Local<Function> tick_callback_function = env->tick_callback_function();
-  tick_callback_function->Call(process_object, 0, NULL);
+  env->tick_callback_function()->Call(process, 0, NULL);
 
   tick_info->set_in_tick(false);
 
@@ -972,31 +1044,48 @@ Handle<Value> MakeDomainCallback(Environment* env,
 
 
 Handle<Value> MakeCallback(Environment* env,
-                           const Handle<Object> object,
+                           Handle<Object> object,
                            const Handle<Function> callback,
                            int argc,
                            Handle<Value> argv[]) {
+  if (env->using_domains())
+    return MakeDomainCallback(env, object, callback, argc, argv);
+
   // If you hit this assertion, you forgot to enter the v8::Context first.
   assert(env->context() == env->isolate()->GetCurrentContext());
 
-  // TODO(trevnorris) Hook for long stack traces to be made here.
-  Local<Object> process_object = env->process_object();
-
-  if (env->using_domains())
-    return MakeDomainCallback(env, object, callback, argc, argv);
+  Local<Object> process = env->process_object();
 
   TryCatch try_catch;
   try_catch.SetVerbose(true);
 
+  // TODO(trevnorris): This is sucky for performance. Fix it.
+  bool has_async_queue = object->Has(env->async_queue_string());
+  if (has_async_queue) {
+    Local<Value> argv[] = { object };
+    env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
+
+    if (try_catch.HasCaught())
+      return Undefined(node_isolate);
+  }
+
   Local<Value> ret = callback->Call(object, argc, argv);
 
   if (try_catch.HasCaught()) {
     return Undefined(node_isolate);
   }
 
+  if (has_async_queue) {
+    Local<Value> val = object.As<Value>();
+    env->async_listener_unload_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return Undefined(node_isolate);
+  }
+
   Environment::TickInfo* tick_info = env->tick_info();
 
-  if (tick_info->in_tick() == 1) {
+  if (tick_info->in_tick()) {
     return ret;
   }
 
@@ -1007,22 +1096,8 @@ Handle<Value> MakeCallback(Environment* env,
 
   tick_info->set_in_tick(true);
 
-  // lazy load no domain next tick callbacks
-  Local<Function> tick_callback_function = env->tick_callback_function();
-  if (tick_callback_function.IsEmpty()) {
-    Local<String> tick_callback_function_key =
-        FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback");
-    tick_callback_function =
-        process_object->Get(tick_callback_function_key).As<Function>();
-    if (!tick_callback_function->IsFunction()) {
-      fprintf(stderr, "process._tickCallback assigned to non-function\n");
-      abort();
-    }
-    env->set_tick_callback_function(tick_callback_function);
-  }
-
   // process nextTicks after call
-  tick_callback_function->Call(process_object, 0, NULL);
+  env->tick_callback_function()->Call(process, 0, NULL);
 
   tick_info->set_in_tick(false);
 
@@ -1041,16 +1116,9 @@ Handle<Value> MakeCallback(Environment* env,
                            uint32_t index,
                            int argc,
                            Handle<Value> argv[]) {
-  // If you hit this assertion, you forgot to enter the v8::Context first.
-  assert(env->context() == env->isolate()->GetCurrentContext());
-
   Local<Function> callback = object->Get(index).As<Function>();
   assert(callback->IsFunction());
 
-  if (env->using_domains()) {
-    return MakeDomainCallback(env, object, callback, argc, argv);
-  }
-
   return MakeCallback(env, object, callback, argc, argv);
 }
 
@@ -1060,16 +1128,8 @@ Handle<Value> MakeCallback(Environment* env,
                            const Handle<String> symbol,
                            int argc,
                            Handle<Value> argv[]) {
-  // If you hit this assertion, you forgot to enter the v8::Context first.
-  assert(env->context() == env->isolate()->GetCurrentContext());
-
   Local<Function> callback = object->Get(symbol).As<Function>();
   assert(callback->IsFunction());
-
-  if (env->using_domains()) {
-    return MakeDomainCallback(env, object, callback, argc, argv);
-  }
-
   return MakeCallback(env, object, callback, argc, argv);
 }
 
@@ -1079,8 +1139,6 @@ Handle<Value> MakeCallback(Environment* env,
                            const char* method,
                            int argc,
                            Handle<Value> argv[]) {
-  // If you hit this assertion, you forgot to enter the v8::Context first.
-  assert(env->context() == env->isolate()->GetCurrentContext());
   Local<String> method_string = OneByteString(node_isolate, method);
   return MakeCallback(env, object, method_string, argc, argv);
 }
@@ -2570,6 +2628,8 @@ void SetupProcessObject(Environment* env,
 
   NODE_SET_METHOD(process, "binding", Binding);
 
+  NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener);
+  NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick);
   NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse);
 
   // values use to cross communicate with processNextTick
index 8692a2c..d878c06 100644 (file)
@@ -26,6 +26,7 @@
 // of the startup process, so many dependencies are invoked lazily.
 (function(process) {
   this.global = this;
+  var _errorHandler;
 
   function startup() {
     var EventEmitter = NativeModule.require('events').EventEmitter;
@@ -46,6 +47,7 @@
     startup.globalTimeouts();
     startup.globalConsole();
 
+    startup.processAsyncListener();
     startup.processAssert();
     startup.processConfig();
     startup.processNextTick();
 
   startup.processFatal = function() {
     process._fatalException = function(er) {
-      var caught = false;
+      // First run through error handlers from asyncListener.
+      var caught = _errorHandler(er);
 
-      if (process.domain && process.domain._errorHandler) {
-        caught = process.domain._errorHandler(er);
-      } else {
+      if (process.domain && process.domain._errorHandler)
+        caught = process.domain._errorHandler(er) || caught;
+
+      if (!caught)
         caught = process.emit('uncaughtException', er);
-      }
 
-      // if someone handled it, then great.  otherwise, die in C++ land
+      // If someone handled it, then great.  otherwise, die in C++ land
       // since that means that we'll exit the process, emit the 'exit' event
       if (!caught) {
         try {
 
       // if we handled an error, then make sure any ticks get processed
       } else {
-        setImmediate(process._tickCallback);
+        var t = setImmediate(process._tickCallback);
+        // Complete hack to make sure any errors thrown from async
+        // listeners don't cause an infinite loop.
+        if (t._asyncQueue)
+          t._asyncQueue = [];
       }
 
       return caught;
     };
   };
 
+  startup.processAsyncListener = function() {
+    var asyncStack = [];
+    var asyncQueue = [];
+    var uid = 0;
+
+    // Stateful flags shared with Environment for quick JS/C++
+    // communication.
+    var asyncFlags = {};
+
+    // Prevent accidentally suppressed thrown errors from before/after.
+    var inAsyncTick = false;
+
+    // To prevent infinite recursion when an error handler also throws
+    // flag when an error is currenly being handled.
+    var inErrorTick = false;
+
+    // Needs to be the same as src/env.h
+    var kCount = 0;
+
+    // _errorHandler is scoped so it's also accessible by _fatalException.
+    _errorHandler = errorHandler;
+
+    // Needs to be accessible from lib/timers.js so they know when async
+    // listeners are currently in queue. They'll be cleaned up once
+    // references there are made.
+    process._asyncFlags = asyncFlags;
+    process._runAsyncQueue = runAsyncQueue;
+    process._loadAsyncQueue = loadAsyncQueue;
+    process._unloadAsyncQueue = unloadAsyncQueue;
+
+    // Public API.
+    process.createAsyncListener = createAsyncListener;
+    process.addAsyncListener = addAsyncListener;
+    process.removeAsyncListener = removeAsyncListener;
+
+    // Setup shared objects/callbacks with native layer.
+    process._setupAsyncListener(asyncFlags,
+                                runAsyncQueue,
+                                loadAsyncQueue,
+                                unloadAsyncQueue,
+                                pushListener,
+                                stripListener);
+
+    function popQueue() {
+      if (asyncStack.length > 0)
+        asyncQueue = asyncStack.pop();
+      else
+        asyncQueue = [];
+    }
+
+    // Run all the async listeners attached when an asynchronous event is
+    // instantiated.
+    function runAsyncQueue(context) {
+      var queue = [];
+      var queueItem, item, i, value;
+
+      inAsyncTick = true;
+      for (i = 0; i < asyncQueue.length; i++) {
+        queueItem = asyncQueue[i];
+        // Not passing "this" context because it hasn't actually been
+        // instantiated yet, so accessing some of the object properties
+        // can cause a segfault.
+        // Passing the original value will allow users to manipulate the
+        // original value object, while also allowing them to return a
+        // new value for current async call tracking.
+        value = queueItem.listener(queueItem.value);
+        if (typeof value !== 'undefined') {
+          item = {
+            callbacks: queueItem.callbacks,
+            value: value,
+            listener: queueItem.listener,
+            uid: queueItem.uid
+          };
+        } else {
+          item = queueItem;
+        }
+        queue[i] = item;
+      }
+      inAsyncTick = false;
+
+      context._asyncQueue = queue;
+    }
+
+    // Uses the _asyncQueue object attached by runAsyncQueue.
+    function loadAsyncQueue(context) {
+      var queue = context._asyncQueue;
+      var item, before, i;
+
+      asyncStack.push(asyncQueue);
+      asyncQueue = queue;
+      // Since the async listener callback is required, the number of
+      // objects in the asyncQueue implies the number of async listeners
+      // there are to be processed.
+      asyncFlags[kCount] = queue.length;
+
+      // Run "before" callbacks.
+      inAsyncTick = true;
+      for (i = 0; i < queue.length; i++) {
+        item = queue[i];
+        if (!item.callbacks)
+          continue;
+        before = item.callbacks.before;
+        if (typeof before === 'function')
+          before(context, item.value);
+      }
+      inAsyncTick = false;
+    }
+
+    // Unload one level of the async stack. Returns true if there are
+    // still listeners somewhere in the stack.
+    function unloadAsyncQueue(context) {
+      var item, after, i;
+
+      // Run "after" callbacks.
+      inAsyncTick = true;
+      for (i = 0; i < asyncQueue.length; i++) {
+        item = asyncQueue[i];
+        if (!item.callbacks)
+          continue;
+        after = item.callbacks.after;
+        if (typeof after === 'function')
+          after(context, item.value);
+      }
+      inAsyncTick = false;
+
+      // Unload the current queue from the stack.
+      popQueue();
+
+      asyncFlags[kCount] = asyncQueue.length;
+
+      return asyncQueue.length > 0 || asyncStack.length > 0;
+    }
+
+    // Create new async listener object. Useful when instantiating a new
+    // object and want the listener instance, but not add it to the stack.
+    function createAsyncListener(listener, callbacks, value) {
+      return {
+        callbacks: callbacks,
+        value: value,
+        listener: listener,
+        uid: uid++
+      };
+    }
+
+    // Add a listener to the current queue.
+    function addAsyncListener(listener, callbacks, value) {
+      // Accept new listeners or previous created listeners.
+      if (typeof listener === 'function')
+        callbacks = createAsyncListener(listener, callbacks, value);
+      else
+        callbacks = listener;
+
+      var inQueue = false;
+      // The asyncQueue will be small. Probably always <= 3 items.
+      for (var i = 0; i < asyncQueue.length; i++) {
+        if (callbacks.uid === asyncQueue[i].uid) {
+          inQueue = true;
+          break;
+        }
+      }
+
+      // Make sure the callback doesn't already exist in the queue.
+      if (!inQueue)
+        asyncQueue.push(callbacks);
+
+      asyncFlags[kCount] = asyncQueue.length;
+      return callbacks;
+    }
+
+    // Remove listener from the current queue and the entire stack.
+    function removeAsyncListener(obj) {
+      var i, j;
+
+      for (i = 0; i < asyncQueue.length; i++) {
+        if (obj.uid === asyncQueue[i].uid) {
+          asyncQueue.splice(i, 1);
+          break;
+        }
+      }
+
+      for (i = 0; i < asyncStack.length; i++) {
+        for (j = 0; j < asyncStack[i].length; j++) {
+          if (obj.uid === asyncStack[i][j].uid) {
+            asyncStack[i].splice(j, 1);
+            break;
+          }
+        }
+      }
+
+      asyncFlags[kCount] = asyncQueue.length;
+    }
+
+    // Error handler used by _fatalException to run through all error
+    // callbacks in the current asyncQueue.
+    function errorHandler(er) {
+      var handled = false;
+      var error, item, i;
+
+      if (inErrorTick)
+        return false;
+
+      inErrorTick = true;
+      for (i = 0; i < asyncQueue.length; i++) {
+        item = asyncQueue[i];
+        if (!item.callbacks)
+          continue;
+        error = item.callbacks.error;
+        if (typeof error === 'function') {
+          try {
+            var threw = true;
+            handled = error(item.value, er) || handled;
+            threw = false;
+          } finally {
+            // If the error callback throws then we're going to die
+            // quickly with no chance of recovery. Only thing we're going
+            // to allow is execution of process exit event callbacks.
+            if (threw) {
+              process._exiting = true;
+              process.emit('exit', 1);
+            }
+          }
+        }
+      }
+      inErrorTick = false;
+
+      // Unload the current queue from the stack.
+      popQueue();
+
+      return handled && !inAsyncTick;
+    }
+
+    // Used by AsyncWrap::AddAsyncListener() to add an individual listener
+    // to the async queue. It will check the uid of the listener and only
+    // allow it to be added once.
+    function pushListener(obj) {
+      if (!this._asyncQueue)
+        this._asyncQueue = [];
+
+      var queue = this._asyncQueue;
+      var inQueue = false;
+      // The asyncQueue will be small. Probably always <= 3 items.
+      for (var i = 0; i < queue.length; i++) {
+        if (obj.uid === queue.uid) {
+          inQueue = true;
+          break;
+        }
+      }
+
+      if (!inQueue)
+        queue.push(obj);
+    }
+
+    // Used by AsyncWrap::RemoveAsyncListener() to remove an individual
+    // listener from the async queue, and return whether there are still
+    // listeners in the queue.
+    function stripListener(obj) {
+      if (!this._asyncQueue || this._asyncQueue.length === 0)
+        return false;
+
+      // The asyncQueue will be small. Probably always <= 3 items.
+      for (var i = 0; i < this._asyncQueue.length; i++) {
+        if (obj.uid === this._asyncQueue[i].uid) {
+          this._asyncQueue.splice(i, 1);
+          break;
+        }
+      }
+
+      return this._asyncQueue.length > 0;
+    }
+  };
+
   var assert;
   startup.processAssert = function() {
     assert = process.assert = function(x, msg) {
 
   startup.processNextTick = function() {
     var nextTickQueue = [];
+    var asyncFlags = process._asyncFlags;
+    var _runAsyncQueue = process._runAsyncQueue;
+    var _loadAsyncQueue = process._loadAsyncQueue;
+    var _unloadAsyncQueue = process._unloadAsyncQueue;
 
     // This tickInfo thing is used so that the C++ code in src/node.cc
     // can have easy accesss to our nextTick state, and avoid unnecessary
-    var tickInfo = process._tickInfo;
+    var tickInfo = {};
 
     // *Must* match Environment::TickInfo::Fields in src/env.h.
     var kIndex = 0;
     var kLength = 1;
 
+    // For asyncFlags.
+    // *Must* match Environment::AsyncListeners::Fields in src/env.h
+    var kCount = 0;
+
     process.nextTick = nextTick;
-    // needs to be accessible from cc land
+    // Needs to be accessible from beyond this scope.
     process._tickCallback = _tickCallback;
     process._tickDomainCallback = _tickDomainCallback;
 
+    process._setupNextTick(tickInfo, _tickCallback);
+
     function tickDone() {
       if (tickInfo[kLength] !== 0) {
         if (tickInfo[kLength] <= tickInfo[kIndex]) {
       tickInfo[kIndex] = 0;
     }
 
-    // run callbacks that have no domain
-    // using domains will cause this to be overridden
+    // Run callbacks that have no domain.
+    // Using domains will cause this to be overridden.
     function _tickCallback() {
-      var callback, threw;
+      var callback, hasQueue, threw, tock;
 
       while (tickInfo[kIndex] < tickInfo[kLength]) {
-        callback = nextTickQueue[tickInfo[kIndex]++].callback;
+        tock = nextTickQueue[tickInfo[kIndex]++];
+        callback = tock.callback;
         threw = true;
+        hasQueue = !!tock._asyncQueue;
+        if (hasQueue)
+          _loadAsyncQueue(tock);
         try {
           callback();
           threw = false;
         } finally {
-          if (threw) tickDone();
+          if (threw)
+            tickDone();
         }
+        if (hasQueue)
+          _unloadAsyncQueue(tock);
       }
 
       tickDone();
     }
 
     function _tickDomainCallback() {
-      var tock, callback, threw, domain;
+      var callback, domain, hasQueue, threw, tock;
 
       while (tickInfo[kIndex] < tickInfo[kLength]) {
         tock = nextTickQueue[tickInfo[kIndex]++];
         callback = tock.callback;
         domain = tock.domain;
-        if (domain) {
-          if (domain._disposed) continue;
+        hasQueue = !!tock._asyncQueue;
+        if (hasQueue)
+          _loadAsyncQueue(tock);
+        if (domain)
           domain.enter();
-        }
         threw = true;
         try {
           callback();
           threw = false;
         } finally {
-          if (threw) tickDone();
+          if (threw)
+            tickDone();
         }
+        if (hasQueue)
+          _unloadAsyncQueue(tock);
         if (domain)
           domain.exit();
       }
       if (process._exiting)
         return;
 
-      nextTickQueue.push({
+      var obj = {
         callback: callback,
-        domain: process.domain || null
-      });
+        domain: process.domain || null,
+        _asyncQueue: undefined
+      };
+
+      if (asyncFlags[kCount] > 0)
+        _runAsyncQueue(obj);
+
+      nextTickQueue.push(obj);
       tickInfo[kLength]++;
     }
   };
index cb37124..5650e4c 100644 (file)
@@ -67,7 +67,7 @@ using v8::Value;
 class FSReqWrap: public ReqWrap<uv_fs_t> {
  public:
   FSReqWrap(Environment* env, const char* syscall, char* data = NULL)
-    : ReqWrap<uv_fs_t>(env),
+    : ReqWrap<uv_fs_t>(env, Object::New()),
       syscall_(syscall),
       data_(data) {
   }
@@ -214,7 +214,7 @@ static void After(uv_fs_t *req) {
     }
   }
 
-  MakeCallback(env, req_wrap->object(), env->oncomplete_string(), argc, argv);
+  req_wrap->MakeCallback(env->oncomplete_string(), argc, argv);
 
   uv_fs_req_cleanup(&req_wrap->req_);
   delete req_wrap;
index a484779..84c523b 100644 (file)
@@ -49,7 +49,7 @@ inline v8::Local<TypeName> PersistentToLocal(
 
 // Call with valid HandleScope and while inside Context scope.
 v8::Handle<v8::Value> MakeCallback(Environment* env,
-                                   const v8::Handle<v8::Object> object,
+                                   v8::Handle<v8::Object> object,
                                    const char* method,
                                    int argc = 0,
                                    v8::Handle<v8::Value>* argv = NULL);
index 08ab719..a4e4ed1 100644 (file)
@@ -192,11 +192,7 @@ void PipeWrap::OnConnection(uv_stream_t* handle, int status) {
   };
 
   if (status != 0) {
-    MakeCallback(env,
-                 pipe_wrap->object(),
-                 env->onconnection_string(),
-                 ARRAY_SIZE(argv),
-                 argv);
+    pipe_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv);
     return;
   }
 
@@ -212,11 +208,7 @@ void PipeWrap::OnConnection(uv_stream_t* handle, int status) {
 
   // Successful accept. Call the onconnection callback in JavaScript land.
   argv[1] = client_obj;
-  MakeCallback(env,
-               pipe_wrap->object(),
-               env->onconnection_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  pipe_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv);
 }
 
 // TODO(bnoordhuis) Maybe share this with TCPWrap?
@@ -251,11 +243,7 @@ void PipeWrap::AfterConnect(uv_connect_t* req, int status) {
     Boolean::New(writable)
   };
 
-  MakeCallback(env,
-               req_wrap_obj,
-               env->oncomplete_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
 
   delete req_wrap;
 }
index ba09745..7b8125c 100644 (file)
@@ -284,11 +284,7 @@ class ProcessWrap : public HandleWrap {
       OneByteString(node_isolate, signo_string(term_signal))
     };
 
-    MakeCallback(env,
-                 wrap->object(),
-                 env->onexit_string(),
-                 ARRAY_SIZE(argv),
-                 argv);
+    wrap->MakeCallback(env->onexit_string(), ARRAY_SIZE(argv), argv);
   }
 
   uv_process_t process_;
index 8a701e5..da3abd8 100644 (file)
@@ -22,6 +22,8 @@
 #ifndef SRC_REQ_WRAP_H_
 #define SRC_REQ_WRAP_H_
 
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
 #include "env.h"
 #include "env-inl.h"
 #include "queue.h"
@@ -33,21 +35,14 @@ namespace node {
 extern QUEUE req_wrap_queue;
 
 template <typename T>
-class ReqWrap {
+class ReqWrap : public AsyncWrap {
  public:
-  ReqWrap(Environment* env,
-          v8::Handle<v8::Object> object = v8::Handle<v8::Object>())
-      : env_(env) {
-    v8::HandleScope handle_scope(env->isolate());
+  ReqWrap(Environment* env, v8::Handle<v8::Object> object)
+      : AsyncWrap(env, object) {
+    assert(!object.IsEmpty());
 
-    if (object.IsEmpty()) {
-      object = v8::Object::New();
-    }
-    persistent().Reset(env->isolate(), object);
-
-    if (env->in_domain()) {
+    if (env->in_domain())
       object->Set(env->domain_string(), env->domain_array()->Get(0));
-    }
 
     QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_);
   }
@@ -66,25 +61,9 @@ class ReqWrap {
     req_.data = this;
   }
 
-  inline Environment* env() const {
-    return env_;
-  }
-
-  inline v8::Local<v8::Object> object() {
-    return PersistentToLocal(env()->isolate(), persistent());
-  }
-
-  inline v8::Persistent<v8::Object>& persistent() {
-    return object_;
-  }
-
   // TODO(bnoordhuis) Make these private.
   QUEUE req_wrap_queue_;
   T req_;  // *must* be last, GetActiveRequests() in node.cc depends on it
-
- private:
-  v8::Persistent<v8::Object> object_;
-  Environment* const env_;
 };
 
 
index 3a1e5bf..b0f1619 100644 (file)
@@ -19,6 +19,8 @@
 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
 #include "env.h"
 #include "env-inl.h"
 #include "handle_wrap.h"
@@ -100,8 +102,9 @@ class SignalWrap : public HandleWrap {
     Environment* env = wrap->env();
     Context::Scope context_scope(env->context());
     HandleScope handle_scope(env->isolate());
+
     Local<Value> arg = Integer::New(signum, env->isolate());
-    MakeCallback(env, wrap->object(), env->onsignal_string(), 1, &arg);
+    wrap->MakeCallback(env->onsignal_string(), 1, &arg);
   }
 
   uv_signal_t handle_;
index f98fcde..76e667b 100644 (file)
@@ -451,11 +451,7 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) {
     req_wrap_obj
   };
 
-  MakeCallback(env,
-               req_wrap_obj,
-               env->oncomplete_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
 
   req_wrap->~WriteWrap();
   delete[] reinterpret_cast<char*>(req_wrap);
@@ -499,11 +495,7 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
     req_wrap_obj
   };
 
-  MakeCallback(env,
-               req_wrap_obj,
-               env->oncomplete_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
 
   delete req_wrap;
 }
@@ -574,7 +566,7 @@ void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
   if (nread < 0)  {
     if (buf->base != NULL)
       free(buf->base);
-    MakeCallback(env, Self(), env->onread_string(), ARRAY_SIZE(argv), argv);
+    wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
     return;
   }
 
@@ -603,11 +595,7 @@ void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
     argv[2] = pending_obj;
   }
 
-  MakeCallback(env,
-               wrap()->object(),
-               env->onread_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
 }
 
 
index 840dd92..73058db 100644 (file)
@@ -321,11 +321,7 @@ void TCPWrap::OnConnection(uv_stream_t* handle, int status) {
     argv[1] = client_obj;
   }
 
-  MakeCallback(env,
-               tcp_wrap->object(),
-               env->onconnection_string(),
-               ARRAY_SIZE(argv),
-               argv);
+  tcp_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv);
 }
 
 
@@ -350,11 +346,8 @@ void TCPWrap::AfterConnect(uv_connect_t* req, int status) {
     v8::True(node_isolate),
     v8::True(node_isolate)
   };
-  MakeCallback(env,
-               req_wrap_obj,
-               env->oncomplete_string(),
-               ARRAY_SIZE(argv),
-               argv);
+
+  req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
 
   delete req_wrap;
 }
index d6e32bd..393def3 100644 (file)
@@ -19,6 +19,8 @@
 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
 #include "env.h"
 #include "env-inl.h"
 #include "handle_wrap.h"
@@ -138,7 +140,7 @@ class TimerWrap : public HandleWrap {
     Context::Scope context_scope(env->context());
     HandleScope handle_scope(env->isolate());
     Local<Value> argv[1] = { Integer::New(status, node_isolate) };
-    MakeCallback(env, wrap->object(), kOnTimeout, ARRAY_SIZE(argv), argv);
+    wrap->MakeCallback(kOnTimeout, ARRAY_SIZE(argv), argv);
   }
 
   static void Now(const FunctionCallbackInfo<Value>& args) {
index 3b50449..2af278d 100644 (file)
@@ -363,9 +363,8 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
     Environment* env = req_wrap->env();
     Context::Scope context_scope(env->context());
     HandleScope handle_scope(env->isolate());
-    Local<Object> req_wrap_obj = req_wrap->object();
     Local<Value> arg = Integer::New(status, node_isolate);
-    MakeCallback(env, req_wrap_obj, env->oncomplete_string(), 1, &arg);
+    req_wrap->MakeCallback(env->oncomplete_string(), 1, &arg);
   }
   delete req_wrap;
 }
@@ -405,25 +404,21 @@ void UDPWrap::OnRecv(uv_udp_t* handle,
   Local<Value> argv[] = {
     Integer::New(nread, node_isolate),
     wrap_obj,
-    Undefined(),
-    Undefined()
+    Undefined(env->isolate()),
+    Undefined(env->isolate())
   };
 
   if (nread < 0) {
     if (buf->base != NULL)
       free(buf->base);
-    MakeCallback(env,
-                 wrap_obj,
-                 env->onmessage_string(),
-                 ARRAY_SIZE(argv),
-                 argv);
+    wrap->MakeCallback(env->onmessage_string(), ARRAY_SIZE(argv), argv);
     return;
   }
 
   char* base = static_cast<char*>(realloc(buf->base, nread));
   argv[2] = Buffer::Use(env, base, nread);
   argv[3] = AddressToJS(env, addr);
-  MakeCallback(env, wrap_obj, env->onmessage_string(), ARRAY_SIZE(argv), argv);
+  wrap->MakeCallback(env->onmessage_string(), ARRAY_SIZE(argv), argv);
 }
 
 
diff --git a/test/simple/test-asynclistener-error.js b/test/simple/test-asynclistener-error.js
new file mode 100644 (file)
index 0000000..c1525e4
--- /dev/null
@@ -0,0 +1,258 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var dns = require('dns');
+var fs = require('fs');
+var net = require('net');
+
+var addListener = process.addAsyncListener;
+var removeListener = process.removeAsyncListener;
+var errorMsgs = [];
+var currentMsg = '';
+var caught = 0;
+var expectCaught = 0;
+var exitCbRan = false;
+
+function asyncL() { }
+
+var callbacksObj = {
+  error: function(value, er) {
+    var idx = errorMsgs.indexOf(er.message);
+
+    caught++;
+
+    if (-1 < idx)
+      errorMsgs.splice(idx, 1);
+
+    return currentMsg === er.message;
+  }
+};
+
+var listener = process.createAsyncListener(asyncL, callbacksObj);
+
+process.on('exit', function(code) {
+  removeListener(listener);
+
+  // Something else went wrong, no need to further check.
+  if (code > 0)
+    return;
+
+  // Make sure the exit callback only runs once.
+  assert.ok(!exitCbRan);
+  exitCbRan = true;
+
+  // Check if any error messages weren't removed from the msg queue.
+  if (errorMsgs.length > 0)
+    throw new Error('Errors not fired: ' + errorMsgs);
+
+  assert.equal(caught, expectCaught, 'caught all expected errors');
+  process._rawDebug('ok');
+});
+
+
+// Catch synchronous throws
+errorMsgs.push('sync throw');
+process.nextTick(function() {
+  addListener(listener);
+
+  expectCaught++;
+  currentMsg = 'sync throw';
+  throw new Error(currentMsg);
+
+  removeListener(listener);
+});
+
+
+// Simple cases
+errorMsgs.push('setTimeout - simple');
+errorMsgs.push('setImmediate - simple');
+errorMsgs.push('setInterval - simple');
+errorMsgs.push('process.nextTick - simple');
+process.nextTick(function() {
+  addListener(listener);
+
+  setTimeout(function() {
+    currentMsg = 'setTimeout - simple';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  setImmediate(function() {
+    currentMsg = 'setImmediate - simple';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  var b = setInterval(function() {
+    clearInterval(b);
+    currentMsg = 'setInterval - simple';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  process.nextTick(function() {
+    currentMsg = 'process.nextTick - simple';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  removeListener(listener);
+});
+
+
+// Deeply nested
+errorMsgs.push('setInterval - nested');
+errorMsgs.push('setImmediate - nested');
+errorMsgs.push('process.nextTick - nested');
+errorMsgs.push('setTimeout2 - nested');
+errorMsgs.push('setTimeout - nested');
+process.nextTick(function() {
+  addListener(listener);
+
+  setTimeout(function() {
+    process.nextTick(function() {
+      setImmediate(function() {
+        var b = setInterval(function() {
+          clearInterval(b);
+          currentMsg = 'setInterval - nested';
+          throw new Error(currentMsg);
+        });
+        expectCaught++;
+        currentMsg = 'setImmediate - nested';
+        throw new Error(currentMsg);
+      });
+      expectCaught++;
+      currentMsg = 'process.nextTick - nested';
+      throw new Error(currentMsg);
+    });
+    expectCaught++;
+    setTimeout(function() {
+      currentMsg = 'setTimeout2 - nested';
+      throw new Error(currentMsg);
+    });
+    expectCaught++;
+    currentMsg = 'setTimeout - nested';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  removeListener(listener);
+});
+
+
+// FS
+errorMsgs.push('fs - file does not exist');
+errorMsgs.push('fs - exists');
+errorMsgs.push('fs - realpath');
+process.nextTick(function() {
+  addListener(listener);
+
+  fs.stat('does not exist', function(err, stats) {
+    currentMsg = 'fs - file does not exist';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  fs.exists('hi all', function(exists) {
+    currentMsg = 'fs - exists';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  fs.realpath('/some/path', function(err, resolved) {
+    currentMsg = 'fs - realpath';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  removeListener(listener);
+});
+
+
+// Nested FS
+errorMsgs.push('fs - nested file does not exist');
+process.nextTick(function() {
+  addListener(listener);
+
+  setTimeout(function() {
+    setImmediate(function() {
+      var b = setInterval(function() {
+        clearInterval(b);
+        process.nextTick(function() {
+          fs.stat('does not exist', function(err, stats) {
+            currentMsg = 'fs - nested file does not exist';
+            throw new Error(currentMsg);
+          });
+          expectCaught++;
+        });
+      });
+    });
+  });
+
+  removeListener(listener);
+});
+
+
+// Net
+errorMsgs.push('net - connection listener');
+errorMsgs.push('net - client connect');
+errorMsgs.push('net - server listening');
+process.nextTick(function() {
+  addListener(listener);
+
+  var server = net.createServer(function(c) {
+    server.close();
+    currentMsg = 'net - connection listener';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  server.listen(common.PORT, function() {
+    var client = net.connect(common.PORT, function() {
+      client.end();
+      currentMsg = 'net - client connect';
+      throw new Error(currentMsg);
+    });
+    expectCaught++;
+    currentMsg = 'net - server listening';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  removeListener(listener);
+});
+
+
+// DNS
+errorMsgs.push('dns - lookup');
+process.nextTick(function() {
+  addListener(listener);
+
+  dns.lookup('localhost', function() {
+    currentMsg = 'dns - lookup';
+    throw new Error(currentMsg);
+  });
+  expectCaught++;
+
+  removeListener(listener);
+});
diff --git a/test/simple/test-asynclistener-multi-timeout.js b/test/simple/test-asynclistener-multi-timeout.js
new file mode 100644 (file)
index 0000000..30ec6dd
--- /dev/null
@@ -0,0 +1,71 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+var addListener = process.addAsyncListener;
+var removeListener = process.removeAsyncListener;
+var caught = [];
+var expect = [];
+
+function asyncL(a) {}
+
+var callbacksObj = {
+  error: function(value, er) {
+    process._rawDebug('caught', er.message);
+    caught.push(er.message);
+    return (expect.indexOf(er.message) !== -1);
+  }
+};
+
+var listener = process.createAsyncListener(asyncL, callbacksObj);
+
+process.on('exit', function(code) {
+  removeListener(listener);
+
+  if (code > 0)
+    return;
+
+  expect = expect.sort();
+  caught = caught.sort();
+
+  process._rawDebug('expect', expect);
+  process._rawDebug('caught', caught);
+  assert.deepEqual(caught, expect, 'caught all expected errors');
+  process._rawDebug('ok');
+});
+
+
+expect.push('immediate simple a');
+expect.push('immediate simple b');
+process.nextTick(function() {
+  addListener(listener);
+  // Tests for a setImmediate specific bug encountered while implementing
+  // AsyncListeners.
+  setImmediate(function() {
+    throw new Error('immediate simple a');
+  });
+  setImmediate(function() {
+    throw new Error('immediate simple b');
+  });
+  removeListener(listener);
+});
diff --git a/test/simple/test-asynclistener-throw-before-infinite-recursion.js b/test/simple/test-asynclistener-throw-before-infinite-recursion.js
new file mode 100644 (file)
index 0000000..fcf0fd6
--- /dev/null
@@ -0,0 +1,47 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+// If there is an uncaughtException listener then the error thrown from
+// "before" will be considered handled, thus calling setImmediate to
+// finish execution of the nextTickQueue. This in turn will cause "before"
+// to fire again, entering into an infinite loop.
+// So the asyncQueue is cleared from the returned setImmediate in
+// _fatalException to prevent this from happening.
+var cntr = 0;
+
+
+process.addAsyncListener(function() { }, {
+  before: function() {
+    if (++cntr > 1) {
+      // Can't throw since uncaughtException will also catch that.
+      process._rawDebug('Error: Multiple before callbacks called');
+      process.exit(1);
+    }
+    throw new Error('before');
+  }
+});
+
+process.on('uncaughtException', function() { });
+
+process.nextTick();
diff --git a/test/simple/test-asynclistener.js b/test/simple/test-asynclistener.js
new file mode 100644 (file)
index 0000000..c5110de
--- /dev/null
@@ -0,0 +1,185 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var net = require('net');
+var fs = require('fs');
+var dgram = require('dgram');
+
+var addListener = process.addAsyncListener;
+var removeListener = process.removeAsyncListener;
+var actualAsync = 0;
+var expectAsync = 0;
+
+function onAsync() {
+  actualAsync++;
+}
+
+var listener = process.createAsyncListener(onAsync);
+
+process.on('exit', function() {
+  process._rawDebug('expected', expectAsync);
+  process._rawDebug('actual  ', actualAsync);
+  // TODO(trevnorris): Not a great test. If one was missed, but others
+  // overflowed then the test would still pass.
+  assert.ok(actualAsync >= expectAsync);
+});
+
+
+// Test listeners side-by-side
+process.nextTick(function() {
+  addListener(listener);
+
+  var b = setInterval(function() {
+    clearInterval(b);
+  });
+  expectAsync++;
+
+  var c = setInterval(function() {
+    clearInterval(c);
+  });
+  expectAsync++;
+
+  setTimeout(function() { });
+  expectAsync++;
+
+  setTimeout(function() { });
+  expectAsync++;
+
+  process.nextTick(function() { });
+  expectAsync++;
+
+  process.nextTick(function() { });
+  expectAsync++;
+
+  setImmediate(function() { });
+  expectAsync++;
+
+  setImmediate(function() { });
+  expectAsync++;
+
+  setTimeout(function() { }, 10);
+  expectAsync++;
+
+  setTimeout(function() { }, 10);
+  expectAsync++;
+
+  removeListener(listener);
+});
+
+
+// Async listeners should propagate with nested callbacks
+process.nextTick(function() {
+  addListener(listener);
+  var interval = 3;
+
+  process.nextTick(function() {
+    setTimeout(function() {
+      setImmediate(function() {
+        var i = setInterval(function() {
+          if (--interval <= 0)
+            clearInterval(i);
+        });
+        expectAsync++;
+      });
+      expectAsync++;
+      process.nextTick(function() {
+        setImmediate(function() {
+          setTimeout(function() { }, 20);
+          expectAsync++;
+        });
+        expectAsync++;
+      });
+      expectAsync++;
+    });
+    expectAsync++;
+  });
+  expectAsync++;
+
+  removeListener(listener);
+});
+
+
+// Test triggers with two async listeners
+process.nextTick(function() {
+  addListener(listener);
+  addListener(listener);
+
+  setTimeout(function() {
+    process.nextTick(function() { });
+    expectAsync += 2;
+  });
+  expectAsync += 2;
+
+  removeListener(listener);
+  removeListener(listener);
+});
+
+
+// Test callbacks from fs I/O
+process.nextTick(function() {
+  addListener(listener);
+
+  fs.stat('something random', function(err, stat) { });
+  expectAsync++;
+
+  setImmediate(function() {
+    fs.stat('random again', function(err, stat) { });
+    expectAsync++;
+  });
+  expectAsync++;
+
+  removeListener(listener);
+});
+
+
+// Test net I/O
+process.nextTick(function() {
+  addListener(listener);
+
+  var server = net.createServer(function(c) { });
+  expectAsync++;
+
+  server.listen(common.PORT, function() {
+    server.close();
+    expectAsync++;
+  });
+  expectAsync++;
+
+  removeListener(listener);
+});
+
+
+// Test UDP
+process.nextTick(function() {
+  addListener(listener);
+
+  var server = dgram.createSocket('udp4');
+  expectAsync++;
+
+  server.bind(common.PORT);
+
+  server.close();
+  expectAsync++;
+
+  removeListener(listener);
+});
index 666f5d1..57e8ac4 100644 (file)
@@ -45,11 +45,10 @@ var server = http.createServer(function(req, res) {
     res.end(er.stack || er.message || 'Unknown error');
   });
 
-  var data;
   dom.run(function() {
     // Now, an action that has the potential to fail!
     // if you request 'baz', then it'll throw a JSON circular ref error.
-    data = JSON.stringify(objects[req.url.replace(/[^a-z]/g, '')]);
+    var data = JSON.stringify(objects[req.url.replace(/[^a-z]/g, '')]);
 
     // this line will throw if you pick an unknown key
     assert(data !== undefined, 'Data should not be undefined');
@@ -64,8 +63,6 @@ server.listen(common.PORT, next);
 function next() {
   console.log('listening on localhost:%d', common.PORT);
 
-  // now hit it a few times
-  var dom = domain.create();
   var requests = 0;
   var responses = 0;