child_process: add callback parameter to .send()
authorBen Noordhuis <info@bnoordhuis.nl>
Sun, 30 Aug 2015 22:49:34 +0000 (00:49 +0200)
committerRod Vagg <rod@vagg.org>
Sun, 6 Sep 2015 11:38:58 +0000 (21:38 +1000)
Add an optional callback parameter to `ChildProcess.prototype.send()`
that is invoked when the message has been sent.

Juggle the control channel's reference count so that in-flight messages
keep the event loop (and therefore the process) alive until they have
been sent.

`ChildProcess.prototype.send()` and `process.send()` used to operate
synchronously but became asynchronous in commit libuv/libuv@393c1c5
("unix: set non-block mode in uv_{pipe,tcp,udp}_open"), which landed
in io.js in commit 07bd05b ("deps: update libuv to 1.2.1").

Fixes: https://github.com/nodejs/node/issues/760
PR-URL: https://github.com/nodejs/node/pull/2620
Reviewed-By: trevnorris - Trevor Norris <trev.norris@gmail.com>
Reviewed-By: jasnell - James M Snell <jasnell@gmail.com>
doc/api/child_process.markdown
doc/api/cluster.markdown
lib/child_process.js
lib/internal/child_process.js
test/parallel/test-child-process-send-cb.js [new file with mode: 0644]

index 0530806..31ac724 100644 (file)
@@ -214,13 +214,15 @@ to a process.
 
 See `kill(2)`
 
-### child.send(message[, sendHandle])
+### child.send(message[, sendHandle][, callback])
 
 * `message` {Object}
 * `sendHandle` {Handle object}
+* `callback` {Function}
+* Return: Boolean
 
 When using `child_process.fork()` you can write to the child using
-`child.send(message, [sendHandle])` and messages are received by
+`child.send(message[, sendHandle][, callback])` and messages are received by
 a `'message'` event on the child.
 
 For example:
@@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this:
 In the child the `process` object will have a `send()` method, and `process`
 will emit objects each time it receives a message on its channel.
 
-Please note that the `send()` method on both the parent and child are
-synchronous - sending large chunks of data is not advised (pipes can be used
-instead, see
-[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).
-
 There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
 containing a `NODE_` prefix in its `cmd` property will not be emitted in
 the `message` event, since they are internal messages used by Node.js core.
@@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
 socket object to another process. The child will receive the object as its
 second argument to the `message` event.
 
-Emits an `'error'` event if the message cannot be sent, for example because
-the child process has already exited.
+The `callback` option is a function that is invoked after the message is
+sent but before the target may have received it.  It is called with a single
+argument: `null` on success, or an `Error` object on failure.
+
+`child.send()` emits an `'error'` event if no callback was given and the message
+cannot be sent, for example because the child process has already exited.
+
+Returns `true` under normal circumstances or `false` when the backlog of
+unsent messages exceeds a threshold that makes it unwise to send more.
+Use the callback mechanism to implement flow control.
 
 #### Example: sending server object
 
index 53f6e0a..d29d597 100644 (file)
@@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value.
     // kill worker
     worker.kill();
 
-### worker.send(message[, sendHandle])
+### worker.send(message[, sendHandle][, callback])
 
 * `message` {Object}
 * `sendHandle` {Handle object}
+* `callback` {Function}
+* Return: Boolean
 
 Send a message to a worker or master, optionally with a handle.
 
index 57a8601..a923477 100644 (file)
@@ -48,16 +48,12 @@ exports._forkChild = function(fd) {
   var p = new Pipe(true);
   p.open(fd);
   p.unref();
-  setupChannel(process, p);
-
-  var refs = 0;
+  const control = setupChannel(process, p);
   process.on('newListener', function(name) {
-    if (name !== 'message' && name !== 'disconnect') return;
-    if (++refs === 1) p.ref();
+    if (name === 'message' || name === 'disconnect') control.ref();
   });
   process.on('removeListener', function(name) {
-    if (name !== 'message' && name !== 'disconnect') return;
-    if (--refs === 0) p.unref();
+    if (name === 'message' || name === 'disconnect') control.unref();
   });
 };
 
index c6bb41f..6d1c22d 100644 (file)
@@ -397,6 +397,25 @@ function setupChannel(target, channel) {
   target._channel = channel;
   target._handleQueue = null;
 
+  const control = new class extends EventEmitter {
+    constructor() {
+      super();
+      this.channel = channel;
+      this.refs = 0;
+    }
+    ref() {
+      if (++this.refs === 1) {
+        this.channel.ref();
+      }
+    }
+    unref() {
+      if (--this.refs === 0) {
+        this.channel.unref();
+        this.emit('unref');
+      }
+    }
+  };
+
   var decoder = new StringDecoder('utf8');
   var jsonBuffer = '';
   channel.buffering = false;
@@ -446,7 +465,7 @@ function setupChannel(target, channel) {
       target._handleQueue = null;
 
       queue.forEach(function(args) {
-        target._send(args.message, args.handle, false);
+        target._send(args.message, args.handle, false, args.callback);
       });
 
       // Process a pending disconnect (if any).
@@ -478,14 +497,24 @@ function setupChannel(target, channel) {
     });
   });
 
-  target.send = function(message, handle) {
-    if (!this.connected)
-      this.emit('error', new Error('channel closed'));
-    else
-      this._send(message, handle, false);
+  target.send = function(message, handle, callback) {
+    if (typeof handle === 'function') {
+      callback = handle;
+      handle = undefined;
+    }
+    if (this.connected) {
+      this._send(message, handle, false, callback);
+      return;
+    }
+    const ex = new Error('channel closed');
+    if (typeof callback === 'function') {
+      process.nextTick(callback, ex);
+    } else {
+      this.emit('error', ex);  // FIXME(bnoordhuis) Defer to next tick.
+    }
   };
 
-  target._send = function(message, handle, swallowErrors) {
+  target._send = function(message, handle, swallowErrors, callback) {
     assert(this.connected || this._channel);
 
     if (message === undefined)
@@ -516,7 +545,11 @@ function setupChannel(target, channel) {
 
       // Queue-up message and handle if we haven't received ACK yet.
       if (this._handleQueue) {
-        this._handleQueue.push({ message: message.msg, handle: handle });
+        this._handleQueue.push({
+          callback: callback,
+          handle: handle,
+          message: message.msg,
+        });
         return;
       }
 
@@ -538,24 +571,43 @@ function setupChannel(target, channel) {
     } else if (this._handleQueue &&
                !(message && message.cmd === 'NODE_HANDLE_ACK')) {
       // Queue request anyway to avoid out-of-order messages.
-      this._handleQueue.push({ message: message, handle: null });
+      this._handleQueue.push({
+        callback: callback,
+        handle: null,
+        message: message,
+      });
       return;
     }
 
     var req = new WriteWrap();
-    req.oncomplete = nop;
+    req.async = false;
+
     var string = JSON.stringify(message) + '\n';
     var err = channel.writeUtf8String(req, string, handle);
 
-    if (err) {
-      if (!swallowErrors)
-        this.emit('error', errnoException(err, 'write'));
-    } else if (handle && !this._handleQueue) {
-      this._handleQueue = [];
-    }
-
-    if (obj && obj.postSend) {
-      req.oncomplete = obj.postSend.bind(null, handle);
+    if (err === 0) {
+      if (handle && !this._handleQueue)
+        this._handleQueue = [];
+      req.oncomplete = function() {
+        if (this.async === true)
+          control.unref();
+        if (obj && obj.postSend)
+          obj.postSend(handle);
+        if (typeof callback === 'function')
+          callback(null);
+      };
+      if (req.async === true) {
+        control.ref();
+      } else {
+        process.nextTick(function() { req.oncomplete(); });
+      }
+    } else if (!swallowErrors) {
+      const ex = errnoException(err, 'write');
+      if (typeof callback === 'function') {
+        process.nextTick(callback, ex);
+      } else {
+        this.emit('error', ex);  // FIXME(bnoordhuis) Defer to next tick.
+      }
     }
 
     /* If the master is > 2 read() calls behind, please stop sending. */
@@ -616,6 +668,7 @@ function setupChannel(target, channel) {
   };
 
   channel.readStart();
+  return control;
 }
 
 
diff --git a/test/parallel/test-child-process-send-cb.js b/test/parallel/test-child-process-send-cb.js
new file mode 100644 (file)
index 0000000..d65a1ab
--- /dev/null
@@ -0,0 +1,19 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const fork = require('child_process').fork;
+
+if (process.argv[2] === 'child') {
+  process.send('ok', common.mustCall(function(err) {
+    assert.strictEqual(err, null);
+  }));
+} else {
+  const child = fork(process.argv[1], ['child']);
+  child.on('message', common.mustCall(function(message) {
+    assert.strictEqual(message, 'ok');
+  }));
+  child.on('exit', common.mustCall(function(exitCode, signalCode) {
+    assert.strictEqual(exitCode, 0);
+    assert.strictEqual(signalCode, null);
+  }));
+}