1 // Copyright Joyent, Inc. and other Node contributors.
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
22 var StringDecoder = require('string_decoder').StringDecoder;
23 var EventEmitter = require('events').EventEmitter;
24 var net = require('net');
25 var dgram = require('dgram');
26 var assert = require('assert');
27 var util = require('util');
29 var Process = process.binding('process_wrap').Process;
30 var uv = process.binding('uv');
32 var constants; // Lazy-loaded process.binding('constants')
34 var errnoException = util._errnoException;
37 function handleWrapGetter(name, callback) {
40 Object.defineProperty(handleWraps, name, {
42 if (!util.isUndefined(cons)) return cons;
43 return cons = callback();
48 handleWrapGetter('Pipe', function() {
49 return process.binding('pipe_wrap').Pipe;
52 handleWrapGetter('TTY', function() {
53 return process.binding('tty_wrap').TTY;
56 handleWrapGetter('TCP', function() {
57 return process.binding('tcp_wrap').TCP;
60 handleWrapGetter('UDP', function() {
61 return process.binding('udp_wrap').UDP;
64 // constructors for lazy loading
65 function createPipe(ipc) {
66 return new handleWraps.Pipe(ipc);
69 function createSocket(pipe, readable) {
70 var s = new net.Socket({ handle: pipe });
84 // this object contain function to convert TCP objects to native handle objects
86 var handleConversion = {
88 simultaneousAccepts: true,
90 send: function(message, handle) {
94 got: function(message, handle, emit) {
100 simultaneousAccepts: true,
102 send: function(message, server) {
103 return server._handle;
106 got: function(message, handle, emit) {
107 var server = new net.Server();
108 server.listen(handle, function() {
115 send: function(message, socket) {
116 // if the socket was created by net.Server
118 // the slave should keep track of the socket
119 message.key = socket.server._connectionKey;
121 var firstTime = !this._channel.sockets.send[message.key];
122 var socketList = getSocketList('send', this, message.key);
124 // the server should no longer expose a .connection property
125 // and when asked to close it should query the socket status from
127 if (firstTime) socket.server._setupSlave(socketList);
129 // Act like socket is detached
130 socket.server._connections--;
133 // remove handle from socket object, it will be closed when the socket
135 var handle = socket._handle;
136 handle.onread = function() {};
137 socket._handle = null;
142 postSend: function(handle) {
143 // Close the Socket handle after sending it
147 got: function(message, handle, emit) {
148 var socket = new net.Socket({handle: handle});
149 socket.readable = socket.writable = true;
151 // if the socket was created by net.Server we will track the socket
154 // add socket to connections list
155 var socketList = getSocketList('got', this, message.key);
166 simultaneousAccepts: false,
168 send: function(message, handle) {
172 got: function(message, handle, emit) {
178 simultaneousAccepts: false,
180 send: function(message, socket) {
181 message.dgramType = socket.type;
183 return socket._handle;
186 got: function(message, handle, emit) {
187 var socket = new dgram.Socket(message.dgramType);
189 socket.bind(handle, function() {
196 // This object keep track of the socket there are sended
197 function SocketListSend(slave, key) {
198 EventEmitter.call(this);
203 util.inherits(SocketListSend, EventEmitter);
205 SocketListSend.prototype._request = function(msg, cmd, callback) {
208 if (!this.slave.connected) return onclose();
209 this.slave.send(msg);
212 self.slave.removeListener('internalMessage', onreply);
213 callback(new Error('Slave closed before reply'));
216 function onreply(msg) {
217 if (!(msg.cmd === cmd && msg.key === self.key)) return;
218 self.slave.removeListener('disconnect', onclose);
219 self.slave.removeListener('internalMessage', onreply);
224 this.slave.once('disconnect', onclose);
225 this.slave.on('internalMessage', onreply);
228 SocketListSend.prototype.close = function close(callback) {
230 cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
232 }, 'NODE_SOCKET_ALL_CLOSED', callback);
235 SocketListSend.prototype.getConnections = function getConnections(callback) {
237 cmd: 'NODE_SOCKET_GET_COUNT',
239 }, 'NODE_SOCKET_COUNT', function(err, msg) {
240 if (err) return callback(err);
241 callback(null, msg.count);
245 // This object keep track of the socket there are received
246 function SocketListReceive(slave, key) {
247 EventEmitter.call(this);
251 this.connections = 0;
256 if (!self.slave.connected) return;
259 cmd: 'NODE_SOCKET_ALL_CLOSED',
264 this.slave.on('internalMessage', function(msg) {
265 if (msg.key !== self.key) return;
267 if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
269 if (self.connections === 0) return onempty();
271 // Wait for sockets to get closed
272 self.once('empty', onempty);
273 } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
274 if (!self.slave.connected) return;
276 cmd: 'NODE_SOCKET_COUNT',
278 count: self.connections
283 util.inherits(SocketListReceive, EventEmitter);
285 SocketListReceive.prototype.add = function(obj) {
290 // Notify previous owner of socket about its state change
291 obj.socket.once('close', function() {
294 if (self.connections === 0) self.emit('empty');
298 function getSocketList(type, slave, key) {
299 var sockets = slave._channel.sockets[type];
300 var socketList = sockets[key];
302 var Construct = type === 'send' ? SocketListSend : SocketListReceive;
303 socketList = sockets[key] = new Construct(slave, key);
308 var INTERNAL_PREFIX = 'NODE_';
309 function handleMessage(target, message, handle) {
310 var eventName = 'message';
311 if (!util.isNull(message) &&
312 util.isObject(message) &&
313 util.isString(message.cmd) &&
314 message.cmd.length > INTERNAL_PREFIX.length &&
315 message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) {
316 eventName = 'internalMessage';
318 target.emit(eventName, message, handle);
321 function setupChannel(target, channel) {
322 target._channel = channel;
323 target._handleQueue = null;
325 var decoder = new StringDecoder('utf8');
327 channel.buffering = false;
328 channel.onread = function(nread, pool, recvHandle) {
329 // TODO(bnoordhuis) Check that nread > 0.
331 jsonBuffer += decoder.write(pool);
335 //Linebreak is used as a message end sign
336 while ((i = jsonBuffer.indexOf('\n', start)) >= 0) {
337 var json = jsonBuffer.slice(start, i);
338 var message = JSON.parse(json);
340 // There will be at most one NODE_HANDLE message in every chunk we
341 // read because SCM_RIGHTS messages don't get coalesced. Make sure
342 // that we deliver the handle with the right message however.
343 if (message && message.cmd === 'NODE_HANDLE')
344 handleMessage(target, message, recvHandle);
346 handleMessage(target, message, undefined);
350 jsonBuffer = jsonBuffer.slice(start);
351 this.buffering = jsonBuffer.length !== 0;
354 this.buffering = false;
356 channel.onread = nop;
362 // object where socket lists will live
363 channel.sockets = { got: {}, send: {} };
365 // handlers will go through this
366 target.on('internalMessage', function(message, handle) {
367 // Once acknowledged - continue sending handles.
368 if (message.cmd === 'NODE_HANDLE_ACK') {
369 assert(util.isArray(target._handleQueue));
370 var queue = target._handleQueue;
371 target._handleQueue = null;
372 queue.forEach(function(args) {
373 target.send(args.message, args.handle);
378 if (message.cmd !== 'NODE_HANDLE') return;
380 // Acknowledge handle receival.
381 target.send({ cmd: 'NODE_HANDLE_ACK' });
383 var obj = handleConversion[message.type];
385 // Update simultaneous accepts on Windows
386 if (process.platform === 'win32') {
387 handle._simultaneousAccepts = false;
388 net._setSimultaneousAccepts(handle);
391 // Convert handle object
392 obj.got.call(this, message, handle, function(handle) {
393 handleMessage(target, message.msg, handle);
397 target.send = function(message, handle) {
398 if (util.isUndefined(message)) {
399 throw new TypeError('message cannot be undefined');
402 if (!this.connected) {
403 this.emit('error', new Error('channel closed'));
407 // package messages with a handle object
409 // this message will be handled by an internalMessage event handler
416 if (handle instanceof net.Socket) {
417 message.type = 'net.Socket';
418 } else if (handle instanceof net.Server) {
419 message.type = 'net.Server';
420 } else if (handle instanceof process.binding('tcp_wrap').TCP ||
421 handle instanceof process.binding('pipe_wrap').Pipe) {
422 message.type = 'net.Native';
423 } else if (handle instanceof dgram.Socket) {
424 message.type = 'dgram.Socket';
425 } else if (handle instanceof process.binding('udp_wrap').UDP) {
426 message.type = 'dgram.Native';
428 throw new TypeError("This handle type can't be sent");
431 // Queue-up message and handle if we haven't received ACK yet.
432 if (this._handleQueue) {
433 this._handleQueue.push({ message: message.msg, handle: handle });
437 var obj = handleConversion[message.type];
439 // convert TCP object to native handle object
440 handle = handleConversion[message.type].send.apply(target, arguments);
442 // Update simultaneous accepts on Windows
443 if (obj.simultaneousAccepts) {
444 net._setSimultaneousAccepts(handle);
446 } else if (this._handleQueue) {
447 // Queue request anyway to avoid out-of-order messages.
448 this._handleQueue.push({ message: message, handle: null });
452 var req = { oncomplete: nop };
453 var string = JSON.stringify(message) + '\n';
454 var err = channel.writeUtf8String(req, string, handle);
457 this.emit('error', errnoException(err, 'write'));
458 } else if (handle && !this._handleQueue) {
459 this._handleQueue = [];
462 if (obj && obj.postSend) {
463 req.oncomplete = obj.postSend.bind(null, handle);
466 /* If the master is > 2 read() calls behind, please stop sending. */
467 return channel.writeQueueSize < (65536 * 2);
470 target.connected = true;
471 target.disconnect = function() {
472 if (!this.connected) {
473 this.emit('error', new Error('IPC channel is already disconnected'));
477 // do not allow messages to be written
478 this.connected = false;
479 this._channel = null;
487 target.emit('disconnect');
490 // If a message is being read, then wait for it to complete.
491 if (channel.buffering) {
492 this.once('message', finish);
493 this.once('internalMessage', finish);
498 process.nextTick(finish);
507 exports.fork = function(modulePath /*, args, options*/) {
509 // Get options and args arguments.
510 var options, args, execArgv;
511 if (util.isArray(arguments[1])) {
513 options = util._extend({}, arguments[2]);
516 options = util._extend({}, arguments[1]);
519 // Prepare arguments for fork:
520 execArgv = options.execArgv || process.execArgv;
521 args = execArgv.concat([modulePath], args);
523 // Leave stdin open for the IPC channel. stdout and stderr should be the
524 // same as the parent's if silent isn't set.
525 options.stdio = options.silent ? ['pipe', 'pipe', 'pipe', 'ipc'] :
528 options.execPath = options.execPath || process.execPath;
530 return spawn(options.execPath, args, options);
534 exports._forkChild = function(fd) {
535 // set process.send()
536 var p = createPipe(true);
539 setupChannel(process, p);
542 process.on('newListener', function(name) {
543 if (name !== 'message' && name !== 'disconnect') return;
544 if (++refs === 1) p.ref();
546 process.on('removeListener', function(name) {
547 if (name !== 'message' && name !== 'disconnect') return;
548 if (--refs === 0) p.unref();
553 exports.exec = function(command /*, options, callback */) {
554 var file, args, options, callback;
556 if (util.isFunction(arguments[1])) {
558 callback = arguments[1];
560 options = arguments[1];
561 callback = arguments[2];
564 if (process.platform === 'win32') {
566 args = ['/s', '/c', '"' + command + '"'];
567 // Make a shallow copy before patching so we don't clobber the user's
569 options = util._extend({}, options);
570 options.windowsVerbatimArguments = true;
573 args = ['-c', command];
576 if (options && options.shell)
577 file = options.shell;
579 return exports.execFile(file, args, options, callback);
583 exports.execFile = function(file /* args, options, callback */) {
588 maxBuffer: 200 * 1024,
589 killSignal: 'SIGTERM',
594 // Parse the parameters.
596 if (util.isFunction(arguments[arguments.length - 1])) {
597 callback = arguments[arguments.length - 1];
600 if (util.isArray(arguments[1])) {
602 options = util._extend(options, arguments[2]);
605 options = util._extend(options, arguments[1]);
608 var child = spawn(file, args, {
611 windowsVerbatimArguments: !!options.windowsVerbatimArguments
617 if (options.encoding !== 'buffer' && Buffer.isEncoding(options.encoding)) {
618 encoding = options.encoding;
634 function exithandler(code, signal) {
639 clearTimeout(timeoutId);
643 if (!callback) return;
649 stdout = Buffer.concat(_stdout);
650 stderr = Buffer.concat(_stderr);
657 callback(ex, stdout, stderr);
658 } else if (code === 0 && signal === null) {
659 callback(null, stdout, stderr);
661 ex = new Error('Command failed: ' + stderr);
662 ex.killed = child.killed || killed;
663 ex.code = code < 0 ? uv.errname(code) : code;
665 callback(ex, stdout, stderr);
669 function errorhandler(e) {
671 child.stdout.destroy();
672 child.stderr.destroy();
677 child.stdout.destroy();
678 child.stderr.destroy();
682 child.kill(options.killSignal);
689 if (options.timeout > 0) {
690 timeoutId = setTimeout(function() {
696 child.stdout.addListener('data', function(chunk) {
697 stdoutLen += chunk.length;
699 if (stdoutLen > options.maxBuffer) {
700 ex = new Error('stdout maxBuffer exceeded.');
710 child.stderr.addListener('data', function(chunk) {
711 stderrLen += chunk.length;
713 if (stderrLen > options.maxBuffer) {
714 ex = new Error('stderr maxBuffer exceeded.');
725 child.stderr.setEncoding(encoding);
726 child.stdout.setEncoding(encoding);
729 child.addListener('close', exithandler);
730 child.addListener('error', errorhandler);
736 var spawn = exports.spawn = function(file, args, options) {
737 args = args ? args.slice(0) : [];
740 var env = (options ? options.env : null) || process.env;
742 for (var key in env) {
743 envPairs.push(key + '=' + env[key]);
746 var child = new ChildProcess();
747 if (options && options.customFds && !options.stdio) {
748 options.stdio = options.customFds.map(function(fd) {
749 return fd === -1 ? 'pipe' : fd;
756 cwd: options ? options.cwd : null,
757 windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments),
758 detached: !!(options && options.detached),
760 stdio: options ? options.stdio : null,
761 uid: options ? options.uid : null,
762 gid: options ? options.gid : null
769 function maybeClose(subprocess) {
770 subprocess._closesGot++;
772 if (subprocess._closesGot == subprocess._closesNeeded) {
773 subprocess.emit('close', subprocess.exitCode, subprocess.signalCode);
778 function ChildProcess() {
779 EventEmitter.call(this);
781 // Initialize TCPWrap and PipeWrap
782 process.binding('tcp_wrap');
783 process.binding('pipe_wrap');
787 this._closesNeeded = 1;
789 this.connected = false;
791 this.signalCode = null;
792 this.exitCode = null;
795 this._handle = new Process();
796 this._handle.owner = this;
798 this._handle.onexit = function(exitCode, signalCode) {
800 // follow 0.4.x behaviour:
802 // - normally terminated processes don't touch this.signalCode
803 // - signaled processes don't touch this.exitCode
807 // - spawn failures are reported with exitCode < 0
809 var err = (exitCode < 0) ? errnoException(exitCode, 'spawn') : null;
812 self.signalCode = signalCode;
814 self.exitCode = exitCode;
818 self.stdin.destroy();
821 self._handle.close();
825 self.emit('error', err);
827 self.emit('exit', self.exitCode, self.signalCode);
830 // if any of the stdio streams have not been touched,
831 // then pull all the data through so that it can get the
832 // eof and emit a 'close' event.
833 // Do it on nextTick so that the user has one last chance
834 // to consume the output, if for example they only want to
835 // start reading the data once the process exits.
836 process.nextTick(function() {
843 util.inherits(ChildProcess, EventEmitter);
846 function flushStdio(subprocess) {
847 subprocess.stdio.forEach(function(stream, fd, stdio) {
848 if (!stream || !stream.readable || stream._consuming ||
849 stream._readableState.flowing)
857 function getHandleWrapType(stream) {
858 if (stream instanceof handleWraps.Pipe) return 'pipe';
859 if (stream instanceof handleWraps.TTY) return 'tty';
860 if (stream instanceof handleWraps.TCP) return 'tcp';
861 if (stream instanceof handleWraps.UDP) return 'udp';
867 ChildProcess.prototype.spawn = function(options) {
871 // If no `stdio` option was given - use default
872 stdio = options.stdio || 'pipe';
874 // Replace shortcut with an array
875 if (util.isString(stdio)) {
877 case 'ignore': stdio = ['ignore', 'ignore', 'ignore']; break;
878 case 'pipe': stdio = ['pipe', 'pipe', 'pipe']; break;
879 case 'inherit': stdio = [0, 1, 2]; break;
880 default: throw new TypeError('Incorrect value of stdio option: ' + stdio);
882 } else if (!util.isArray(stdio)) {
883 throw new TypeError('Incorrect value of stdio option: ' + stdio);
886 // At least 3 stdio will be created
887 // Don't concat() a new Array() because it would be sparse, and
888 // stdio.reduce() would skip the sparse elements of stdio.
889 // See http://stackoverflow.com/a/5501711/3561
890 while (stdio.length < 3) stdio.push(undefined);
892 // Translate stdio into C++-readable form
893 // (i.e. PipeWraps or fds)
894 stdio = stdio.reduce(function(acc, stdio, i) {
896 acc.filter(function(stdio) {
897 return stdio.type === 'pipe' || stdio.type === 'ipc';
898 }).forEach(function(stdio) {
899 stdio.handle.close();
904 if (util.isNullOrUndefined(stdio)) {
905 stdio = i < 3 ? 'pipe' : 'ignore';
908 if (stdio === 'ignore') {
909 acc.push({type: 'ignore'});
910 } else if (stdio === 'pipe' || util.isNumber(stdio) && stdio < 0) {
911 acc.push({type: 'pipe', handle: createPipe()});
912 } else if (stdio === 'ipc') {
913 if (!util.isUndefined(ipc)) {
914 // Cleanup previously created pipes
916 throw Error('Child process can have only one IPC pipe');
919 ipc = createPipe(true);
922 acc.push({ type: 'pipe', handle: ipc, ipc: true });
923 } else if (util.isNumber(stdio) || util.isNumber(stdio.fd)) {
924 acc.push({ type: 'fd', fd: stdio.fd || stdio });
925 } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
926 getHandleWrapType(stdio._handle)) {
927 var handle = getHandleWrapType(stdio) ?
929 getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;
933 wrapType: getHandleWrapType(handle),
939 throw new TypeError('Incorrect value for stdio stream: ' + stdio);
945 options.stdio = stdio;
947 if (!util.isUndefined(ipc)) {
948 // Let child process know about opened IPC channel
949 options.envPairs = options.envPairs || [];
950 options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
953 var err = this._handle.spawn(options);
956 // Close all opened fds on error
957 stdio.forEach(function(stdio) {
958 if (stdio.type === 'pipe') {
959 stdio.handle.close();
963 this._handle.close();
965 throw errnoException(err, 'spawn');
968 this.pid = this._handle.pid;
970 stdio.forEach(function(stdio, i) {
971 if (stdio.type === 'ignore') return;
974 self._closesNeeded++;
979 // when i === 0 - we're dealing with stdin
980 // (which is the only one writable pipe)
981 stdio.socket = createSocket(self.pid !== 0 ? stdio.handle : null, i > 0);
983 if (i > 0 && self.pid !== 0) {
984 self._closesNeeded++;
985 stdio.socket.on('close', function() {
992 this.stdin = stdio.length >= 1 && !util.isUndefined(stdio[0].socket) ?
993 stdio[0].socket : null;
994 this.stdout = stdio.length >= 2 && !util.isUndefined(stdio[1].socket) ?
995 stdio[1].socket : null;
996 this.stderr = stdio.length >= 3 && !util.isUndefined(stdio[2].socket) ?
997 stdio[2].socket : null;
999 this.stdio = stdio.map(function(stdio) {
1000 return util.isUndefined(stdio.socket) ? null : stdio.socket;
1003 // Add .send() method and start listening for IPC data
1004 if (!util.isUndefined(ipc)) setupChannel(this, ipc);
1010 ChildProcess.prototype.kill = function(sig) {
1014 constants = process.binding('constants');
1020 signal = constants['SIGTERM'];
1022 signal = constants[sig];
1025 if (util.isUndefined(signal)) {
1026 throw new Error('Unknown signal: ' + sig);
1030 var err = this._handle.kill(signal);
1036 if (err === uv.UV_ESRCH) {
1038 } else if (err === uv.UV_EINVAL || err === uv.UV_ENOSYS) {
1039 /* The underlying platform doesn't support this signal. */
1040 throw errnoException(err, 'kill');
1042 /* Other error, almost certainly EPERM. */
1043 this.emit('error', errnoException(err, 'kill'));
1047 /* Kill didn't succeed. */
1052 ChildProcess.prototype.ref = function() {
1053 if (this._handle) this._handle.ref();
1057 ChildProcess.prototype.unref = function() {
1058 if (this._handle) this._handle.unref();