var loadAsyncQueue = process._loadAsyncQueue;
var unloadAsyncQueue = process._unloadAsyncQueue;
+// Same as in AsyncListener in env.h
+var kHasListener = 0;
+
// Do a little housekeeping.
delete process._asyncFlags;
delete process._runAsyncQueue;
// Make Timer as monomorphic as possible.
Timer.prototype._asyncQueue = undefined;
+Timer.prototype._asyncData = undefined;
+Timer.prototype._asyncFlags = 0;
// the main function - creates lists on demand and the watchers associated
// with them.
// Whether or not a new TimerWrap needed to be created, this should run
// for each item. This way each "item" (i.e. timer) can properly have
// their own domain assigned.
- if (asyncFlags[0] > 0)
+ if (asyncFlags[kHasListener] > 0)
runAsyncQueue(item);
};
-function timerAddAsyncListener(obj) {
+// new Array() is used here because it is more efficient for sparse
+// arrays. Please *do not* change these to simple bracket notation.
+function timerAddAsyncListener(obj, data) {
+ obj = process.createAsyncListener(obj, data);
+
if (!this._asyncQueue)
- this._asyncQueue = [];
+ this._asyncQueue = new Array();
+ if (!this._asyncData)
+ this._asyncData = new Array();
+
+ var inQueue = false;
var queue = this._asyncQueue;
// This queue will be small. Probably always <= 3 items.
for (var i = 0; i < queue.length; i++) {
- if (queue[i].uid === obj.uid)
- return;
+ if (queue[i].uid === obj.uid) {
+ inQueue = true;
+ break;
+ }
}
- this._asyncQueue.push(obj);
+
+ if (!inQueue) {
+ queue.push(obj);
+ this._asyncData[obj.uid] = obj.data;
+ this._asyncFlags |= obj.flags;
+ }
+
+ return obj;
}
if (!this._asyncQueue)
return;
var queue = this._asyncQueue;
+ var i;
// This queue will be small. Probably always <= 3 items.
- for (var i = 0; i < queue.length; i++) {
+ for (i = 0; i < queue.length; i++) {
if (queue[i].uid === obj.uid) {
queue.splice(i, 1);
+ this._asyncData[obj.uid] = undefined;
return;
}
}
+ // Rebuild flags
+ this._asyncFlags = 0;
+ for (i = 0; i < queue.length; i++) {
+ this._asyncFlags |= queue[i].flags;
+ }
}
Immediate.prototype.domain = undefined;
Immediate.prototype._onImmediate = undefined;
Immediate.prototype._asyncQueue = undefined;
+Immediate.prototype._asyncData = undefined;
Immediate.prototype._idleNext = undefined;
Immediate.prototype._idlePrev = undefined;
+Immediate.prototype._asyncFlags = 0;
exports.setImmediate = function(callback) {
}
// setImmediates are handled more like nextTicks.
- if (asyncFlags[0] > 0)
+ if (asyncFlags[kHasListener] > 0)
runAsyncQueue(immediate);
if (process.domain)
immediate.domain = process.domain;
var diff, domain, first, hasQueue, threw;
while (first = L.peek(unrefList)) {
diff = now - first._idleStart;
- hasQueue = !!first._asyncQueue;
if (diff < first._idleTimeout) {
diff = first._idleTimeout - diff;
if (!first._onTimeout) continue;
if (domain && domain._disposed) continue;
+ hasQueue = !!first._asyncQueue;
try {
if (hasQueue)
};
startup.processAsyncListener = function() {
- var asyncStack = [];
- var asyncQueue = [];
- var uid = 0;
+ // new Array() is used here because it is more efficient for sparse
+ // arrays. Please *do not* change these to simple bracket notation.
+
+ // Track the active queue of AsyncListeners that have been added.
+ var asyncStack = new Array();
+ var asyncQueue = undefined;
+
+ // Keep the stack of all contexts that have been loaded in the
+ // execution chain of asynchronous events.
+ var contextStack = new Array();
+ var currentContext = undefined;
+
+ // Incremental uid for new AsyncListener instances.
+ var alUid = 0;
// Stateful flags shared with Environment for quick JS/C++
// communication.
var inErrorTick = false;
// Needs to be the same as src/env.h
- var kCount = 0;
+ var kHasListener = 0;
+
+ // Flags to determine what async listeners are available.
+ var HAS_CREATE_AL = 1 << 0;
+ var HAS_BEFORE_AL = 1 << 1;
+ var HAS_AFTER_AL = 1 << 2;
+ var HAS_ERROR_AL = 1 << 3;
// _errorHandler is scoped so it's also accessible by _fatalException.
_errorHandler = errorHandler;
pushListener,
stripListener);
- function popQueue() {
- if (asyncStack.length > 0)
- asyncQueue = asyncStack.pop();
- else
- asyncQueue = [];
+ // Load the currently executing context as the current context, and
+ // create a new asyncQueue that can receive any added queue items
+ // during the executing of the callback.
+ function loadContext(ctx) {
+ contextStack.push(currentContext);
+ currentContext = ctx;
+
+ asyncStack.push(asyncQueue);
+ asyncQueue = new Array();
+
+ asyncFlags[kHasListener] = 1;
+ }
+
+ function unloadContext() {
+ currentContext = contextStack.pop();
+ asyncQueue = asyncStack.pop();
+
+ if (typeof currentContext === 'undefined' &&
+ typeof asyncQueue === 'undefined')
+ asyncFlags[kHasListener] = 0;
}
// Run all the async listeners attached when an asynchronous event is
// instantiated.
function runAsyncQueue(context) {
- var queue = [];
- var queueItem, item, i, value;
+ var queue = new Array();
+ var data = new Array();
+ var ccQueue, i, item, queueItem, value;
+
+ context._asyncQueue = queue;
+ context._asyncData = data;
+ context._asyncFlags = 0;
inAsyncTick = true;
- for (i = 0; i < asyncQueue.length; i++) {
- queueItem = asyncQueue[i];
- if (!queueItem.callbacks.create) {
- queue[i] = queueItem;
- continue;
+
+ // First run through all callbacks in the currentContext. These may
+ // add new AsyncListeners to the asyncQueue during execution. Hence
+ // why they need to be evaluated first.
+ if (currentContext) {
+ ccQueue = currentContext._asyncQueue;
+ context._asyncFlags |= currentContext._asyncFlags;
+ for (i = 0; i < ccQueue.length; i++) {
+ queueItem = ccQueue[i];
+ queue[queue.length] = queueItem;
+ if ((queueItem.flags & HAS_CREATE_AL) === 0) {
+ data[queueItem.uid] = queueItem.data;
+ continue;
+ }
+ value = queueItem.create(queueItem.data);
+ data[queueItem.uid] = (value === undefined) ? queueItem.data : value;
}
- // Not passing "this" context because it hasn't actually been
- // instantiated yet, so accessing some of the object properties
- // can cause a segfault.
- // Passing the original value will allow users to manipulate the
- // original value object, while also allowing them to return a
- // new value for current async call tracking.
- value = queueItem.callbacks.create(queueItem.value);
- if (typeof value !== 'undefined') {
- item = {
- callbacks: queueItem.callbacks,
- value: value,
- uid: queueItem.uid
- };
- } else {
- item = queueItem;
+ }
+
+
+ // Then run through all items in the asyncQueue
+ if (asyncQueue) {
+ for (i = 0; i < asyncQueue.length; i++) {
+ queueItem = asyncQueue[i];
+ queue[queue.length] = queueItem;
+ context._asyncFlags |= queueItem.flags;
+ if ((queueItem.flags & HAS_CREATE_AL) === 0) {
+ data[queueItem.uid] = queueItem.data;
+ continue;
+ }
+ value = queueItem.create(queueItem.data);
+ data[queueItem.uid] = (value === undefined) ? queueItem.data : value;
}
- queue[i] = item;
}
- inAsyncTick = false;
- context._asyncQueue = queue;
+ inAsyncTick = false;
}
- // Uses the _asyncQueue object attached by runAsyncQueue.
+ // Load the AsyncListener queue attached to context and run all
+ // "before" callbacks, if they exist.
function loadAsyncQueue(context) {
- var queue = context._asyncQueue;
- var item, before, i;
+ loadContext(context);
- asyncStack.push(asyncQueue);
- asyncQueue = queue.slice();
- // Since the async listener callback is required, the number of
- // objects in the asyncQueue implies the number of async listeners
- // there are to be processed.
- asyncFlags[kCount] = queue.length;
+ if ((context._asyncFlags & HAS_BEFORE_AL) === 0)
+ return;
+
+ var queue = context._asyncQueue;
+ var data = context._asyncData;
+ var i, queueItem;
- // Run "before" callbacks.
inAsyncTick = true;
for (i = 0; i < queue.length; i++) {
- item = queue[i];
- if (!item.callbacks)
- continue;
- before = item.callbacks.before;
- if (typeof before === 'function')
- before(context, item.value);
+ queueItem = queue[i];
+ if ((queueItem.flags & HAS_BEFORE_AL) > 0)
+ queueItem.before(context, data[queueItem.uid]);
}
inAsyncTick = false;
}
- // Unload one level of the async stack. Returns true if there are
- // still listeners somewhere in the stack.
+ // Unload the AsyncListener queue attached to context and run all
+ // "after" callbacks, if they exist.
function unloadAsyncQueue(context) {
+ if ((context._asyncFlags & HAS_AFTER_AL) === 0) {
+ unloadContext();
+ return;
+ }
+
var queue = context._asyncQueue;
- var item, after, i;
+ var data = context._asyncData;
+ var i, queueItem;
- // Run "after" callbacks.
inAsyncTick = true;
for (i = 0; i < queue.length; i++) {
- item = queue[i];
- if (!item.callbacks)
- continue;
- after = item.callbacks.after;
- if (typeof after === 'function')
- after(context, item.value);
+ queueItem = queue[i];
+ if ((queueItem.flags & HAS_AFTER_AL) > 0)
+ queueItem.after(context, data[queueItem.uid]);
}
inAsyncTick = false;
- // Unload the current queue from the stack.
- popQueue();
+ unloadContext();
+ }
+
+ // Handle errors that are thrown while in the context of an
+ // AsyncListener. If an error is thrown from an AsyncListener
+ // callback error handlers will be called once more to report
+ // the error, then the application will die forcefully.
+ function errorHandler(er) {
+ if (inErrorTick)
+ return false;
+
+ var handled = false;
+ var i, queueItem, threw;
+
+ inErrorTick = true;
+
+ // First process error callbacks from the current context.
+ if (currentContext && (currentContext._asyncFlags & HAS_ERROR_AL) > 0) {
+ var queue = currentContext._asyncQueue;
+ var data = currentContext._asyncData;
+ for (i = 0; i < queue.length; i++) {
+ queueItem = queue[i];
+ if ((queueItem.flags & HAS_ERROR_AL) === 0)
+ continue;
+ try {
+ threw = true;
+ // While it would be possible to pass in currentContext, if
+ // the error is thrown from the "create" callback then there's
+ // a chance the object hasn't been fully constructed.
+ handled = queueItem.error(data[queueItem.uid], er) || handled;
+ threw = false;
+ } finally {
+ // If the error callback thew then die quickly. Only allow the
+ // exit events to be processed.
+ if (threw) {
+ process._exiting = true;
+ process.emit('exit', 1);
+ }
+ }
+ }
+ }
+
+ // Now process callbacks from any existing queue.
+ if (asyncQueue) {
+ for (i = 0; i < asyncQueue.length; i++) {
+ queueItem = asyncQueue[i];
+ if ((queueItem.flags & HAS_ERROR_AL) === 0)
+ continue;
+ try {
+ threw = true;
+ handled = queueItem.error(queueItem.data, er) || handled;
+ threw = false;
+ } finally {
+ // If the error callback thew then die quickly. Only allow the
+ // exit events to be processed.
+ if (threw) {
+ process._exiting = true;
+ process.emit('exit', 1);
+ }
+ }
+ }
+ }
+
+ inErrorTick = false;
+
+ unloadContext();
+
+ // TODO(trevnorris): If the error was handled, should the after callbacks
+ // be fired anyways?
- asyncFlags[kCount] = asyncQueue.length;
+ return handled && !inAsyncTick;
+ }
+
+ // Instance function of an AsyncListener object.
+ function AsyncListenerInst(callbacks, data) {
+ if (typeof callbacks.create === 'function') {
+ this.create = callbacks.create;
+ this.flags |= HAS_CREATE_AL;
+ }
+ if (typeof callbacks.before === 'function') {
+ this.before = callbacks.before;
+ this.flags |= HAS_BEFORE_AL;
+ }
+ if (typeof callbacks.after === 'function') {
+ this.after = callbacks.after;
+ this.flags |= HAS_AFTER_AL;
+ }
+ if (typeof callbacks.error === 'function') {
+ this.error = callbacks.error;
+ this.flags |= HAS_ERROR_AL;
+ }
- return asyncQueue.length > 0 || asyncStack.length > 0;
+ this.uid = ++alUid;
+ this.data = data;
}
+ AsyncListenerInst.prototype.create = undefined;
+ AsyncListenerInst.prototype.before = undefined;
+ AsyncListenerInst.prototype.after = undefined;
+ AsyncListenerInst.prototype.error = undefined;
+ AsyncListenerInst.prototype.uid = 0;
+ AsyncListenerInst.prototype.flags = 0;
// Create new async listener object. Useful when instantiating a new
// object and want the listener instance, but not add it to the stack.
- function createAsyncListener(callbacks, value) {
- return {
- callbacks: callbacks,
- value: value,
- uid: uid++
- };
+ // If an existing AsyncListenerInst is passed then any new "data" is
+ // ignored.
+ function createAsyncListener(callbacks, data) {
+ if (typeof callbacks !== 'object' || callbacks == null)
+ throw new TypeError('callbacks argument must be an object');
+
+ if (callbacks instanceof AsyncListenerInst)
+ return callbacks;
+ else
+ return new AsyncListenerInst(callbacks, data);
}
// Add a listener to the current queue.
- function addAsyncListener(callbacks, value) {
- // Accept new listeners or previous created listeners.
- if (typeof callbacks.uid !== 'number')
- callbacks = createAsyncListener(callbacks, value);
+ function addAsyncListener(callbacks, data) {
+ if (!asyncQueue) {
+ asyncStack.push(asyncQueue);
+ asyncQueue = new Array();
+ }
+
+ // Fast track if a new AsyncListenerInst has to be created.
+ if (!(callbacks instanceof AsyncListenerInst)) {
+ callbacks = createAsyncListener(callbacks, data);
+ asyncQueue.push(callbacks);
+ asyncFlags[kHasListener] = 1;
+ return callbacks;
+ }
var inQueue = false;
// The asyncQueue will be small. Probably always <= 3 items.
}
// Make sure the callback doesn't already exist in the queue.
- if (!inQueue)
+ if (!inQueue) {
asyncQueue.push(callbacks);
+ asyncFlags[kHasListener] = 1;
+ }
- asyncFlags[kCount] = asyncQueue.length;
return callbacks;
}
function removeAsyncListener(obj) {
var i, j;
- for (i = 0; i < asyncQueue.length; i++) {
- if (obj.uid === asyncQueue[i].uid) {
- asyncQueue.splice(i, 1);
- break;
+ if (asyncQueue) {
+ for (i = 0; i < asyncQueue.length; i++) {
+ if (obj.uid === asyncQueue[i].uid) {
+ asyncQueue.splice(i, 1);
+ break;
+ }
}
}
+ // TODO(trevnorris): Why remove the AL from the entire stack?
for (i = 0; i < asyncStack.length; i++) {
+ if (asyncStack[i] === undefined)
+ continue;
for (j = 0; j < asyncStack[i].length; j++) {
if (obj.uid === asyncStack[i][j].uid) {
asyncStack[i].splice(j, 1);
}
}
- asyncFlags[kCount] = asyncQueue.length;
- }
-
- // Error handler used by _fatalException to run through all error
- // callbacks in the current asyncQueue.
- function errorHandler(er) {
- var handled = false;
- var error, item, i;
-
- if (inErrorTick)
- return false;
-
- inErrorTick = true;
- for (i = 0; i < asyncQueue.length; i++) {
- item = asyncQueue[i];
- if (!item.callbacks)
- continue;
- error = item.callbacks.error;
- if (typeof error === 'function') {
- try {
- var threw = true;
- handled = error(item.value, er) || handled;
- threw = false;
- } finally {
- // If the error callback throws then we're going to die
- // quickly with no chance of recovery. Only thing we're going
- // to allow is execution of process exit event callbacks.
- if (threw) {
- process._exiting = true;
- process.emit('exit', 1);
- }
- }
- }
- }
- inErrorTick = false;
-
- // Unload the current queue from the stack.
- popQueue();
-
- return handled && !inAsyncTick;
+ if ((asyncQueue && asyncQueue.length > 0) ||
+ (currentContext && currentContext._asyncQueue.length))
+ asyncFlags[kHasListener] = 1;
}
// Used by AsyncWrap::AddAsyncListener() to add an individual listener
// to the async queue. It will check the uid of the listener and only
// allow it to be added once.
function pushListener(obj) {
- if (!this._asyncQueue)
- this._asyncQueue = [];
+ if (!this._asyncQueue) {
+ this._asyncQueue = [obj];
+ this._asyncData = new Array();
+ this._asyncData[obj.uid] = obj.data;
+ this._asyncFlags = obj.flags;
+ return;
+ }
+
+ if (!this._asyncData)
+ this._asyncData = new Array();
var queue = this._asyncQueue;
var inQueue = false;
}
}
- if (!inQueue)
+ // Not in the queue so push it on and set the default storage.
+ if (!inQueue) {
queue.push(obj);
+ this._asyncData[obj.uid] = obj.data;
+ this._asyncFlags |= obj.flags;
+ }
}
// Used by AsyncWrap::RemoveAsyncListener() to remove an individual
// listener from the async queue, and return whether there are still
// listeners in the queue.
function stripListener(obj) {
- if (!this._asyncQueue || this._asyncQueue.length === 0)
+ // No queue exists, so nothing to do.
+ if (!this._asyncQueue)
return false;
+ var queue = this._asyncQueue;
+
// The asyncQueue will be small. Probably always <= 3 items.
- for (var i = 0; i < this._asyncQueue.length; i++) {
- if (obj.uid === this._asyncQueue[i].uid) {
- this._asyncQueue.splice(i, 1);
- break;
+ for (var i = 0; i < queue.length; i++) {
+ if (obj.uid === queue[i].uid) {
+ this._asyncData[queue[i].uid] = undefined;
+ queue.splice(i, 1);
+ return queue.length > 0;
}
}
-
- return this._asyncQueue.length > 0;
}
};