See `kill(2)`
-### child.send(message[, sendHandle])
+### child.send(message[, sendHandle][, callback])
* `message` {Object}
* `sendHandle` {Handle object}
+* `callback` {Function}
+* Return: Boolean
When using `child_process.fork()` you can write to the child using
-`child.send(message, [sendHandle])` and messages are received by
+`child.send(message[, sendHandle][, callback])` and messages are received by
a `'message'` event on the child.
For example:
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.
-Please note that the `send()` method on both the parent and child are
-synchronous - sending large chunks of data is not advised (pipes can be used
-instead, see
-[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).
-
There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
containing a `NODE_` prefix in its `cmd` property will not be emitted in
the `message` event, since they are internal messages used by Node.js core.
socket object to another process. The child will receive the object as its
second argument to the `message` event.
-Emits an `'error'` event if the message cannot be sent, for example because
-the child process has already exited.
+The `callback` option is a function that is invoked after the message is
+sent but before the target may have received it. It is called with a single
+argument: `null` on success, or an `Error` object on failure.
+
+`child.send()` emits an `'error'` event if no callback was given and the message
+cannot be sent, for example because the child process has already exited.
+
+Returns `true` under normal circumstances or `false` when the backlog of
+unsent messages exceeds a threshold that makes it unwise to send more.
+Use the callback mechanism to implement flow control.
#### Example: sending server object
target._channel = channel;
target._handleQueue = null;
+ const control = new class extends EventEmitter {
+ constructor() {
+ super();
+ this.channel = channel;
+ this.refs = 0;
+ }
+ ref() {
+ if (++this.refs === 1) {
+ this.channel.ref();
+ }
+ }
+ unref() {
+ if (--this.refs === 0) {
+ this.channel.unref();
+ this.emit('unref');
+ }
+ }
+ };
+
var decoder = new StringDecoder('utf8');
var jsonBuffer = '';
channel.buffering = false;
target._handleQueue = null;
queue.forEach(function(args) {
- target._send(args.message, args.handle, false);
+ target._send(args.message, args.handle, false, args.callback);
});
// Process a pending disconnect (if any).
});
});
- target.send = function(message, handle) {
- if (!this.connected)
- this.emit('error', new Error('channel closed'));
- else
- this._send(message, handle, false);
+ target.send = function(message, handle, callback) {
+ if (typeof handle === 'function') {
+ callback = handle;
+ handle = undefined;
+ }
+ if (this.connected) {
+ this._send(message, handle, false, callback);
+ return;
+ }
+ const ex = new Error('channel closed');
+ if (typeof callback === 'function') {
+ process.nextTick(callback, ex);
+ } else {
+ this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
+ }
};
- target._send = function(message, handle, swallowErrors) {
+ target._send = function(message, handle, swallowErrors, callback) {
assert(this.connected || this._channel);
if (message === undefined)
// Queue-up message and handle if we haven't received ACK yet.
if (this._handleQueue) {
- this._handleQueue.push({ message: message.msg, handle: handle });
+ this._handleQueue.push({
+ callback: callback,
+ handle: handle,
+ message: message.msg,
+ });
return;
}
} else if (this._handleQueue &&
!(message && message.cmd === 'NODE_HANDLE_ACK')) {
// Queue request anyway to avoid out-of-order messages.
- this._handleQueue.push({ message: message, handle: null });
+ this._handleQueue.push({
+ callback: callback,
+ handle: null,
+ message: message,
+ });
return;
}
var req = new WriteWrap();
- req.oncomplete = nop;
+ req.async = false;
+
var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);
- if (err) {
- if (!swallowErrors)
- this.emit('error', errnoException(err, 'write'));
- } else if (handle && !this._handleQueue) {
- this._handleQueue = [];
- }
-
- if (obj && obj.postSend) {
- req.oncomplete = obj.postSend.bind(null, handle);
+ if (err === 0) {
+ if (handle && !this._handleQueue)
+ this._handleQueue = [];
+ req.oncomplete = function() {
+ if (this.async === true)
+ control.unref();
+ if (obj && obj.postSend)
+ obj.postSend(handle);
+ if (typeof callback === 'function')
+ callback(null);
+ };
+ if (req.async === true) {
+ control.ref();
+ } else {
+ process.nextTick(function() { req.oncomplete(); });
+ }
+ } else if (!swallowErrors) {
+ const ex = errnoException(err, 'write');
+ if (typeof callback === 'function') {
+ process.nextTick(callback, ex);
+ } else {
+ this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
+ }
}
/* If the master is > 2 read() calls behind, please stop sending. */
};
channel.readStart();
+ return control;
}
--- /dev/null
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const fork = require('child_process').fork;
+
+if (process.argv[2] === 'child') {
+ process.send('ok', common.mustCall(function(err) {
+ assert.strictEqual(err, null);
+ }));
+} else {
+ const child = fork(process.argv[1], ['child']);
+ child.on('message', common.mustCall(function(message) {
+ assert.strictEqual(message, 'ok');
+ }));
+ child.on('exit', common.mustCall(function(exitCode, signalCode) {
+ assert.strictEqual(exitCode, 0);
+ assert.strictEqual(signalCode, null);
+ }));
+}