var debug = require('util').debuglog('timer');
+var asyncFlags = process._asyncFlags;
+var runAsyncQueue = process._runAsyncQueue;
+var loadAsyncQueue = process._loadAsyncQueue;
+var unloadAsyncQueue = process._unloadAsyncQueue;
+
+// Do a little housekeeping.
+delete process._asyncFlags;
+delete process._runAsyncQueue;
+delete process._loadAsyncQueue;
+delete process._unloadAsyncQueue;
+
// IDLE TIMEOUTS
//
// value = list
var lists = {};
+// Make Timer as monomorphic as possible.
+Timer.prototype._asyncQueue = undefined;
+
// the main function - creates lists on demand and the watchers associated
// with them.
function insert(item, msecs) {
var now = Timer.now();
debug('now: %s', now);
- var first;
+ var diff, first, hasQueue, threw;
while (first = L.peek(list)) {
- var diff = now - first._idleStart;
+ diff = now - first._idleStart;
if (diff < msecs) {
list.start(msecs - diff, 0);
debug('%d list wait because diff is %d', msecs, diff);
//
// https://github.com/joyent/node/issues/2631
var domain = first.domain;
- if (domain && domain._disposed) continue;
+ if (domain && domain._disposed)
+ continue;
+
+ hasQueue = !!first._asyncQueue;
+
try {
+ if (hasQueue)
+ loadAsyncQueue(first);
if (domain)
domain.enter();
- var threw = true;
+ threw = true;
first._onTimeout();
+ if (hasQueue)
+ unloadAsyncQueue(first);
if (domain)
domain.exit();
threw = false;
exports.active = function(item) {
var msecs = item._idleTimeout;
if (msecs >= 0) {
-
var list = lists[msecs];
if (!list || L.isEmpty(list)) {
insert(item, msecs);
L.append(list, item);
}
}
+ // Whether or not a new TimerWrap needed to be created, this should run
+ // for each item. This way each "item" (i.e. timer) can properly have
+ // their own domain assigned.
+ if (asyncFlags[0] > 0)
+ runAsyncQueue(item);
};
function processImmediate() {
var queue = immediateQueue;
+ var domain, hasQueue, immediate;
immediateQueue = {};
L.init(immediateQueue);
while (L.isEmpty(queue) === false) {
- var immediate = L.shift(queue);
- var domain = immediate.domain;
- if (domain) domain.enter();
- immediate._onImmediate();
- if (domain) domain.exit();
+ immediate = L.shift(queue);
+ hasQueue = !!immediate._asyncQueue;
+ domain = immediate.domain;
+
+ if (hasQueue)
+ loadAsyncQueue(immediate);
+ if (domain)
+ domain.enter();
+
+ var threw = true;
+ try {
+ immediate._onImmediate();
+ threw = false;
+ } finally {
+ if (threw) {
+ if (!L.isEmpty(queue)) {
+ // Handle any remaining on next tick, assuming we're still
+ // alive to do so.
+ while (!L.isEmpty(immediateQueue)) {
+ L.append(queue, L.shift(immediateQueue));
+ }
+ immediateQueue = queue;
+ process.nextTick(processImmediate);
+ }
+ }
+ }
+
+ if (hasQueue)
+ unloadAsyncQueue(immediate);
+ if (domain)
+ domain.exit();
}
// Only round-trip to C++ land if we have to. Calling clearImmediate() on an
process._immediateCallback = processImmediate;
}
- if (process.domain) immediate.domain = process.domain;
+ // setImmediates are handled more like nextTicks.
+ if (asyncFlags[0] > 0)
+ runAsyncQueue(immediate);
+ if (process.domain)
+ immediate.domain = process.domain;
L.append(immediateQueue, immediate);
debug('unrefTimer fired');
- var first;
+ var diff, domain, first, hasQueue, threw;
while (first = L.peek(unrefList)) {
- var diff = now - first._idleStart;
+ diff = now - first._idleStart;
+ hasQueue = !!first._asyncQueue;
if (diff < first._idleTimeout) {
diff = first._idleTimeout - diff;
L.remove(first);
- var domain = first.domain;
+ domain = first.domain;
if (!first._onTimeout) continue;
if (domain && domain._disposed) continue;
try {
+ if (hasQueue)
+ loadAsyncQueue(first);
if (domain) domain.enter();
- var threw = true;
+ threw = true;
debug('unreftimer firing timeout');
first._onTimeout();
threw = false;
+ if (hasQueue)
+ unloadAsyncQueue(first);
if (domain) domain.exit();
} finally {
if (threw) process.nextTick(unrefTimeout);
'src/udp_wrap.cc',
'src/uv.cc',
# headers to make for a more pleasant IDE experience
+ 'src/async-wrap.h',
+ 'src/async-wrap-inl.h',
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+#ifndef SRC_ASYNC_WRAP_INL_H_
+#define SRC_ASYNC_WRAP_INL_H_
+
+#include "async-wrap.h"
+#include "env.h"
+#include "env-inl.h"
+#include "util.h"
+#include "util-inl.h"
+#include "v8.h"
+#include <assert.h>
+
+namespace node {
+
+inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle<v8::Object> object)
+ : object_(env->isolate(), object),
+ env_(env),
+ async_flags_(NO_OPTIONS) {
+ assert(!object.IsEmpty());
+
+ if (!env->has_async_listeners())
+ return;
+
+ // TODO(trevnorris): Do we really need to TryCatch this call?
+ v8::TryCatch try_catch;
+ try_catch.SetVerbose(true);
+
+ v8::Local<v8::Value> val = object.As<v8::Value>();
+ env->async_listener_run_function()->Call(env->process_object(), 1, &val);
+
+ if (!try_catch.HasCaught())
+ async_flags_ |= ASYNC_LISTENERS;
+}
+
+
+inline AsyncWrap::~AsyncWrap() {
+ assert(persistent().IsEmpty());
+}
+
+
+template<typename TYPE>
+inline void AsyncWrap::AddMethods(v8::Handle<v8::FunctionTemplate> t) {
+ NODE_SET_PROTOTYPE_METHOD(t,
+ "addAsyncListener",
+ AddAsyncListener<TYPE>);
+ NODE_SET_PROTOTYPE_METHOD(t,
+ "removeAsyncListener",
+ RemoveAsyncListener<TYPE>);
+}
+
+
+inline uint32_t AsyncWrap::async_flags() const {
+ return async_flags_;
+}
+
+
+inline void AsyncWrap::set_flag(unsigned int flag) {
+ async_flags_ |= flag;
+}
+
+
+inline void AsyncWrap::remove_flag(unsigned int flag) {
+ async_flags_ &= ~flag;
+}
+
+
+inline bool AsyncWrap::has_async_queue() {
+ return async_flags() & ASYNC_LISTENERS;
+}
+
+
+inline Environment* AsyncWrap::env() const {
+ return env_;
+}
+
+
+inline v8::Local<v8::Object> AsyncWrap::object() {
+ return PersistentToLocal(env()->isolate(), persistent());
+}
+
+
+inline v8::Persistent<v8::Object>& AsyncWrap::persistent() {
+ return object_;
+}
+
+
+// I hate you domains.
+inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
+ const v8::Handle<v8::Function> cb,
+ int argc,
+ v8::Handle<v8::Value>* argv) {
+ assert(env()->context() == env()->isolate()->GetCurrentContext());
+
+ v8::Local<v8::Object> context = object();
+ v8::Local<v8::Object> process = env()->process_object();
+ v8::Local<v8::Value> domain_v = context->Get(env()->domain_string());
+ v8::Local<v8::Object> domain;
+
+ v8::TryCatch try_catch;
+ try_catch.SetVerbose(true);
+
+ if (has_async_queue()) {
+ v8::Local<v8::Value> val = context.As<v8::Value>();
+ env()->async_listener_load_function()->Call(process, 1, &val);
+
+ if (try_catch.HasCaught())
+ return v8::Undefined(env()->isolate());
+ }
+
+ bool has_domain = domain_v->IsObject();
+ if (has_domain) {
+ domain = domain_v.As<v8::Object>();
+
+ if (domain->Get(env()->disposed_string())->IsTrue())
+ return Undefined(env()->isolate());
+
+ v8::Local<v8::Function> enter =
+ domain->Get(env()->enter_string()).As<v8::Function>();
+ assert(enter->IsFunction());
+ enter->Call(domain, 0, NULL);
+
+ if (try_catch.HasCaught())
+ return Undefined(env()->isolate());
+ }
+
+ v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
+
+ if (try_catch.HasCaught()) {
+ return Undefined(env()->isolate());
+ }
+
+ if (has_async_queue()) {
+ v8::Local<v8::Value> val = context.As<v8::Value>();
+ env()->async_listener_unload_function()->Call(process, 1, &val);
+ }
+
+ if (has_domain) {
+ v8::Local<v8::Function> exit =
+ domain->Get(env()->exit_string()).As<v8::Function>();
+ assert(exit->IsFunction());
+ exit->Call(domain, 0, NULL);
+
+ if (try_catch.HasCaught())
+ return Undefined(env()->isolate());
+ }
+
+ Environment::TickInfo* tick_info = env()->tick_info();
+
+ if (tick_info->in_tick()) {
+ return ret;
+ }
+
+ if (tick_info->length() == 0) {
+ tick_info->set_index(0);
+ return ret;
+ }
+
+ tick_info->set_in_tick(true);
+
+ env()->tick_callback_function()->Call(process, 0, NULL);
+
+ tick_info->set_in_tick(false);
+
+ if (try_catch.HasCaught()) {
+ tick_info->set_last_threw(true);
+ return Undefined(env()->isolate());
+ }
+
+ return ret;
+}
+
+
+inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
+ const v8::Handle<v8::Function> cb,
+ int argc,
+ v8::Handle<v8::Value>* argv) {
+ if (env()->using_domains())
+ return MakeDomainCallback(cb, argc, argv);
+
+ assert(env()->context() == env()->isolate()->GetCurrentContext());
+
+ v8::Local<v8::Object> context = object();
+ v8::Local<v8::Object> process = env()->process_object();
+
+ v8::TryCatch try_catch;
+ try_catch.SetVerbose(true);
+
+ if (has_async_queue()) {
+ v8::Local<v8::Value> val = context.As<v8::Value>();
+ env()->async_listener_load_function()->Call(process, 1, &val);
+
+ if (try_catch.HasCaught())
+ return v8::Undefined(env()->isolate());
+ }
+
+ v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
+
+ if (try_catch.HasCaught()) {
+ return Undefined(env()->isolate());
+ }
+
+ if (has_async_queue()) {
+ v8::Local<v8::Value> val = context.As<v8::Value>();
+ env()->async_listener_unload_function()->Call(process, 1, &val);
+ }
+
+ Environment::TickInfo* tick_info = env()->tick_info();
+
+ if (tick_info->in_tick()) {
+ return ret;
+ }
+
+ if (tick_info->length() == 0) {
+ tick_info->set_index(0);
+ return ret;
+ }
+
+ tick_info->set_in_tick(true);
+
+ // TODO(trevnorris): Consider passing "context" to _tickCallback so it
+ // can then be passed as the first argument to the nextTick callback.
+ // That should greatly help needing to create closures.
+ env()->tick_callback_function()->Call(process, 0, NULL);
+
+ tick_info->set_in_tick(false);
+
+ if (try_catch.HasCaught()) {
+ tick_info->set_last_threw(true);
+ return Undefined(env()->isolate());
+ }
+
+ return ret;
+}
+
+
+inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
+ const v8::Handle<v8::String> symbol,
+ int argc,
+ v8::Handle<v8::Value>* argv) {
+ v8::Local<v8::Value> cb_v = object()->Get(symbol);
+ v8::Local<v8::Function> cb = cb_v.As<v8::Function>();
+ assert(cb->IsFunction());
+
+ return MakeCallback(cb, argc, argv);
+}
+
+
+inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
+ uint32_t index,
+ int argc,
+ v8::Handle<v8::Value>* argv) {
+ v8::Local<v8::Value> cb_v = object()->Get(index);
+ v8::Local<v8::Function> cb = cb_v.As<v8::Function>();
+ assert(cb->IsFunction());
+
+ return MakeCallback(cb, argc, argv);
+}
+
+
+template <typename TYPE>
+inline void AsyncWrap::AddAsyncListener(
+ const v8::FunctionCallbackInfo<v8::Value>& args) {
+ Environment* env = Environment::GetCurrent(args.GetIsolate());
+ v8::HandleScope handle_scope(args.GetIsolate());
+
+ v8::Local<v8::Object> handle = args.This();
+ v8::Local<v8::Value> listener = args[0];
+ assert(listener->IsObject());
+ assert(handle->InternalFieldCount() > 0);
+
+ env->async_listener_push_function()->Call(handle, 1, &listener);
+
+ TYPE* wrap = static_cast<TYPE*>(
+ handle->GetAlignedPointerFromInternalField(0));
+ assert(wrap != NULL);
+ wrap->set_flag(ASYNC_LISTENERS);
+}
+
+
+template <typename TYPE>
+inline void AsyncWrap::RemoveAsyncListener(
+ const v8::FunctionCallbackInfo<v8::Value>& args) {
+ Environment* env = Environment::GetCurrent(args.GetIsolate());
+ v8::HandleScope handle_scope(args.GetIsolate());
+
+ v8::Local<v8::Object> handle = args.This();
+ v8::Local<v8::Value> listener = args[0];
+ assert(listener->IsObject());
+ assert(handle->InternalFieldCount() > 0);
+
+ v8::Local<v8::Value> ret =
+ env->async_listener_strip_function()->Call(handle, 1, &listener);
+
+ if (ret->IsFalse()) {
+ TYPE* wrap = static_cast<TYPE*>(
+ handle->GetAlignedPointerFromInternalField(0));
+ assert(wrap != NULL);
+ wrap->remove_flag(ASYNC_LISTENERS);
+ }
+}
+
+} // namespace node
+
+#endif // SRC_ASYNC_WRAP_INL_H_
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+#ifndef SRC_ASYNC_WRAP_H_
+#define SRC_ASYNC_WRAP_H_
+
+#include "env.h"
+#include "v8.h"
+
+namespace node {
+
+class AsyncWrap {
+ public:
+ enum AsyncFlags {
+ NO_OPTIONS = 0,
+ ASYNC_LISTENERS = 1
+ };
+
+ inline AsyncWrap(Environment* env, v8::Handle<v8::Object> object);
+
+ inline ~AsyncWrap();
+
+ template <typename Type>
+ static inline void AddMethods(v8::Handle<v8::FunctionTemplate> t);
+
+ inline uint32_t async_flags() const;
+
+ inline void set_flag(unsigned int flag);
+
+ inline void remove_flag(unsigned int flag);
+
+ inline bool has_async_queue();
+
+ inline Environment* env() const;
+
+ // Returns the wrapped object. Illegal to call in your destructor.
+ inline v8::Local<v8::Object> object();
+
+ // Parent class is responsible to Dispose.
+ inline v8::Persistent<v8::Object>& persistent();
+
+ // Only call these within a valid HandleScope.
+ inline v8::Handle<v8::Value> MakeCallback(const v8::Handle<v8::Function> cb,
+ int argc,
+ v8::Handle<v8::Value>* argv);
+ inline v8::Handle<v8::Value> MakeCallback(const v8::Handle<v8::String> symbol,
+ int argc,
+ v8::Handle<v8::Value>* argv);
+ inline v8::Handle<v8::Value> MakeCallback(uint32_t index,
+ int argc,
+ v8::Handle<v8::Value>* argv);
+
+ private:
+ // TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
+ // replacement is committed.
+ inline v8::Handle<v8::Value> MakeDomainCallback(
+ const v8::Handle<v8::Function> cb,
+ int argc,
+ v8::Handle<v8::Value>* argv);
+
+ // Add an async listener to an existing handle.
+ template <typename Type>
+ static inline void AddAsyncListener(
+ const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ // Remove an async listener to an existing handle.
+ template <typename Type>
+ static inline void RemoveAsyncListener(
+ const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ v8::Persistent<v8::Object> object_;
+ Environment* const env_;
+ uint32_t async_flags_;
+};
+
+} // namespace node
+
+
+#endif // SRC_ASYNC_WRAP_H_
return isolate_;
}
+inline Environment::AsyncListener::AsyncListener() {
+ for (int i = 0; i < kFieldsCount; ++i)
+ fields_[i] = 0;
+}
+
+inline uint32_t* Environment::AsyncListener::fields() {
+ return fields_;
+}
+
+inline int Environment::AsyncListener::fields_count() const {
+ return kFieldsCount;
+}
+
+inline uint32_t Environment::AsyncListener::count() const {
+ return fields_[kCount];
+}
+
inline Environment::DomainFlag::DomainFlag() {
for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
}
}
inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) {
- for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
+ for (int i = 0; i < kFieldsCount; ++i)
+ fields_[i] = 0;
}
inline uint32_t* Environment::TickInfo::fields() {
return isolate_;
}
+inline bool Environment::has_async_listeners() const {
+ // The const_cast is okay, it doesn't violate conceptual const-ness.
+ return const_cast<Environment*>(this)->async_listener()->count() > 0;
+}
+
inline bool Environment::in_domain() const {
// The const_cast is okay, it doesn't violate conceptual const-ness.
return using_domains() &&
return isolate_data()->event_loop();
}
+inline Environment::AsyncListener* Environment::async_listener() {
+ return &async_listener_count_;
+}
+
inline Environment::DomainFlag* Environment::domain_flag() {
return &domain_flag_;
}
#define PER_ISOLATE_STRING_PROPERTIES(V) \
V(address_string, "address") \
V(atime_string, "atime") \
+ V(async_queue_string, "_asyncQueue") \
V(birthtime_string, "birthtime") \
V(blksize_string, "blksize") \
V(blocks_string, "blocks") \
V(write_queue_size_string, "writeQueueSize") \
#define ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V) \
+ V(async_listener_load_function, v8::Function) \
+ V(async_listener_push_function, v8::Function) \
+ V(async_listener_run_function, v8::Function) \
+ V(async_listener_strip_function, v8::Function) \
+ V(async_listener_unload_function, v8::Function) \
V(binding_cache_object, v8::Object) \
V(buffer_constructor_function, v8::Function) \
V(context, v8::Context) \
class Environment {
public:
+ class AsyncListener {
+ public:
+ inline uint32_t* fields();
+ inline int fields_count() const;
+ inline uint32_t count() const;
+
+ private:
+ friend class Environment; // So we can call the constructor.
+ inline AsyncListener();
+
+ enum Fields {
+ kCount,
+ kFieldsCount
+ };
+
+ uint32_t fields_[kFieldsCount];
+
+ DISALLOW_COPY_AND_ASSIGN(AsyncListener);
+ };
+
class DomainFlag {
public:
inline uint32_t* fields();
inline v8::Isolate* isolate() const;
inline uv_loop_t* event_loop() const;
+ inline bool has_async_listeners() const;
inline bool in_domain() const;
static inline Environment* from_immediate_check_handle(uv_check_t* handle);
static inline Environment* from_idle_check_handle(uv_check_t* handle);
inline uv_check_t* idle_check_handle();
+ inline AsyncListener* async_listener();
inline DomainFlag* domain_flag();
inline TickInfo* tick_info();
uv_idle_t immediate_idle_handle_;
uv_prepare_t idle_prepare_handle_;
uv_check_t idle_check_handle_;
+ AsyncListener async_listener_count_;
DomainFlag domain_flag_;
TickInfo tick_info_;
uv_timer_t cares_timer_handle_;
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
#include "env.h"
#include "env-inl.h"
#include "util.h"
argv[2] = OneByteString(node_isolate, filename);
}
- MakeCallback(env,
- wrap->object(),
- env->onchange_string(),
- ARRAY_SIZE(argv),
- argv);
+ wrap->MakeCallback(env->onchange_string(), ARRAY_SIZE(argv), argv);
}
// USE OR OTHER DEALINGS IN THE SOFTWARE.
#include "handle_wrap.h"
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
#include "env.h"
#include "env-inl.h"
#include "util.h"
HandleWrap::HandleWrap(Environment* env,
Handle<Object> object,
uv_handle_t* handle)
- : env_(env),
+ : AsyncWrap(env, object),
flags_(0),
handle__(handle) {
handle__->data = this;
HandleScope scope(node_isolate);
- persistent().Reset(node_isolate, object);
Wrap<HandleWrap>(object, this);
QUEUE_INSERT_TAIL(&handle_wrap_queue, &handle_wrap_queue_);
}
Local<Object> object = wrap->object();
if (wrap->flags_ & kCloseCallback) {
- MakeCallback(env, object, env->close_string());
+ wrap->MakeCallback(env->close_string(), 0, NULL);
}
object->SetAlignedPointerInInternalField(0, NULL);
#ifndef SRC_HANDLE_WRAP_H_
#define SRC_HANDLE_WRAP_H_
+#include "async-wrap.h"
#include "env.h"
#include "node.h"
#include "queue.h"
// js/c++ boundary crossing. At the javascript layer that should all be
// taken care of.
-class HandleWrap {
+class HandleWrap : public AsyncWrap {
public:
static void Close(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args);
uv_handle_t* handle);
virtual ~HandleWrap();
- inline Environment* env() const {
- return env_;
- }
-
- inline v8::Local<v8::Object> object() {
- return PersistentToLocal(env()->isolate(), persistent());
- }
-
- inline v8::Persistent<v8::Object>& persistent() {
- return object_;
- }
-
private:
friend void GetActiveHandles(const v8::FunctionCallbackInfo<v8::Value>&);
static void OnClose(uv_handle_t* handle);
- v8::Persistent<v8::Object> object_;
QUEUE handle_wrap_queue_;
- Environment* const env_;
unsigned int flags_;
// Using double underscore due to handle_ member in tcp_wrap. Probably
// tcp_wrap should rename it's member to 'handle'.
#endif
#include "ares.h"
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
#include "env.h"
#include "env-inl.h"
#include "handle_wrap.h"
#endif
+void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args.GetIsolate());
+ HandleScope handle_scope(args.GetIsolate());
+
+ assert(args[0]->IsObject() &&
+ args[1]->IsFunction() &&
+ args[2]->IsFunction() &&
+ args[3]->IsFunction() &&
+ args[4]->IsFunction() &&
+ args[5]->IsFunction());
+
+ env->set_async_listener_run_function(args[1].As<Function>());
+ env->set_async_listener_load_function(args[2].As<Function>());
+ env->set_async_listener_unload_function(args[3].As<Function>());
+ env->set_async_listener_push_function(args[4].As<Function>());
+ env->set_async_listener_strip_function(args[5].As<Function>());
+
+ Local<Object> async_listener_flag_obj = args[0].As<Object>();
+ Environment::AsyncListener* async_listener = env->async_listener();
+ async_listener_flag_obj->SetIndexedPropertiesToExternalArrayData(
+ async_listener->fields(),
+ kExternalUnsignedIntArray,
+ async_listener->fields_count());
+
+ // Do a little housekeeping.
+ env->process_object()->Delete(
+ FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupAsyncListener"));
+}
+
+
void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args.GetIsolate());
domain_flag->fields(),
kExternalUnsignedIntArray,
domain_flag->fields_count());
+
+ // Do a little housekeeping.
+ env->process_object()->Delete(
+ FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse"));
+}
+
+
+void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args.GetIsolate());
+ HandleScope handle_scope(args.GetIsolate());
+
+ if (!args[0]->IsObject() || !args[1]->IsFunction())
+ abort();
+
+ // Values use to cross communicate with processNextTick.
+ Local<Object> tick_info_obj = args[0].As<Object>();
+ tick_info_obj->SetIndexedPropertiesToExternalArrayData(
+ env->tick_info()->fields(),
+ kExternalUnsignedIntArray,
+ env->tick_info()->fields_count());
+
+ env->set_tick_callback_function(args[1].As<Function>());
+
+ // Do a little housekeeping.
+ env->process_object()->Delete(
+ FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupNextTick"));
}
Handle<Value> MakeDomainCallback(Environment* env,
- const Handle<Object> object,
+ Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
// If you hit this assertion, you forgot to enter the v8::Context first.
assert(env->context() == env->isolate()->GetCurrentContext());
- // TODO(trevnorris) Hook for long stack traces to be made here.
-
+ Local<Object> process = env->process_object();
Local<Value> domain_v = object->Get(env->domain_string());
Local<Object> domain;
TryCatch try_catch;
try_catch.SetVerbose(true);
+ // TODO(trevnorris): This is sucky for performance. Fix it.
+ bool has_async_queue = object->Has(env->async_queue_string());
+ if (has_async_queue) {
+ Local<Value> argv[] = { object };
+ env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
+
+ if (try_catch.HasCaught())
+ return Undefined(node_isolate);
+ }
+
bool has_domain = domain_v->IsObject();
if (has_domain) {
domain = domain_v.As<Object>();
return Undefined(node_isolate);
}
+ if (has_async_queue) {
+ Local<Value> val = object.As<Value>();
+ env->async_listener_unload_function()->Call(process, 1, &val);
+
+ if (try_catch.HasCaught())
+ return Undefined(node_isolate);
+ }
+
if (has_domain) {
Local<Function> exit =
domain->Get(env->exit_string()).As<Function>();
tick_info->set_in_tick(true);
- // process nextTicks after call
- Local<Object> process_object = env->process_object();
- Local<Function> tick_callback_function = env->tick_callback_function();
- tick_callback_function->Call(process_object, 0, NULL);
+ env->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
Handle<Value> MakeCallback(Environment* env,
- const Handle<Object> object,
+ Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
+ if (env->using_domains())
+ return MakeDomainCallback(env, object, callback, argc, argv);
+
// If you hit this assertion, you forgot to enter the v8::Context first.
assert(env->context() == env->isolate()->GetCurrentContext());
- // TODO(trevnorris) Hook for long stack traces to be made here.
- Local<Object> process_object = env->process_object();
-
- if (env->using_domains())
- return MakeDomainCallback(env, object, callback, argc, argv);
+ Local<Object> process = env->process_object();
TryCatch try_catch;
try_catch.SetVerbose(true);
+ // TODO(trevnorris): This is sucky for performance. Fix it.
+ bool has_async_queue = object->Has(env->async_queue_string());
+ if (has_async_queue) {
+ Local<Value> argv[] = { object };
+ env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
+
+ if (try_catch.HasCaught())
+ return Undefined(node_isolate);
+ }
+
Local<Value> ret = callback->Call(object, argc, argv);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
+ if (has_async_queue) {
+ Local<Value> val = object.As<Value>();
+ env->async_listener_unload_function()->Call(process, 1, &val);
+
+ if (try_catch.HasCaught())
+ return Undefined(node_isolate);
+ }
+
Environment::TickInfo* tick_info = env->tick_info();
- if (tick_info->in_tick() == 1) {
+ if (tick_info->in_tick()) {
return ret;
}
tick_info->set_in_tick(true);
- // lazy load no domain next tick callbacks
- Local<Function> tick_callback_function = env->tick_callback_function();
- if (tick_callback_function.IsEmpty()) {
- Local<String> tick_callback_function_key =
- FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback");
- tick_callback_function =
- process_object->Get(tick_callback_function_key).As<Function>();
- if (!tick_callback_function->IsFunction()) {
- fprintf(stderr, "process._tickCallback assigned to non-function\n");
- abort();
- }
- env->set_tick_callback_function(tick_callback_function);
- }
-
// process nextTicks after call
- tick_callback_function->Call(process_object, 0, NULL);
+ env->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
uint32_t index,
int argc,
Handle<Value> argv[]) {
- // If you hit this assertion, you forgot to enter the v8::Context first.
- assert(env->context() == env->isolate()->GetCurrentContext());
-
Local<Function> callback = object->Get(index).As<Function>();
assert(callback->IsFunction());
- if (env->using_domains()) {
- return MakeDomainCallback(env, object, callback, argc, argv);
- }
-
return MakeCallback(env, object, callback, argc, argv);
}
const Handle<String> symbol,
int argc,
Handle<Value> argv[]) {
- // If you hit this assertion, you forgot to enter the v8::Context first.
- assert(env->context() == env->isolate()->GetCurrentContext());
-
Local<Function> callback = object->Get(symbol).As<Function>();
assert(callback->IsFunction());
-
- if (env->using_domains()) {
- return MakeDomainCallback(env, object, callback, argc, argv);
- }
-
return MakeCallback(env, object, callback, argc, argv);
}
const char* method,
int argc,
Handle<Value> argv[]) {
- // If you hit this assertion, you forgot to enter the v8::Context first.
- assert(env->context() == env->isolate()->GetCurrentContext());
Local<String> method_string = OneByteString(node_isolate, method);
return MakeCallback(env, object, method_string, argc, argv);
}
NODE_SET_METHOD(process, "binding", Binding);
+ NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener);
+ NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick);
NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse);
// values use to cross communicate with processNextTick
// of the startup process, so many dependencies are invoked lazily.
(function(process) {
this.global = this;
+ var _errorHandler;
function startup() {
var EventEmitter = NativeModule.require('events').EventEmitter;
startup.globalTimeouts();
startup.globalConsole();
+ startup.processAsyncListener();
startup.processAssert();
startup.processConfig();
startup.processNextTick();
startup.processFatal = function() {
process._fatalException = function(er) {
- var caught = false;
+ // First run through error handlers from asyncListener.
+ var caught = _errorHandler(er);
- if (process.domain && process.domain._errorHandler) {
- caught = process.domain._errorHandler(er);
- } else {
+ if (process.domain && process.domain._errorHandler)
+ caught = process.domain._errorHandler(er) || caught;
+
+ if (!caught)
caught = process.emit('uncaughtException', er);
- }
- // if someone handled it, then great. otherwise, die in C++ land
+ // If someone handled it, then great. otherwise, die in C++ land
// since that means that we'll exit the process, emit the 'exit' event
if (!caught) {
try {
// if we handled an error, then make sure any ticks get processed
} else {
- setImmediate(process._tickCallback);
+ var t = setImmediate(process._tickCallback);
+ // Complete hack to make sure any errors thrown from async
+ // listeners don't cause an infinite loop.
+ if (t._asyncQueue)
+ t._asyncQueue = [];
}
return caught;
};
};
+ startup.processAsyncListener = function() {
+ var asyncStack = [];
+ var asyncQueue = [];
+ var uid = 0;
+
+ // Stateful flags shared with Environment for quick JS/C++
+ // communication.
+ var asyncFlags = {};
+
+ // Prevent accidentally suppressed thrown errors from before/after.
+ var inAsyncTick = false;
+
+ // To prevent infinite recursion when an error handler also throws
+ // flag when an error is currenly being handled.
+ var inErrorTick = false;
+
+ // Needs to be the same as src/env.h
+ var kCount = 0;
+
+ // _errorHandler is scoped so it's also accessible by _fatalException.
+ _errorHandler = errorHandler;
+
+ // Needs to be accessible from lib/timers.js so they know when async
+ // listeners are currently in queue. They'll be cleaned up once
+ // references there are made.
+ process._asyncFlags = asyncFlags;
+ process._runAsyncQueue = runAsyncQueue;
+ process._loadAsyncQueue = loadAsyncQueue;
+ process._unloadAsyncQueue = unloadAsyncQueue;
+
+ // Public API.
+ process.createAsyncListener = createAsyncListener;
+ process.addAsyncListener = addAsyncListener;
+ process.removeAsyncListener = removeAsyncListener;
+
+ // Setup shared objects/callbacks with native layer.
+ process._setupAsyncListener(asyncFlags,
+ runAsyncQueue,
+ loadAsyncQueue,
+ unloadAsyncQueue,
+ pushListener,
+ stripListener);
+
+ function popQueue() {
+ if (asyncStack.length > 0)
+ asyncQueue = asyncStack.pop();
+ else
+ asyncQueue = [];
+ }
+
+ // Run all the async listeners attached when an asynchronous event is
+ // instantiated.
+ function runAsyncQueue(context) {
+ var queue = [];
+ var queueItem, item, i, value;
+
+ inAsyncTick = true;
+ for (i = 0; i < asyncQueue.length; i++) {
+ queueItem = asyncQueue[i];
+ // Not passing "this" context because it hasn't actually been
+ // instantiated yet, so accessing some of the object properties
+ // can cause a segfault.
+ // Passing the original value will allow users to manipulate the
+ // original value object, while also allowing them to return a
+ // new value for current async call tracking.
+ value = queueItem.listener(queueItem.value);
+ if (typeof value !== 'undefined') {
+ item = {
+ callbacks: queueItem.callbacks,
+ value: value,
+ listener: queueItem.listener,
+ uid: queueItem.uid
+ };
+ } else {
+ item = queueItem;
+ }
+ queue[i] = item;
+ }
+ inAsyncTick = false;
+
+ context._asyncQueue = queue;
+ }
+
+ // Uses the _asyncQueue object attached by runAsyncQueue.
+ function loadAsyncQueue(context) {
+ var queue = context._asyncQueue;
+ var item, before, i;
+
+ asyncStack.push(asyncQueue);
+ asyncQueue = queue;
+ // Since the async listener callback is required, the number of
+ // objects in the asyncQueue implies the number of async listeners
+ // there are to be processed.
+ asyncFlags[kCount] = queue.length;
+
+ // Run "before" callbacks.
+ inAsyncTick = true;
+ for (i = 0; i < queue.length; i++) {
+ item = queue[i];
+ if (!item.callbacks)
+ continue;
+ before = item.callbacks.before;
+ if (typeof before === 'function')
+ before(context, item.value);
+ }
+ inAsyncTick = false;
+ }
+
+ // Unload one level of the async stack. Returns true if there are
+ // still listeners somewhere in the stack.
+ function unloadAsyncQueue(context) {
+ var item, after, i;
+
+ // Run "after" callbacks.
+ inAsyncTick = true;
+ for (i = 0; i < asyncQueue.length; i++) {
+ item = asyncQueue[i];
+ if (!item.callbacks)
+ continue;
+ after = item.callbacks.after;
+ if (typeof after === 'function')
+ after(context, item.value);
+ }
+ inAsyncTick = false;
+
+ // Unload the current queue from the stack.
+ popQueue();
+
+ asyncFlags[kCount] = asyncQueue.length;
+
+ return asyncQueue.length > 0 || asyncStack.length > 0;
+ }
+
+ // Create new async listener object. Useful when instantiating a new
+ // object and want the listener instance, but not add it to the stack.
+ function createAsyncListener(listener, callbacks, value) {
+ return {
+ callbacks: callbacks,
+ value: value,
+ listener: listener,
+ uid: uid++
+ };
+ }
+
+ // Add a listener to the current queue.
+ function addAsyncListener(listener, callbacks, value) {
+ // Accept new listeners or previous created listeners.
+ if (typeof listener === 'function')
+ callbacks = createAsyncListener(listener, callbacks, value);
+ else
+ callbacks = listener;
+
+ var inQueue = false;
+ // The asyncQueue will be small. Probably always <= 3 items.
+ for (var i = 0; i < asyncQueue.length; i++) {
+ if (callbacks.uid === asyncQueue[i].uid) {
+ inQueue = true;
+ break;
+ }
+ }
+
+ // Make sure the callback doesn't already exist in the queue.
+ if (!inQueue)
+ asyncQueue.push(callbacks);
+
+ asyncFlags[kCount] = asyncQueue.length;
+ return callbacks;
+ }
+
+ // Remove listener from the current queue and the entire stack.
+ function removeAsyncListener(obj) {
+ var i, j;
+
+ for (i = 0; i < asyncQueue.length; i++) {
+ if (obj.uid === asyncQueue[i].uid) {
+ asyncQueue.splice(i, 1);
+ break;
+ }
+ }
+
+ for (i = 0; i < asyncStack.length; i++) {
+ for (j = 0; j < asyncStack[i].length; j++) {
+ if (obj.uid === asyncStack[i][j].uid) {
+ asyncStack[i].splice(j, 1);
+ break;
+ }
+ }
+ }
+
+ asyncFlags[kCount] = asyncQueue.length;
+ }
+
+ // Error handler used by _fatalException to run through all error
+ // callbacks in the current asyncQueue.
+ function errorHandler(er) {
+ var handled = false;
+ var error, item, i;
+
+ if (inErrorTick)
+ return false;
+
+ inErrorTick = true;
+ for (i = 0; i < asyncQueue.length; i++) {
+ item = asyncQueue[i];
+ if (!item.callbacks)
+ continue;
+ error = item.callbacks.error;
+ if (typeof error === 'function') {
+ try {
+ var threw = true;
+ handled = error(item.value, er) || handled;
+ threw = false;
+ } finally {
+ // If the error callback throws then we're going to die
+ // quickly with no chance of recovery. Only thing we're going
+ // to allow is execution of process exit event callbacks.
+ if (threw) {
+ process._exiting = true;
+ process.emit('exit', 1);
+ }
+ }
+ }
+ }
+ inErrorTick = false;
+
+ // Unload the current queue from the stack.
+ popQueue();
+
+ return handled && !inAsyncTick;
+ }
+
+ // Used by AsyncWrap::AddAsyncListener() to add an individual listener
+ // to the async queue. It will check the uid of the listener and only
+ // allow it to be added once.
+ function pushListener(obj) {
+ if (!this._asyncQueue)
+ this._asyncQueue = [];
+
+ var queue = this._asyncQueue;
+ var inQueue = false;
+ // The asyncQueue will be small. Probably always <= 3 items.
+ for (var i = 0; i < queue.length; i++) {
+ if (obj.uid === queue.uid) {
+ inQueue = true;
+ break;
+ }
+ }
+
+ if (!inQueue)
+ queue.push(obj);
+ }
+
+ // Used by AsyncWrap::RemoveAsyncListener() to remove an individual
+ // listener from the async queue, and return whether there are still
+ // listeners in the queue.
+ function stripListener(obj) {
+ if (!this._asyncQueue || this._asyncQueue.length === 0)
+ return false;
+
+ // The asyncQueue will be small. Probably always <= 3 items.
+ for (var i = 0; i < this._asyncQueue.length; i++) {
+ if (obj.uid === this._asyncQueue[i].uid) {
+ this._asyncQueue.splice(i, 1);
+ break;
+ }
+ }
+
+ return this._asyncQueue.length > 0;
+ }
+ };
+
var assert;
startup.processAssert = function() {
assert = process.assert = function(x, msg) {
startup.processNextTick = function() {
var nextTickQueue = [];
+ var asyncFlags = process._asyncFlags;
+ var _runAsyncQueue = process._runAsyncQueue;
+ var _loadAsyncQueue = process._loadAsyncQueue;
+ var _unloadAsyncQueue = process._unloadAsyncQueue;
// This tickInfo thing is used so that the C++ code in src/node.cc
// can have easy accesss to our nextTick state, and avoid unnecessary
- var tickInfo = process._tickInfo;
+ var tickInfo = {};
// *Must* match Environment::TickInfo::Fields in src/env.h.
var kIndex = 0;
var kLength = 1;
+ // For asyncFlags.
+ // *Must* match Environment::AsyncListeners::Fields in src/env.h
+ var kCount = 0;
+
process.nextTick = nextTick;
- // needs to be accessible from cc land
+ // Needs to be accessible from beyond this scope.
process._tickCallback = _tickCallback;
process._tickDomainCallback = _tickDomainCallback;
+ process._setupNextTick(tickInfo, _tickCallback);
+
function tickDone() {
if (tickInfo[kLength] !== 0) {
if (tickInfo[kLength] <= tickInfo[kIndex]) {
tickInfo[kIndex] = 0;
}
- // run callbacks that have no domain
- // using domains will cause this to be overridden
+ // Run callbacks that have no domain.
+ // Using domains will cause this to be overridden.
function _tickCallback() {
- var callback, threw;
+ var callback, hasQueue, threw, tock;
while (tickInfo[kIndex] < tickInfo[kLength]) {
- callback = nextTickQueue[tickInfo[kIndex]++].callback;
+ tock = nextTickQueue[tickInfo[kIndex]++];
+ callback = tock.callback;
threw = true;
+ hasQueue = !!tock._asyncQueue;
+ if (hasQueue)
+ _loadAsyncQueue(tock);
try {
callback();
threw = false;
} finally {
- if (threw) tickDone();
+ if (threw)
+ tickDone();
}
+ if (hasQueue)
+ _unloadAsyncQueue(tock);
}
tickDone();
}
function _tickDomainCallback() {
- var tock, callback, threw, domain;
+ var callback, domain, hasQueue, threw, tock;
while (tickInfo[kIndex] < tickInfo[kLength]) {
tock = nextTickQueue[tickInfo[kIndex]++];
callback = tock.callback;
domain = tock.domain;
- if (domain) {
- if (domain._disposed) continue;
+ hasQueue = !!tock._asyncQueue;
+ if (hasQueue)
+ _loadAsyncQueue(tock);
+ if (domain)
domain.enter();
- }
threw = true;
try {
callback();
threw = false;
} finally {
- if (threw) tickDone();
+ if (threw)
+ tickDone();
}
+ if (hasQueue)
+ _unloadAsyncQueue(tock);
if (domain)
domain.exit();
}
if (process._exiting)
return;
- nextTickQueue.push({
+ var obj = {
callback: callback,
- domain: process.domain || null
- });
+ domain: process.domain || null,
+ _asyncQueue: undefined
+ };
+
+ if (asyncFlags[kCount] > 0)
+ _runAsyncQueue(obj);
+
+ nextTickQueue.push(obj);
tickInfo[kLength]++;
}
};
class FSReqWrap: public ReqWrap<uv_fs_t> {
public:
FSReqWrap(Environment* env, const char* syscall, char* data = NULL)
- : ReqWrap<uv_fs_t>(env),
+ : ReqWrap<uv_fs_t>(env, Object::New()),
syscall_(syscall),
data_(data) {
}
}
}
- MakeCallback(env, req_wrap->object(), env->oncomplete_string(), argc, argv);
+ req_wrap->MakeCallback(env->oncomplete_string(), argc, argv);
uv_fs_req_cleanup(&req_wrap->req_);
delete req_wrap;
// Call with valid HandleScope and while inside Context scope.
v8::Handle<v8::Value> MakeCallback(Environment* env,
- const v8::Handle<v8::Object> object,
+ v8::Handle<v8::Object> object,
const char* method,
int argc = 0,
v8::Handle<v8::Value>* argv = NULL);
};
if (status != 0) {
- MakeCallback(env,
- pipe_wrap->object(),
- env->onconnection_string(),
- ARRAY_SIZE(argv),
- argv);
+ pipe_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv);
return;
}
// Successful accept. Call the onconnection callback in JavaScript land.
argv[1] = client_obj;
- MakeCallback(env,
- pipe_wrap->object(),
- env->onconnection_string(),
- ARRAY_SIZE(argv),
- argv);
+ pipe_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv);
}
// TODO(bnoordhuis) Maybe share this with TCPWrap?
Boolean::New(writable)
};
- MakeCallback(env,
- req_wrap_obj,
- env->oncomplete_string(),
- ARRAY_SIZE(argv),
- argv);
+ req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
delete req_wrap;
}
OneByteString(node_isolate, signo_string(term_signal))
};
- MakeCallback(env,
- wrap->object(),
- env->onexit_string(),
- ARRAY_SIZE(argv),
- argv);
+ wrap->MakeCallback(env->onexit_string(), ARRAY_SIZE(argv), argv);
}
uv_process_t process_;
#ifndef SRC_REQ_WRAP_H_
#define SRC_REQ_WRAP_H_
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
#include "env.h"
#include "env-inl.h"
#include "queue.h"
extern QUEUE req_wrap_queue;
template <typename T>
-class ReqWrap {
+class ReqWrap : public AsyncWrap {
public:
- ReqWrap(Environment* env,
- v8::Handle<v8::Object> object = v8::Handle<v8::Object>())
- : env_(env) {
- v8::HandleScope handle_scope(env->isolate());
+ ReqWrap(Environment* env, v8::Handle<v8::Object> object)
+ : AsyncWrap(env, object) {
+ assert(!object.IsEmpty());
- if (object.IsEmpty()) {
- object = v8::Object::New();
- }
- persistent().Reset(env->isolate(), object);
-
- if (env->in_domain()) {
+ if (env->in_domain())
object->Set(env->domain_string(), env->domain_array()->Get(0));
- }
QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_);
}
req_.data = this;
}
- inline Environment* env() const {
- return env_;
- }
-
- inline v8::Local<v8::Object> object() {
- return PersistentToLocal(env()->isolate(), persistent());
- }
-
- inline v8::Persistent<v8::Object>& persistent() {
- return object_;
- }
-
// TODO(bnoordhuis) Make these private.
QUEUE req_wrap_queue_;
T req_; // *must* be last, GetActiveRequests() in node.cc depends on it
-
- private:
- v8::Persistent<v8::Object> object_;
- Environment* const env_;
};
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
#include "env.h"
#include "env-inl.h"
#include "handle_wrap.h"
Environment* env = wrap->env();
Context::Scope context_scope(env->context());
HandleScope handle_scope(env->isolate());
+
Local<Value> arg = Integer::New(signum, env->isolate());
- MakeCallback(env, wrap->object(), env->onsignal_string(), 1, &arg);
+ wrap->MakeCallback(env->onsignal_string(), 1, &arg);
}
uv_signal_t handle_;
req_wrap_obj
};
- MakeCallback(env,
- req_wrap_obj,
- env->oncomplete_string(),
- ARRAY_SIZE(argv),
- argv);
+ req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
req_wrap->~WriteWrap();
delete[] reinterpret_cast<char*>(req_wrap);
req_wrap_obj
};
- MakeCallback(env,
- req_wrap_obj,
- env->oncomplete_string(),
- ARRAY_SIZE(argv),
- argv);
+ req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
delete req_wrap;
}
if (nread < 0) {
if (buf->base != NULL)
free(buf->base);
- MakeCallback(env, Self(), env->onread_string(), ARRAY_SIZE(argv), argv);
+ wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
return;
}
argv[2] = pending_obj;
}
- MakeCallback(env,
- wrap()->object(),
- env->onread_string(),
- ARRAY_SIZE(argv),
- argv);
+ wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
}
argv[1] = client_obj;
}
- MakeCallback(env,
- tcp_wrap->object(),
- env->onconnection_string(),
- ARRAY_SIZE(argv),
- argv);
+ tcp_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv);
}
v8::True(node_isolate),
v8::True(node_isolate)
};
- MakeCallback(env,
- req_wrap_obj,
- env->oncomplete_string(),
- ARRAY_SIZE(argv),
- argv);
+
+ req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
delete req_wrap;
}
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+#include "async-wrap.h"
+#include "async-wrap-inl.h"
#include "env.h"
#include "env-inl.h"
#include "handle_wrap.h"
Context::Scope context_scope(env->context());
HandleScope handle_scope(env->isolate());
Local<Value> argv[1] = { Integer::New(status, node_isolate) };
- MakeCallback(env, wrap->object(), kOnTimeout, ARRAY_SIZE(argv), argv);
+ wrap->MakeCallback(kOnTimeout, ARRAY_SIZE(argv), argv);
}
static void Now(const FunctionCallbackInfo<Value>& args) {
Environment* env = req_wrap->env();
Context::Scope context_scope(env->context());
HandleScope handle_scope(env->isolate());
- Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> arg = Integer::New(status, node_isolate);
- MakeCallback(env, req_wrap_obj, env->oncomplete_string(), 1, &arg);
+ req_wrap->MakeCallback(env->oncomplete_string(), 1, &arg);
}
delete req_wrap;
}
Local<Value> argv[] = {
Integer::New(nread, node_isolate),
wrap_obj,
- Undefined(),
- Undefined()
+ Undefined(env->isolate()),
+ Undefined(env->isolate())
};
if (nread < 0) {
if (buf->base != NULL)
free(buf->base);
- MakeCallback(env,
- wrap_obj,
- env->onmessage_string(),
- ARRAY_SIZE(argv),
- argv);
+ wrap->MakeCallback(env->onmessage_string(), ARRAY_SIZE(argv), argv);
return;
}
char* base = static_cast<char*>(realloc(buf->base, nread));
argv[2] = Buffer::Use(env, base, nread);
argv[3] = AddressToJS(env, addr);
- MakeCallback(env, wrap_obj, env->onmessage_string(), ARRAY_SIZE(argv), argv);
+ wrap->MakeCallback(env->onmessage_string(), ARRAY_SIZE(argv), argv);
}
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var dns = require('dns');
+var fs = require('fs');
+var net = require('net');
+
+var addListener = process.addAsyncListener;
+var removeListener = process.removeAsyncListener;
+var errorMsgs = [];
+var currentMsg = '';
+var caught = 0;
+var expectCaught = 0;
+var exitCbRan = false;
+
+function asyncL() { }
+
+var callbacksObj = {
+ error: function(value, er) {
+ var idx = errorMsgs.indexOf(er.message);
+
+ caught++;
+
+ if (-1 < idx)
+ errorMsgs.splice(idx, 1);
+
+ return currentMsg === er.message;
+ }
+};
+
+var listener = process.createAsyncListener(asyncL, callbacksObj);
+
+process.on('exit', function(code) {
+ removeListener(listener);
+
+ // Something else went wrong, no need to further check.
+ if (code > 0)
+ return;
+
+ // Make sure the exit callback only runs once.
+ assert.ok(!exitCbRan);
+ exitCbRan = true;
+
+ // Check if any error messages weren't removed from the msg queue.
+ if (errorMsgs.length > 0)
+ throw new Error('Errors not fired: ' + errorMsgs);
+
+ assert.equal(caught, expectCaught, 'caught all expected errors');
+ process._rawDebug('ok');
+});
+
+
+// Catch synchronous throws
+errorMsgs.push('sync throw');
+process.nextTick(function() {
+ addListener(listener);
+
+ expectCaught++;
+ currentMsg = 'sync throw';
+ throw new Error(currentMsg);
+
+ removeListener(listener);
+});
+
+
+// Simple cases
+errorMsgs.push('setTimeout - simple');
+errorMsgs.push('setImmediate - simple');
+errorMsgs.push('setInterval - simple');
+errorMsgs.push('process.nextTick - simple');
+process.nextTick(function() {
+ addListener(listener);
+
+ setTimeout(function() {
+ currentMsg = 'setTimeout - simple';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ setImmediate(function() {
+ currentMsg = 'setImmediate - simple';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ var b = setInterval(function() {
+ clearInterval(b);
+ currentMsg = 'setInterval - simple';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ process.nextTick(function() {
+ currentMsg = 'process.nextTick - simple';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ removeListener(listener);
+});
+
+
+// Deeply nested
+errorMsgs.push('setInterval - nested');
+errorMsgs.push('setImmediate - nested');
+errorMsgs.push('process.nextTick - nested');
+errorMsgs.push('setTimeout2 - nested');
+errorMsgs.push('setTimeout - nested');
+process.nextTick(function() {
+ addListener(listener);
+
+ setTimeout(function() {
+ process.nextTick(function() {
+ setImmediate(function() {
+ var b = setInterval(function() {
+ clearInterval(b);
+ currentMsg = 'setInterval - nested';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+ currentMsg = 'setImmediate - nested';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+ currentMsg = 'process.nextTick - nested';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+ setTimeout(function() {
+ currentMsg = 'setTimeout2 - nested';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+ currentMsg = 'setTimeout - nested';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ removeListener(listener);
+});
+
+
+// FS
+errorMsgs.push('fs - file does not exist');
+errorMsgs.push('fs - exists');
+errorMsgs.push('fs - realpath');
+process.nextTick(function() {
+ addListener(listener);
+
+ fs.stat('does not exist', function(err, stats) {
+ currentMsg = 'fs - file does not exist';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ fs.exists('hi all', function(exists) {
+ currentMsg = 'fs - exists';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ fs.realpath('/some/path', function(err, resolved) {
+ currentMsg = 'fs - realpath';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ removeListener(listener);
+});
+
+
+// Nested FS
+errorMsgs.push('fs - nested file does not exist');
+process.nextTick(function() {
+ addListener(listener);
+
+ setTimeout(function() {
+ setImmediate(function() {
+ var b = setInterval(function() {
+ clearInterval(b);
+ process.nextTick(function() {
+ fs.stat('does not exist', function(err, stats) {
+ currentMsg = 'fs - nested file does not exist';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+ });
+ });
+ });
+ });
+
+ removeListener(listener);
+});
+
+
+// Net
+errorMsgs.push('net - connection listener');
+errorMsgs.push('net - client connect');
+errorMsgs.push('net - server listening');
+process.nextTick(function() {
+ addListener(listener);
+
+ var server = net.createServer(function(c) {
+ server.close();
+ currentMsg = 'net - connection listener';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ server.listen(common.PORT, function() {
+ var client = net.connect(common.PORT, function() {
+ client.end();
+ currentMsg = 'net - client connect';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+ currentMsg = 'net - server listening';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ removeListener(listener);
+});
+
+
+// DNS
+errorMsgs.push('dns - lookup');
+process.nextTick(function() {
+ addListener(listener);
+
+ dns.lookup('localhost', function() {
+ currentMsg = 'dns - lookup';
+ throw new Error(currentMsg);
+ });
+ expectCaught++;
+
+ removeListener(listener);
+});
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+var addListener = process.addAsyncListener;
+var removeListener = process.removeAsyncListener;
+var caught = [];
+var expect = [];
+
+function asyncL(a) {}
+
+var callbacksObj = {
+ error: function(value, er) {
+ process._rawDebug('caught', er.message);
+ caught.push(er.message);
+ return (expect.indexOf(er.message) !== -1);
+ }
+};
+
+var listener = process.createAsyncListener(asyncL, callbacksObj);
+
+process.on('exit', function(code) {
+ removeListener(listener);
+
+ if (code > 0)
+ return;
+
+ expect = expect.sort();
+ caught = caught.sort();
+
+ process._rawDebug('expect', expect);
+ process._rawDebug('caught', caught);
+ assert.deepEqual(caught, expect, 'caught all expected errors');
+ process._rawDebug('ok');
+});
+
+
+expect.push('immediate simple a');
+expect.push('immediate simple b');
+process.nextTick(function() {
+ addListener(listener);
+ // Tests for a setImmediate specific bug encountered while implementing
+ // AsyncListeners.
+ setImmediate(function() {
+ throw new Error('immediate simple a');
+ });
+ setImmediate(function() {
+ throw new Error('immediate simple b');
+ });
+ removeListener(listener);
+});
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+
+// If there is an uncaughtException listener then the error thrown from
+// "before" will be considered handled, thus calling setImmediate to
+// finish execution of the nextTickQueue. This in turn will cause "before"
+// to fire again, entering into an infinite loop.
+// So the asyncQueue is cleared from the returned setImmediate in
+// _fatalException to prevent this from happening.
+var cntr = 0;
+
+
+process.addAsyncListener(function() { }, {
+ before: function() {
+ if (++cntr > 1) {
+ // Can't throw since uncaughtException will also catch that.
+ process._rawDebug('Error: Multiple before callbacks called');
+ process.exit(1);
+ }
+ throw new Error('before');
+ }
+});
+
+process.on('uncaughtException', function() { });
+
+process.nextTick();
--- /dev/null
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var net = require('net');
+var fs = require('fs');
+var dgram = require('dgram');
+
+var addListener = process.addAsyncListener;
+var removeListener = process.removeAsyncListener;
+var actualAsync = 0;
+var expectAsync = 0;
+
+function onAsync() {
+ actualAsync++;
+}
+
+var listener = process.createAsyncListener(onAsync);
+
+process.on('exit', function() {
+ process._rawDebug('expected', expectAsync);
+ process._rawDebug('actual ', actualAsync);
+ // TODO(trevnorris): Not a great test. If one was missed, but others
+ // overflowed then the test would still pass.
+ assert.ok(actualAsync >= expectAsync);
+});
+
+
+// Test listeners side-by-side
+process.nextTick(function() {
+ addListener(listener);
+
+ var b = setInterval(function() {
+ clearInterval(b);
+ });
+ expectAsync++;
+
+ var c = setInterval(function() {
+ clearInterval(c);
+ });
+ expectAsync++;
+
+ setTimeout(function() { });
+ expectAsync++;
+
+ setTimeout(function() { });
+ expectAsync++;
+
+ process.nextTick(function() { });
+ expectAsync++;
+
+ process.nextTick(function() { });
+ expectAsync++;
+
+ setImmediate(function() { });
+ expectAsync++;
+
+ setImmediate(function() { });
+ expectAsync++;
+
+ setTimeout(function() { }, 10);
+ expectAsync++;
+
+ setTimeout(function() { }, 10);
+ expectAsync++;
+
+ removeListener(listener);
+});
+
+
+// Async listeners should propagate with nested callbacks
+process.nextTick(function() {
+ addListener(listener);
+ var interval = 3;
+
+ process.nextTick(function() {
+ setTimeout(function() {
+ setImmediate(function() {
+ var i = setInterval(function() {
+ if (--interval <= 0)
+ clearInterval(i);
+ });
+ expectAsync++;
+ });
+ expectAsync++;
+ process.nextTick(function() {
+ setImmediate(function() {
+ setTimeout(function() { }, 20);
+ expectAsync++;
+ });
+ expectAsync++;
+ });
+ expectAsync++;
+ });
+ expectAsync++;
+ });
+ expectAsync++;
+
+ removeListener(listener);
+});
+
+
+// Test triggers with two async listeners
+process.nextTick(function() {
+ addListener(listener);
+ addListener(listener);
+
+ setTimeout(function() {
+ process.nextTick(function() { });
+ expectAsync += 2;
+ });
+ expectAsync += 2;
+
+ removeListener(listener);
+ removeListener(listener);
+});
+
+
+// Test callbacks from fs I/O
+process.nextTick(function() {
+ addListener(listener);
+
+ fs.stat('something random', function(err, stat) { });
+ expectAsync++;
+
+ setImmediate(function() {
+ fs.stat('random again', function(err, stat) { });
+ expectAsync++;
+ });
+ expectAsync++;
+
+ removeListener(listener);
+});
+
+
+// Test net I/O
+process.nextTick(function() {
+ addListener(listener);
+
+ var server = net.createServer(function(c) { });
+ expectAsync++;
+
+ server.listen(common.PORT, function() {
+ server.close();
+ expectAsync++;
+ });
+ expectAsync++;
+
+ removeListener(listener);
+});
+
+
+// Test UDP
+process.nextTick(function() {
+ addListener(listener);
+
+ var server = dgram.createSocket('udp4');
+ expectAsync++;
+
+ server.bind(common.PORT);
+
+ server.close();
+ expectAsync++;
+
+ removeListener(listener);
+});
res.end(er.stack || er.message || 'Unknown error');
});
- var data;
dom.run(function() {
// Now, an action that has the potential to fail!
// if you request 'baz', then it'll throw a JSON circular ref error.
- data = JSON.stringify(objects[req.url.replace(/[^a-z]/g, '')]);
+ var data = JSON.stringify(objects[req.url.replace(/[^a-z]/g, '')]);
// this line will throw if you pick an unknown key
assert(data !== undefined, 'Data should not be undefined');
function next() {
console.log('listening on localhost:%d', common.PORT);
- // now hit it a few times
- var dom = domain.create();
var requests = 0;
var responses = 0;