src: revert domain using AsyncListeners
authorTrevor Norris <trev.norris@gmail.com>
Thu, 9 Jan 2014 19:11:40 +0000 (11:11 -0800)
committerTrevor Norris <trev.norris@gmail.com>
Thu, 9 Jan 2014 21:25:20 +0000 (13:25 -0800)
This is a slightly modified revert of bc39bdd.

Getting domains to use AsyncListeners became too much of a challenge
with many edge cases. While this is still a goal, it will have to be
deferred for now until more test coverage can be provided.

13 files changed:
lib/_http_client.js
lib/domain.js
lib/events.js
lib/net.js
lib/timers.js
src/async-wrap-inl.h
src/async-wrap.h
src/env-inl.h
src/env.h
src/node.cc
src/node.js
src/node_crypto.cc
src/req_wrap.h

index ef15ae1..69dd62e 100644 (file)
@@ -457,12 +457,8 @@ ClientRequest.prototype.onSocket = function(socket) {
   var req = this;
 
   process.nextTick(function() {
-    // If a domain was added to the request, attach it to the socket.
-    if (req.domain)
-      socket._handle.addAsyncListener(req.domain._listener);
     tickOnSocket(req, socket);
   });
-
 };
 
 ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) {
index 0fa4c58..30da685 100644 (file)
@@ -28,6 +28,26 @@ var inherits = util.inherits;
 // a few side effects.
 EventEmitter.usingDomains = true;
 
+// overwrite process.domain with a getter/setter that will allow for more
+// effective optimizations
+var _domain = [null];
+Object.defineProperty(process, 'domain', {
+  enumerable: true,
+  get: function() {
+    return _domain[0];
+  },
+  set: function(arg) {
+    return _domain[0] = arg;
+  }
+});
+
+// objects with external array data are excellent ways to communicate state
+// between js and c++ w/o much overhead
+var _domain_flag = {};
+
+// let the process know we're using domains
+process._setupDomainUse(_domain, _domain_flag);
+
 exports.Domain = Domain;
 
 exports.create = exports.createDomain = function() {
@@ -42,71 +62,66 @@ exports._stack = stack;
 exports.active = null;
 
 
-var listenerObj = {
-  error: function errorHandler(domain, er) {
-    var caught = false;
-    // ignore errors on disposed domains.
-    //
-    // XXX This is a bit stupid.  We should probably get rid of
-    // domain.dispose() altogether.  It's almost always a terrible
-    // idea.  --isaacs
-    if (domain._disposed)
-      return true;
-
-    er.domain = domain;
-    er.domainThrown = true;
-    // wrap this in a try/catch so we don't get infinite throwing
-    try {
-      // One of three things will happen here.
-      //
-      // 1. There is a handler, caught = true
-      // 2. There is no handler, caught = false
-      // 3. It throws, caught = false
-      //
-      // If caught is false after this, then there's no need to exit()
-      // the domain, because we're going to crash the process anyway.
-      caught = domain.emit('error', er);
-
-      if (stack.length === 0)
-        process.removeAsyncListener(domain._listener);
-
-      // Exit all domains on the stack.  Uncaught exceptions end the
-      // current tick and no domains should be left on the stack
-      // between ticks.
-      stack.length = 0;
-      exports.active = process.domain = null;
-    } catch (er2) {
-      // The domain error handler threw!  oh no!
-      // See if another domain can catch THIS error,
-      // or else crash on the original one.
-      // If the user already exited it, then don't double-exit.
-      if (domain === exports.active) {
-        stack.pop();
-      }
-      if (stack.length) {
-        exports.active = process.domain = stack[stack.length - 1];
-        caught = process._fatalException(er2);
-      } else {
-        caught = false;
-      }
-      return caught;
-    }
-    return caught;
-  }
-};
-
-
 inherits(Domain, EventEmitter);
 
 function Domain() {
   EventEmitter.call(this);
+
   this.members = [];
-  this._listener = process.createAsyncListener(listenerObj, this);
 }
 
 Domain.prototype.members = undefined;
 Domain.prototype._disposed = undefined;
-Domain.prototype._listener = undefined;
+
+
+// Called by process._fatalException in case an error was thrown.
+Domain.prototype._errorHandler = function errorHandler(er) {
+  var caught = false;
+  // ignore errors on disposed domains.
+  //
+  // XXX This is a bit stupid.  We should probably get rid of
+  // domain.dispose() altogether.  It's almost always a terrible
+  // idea.  --isaacs
+  if (this._disposed)
+    return true;
+
+  er.domain = this;
+  er.domainThrown = true;
+  // wrap this in a try/catch so we don't get infinite throwing
+  try {
+    // One of three things will happen here.
+    //
+    // 1. There is a handler, caught = true
+    // 2. There is no handler, caught = false
+    // 3. It throws, caught = false
+    //
+    // If caught is false after this, then there's no need to exit()
+    // the domain, because we're going to crash the process anyway.
+    caught = this.emit('error', er);
+
+    // Exit all domains on the stack.  Uncaught exceptions end the
+    // current tick and no domains should be left on the stack
+    // between ticks.
+    stack.length = 0;
+    exports.active = process.domain = null;
+  } catch (er2) {
+    // The domain error handler threw!  oh no!
+    // See if another domain can catch THIS error,
+    // or else crash on the original one.
+    // If the user already exited it, then don't double-exit.
+    if (this === exports.active) {
+      stack.pop();
+    }
+    if (stack.length) {
+      exports.active = process.domain = stack[stack.length - 1];
+      caught = process._fatalException(er2);
+    } else {
+      caught = false;
+    }
+    return caught;
+  }
+  return caught;
+};
 
 
 Domain.prototype.enter = function() {
@@ -116,22 +131,20 @@ Domain.prototype.enter = function() {
   // to push it onto the stack so that we can pop it later.
   exports.active = process.domain = this;
   stack.push(this);
-
-  process.addAsyncListener(this._listener);
+  _domain_flag[0] = stack.length;
 };
 
 
 Domain.prototype.exit = function() {
   if (this._disposed) return;
 
-  process.removeAsyncListener(this._listener);
-
   // exit all domains until this one.
   var index = stack.lastIndexOf(this);
   if (index !== -1)
     stack.splice(index + 1);
   else
     stack.length = 0;
+  _domain_flag[0] = stack.length;
 
   exports.active = stack[stack.length - 1];
   process.domain = exports.active;
@@ -165,13 +178,6 @@ Domain.prototype.add = function(ee) {
 
   ee.domain = this;
   this.members.push(ee);
-
-  // Adding the domain._listener to the Wrap associated with the event
-  // emitter instance will be done automatically either on class
-  // instantiation or manually, like in cases of net listen().
-  // The reason it cannot be done here is because in specific cases the
-  // _handle is not created on EE instantiation, so there's no place to
-  // add the listener.
 };
 
 
@@ -180,24 +186,6 @@ Domain.prototype.remove = function(ee) {
   var index = this.members.indexOf(ee);
   if (index !== -1)
     this.members.splice(index, 1);
-
-  // First check if the ee is a handle itself.
-  if (ee.removeAsyncListener)
-    ee.removeAsyncListener(this._listener);
-
-  // Manually remove the asyncListener from the handle, if possible.
-  if (ee._handle && ee._handle.removeAsyncListener)
-    ee._handle.removeAsyncListener(this._listener);
-
-  // TODO(trevnorris): Are there cases where the handle doesn't live on
-  // the ee or the _handle.
-
-  // TODO(trevnorris): For debugging that we've missed adding AsyncWrap's
-  // methods to a handle somewhere on the native side.
-  if (ee._handle && !ee._handle.removeAsyncListener) {
-    process._rawDebug('Wrap handle is missing AsyncWrap methods');
-    process.abort();
-  }
 };
 
 
index eff953f..f854bde 100644 (file)
@@ -91,6 +91,9 @@ EventEmitter.prototype.emit = function(type) {
   if (util.isUndefined(handler))
     return false;
 
+  if (this.domain && this !== process)
+    this.domain.enter();
+
   if (util.isFunction(handler)) {
     switch (arguments.length) {
       // fast cases
@@ -123,6 +126,9 @@ EventEmitter.prototype.emit = function(type) {
       listeners[i].apply(this, args);
   }
 
+  if (this.domain && this !== process)
+    this.domain.exit();
+
   return true;
 };
 
index afb2d0d..3804d62 100644 (file)
@@ -1089,20 +1089,9 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
   // generate connection key, this should be unique to the connection
   this._connectionKey = addressType + ':' + address + ':' + port;
 
-  // If a domain is attached to the event emitter then we need to add
-  // the listener to the handle.
-  if (this.domain) {
-    this._handle.addAsyncListener(this.domain._listener);
-    process.addAsyncListener(this.domain._listener);
-  }
-
   process.nextTick(function() {
     self.emit('listening');
   });
-
-  if (this.domain) {
-    process.removeAsyncListener(this.domain._listener);
-  }
 };
 
 
index 5612184..ff58430 100644 (file)
@@ -112,7 +112,8 @@ function listOnTimeout() {
       // other timers that expire on this tick should still run.
       //
       // https://github.com/joyent/node/issues/2631
-      if (first.domain && first.domain._disposed)
+      var domain = first.domain;
+      if (domain && domain._disposed)
         continue;
 
       hasQueue = !!first._asyncQueue;
@@ -120,8 +121,12 @@ function listOnTimeout() {
       try {
         if (hasQueue)
           loadAsyncQueue(first);
+        if (domain)
+          domain.enter();
         threw = true;
         first._onTimeout();
+        if (domain)
+          domain.exit();
         if (hasQueue)
           unloadAsyncQueue(first);
         threw = false;
@@ -368,7 +373,7 @@ L.init(immediateQueue);
 
 function processImmediate() {
   var queue = immediateQueue;
-  var hasQueue, immediate;
+  var domain, hasQueue, immediate;
 
   immediateQueue = {};
   L.init(immediateQueue);
@@ -376,9 +381,12 @@ function processImmediate() {
   while (L.isEmpty(queue) === false) {
     immediate = L.shift(queue);
     hasQueue = !!immediate._asyncQueue;
+    domain = immediate.domain;
 
     if (hasQueue)
       loadAsyncQueue(immediate);
+    if (domain)
+      domain.enter();
 
     var threw = true;
     try {
@@ -398,6 +406,8 @@ function processImmediate() {
       }
     }
 
+    if (domain)
+      domain.exit();
     if (hasQueue)
       unloadAsyncQueue(immediate);
   }
@@ -481,7 +491,7 @@ function unrefTimeout() {
 
   debug('unrefTimer fired');
 
-  var diff, first, hasQueue, threw;
+  var diff, domain, first, hasQueue, threw;
   while (first = L.peek(unrefList)) {
     diff = now - first._idleStart;
     hasQueue = !!first._asyncQueue;
@@ -496,16 +506,21 @@ function unrefTimeout() {
 
     L.remove(first);
 
+    domain = first.domain;
+
     if (!first._onTimeout) continue;
-    if (first.domain && first.domain._disposed) continue;
+    if (domain && domain._disposed) continue;
 
     try {
       if (hasQueue)
         loadAsyncQueue(first);
+      if (domain) domain.enter();
       threw = true;
       debug('unreftimer firing timeout');
       first._onTimeout();
       threw = false;
+      if (domain)
+        domain.exit();
       if (hasQueue)
         unloadAsyncQueue(first);
     } finally {
index 58c2d78..b32ead8 100644 (file)
@@ -88,10 +88,102 @@ inline bool AsyncWrap::has_async_queue() {
 }
 
 
+// I hate you domains.
+inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
+    const v8::Handle<v8::Function> cb,
+    int argc,
+    v8::Handle<v8::Value>* argv) {
+  assert(env()->context() == env()->isolate()->GetCurrentContext());
+
+  v8::Local<v8::Object> context = object();
+  v8::Local<v8::Object> process = env()->process_object();
+  v8::Local<v8::Value> domain_v = context->Get(env()->domain_string());
+  v8::Local<v8::Object> domain;
+
+  v8::TryCatch try_catch;
+  try_catch.SetVerbose(true);
+
+  if (has_async_queue()) {
+    v8::Local<v8::Value> val = context.As<v8::Value>();
+    env()->async_listener_load_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return v8::Undefined(env()->isolate());
+  }
+
+  bool has_domain = domain_v->IsObject();
+  if (has_domain) {
+    domain = domain_v.As<v8::Object>();
+
+    if (domain->Get(env()->disposed_string())->IsTrue())
+      return Undefined(env()->isolate());
+
+    v8::Local<v8::Function> enter =
+      domain->Get(env()->enter_string()).As<v8::Function>();
+    assert(enter->IsFunction());
+    enter->Call(domain, 0, NULL);
+
+    if (try_catch.HasCaught())
+      return Undefined(env()->isolate());
+  }
+
+  v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
+
+  if (try_catch.HasCaught()) {
+    return Undefined(env()->isolate());
+  }
+
+  if (has_domain) {
+    v8::Local<v8::Function> exit =
+      domain->Get(env()->exit_string()).As<v8::Function>();
+    assert(exit->IsFunction());
+    exit->Call(domain, 0, NULL);
+
+    if (try_catch.HasCaught())
+      return Undefined(env()->isolate());
+  }
+
+  if (has_async_queue()) {
+    v8::Local<v8::Value> val = context.As<v8::Value>();
+    env()->async_listener_unload_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return Undefined(env()->isolate());
+  }
+
+  Environment::TickInfo* tick_info = env()->tick_info();
+
+  if (tick_info->in_tick()) {
+    return ret;
+  }
+
+  if (tick_info->length() == 0) {
+    tick_info->set_index(0);
+    return ret;
+  }
+
+  tick_info->set_in_tick(true);
+
+  env()->tick_callback_function()->Call(process, 0, NULL);
+
+  tick_info->set_in_tick(false);
+
+  if (try_catch.HasCaught()) {
+    tick_info->set_last_threw(true);
+    return Undefined(env()->isolate());
+  }
+
+  return ret;
+}
+
+
 inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
     const v8::Handle<v8::Function> cb,
     int argc,
     v8::Handle<v8::Value>* argv) {
+  if (env()->using_domains())
+    return MakeDomainCallback(cb, argc, argv);
+
   assert(env()->context() == env()->isolate()->GetCurrentContext());
 
   v8::Local<v8::Object> context = object();
index b978ae3..25ee2ec 100644 (file)
@@ -62,6 +62,13 @@ class AsyncWrap : public BaseObject {
                                             v8::Handle<v8::Value>* argv);
 
  private:
+  // TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
+  // replacement is committed.
+  inline v8::Handle<v8::Value> MakeDomainCallback(
+      const v8::Handle<v8::Function> cb,
+      int argc,
+      v8::Handle<v8::Value>* argv);
+
   // Add an async listener to an existing handle.
   template <typename Type>
   static inline void AddAsyncListener(
index 4578f6e..c7baff4 100644 (file)
@@ -86,6 +86,22 @@ inline uint32_t Environment::AsyncListener::count() const {
   return fields_[kCount];
 }
 
+inline Environment::DomainFlag::DomainFlag() {
+  for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
+}
+
+inline uint32_t* Environment::DomainFlag::fields() {
+  return fields_;
+}
+
+inline int Environment::DomainFlag::fields_count() const {
+  return kFieldsCount;
+}
+
+inline uint32_t Environment::DomainFlag::count() const {
+  return fields_[kCount];
+}
+
 inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) {
   for (int i = 0; i < kFieldsCount; ++i)
     fields_[i] = 0;
@@ -163,6 +179,7 @@ inline Environment::Environment(v8::Local<v8::Context> context)
     : isolate_(context->GetIsolate()),
       isolate_data_(IsolateData::GetOrCreate(context->GetIsolate())),
       using_smalloc_alloc_cb_(false),
+      using_domains_(false),
       context_(context->GetIsolate(), context) {
   // We'll be creating new objects so make sure we've entered the context.
   v8::HandleScope handle_scope(isolate());
@@ -193,6 +210,12 @@ inline bool Environment::has_async_listeners() const {
   return const_cast<Environment*>(this)->async_listener()->count() > 0;
 }
 
+inline bool Environment::in_domain() const {
+  // The const_cast is okay, it doesn't violate conceptual const-ness.
+  return using_domains() &&
+         const_cast<Environment*>(this)->domain_flag()->count() > 0;
+}
+
 inline Environment* Environment::from_immediate_check_handle(
     uv_check_t* handle) {
   return CONTAINER_OF(handle, Environment, immediate_check_handle_);
@@ -231,6 +254,10 @@ inline Environment::AsyncListener* Environment::async_listener() {
   return &async_listener_count_;
 }
 
+inline Environment::DomainFlag* Environment::domain_flag() {
+  return &domain_flag_;
+}
+
 inline Environment::TickInfo* Environment::tick_info() {
   return &tick_info_;
 }
@@ -243,6 +270,14 @@ inline void Environment::set_using_smalloc_alloc_cb(bool value) {
   using_smalloc_alloc_cb_ = value;
 }
 
+inline bool Environment::using_domains() const {
+  return using_domains_;
+}
+
+inline void Environment::set_using_domains(bool value) {
+  using_domains_ = value;
+}
+
 inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
   return CONTAINER_OF(handle, Environment, cares_timer_handle_);
 }
index 6db4a08..71936dd 100644 (file)
--- a/src/env.h
+++ b/src/env.h
@@ -66,6 +66,7 @@ namespace node {
   V(ctime_string, "ctime")                                                    \
   V(dev_string, "dev")                                                        \
   V(disposed_string, "_disposed")                                             \
+  V(domain_string, "domain")                                                  \
   V(enter_string, "enter")                                                    \
   V(errno_string, "errno")                                                    \
   V(exit_string, "exit")                                                      \
@@ -143,6 +144,7 @@ namespace node {
   V(binding_cache_object, v8::Object)                                         \
   V(buffer_constructor_function, v8::Function)                                \
   V(context, v8::Context)                                                     \
+  V(domain_array, v8::Array)                                                  \
   V(module_load_list_array, v8::Array)                                        \
   V(pipe_constructor_template, v8::FunctionTemplate)                          \
   V(process_object, v8::Object)                                               \
@@ -191,6 +193,26 @@ class Environment {
     DISALLOW_COPY_AND_ASSIGN(AsyncListener);
   };
 
+  class DomainFlag {
+   public:
+    inline uint32_t* fields();
+    inline int fields_count() const;
+    inline uint32_t count() const;
+
+   private:
+    friend class Environment;  // So we can call the constructor.
+    inline DomainFlag();
+
+    enum Fields {
+      kCount,
+      kFieldsCount
+    };
+
+    uint32_t fields_[kFieldsCount];
+
+    DISALLOW_COPY_AND_ASSIGN(DomainFlag);
+  };
+
   class TickInfo {
    public:
     inline uint32_t* fields();
@@ -232,6 +254,7 @@ class Environment {
   inline v8::Isolate* isolate() const;
   inline uv_loop_t* event_loop() const;
   inline bool has_async_listeners() const;
+  inline bool in_domain() const;
 
   static inline Environment* from_immediate_check_handle(uv_check_t* handle);
   inline uv_check_t* immediate_check_handle();
@@ -244,6 +267,7 @@ class Environment {
   inline uv_check_t* idle_check_handle();
 
   inline AsyncListener* async_listener();
+  inline DomainFlag* domain_flag();
   inline TickInfo* tick_info();
 
   static inline Environment* from_cares_timer_handle(uv_timer_t* handle);
@@ -255,6 +279,9 @@ class Environment {
   inline bool using_smalloc_alloc_cb() const;
   inline void set_using_smalloc_alloc_cb(bool value);
 
+  inline bool using_domains() const;
+  inline void set_using_domains(bool value);
+
   // Strings are shared across shared contexts. The getters simply proxy to
   // the per-isolate primitive.
 #define V(PropertyName, StringValue)                                          \
@@ -285,11 +312,13 @@ class Environment {
   uv_prepare_t idle_prepare_handle_;
   uv_check_t idle_check_handle_;
   AsyncListener async_listener_count_;
+  DomainFlag domain_flag_;
   TickInfo tick_info_;
   uv_timer_t cares_timer_handle_;
   ares_channel cares_channel_;
   ares_task_list cares_task_list_;
   bool using_smalloc_alloc_cb_;
+  bool using_domains_;
 
 #define V(PropertyName, TypeName)                                             \
   v8::Persistent<TypeName> PropertyName ## _;
index dd370ad..a39459c 100644 (file)
@@ -876,11 +876,54 @@ void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
 }
 
 
+void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
+  Environment* env = Environment::GetCurrent(args.GetIsolate());
+
+  if (env->using_domains())
+    return;
+  env->set_using_domains(true);
+
+  HandleScope scope(node_isolate);
+  Local<Object> process_object = env->process_object();
+
+  Local<String> tick_callback_function_key =
+      FIXED_ONE_BYTE_STRING(node_isolate, "_tickDomainCallback");
+  Local<Function> tick_callback_function =
+      process_object->Get(tick_callback_function_key).As<Function>();
+
+  if (!tick_callback_function->IsFunction()) {
+    fprintf(stderr, "process._tickDomainCallback assigned to non-function\n");
+    abort();
+  }
+
+  process_object->Set(FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback"),
+                      tick_callback_function);
+  env->set_tick_callback_function(tick_callback_function);
+
+  assert(args[0]->IsArray());
+  assert(args[1]->IsObject());
+
+  env->set_domain_array(args[0].As<Array>());
+
+  Local<Object> domain_flag_obj = args[1].As<Object>();
+  Environment::DomainFlag* domain_flag = env->domain_flag();
+  domain_flag_obj->SetIndexedPropertiesToExternalArrayData(
+      domain_flag->fields(),
+      kExternalUnsignedIntArray,
+      domain_flag->fields_count());
+
+  // Do a little housekeeping.
+  env->process_object()->Delete(
+      FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse"));
+}
+
+
 void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
   HandleScope handle_scope(args.GetIsolate());
   Environment* env = Environment::GetCurrent(args.GetIsolate());
 
-  assert(args[0]->IsObject() && args[1]->IsFunction());
+  assert(args[0]->IsObject());
+  assert(args[1]->IsFunction());
 
   // Values use to cross communicate with processNextTick.
   Local<Object> tick_info_obj = args[0].As<Object>();
@@ -897,11 +940,114 @@ void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
 }
 
 
+Handle<Value> MakeDomainCallback(Environment* env,
+                                 Handle<Object> object,
+                                 const Handle<Function> callback,
+                                 int argc,
+                                 Handle<Value> argv[]) {
+  // If you hit this assertion, you forgot to enter the v8::Context first.
+  assert(env->context() == env->isolate()->GetCurrentContext());
+
+  Local<Object> process = env->process_object();
+  Local<Value> domain_v = object->Get(env->domain_string());
+  Local<Object> domain;
+
+  TryCatch try_catch;
+  try_catch.SetVerbose(true);
+
+  // TODO(trevnorris): This is sucky for performance. Fix it.
+  bool has_async_queue = object->Has(env->async_queue_string());
+  if (has_async_queue) {
+    Local<Value> argv[] = { object };
+    env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
+
+    if (try_catch.HasCaught())
+      return Undefined(node_isolate);
+  }
+
+  bool has_domain = domain_v->IsObject();
+  if (has_domain) {
+    domain = domain_v.As<Object>();
+
+    if (domain->Get(env->disposed_string())->IsTrue()) {
+      // domain has been disposed of.
+      return Undefined(node_isolate);
+    }
+
+    Local<Function> enter =
+        domain->Get(env->enter_string()).As<Function>();
+    assert(enter->IsFunction());
+    enter->Call(domain, 0, NULL);
+
+    if (try_catch.HasCaught()) {
+      return Undefined(node_isolate);
+    }
+  }
+
+  Local<Value> ret = callback->Call(object, argc, argv);
+
+  if (try_catch.HasCaught()) {
+    return Undefined(node_isolate);
+  }
+
+  if (has_domain) {
+    Local<Function> exit =
+        domain->Get(env->exit_string()).As<Function>();
+    assert(exit->IsFunction());
+    exit->Call(domain, 0, NULL);
+
+    if (try_catch.HasCaught()) {
+      return Undefined(node_isolate);
+    }
+  }
+
+  if (has_async_queue) {
+    Local<Value> val = object.As<Value>();
+    env->async_listener_unload_function()->Call(process, 1, &val);
+
+    if (try_catch.HasCaught())
+      return Undefined(node_isolate);
+  }
+
+  Environment::TickInfo* tick_info = env->tick_info();
+
+  if (tick_info->last_threw() == 1) {
+    tick_info->set_last_threw(0);
+    return ret;
+  }
+
+  if (tick_info->in_tick()) {
+    return ret;
+  }
+
+  if (tick_info->length() == 0) {
+    tick_info->set_index(0);
+    return ret;
+  }
+
+  tick_info->set_in_tick(true);
+
+  env->tick_callback_function()->Call(process, 0, NULL);
+
+  tick_info->set_in_tick(false);
+
+  if (try_catch.HasCaught()) {
+    tick_info->set_last_threw(true);
+    return Undefined(node_isolate);
+  }
+
+  return ret;
+}
+
+
 Handle<Value> MakeCallback(Environment* env,
                            Handle<Object> object,
                            const Handle<Function> callback,
                            int argc,
                            Handle<Value> argv[]) {
+  if (env->using_domains())
+    return MakeDomainCallback(env, object, callback, argc, argv);
+
   // If you hit this assertion, you forgot to enter the v8::Context first.
   assert(env->context() == env->isolate()->GetCurrentContext());
 
@@ -1031,6 +1177,19 @@ Handle<Value> MakeCallback(const Handle<Object> object,
 }
 
 
+Handle<Value> MakeDomainCallback(const Handle<Object> object,
+                                 const Handle<Function> callback,
+                                 int argc,
+                                 Handle<Value> argv[]) {
+  Local<Context> context = object->CreationContext();
+  Environment* env = Environment::GetCurrent(context);
+  Context::Scope context_scope(context);
+  HandleScope handle_scope(env->isolate());
+  return handle_scope.Close(
+      MakeDomainCallback(env, object, callback, argc, argv));
+}
+
+
 enum encoding ParseEncoding(Handle<Value> encoding_v, enum encoding _default) {
   HandleScope scope(node_isolate);
 
@@ -2482,6 +2641,7 @@ void SetupProcessObject(Environment* env,
 
   NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener);
   NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick);
+  NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse);
 
   // values use to cross communicate with processNextTick
   Local<Object> tick_info_obj = Object::New();
index 1c0fc8b..2c61592 100644 (file)
       // First run through error handlers from asyncListener.
       var caught = _errorHandler(er);
 
+      if (process.domain && process.domain._errorHandler)
+        caught = process.domain._errorHandler(er) || caught;
+
       if (!caught)
         caught = process.emit('uncaughtException', er);
 
-      // If someone handled it, then great. Otherwise die in C++ since
-      // that means we'll exit the process, emit the 'exit' event.
+      // If someone handled it, then great.  otherwise, die in C++ land
+      // since that means that we'll exit the process, emit the 'exit' event
       if (!caught) {
         try {
           if (!process._exiting) {
     process.nextTick = nextTick;
     // Needs to be accessible from beyond this scope.
     process._tickCallback = _tickCallback;
+    process._tickDomainCallback = _tickDomainCallback;
 
     process._setupNextTick(tickInfo, _tickCallback);
 
     }
 
     // Run callbacks that have no domain.
+    // Using domains will cause this to be overridden.
     function _tickCallback() {
       var callback, hasQueue, threw, tock;
 
       tickDone();
     }
 
+    function _tickDomainCallback() {
+      var callback, domain, hasQueue, threw, tock;
+
+      while (tickInfo[kIndex] < tickInfo[kLength]) {
+        tock = nextTickQueue[tickInfo[kIndex]++];
+        callback = tock.callback;
+        domain = tock.domain;
+        hasQueue = !!tock._asyncQueue;
+        if (hasQueue)
+          _loadAsyncQueue(tock);
+        if (domain)
+          domain.enter();
+        threw = true;
+        try {
+          callback();
+          threw = false;
+        } finally {
+          if (threw)
+            tickDone();
+        }
+        if (hasQueue)
+          _unloadAsyncQueue(tock);
+        if (domain)
+          domain.exit();
+      }
+
+      tickDone();
+    }
+
     function nextTick(callback) {
       // on the way out, don't bother. it won't get fired anyway.
       if (process._exiting)
 
       var obj = {
         callback: callback,
+        domain: process.domain || null,
         _asyncQueue: undefined
       };
 
index e1eae7f..6b6beb4 100644 (file)
@@ -3628,6 +3628,9 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
 
   if (args[4]->IsFunction()) {
     obj->Set(env->ondone_string(), args[4]);
+    // XXX(trevnorris): This will need to go with the rest of domains.
+    if (env->in_domain())
+      obj->Set(env->domain_string(), env->domain_array()->Get(0));
     uv_queue_work(env->event_loop(),
                   req->work_req(),
                   EIO_PBKDF2,
@@ -3789,6 +3792,9 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
 
   if (args[1]->IsFunction()) {
     obj->Set(FIXED_ONE_BYTE_STRING(node_isolate, "ondone"), args[1]);
+    // XXX(trevnorris): This will need to go with the rest of domains.
+    if (env->in_domain())
+      obj->Set(env->domain_string(), env->domain_array()->Get(0));
     uv_queue_work(env->event_loop(),
                   req->work_req(),
                   RandomBytesWork<pseudoRandom>,
index 2e08ef7..043edb2 100644 (file)
@@ -39,6 +39,9 @@ class ReqWrap : public AsyncWrap {
  public:
   ReqWrap(Environment* env, v8::Handle<v8::Object> object)
       : AsyncWrap(env, object) {
+    if (env->in_domain())
+      object->Set(env->domain_string(), env->domain_array()->Get(0));
+
     QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_);
   }