diff --git a/lib/child_process.js b/lib/child_process.js index 34ce359f5b0ec5..d8d2ec6aacff56 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -1,508 +1,18 @@ 'use strict'; -const StringDecoder = require('string_decoder').StringDecoder; -const EventEmitter = require('events').EventEmitter; -const net = require('net'); -const dgram = require('dgram'); -const assert = require('assert'); const util = require('util'); const debug = util.debuglog('child_process'); const constants = require('constants'); -const Process = process.binding('process_wrap').Process; -const WriteWrap = process.binding('stream_wrap').WriteWrap; const uv = process.binding('uv'); const spawn_sync = process.binding('spawn_sync'); const Pipe = process.binding('pipe_wrap').Pipe; -const TTY = process.binding('tty_wrap').TTY; -const TCP = process.binding('tcp_wrap').TCP; -const UDP = process.binding('udp_wrap').UDP; +const child_process = require('internal/child_process'); const errnoException = util._errnoException; - -function createSocket(pipe, readable) { - var s = new net.Socket({ handle: pipe }); - - if (readable) { - s.writable = false; - s.readable = true; - } else { - s.writable = true; - s.readable = false; - } - - return s; -} - - -// this object contain function to convert TCP objects to native handle objects -// and back again. -const handleConversion = { - 'net.Native': { - simultaneousAccepts: true, - - send: function(message, handle) { - return handle; - }, - - got: function(message, handle, emit) { - emit(handle); - } - }, - - 'net.Server': { - simultaneousAccepts: true, - - send: function(message, server) { - return server._handle; - }, - - got: function(message, handle, emit) { - var server = new net.Server(); - server.listen(handle, function() { - emit(server); - }); - } - }, - - 'net.Socket': { - send: function(message, socket) { - if (!socket._handle) - return; - - // if the socket was created by net.Server - if (socket.server) { - // the slave should keep track of the socket - message.key = socket.server._connectionKey; - - var firstTime = !this._channel.sockets.send[message.key]; - var socketList = getSocketList('send', this, message.key); - - // the server should no longer expose a .connection property - // and when asked to close it should query the socket status from - // the slaves - if (firstTime) socket.server._setupSlave(socketList); - - // Act like socket is detached - socket.server._connections--; - } - - // remove handle from socket object, it will be closed when the socket - // will be sent - var handle = socket._handle; - handle.onread = function() {}; - socket._handle = null; - - return handle; - }, - - postSend: function(handle) { - // Close the Socket handle after sending it - if (handle) - handle.close(); - }, - - got: function(message, handle, emit) { - var socket = new net.Socket({handle: handle}); - socket.readable = socket.writable = true; - - // if the socket was created by net.Server we will track the socket - if (message.key) { - - // add socket to connections list - var socketList = getSocketList('got', this, message.key); - socketList.add({ - socket: socket - }); - } - - emit(socket); - } - }, - - 'dgram.Native': { - simultaneousAccepts: false, - - send: function(message, handle) { - return handle; - }, - - got: function(message, handle, emit) { - emit(handle); - } - }, - - 'dgram.Socket': { - simultaneousAccepts: false, - - send: function(message, socket) { - message.dgramType = socket.type; - - return socket._handle; - }, - - got: function(message, handle, emit) { - var socket = new dgram.Socket(message.dgramType); - - socket.bind(handle, function() { - emit(socket); - }); - } - } -}; - -// This object keep track of the socket there are sended -function SocketListSend(slave, key) { - EventEmitter.call(this); - - this.key = key; - this.slave = slave; -} -util.inherits(SocketListSend, EventEmitter); - -SocketListSend.prototype._request = function(msg, cmd, callback) { - var self = this; - - if (!this.slave.connected) return onclose(); - this.slave.send(msg); - - function onclose() { - self.slave.removeListener('internalMessage', onreply); - callback(new Error('Slave closed before reply')); - } - - function onreply(msg) { - if (!(msg.cmd === cmd && msg.key === self.key)) return; - self.slave.removeListener('disconnect', onclose); - self.slave.removeListener('internalMessage', onreply); - - callback(null, msg); - } - - this.slave.once('disconnect', onclose); - this.slave.on('internalMessage', onreply); -}; - -SocketListSend.prototype.close = function close(callback) { - this._request({ - cmd: 'NODE_SOCKET_NOTIFY_CLOSE', - key: this.key - }, 'NODE_SOCKET_ALL_CLOSED', callback); -}; - -SocketListSend.prototype.getConnections = function getConnections(callback) { - this._request({ - cmd: 'NODE_SOCKET_GET_COUNT', - key: this.key - }, 'NODE_SOCKET_COUNT', function(err, msg) { - if (err) return callback(err); - callback(null, msg.count); - }); -}; - -// This object keep track of the socket there are received -function SocketListReceive(slave, key) { - EventEmitter.call(this); - - var self = this; - - this.connections = 0; - this.key = key; - this.slave = slave; - - function onempty() { - if (!self.slave.connected) return; - - self.slave.send({ - cmd: 'NODE_SOCKET_ALL_CLOSED', - key: self.key - }); - } - - this.slave.on('internalMessage', function(msg) { - if (msg.key !== self.key) return; - - if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') { - // Already empty - if (self.connections === 0) return onempty(); - - // Wait for sockets to get closed - self.once('empty', onempty); - } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') { - if (!self.slave.connected) return; - self.slave.send({ - cmd: 'NODE_SOCKET_COUNT', - key: self.key, - count: self.connections - }); - } - }); -} -util.inherits(SocketListReceive, EventEmitter); - -SocketListReceive.prototype.add = function(obj) { - var self = this; - - this.connections++; - - // Notify previous owner of socket about its state change - obj.socket.once('close', function() { - self.connections--; - - if (self.connections === 0) self.emit('empty'); - }); -}; - -function getSocketList(type, slave, key) { - var sockets = slave._channel.sockets[type]; - var socketList = sockets[key]; - if (!socketList) { - var Construct = type === 'send' ? SocketListSend : SocketListReceive; - socketList = sockets[key] = new Construct(slave, key); - } - return socketList; -} - -const INTERNAL_PREFIX = 'NODE_'; -function handleMessage(target, message, handle) { - var eventName = 'message'; - if (message !== null && - typeof message === 'object' && - typeof message.cmd === 'string' && - message.cmd.length > INTERNAL_PREFIX.length && - message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) { - eventName = 'internalMessage'; - } - target.emit(eventName, message, handle); -} - -function setupChannel(target, channel) { - target._channel = channel; - target._handleQueue = null; - - var decoder = new StringDecoder('utf8'); - var jsonBuffer = ''; - channel.buffering = false; - channel.onread = function(nread, pool, recvHandle) { - // TODO(bnoordhuis) Check that nread > 0. - if (pool) { - jsonBuffer += decoder.write(pool); - - var i, start = 0; - - //Linebreak is used as a message end sign - while ((i = jsonBuffer.indexOf('\n', start)) >= 0) { - var json = jsonBuffer.slice(start, i); - var message = JSON.parse(json); - - // There will be at most one NODE_HANDLE message in every chunk we - // read because SCM_RIGHTS messages don't get coalesced. Make sure - // that we deliver the handle with the right message however. - if (message && message.cmd === 'NODE_HANDLE') - handleMessage(target, message, recvHandle); - else - handleMessage(target, message, undefined); - - start = i + 1; - } - jsonBuffer = jsonBuffer.slice(start); - this.buffering = jsonBuffer.length !== 0; - - } else { - this.buffering = false; - target.disconnect(); - channel.onread = nop; - channel.close(); - maybeClose(target); - } - }; - - // object where socket lists will live - channel.sockets = { got: {}, send: {} }; - - // handlers will go through this - target.on('internalMessage', function(message, handle) { - // Once acknowledged - continue sending handles. - if (message.cmd === 'NODE_HANDLE_ACK') { - assert(Array.isArray(target._handleQueue)); - var queue = target._handleQueue; - target._handleQueue = null; - - queue.forEach(function(args) { - target._send(args.message, args.handle, false); - }); - - // Process a pending disconnect (if any). - if (!target.connected && target._channel && !target._handleQueue) - target._disconnect(); - - return; - } - - if (message.cmd !== 'NODE_HANDLE') return; - - // Acknowledge handle receival. Don't emit error events (for example if - // the other side has disconnected) because this call to send() is not - // initiated by the user and it shouldn't be fatal to be unable to ACK - // a message. - target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); - - var obj = handleConversion[message.type]; - - // Update simultaneous accepts on Windows - if (process.platform === 'win32') { - handle._simultaneousAccepts = false; - net._setSimultaneousAccepts(handle); - } - - // Convert handle object - obj.got.call(this, message, handle, function(handle) { - handleMessage(target, message.msg, handle); - }); - }); - - target.send = function(message, handle) { - if (!this.connected) - this.emit('error', new Error('channel closed')); - else - this._send(message, handle, false); - }; - - target._send = function(message, handle, swallowErrors) { - assert(this.connected || this._channel); - - if (message === undefined) - throw new TypeError('message cannot be undefined'); - - // package messages with a handle object - if (handle) { - // this message will be handled by an internalMessage event handler - message = { - cmd: 'NODE_HANDLE', - type: null, - msg: message - }; - - if (handle instanceof net.Socket) { - message.type = 'net.Socket'; - } else if (handle instanceof net.Server) { - message.type = 'net.Server'; - } else if (handle instanceof TCP || handle instanceof Pipe) { - message.type = 'net.Native'; - } else if (handle instanceof dgram.Socket) { - message.type = 'dgram.Socket'; - } else if (handle instanceof UDP) { - message.type = 'dgram.Native'; - } else { - throw new TypeError("This handle type can't be sent"); - } - - // Queue-up message and handle if we haven't received ACK yet. - if (this._handleQueue) { - this._handleQueue.push({ message: message.msg, handle: handle }); - return; - } - - var obj = handleConversion[message.type]; - - // convert TCP object to native handle object - handle = - handleConversion[message.type].send.call(target, message, handle); - - // If handle was sent twice, or it is impossible to get native handle - // out of it - just send a text without the handle. - if (!handle) - message = message.msg; - - // Update simultaneous accepts on Windows - if (obj.simultaneousAccepts) { - net._setSimultaneousAccepts(handle); - } - } else if (this._handleQueue && - !(message && message.cmd === 'NODE_HANDLE_ACK')) { - // Queue request anyway to avoid out-of-order messages. - this._handleQueue.push({ message: message, handle: null }); - return; - } - - var req = new WriteWrap(); - req.oncomplete = nop; - var string = JSON.stringify(message) + '\n'; - var err = channel.writeUtf8String(req, string, handle); - - if (err) { - if (!swallowErrors) - this.emit('error', errnoException(err, 'write')); - } else if (handle && !this._handleQueue) { - this._handleQueue = []; - } - - if (obj && obj.postSend) { - req.oncomplete = obj.postSend.bind(null, handle); - } - - /* If the master is > 2 read() calls behind, please stop sending. */ - return channel.writeQueueSize < (65536 * 2); - }; - - // connected will be set to false immediately when a disconnect() is - // requested, even though the channel might still be alive internally to - // process queued messages. The three states are distinguished as follows: - // - disconnect() never requested: _channel is not null and connected - // is true - // - disconnect() requested, messages in the queue: _channel is not null - // and connected is false - // - disconnect() requested, channel actually disconnected: _channel is - // null and connected is false - target.connected = true; - - target.disconnect = function() { - if (!this.connected) { - this.emit('error', new Error('IPC channel is already disconnected')); - return; - } - - // Do not allow any new messages to be written. - this.connected = false; - - // If there are no queued messages, disconnect immediately. Otherwise, - // postpone the disconnect so that it happens internally after the - // queue is flushed. - if (!this._handleQueue) - this._disconnect(); - }; - - target._disconnect = function() { - assert(this._channel); - - // This marks the fact that the channel is actually disconnected. - this._channel = null; - - var fired = false; - function finish() { - if (fired) return; - fired = true; - - channel.close(); - target.emit('disconnect'); - } - - // If a message is being read, then wait for it to complete. - if (channel.buffering) { - this.once('message', finish); - this.once('internalMessage', finish); - - return; - } - - process.nextTick(finish); - }; - - channel.readStart(); -} - - -function nop() { } +const _validateStdio = child_process._validateStdio; +const setupChannel = child_process.setupChannel; +const ChildProcess = exports.ChildProcess = child_process.ChildProcess; exports.fork = function(modulePath /*, args, options*/) { @@ -770,119 +280,6 @@ function _convertCustomFds(options) { } } - -function _validateStdio(stdio, sync) { - var ipc, - ipcFd; - - // Replace shortcut with an array - if (typeof stdio === 'string') { - switch (stdio) { - case 'ignore': stdio = ['ignore', 'ignore', 'ignore']; break; - case 'pipe': stdio = ['pipe', 'pipe', 'pipe']; break; - case 'inherit': stdio = [0, 1, 2]; break; - default: throw new TypeError('Incorrect value of stdio option: ' + stdio); - } - } else if (!Array.isArray(stdio)) { - throw new TypeError('Incorrect value of stdio option: ' + - util.inspect(stdio)); - } - - // At least 3 stdio will be created - // Don't concat() a new Array() because it would be sparse, and - // stdio.reduce() would skip the sparse elements of stdio. - // See http://stackoverflow.com/a/5501711/3561 - while (stdio.length < 3) stdio.push(undefined); - - // Translate stdio into C++-readable form - // (i.e. PipeWraps or fds) - stdio = stdio.reduce(function(acc, stdio, i) { - function cleanup() { - acc.filter(function(stdio) { - return stdio.type === 'pipe' || stdio.type === 'ipc'; - }).forEach(function(stdio) { - if (stdio.handle) - stdio.handle.close(); - }); - } - - // Defaults - if (stdio === null || stdio === undefined) { - stdio = i < 3 ? 'pipe' : 'ignore'; - } - - if (stdio === null || stdio === 'ignore') { - acc.push({type: 'ignore'}); - } else if (stdio === 'pipe' || typeof stdio === 'number' && stdio < 0) { - var a = { - type: 'pipe', - readable: i === 0, - writable: i !== 0 - }; - - if (!sync) - a.handle = new Pipe(); - - acc.push(a); - } else if (stdio === 'ipc') { - if (sync || ipc !== undefined) { - // Cleanup previously created pipes - cleanup(); - if (!sync) - throw new Error('Child process can have only one IPC pipe'); - else - throw new Error('You cannot use IPC with synchronous forks'); - } - - ipc = new Pipe(true); - ipcFd = i; - - acc.push({ - type: 'pipe', - handle: ipc, - ipc: true - }); - } else if (stdio === 'inherit') { - acc.push({ - type: 'inherit', - fd: i - }); - } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { - acc.push({ - type: 'fd', - fd: stdio.fd || stdio - }); - } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || - getHandleWrapType(stdio._handle)) { - var handle = getHandleWrapType(stdio) ? - stdio : - getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; - - acc.push({ - type: 'wrap', - wrapType: getHandleWrapType(handle), - handle: handle - }); - } else if (stdio instanceof Buffer || typeof stdio === 'string') { - if (!sync) { - cleanup(); - throw new TypeError('Asynchronous forks do not support Buffer input: ' + - util.inspect(stdio)); - } - } else { - // Cleanup - cleanup(); - throw new TypeError('Incorrect value for stdio stream: ' + - util.inspect(stdio)); - } - - return acc; - }, []); - - return {stdio: stdio, ipc: ipc, ipcFd: ipcFd}; -} - - function normalizeSpawnArguments(file /*, args, options*/) { var args, options; @@ -946,247 +343,6 @@ var spawn = exports.spawn = function(/*file, args, options*/) { }; -function maybeClose(subprocess) { - subprocess._closesGot++; - - if (subprocess._closesGot == subprocess._closesNeeded) { - subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); - } -} - - -function ChildProcess() { - EventEmitter.call(this); - - var self = this; - - this._closesNeeded = 1; - this._closesGot = 0; - this.connected = false; - - this.signalCode = null; - this.exitCode = null; - this.killed = false; - this.spawnfile = null; - - this._handle = new Process(); - this._handle.owner = this; - - this._handle.onexit = function(exitCode, signalCode) { - // - // follow 0.4.x behaviour: - // - // - normally terminated processes don't touch this.signalCode - // - signaled processes don't touch this.exitCode - // - // new in 0.9.x: - // - // - spawn failures are reported with exitCode < 0 - // - var syscall = self.spawnfile ? 'spawn ' + self.spawnfile : 'spawn'; - var err = (exitCode < 0) ? errnoException(exitCode, syscall) : null; - - if (signalCode) { - self.signalCode = signalCode; - } else { - self.exitCode = exitCode; - } - - if (self.stdin) { - self.stdin.destroy(); - } - - self._handle.close(); - self._handle = null; - - if (exitCode < 0) { - if (self.spawnfile) - err.path = self.spawnfile; - - err.spawnargs = self.spawnargs.slice(1); - self.emit('error', err); - } else { - self.emit('exit', self.exitCode, self.signalCode); - } - - // if any of the stdio streams have not been touched, - // then pull all the data through so that it can get the - // eof and emit a 'close' event. - // Do it on nextTick so that the user has one last chance - // to consume the output, if for example they only want to - // start reading the data once the process exits. - process.nextTick(flushStdio, self); - - maybeClose(self); - }; -} -util.inherits(ChildProcess, EventEmitter); - - -function flushStdio(subprocess) { - if (subprocess.stdio == null) return; - subprocess.stdio.forEach(function(stream, fd, stdio) { - if (!stream || !stream.readable || stream._consuming) - return; - stream.resume(); - }); -} - - - -function getHandleWrapType(stream) { - if (stream instanceof Pipe) return 'pipe'; - if (stream instanceof TTY) return 'tty'; - if (stream instanceof TCP) return 'tcp'; - if (stream instanceof UDP) return 'udp'; - - return false; -} - - -ChildProcess.prototype.spawn = function(options) { - var self = this, - ipc, - ipcFd, - // If no `stdio` option was given - use default - stdio = options.stdio || 'pipe'; - - stdio = _validateStdio(stdio, false); - - ipc = stdio.ipc; - ipcFd = stdio.ipcFd; - stdio = options.stdio = stdio.stdio; - - if (ipc !== undefined) { - // Let child process know about opened IPC channel - options.envPairs = options.envPairs || []; - options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd); - } - - this.spawnfile = options.file; - this.spawnargs = options.args; - - var err = this._handle.spawn(options); - - // Run-time errors should emit an error, not throw an exception. - if (err === uv.UV_EAGAIN || - err === uv.UV_EMFILE || - err === uv.UV_ENFILE || - err === uv.UV_ENOENT) { - process.nextTick(onErrorNT, self, err); - // There is no point in continuing when we've hit EMFILE or ENFILE - // because we won't be able to set up the stdio file descriptors. - // It's kind of silly that the de facto spec for ENOENT (the test suite) - // mandates that stdio _is_ set up, even if there is no process on the - // receiving end, but it is what it is. - if (err !== uv.UV_ENOENT) return err; - } else if (err) { - // Close all opened fds on error - stdio.forEach(function(stdio) { - if (stdio.type === 'pipe') { - stdio.handle.close(); - } - }); - - this._handle.close(); - this._handle = null; - throw errnoException(err, 'spawn'); - } - - this.pid = this._handle.pid; - - stdio.forEach(function(stdio, i) { - if (stdio.type === 'ignore') return; - - if (stdio.ipc) { - self._closesNeeded++; - return; - } - - if (stdio.handle) { - // when i === 0 - we're dealing with stdin - // (which is the only one writable pipe) - stdio.socket = createSocket(self.pid !== 0 ? stdio.handle : null, i > 0); - - if (i > 0 && self.pid !== 0) { - self._closesNeeded++; - stdio.socket.on('close', function() { - maybeClose(self); - }); - } - } - }); - - this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? - stdio[0].socket : null; - this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? - stdio[1].socket : null; - this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? - stdio[2].socket : null; - - this.stdio = stdio.map(function(stdio) { - return stdio.socket === undefined ? null : stdio.socket; - }); - - // Add .send() method and start listening for IPC data - if (ipc !== undefined) setupChannel(this, ipc); - - return err; -}; - -function onErrorNT(self, err) { - self._handle.onexit(err); -} - - -ChildProcess.prototype.kill = function(sig) { - var signal; - - if (sig === 0) { - signal = 0; - } else if (!sig) { - signal = constants['SIGTERM']; - } else { - signal = constants[sig]; - } - - if (signal === undefined) { - throw new Error('Unknown signal: ' + sig); - } - - if (this._handle) { - var err = this._handle.kill(signal); - if (err === 0) { - /* Success. */ - this.killed = true; - return true; - } - if (err === uv.UV_ESRCH) { - /* Already dead. */ - } else if (err === uv.UV_EINVAL || err === uv.UV_ENOSYS) { - /* The underlying platform doesn't support this signal. */ - throw errnoException(err, 'kill'); - } else { - /* Other error, almost certainly EPERM. */ - this.emit('error', errnoException(err, 'kill')); - } - } - - /* Kill didn't succeed. */ - return false; -}; - - -ChildProcess.prototype.ref = function() { - if (this._handle) this._handle.ref(); -}; - - -ChildProcess.prototype.unref = function() { - if (this._handle) this._handle.unref(); -}; - - function lookupSignal(signal) { if (typeof signal === 'number') return signal; diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js new file mode 100644 index 00000000000000..428250cb9ee20a --- /dev/null +++ b/lib/internal/child_process.js @@ -0,0 +1,765 @@ +'use strict'; + +const StringDecoder = require('string_decoder').StringDecoder; +const EventEmitter = require('events').EventEmitter; +const net = require('net'); +const dgram = require('dgram'); +const util = require('util'); +const constants = require('constants'); +const assert = require('assert'); + +const Process = process.binding('process_wrap').Process; +const WriteWrap = process.binding('stream_wrap').WriteWrap; +const uv = process.binding('uv'); +const Pipe = process.binding('pipe_wrap').Pipe; +const TTY = process.binding('tty_wrap').TTY; +const TCP = process.binding('tcp_wrap').TCP; +const UDP = process.binding('udp_wrap').UDP; +const SocketList = require('internal/socket_list'); + +const errnoException = util._errnoException; +const SocketListSend = SocketList.SocketListSend; +const SocketListReceive = SocketList.SocketListReceive; + +module.exports = { + ChildProcess, + setupChannel, + _validateStdio, + getSocketList +}; + +// this object contain function to convert TCP objects to native handle objects +// and back again. +const handleConversion = { + 'net.Native': { + simultaneousAccepts: true, + + send: function(message, handle) { + return handle; + }, + + got: function(message, handle, emit) { + emit(handle); + } + }, + + 'net.Server': { + simultaneousAccepts: true, + + send: function(message, server) { + return server._handle; + }, + + got: function(message, handle, emit) { + var server = new net.Server(); + server.listen(handle, function() { + emit(server); + }); + } + }, + + 'net.Socket': { + send: function(message, socket) { + if (!socket._handle) + return; + + // if the socket was created by net.Server + if (socket.server) { + // the slave should keep track of the socket + message.key = socket.server._connectionKey; + + var firstTime = !this._channel.sockets.send[message.key]; + var socketList = getSocketList('send', this, message.key); + + // the server should no longer expose a .connection property + // and when asked to close it should query the socket status from + // the slaves + if (firstTime) socket.server._setupSlave(socketList); + + // Act like socket is detached + socket.server._connections--; + } + + // remove handle from socket object, it will be closed when the socket + // will be sent + var handle = socket._handle; + handle.onread = function() {}; + socket._handle = null; + + return handle; + }, + + postSend: function(handle) { + // Close the Socket handle after sending it + if (handle) + handle.close(); + }, + + got: function(message, handle, emit) { + var socket = new net.Socket({handle: handle}); + socket.readable = socket.writable = true; + + // if the socket was created by net.Server we will track the socket + if (message.key) { + + // add socket to connections list + var socketList = getSocketList('got', this, message.key); + socketList.add({ + socket: socket + }); + } + + emit(socket); + } + }, + + 'dgram.Native': { + simultaneousAccepts: false, + + send: function(message, handle) { + return handle; + }, + + got: function(message, handle, emit) { + emit(handle); + } + }, + + 'dgram.Socket': { + simultaneousAccepts: false, + + send: function(message, socket) { + message.dgramType = socket.type; + + return socket._handle; + }, + + got: function(message, handle, emit) { + var socket = new dgram.Socket(message.dgramType); + + socket.bind(handle, function() { + emit(socket); + }); + } + } +}; + + +function ChildProcess() { + EventEmitter.call(this); + + var self = this; + + this._closesNeeded = 1; + this._closesGot = 0; + this.connected = false; + + this.signalCode = null; + this.exitCode = null; + this.killed = false; + this.spawnfile = null; + + this._handle = new Process(); + this._handle.owner = this; + + this._handle.onexit = function(exitCode, signalCode) { + // + // follow 0.4.x behaviour: + // + // - normally terminated processes don't touch this.signalCode + // - signaled processes don't touch this.exitCode + // + // new in 0.9.x: + // + // - spawn failures are reported with exitCode < 0 + // + var syscall = self.spawnfile ? 'spawn ' + self.spawnfile : 'spawn'; + var err = (exitCode < 0) ? errnoException(exitCode, syscall) : null; + + if (signalCode) { + self.signalCode = signalCode; + } else { + self.exitCode = exitCode; + } + + if (self.stdin) { + self.stdin.destroy(); + } + + self._handle.close(); + self._handle = null; + + if (exitCode < 0) { + if (self.spawnfile) + err.path = self.spawnfile; + + err.spawnargs = self.spawnargs.slice(1); + self.emit('error', err); + } else { + self.emit('exit', self.exitCode, self.signalCode); + } + + // if any of the stdio streams have not been touched, + // then pull all the data through so that it can get the + // eof and emit a 'close' event. + // Do it on nextTick so that the user has one last chance + // to consume the output, if for example they only want to + // start reading the data once the process exits. + process.nextTick(flushStdio, self); + + maybeClose(self); + }; +} +util.inherits(ChildProcess, EventEmitter); + + +function flushStdio(subprocess) { + if (subprocess.stdio == null) return; + subprocess.stdio.forEach(function(stream, fd, stdio) { + if (!stream || !stream.readable || stream._consuming) + return; + stream.resume(); + }); +} + + +function createSocket(pipe, readable) { + var s = new net.Socket({ handle: pipe }); + + if (readable) { + s.writable = false; + s.readable = true; + } else { + s.writable = true; + s.readable = false; + } + + return s; +} + + +function getHandleWrapType(stream) { + if (stream instanceof Pipe) return 'pipe'; + if (stream instanceof TTY) return 'tty'; + if (stream instanceof TCP) return 'tcp'; + if (stream instanceof UDP) return 'udp'; + + return false; +} + + +ChildProcess.prototype.spawn = function(options) { + var self = this, + ipc, + ipcFd, + // If no `stdio` option was given - use default + stdio = options.stdio || 'pipe'; + + stdio = _validateStdio(stdio, false); + + ipc = stdio.ipc; + ipcFd = stdio.ipcFd; + stdio = options.stdio = stdio.stdio; + + if (ipc !== undefined) { + // Let child process know about opened IPC channel + options.envPairs = options.envPairs || []; + options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd); + } + + this.spawnfile = options.file; + this.spawnargs = options.args; + + var err = this._handle.spawn(options); + + // Run-time errors should emit an error, not throw an exception. + if (err === uv.UV_EAGAIN || + err === uv.UV_EMFILE || + err === uv.UV_ENFILE || + err === uv.UV_ENOENT) { + process.nextTick(onErrorNT, self, err); + // There is no point in continuing when we've hit EMFILE or ENFILE + // because we won't be able to set up the stdio file descriptors. + // It's kind of silly that the de facto spec for ENOENT (the test suite) + // mandates that stdio _is_ set up, even if there is no process on the + // receiving end, but it is what it is. + if (err !== uv.UV_ENOENT) return err; + } else if (err) { + // Close all opened fds on error + stdio.forEach(function(stdio) { + if (stdio.type === 'pipe') { + stdio.handle.close(); + } + }); + + this._handle.close(); + this._handle = null; + throw errnoException(err, 'spawn'); + } + + this.pid = this._handle.pid; + + stdio.forEach(function(stdio, i) { + if (stdio.type === 'ignore') return; + + if (stdio.ipc) { + self._closesNeeded++; + return; + } + + if (stdio.handle) { + // when i === 0 - we're dealing with stdin + // (which is the only one writable pipe) + stdio.socket = createSocket(self.pid !== 0 ? stdio.handle : null, i > 0); + + if (i > 0 && self.pid !== 0) { + self._closesNeeded++; + stdio.socket.on('close', function() { + maybeClose(self); + }); + } + } + }); + + this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? + stdio[0].socket : null; + this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? + stdio[1].socket : null; + this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? + stdio[2].socket : null; + + this.stdio = stdio.map(function(stdio) { + return stdio.socket === undefined ? null : stdio.socket; + }); + + // Add .send() method and start listening for IPC data + if (ipc !== undefined) setupChannel(this, ipc); + + return err; +}; + + +function onErrorNT(self, err) { + self._handle.onexit(err); +} + + +ChildProcess.prototype.kill = function(sig) { + var signal; + + if (sig === 0) { + signal = 0; + } else if (!sig) { + signal = constants['SIGTERM']; + } else { + signal = constants[sig]; + } + + if (signal === undefined) { + throw new Error('Unknown signal: ' + sig); + } + + if (this._handle) { + var err = this._handle.kill(signal); + if (err === 0) { + /* Success. */ + this.killed = true; + return true; + } + if (err === uv.UV_ESRCH) { + /* Already dead. */ + } else if (err === uv.UV_EINVAL || err === uv.UV_ENOSYS) { + /* The underlying platform doesn't support this signal. */ + throw errnoException(err, 'kill'); + } else { + /* Other error, almost certainly EPERM. */ + this.emit('error', errnoException(err, 'kill')); + } + } + + /* Kill didn't succeed. */ + return false; +}; + + +ChildProcess.prototype.ref = function() { + if (this._handle) this._handle.ref(); +}; + + +ChildProcess.prototype.unref = function() { + if (this._handle) this._handle.unref(); +}; + + +function setupChannel(target, channel) { + target._channel = channel; + target._handleQueue = null; + + var decoder = new StringDecoder('utf8'); + var jsonBuffer = ''; + channel.buffering = false; + channel.onread = function(nread, pool, recvHandle) { + // TODO(bnoordhuis) Check that nread > 0. + if (pool) { + jsonBuffer += decoder.write(pool); + + var i, start = 0; + + //Linebreak is used as a message end sign + while ((i = jsonBuffer.indexOf('\n', start)) >= 0) { + var json = jsonBuffer.slice(start, i); + var message = JSON.parse(json); + + // There will be at most one NODE_HANDLE message in every chunk we + // read because SCM_RIGHTS messages don't get coalesced. Make sure + // that we deliver the handle with the right message however. + if (message && message.cmd === 'NODE_HANDLE') + handleMessage(target, message, recvHandle); + else + handleMessage(target, message, undefined); + + start = i + 1; + } + jsonBuffer = jsonBuffer.slice(start); + this.buffering = jsonBuffer.length !== 0; + + } else { + this.buffering = false; + target.disconnect(); + channel.onread = nop; + channel.close(); + maybeClose(target); + } + }; + + // object where socket lists will live + channel.sockets = { got: {}, send: {} }; + + // handlers will go through this + target.on('internalMessage', function(message, handle) { + // Once acknowledged - continue sending handles. + if (message.cmd === 'NODE_HANDLE_ACK') { + assert(Array.isArray(target._handleQueue)); + var queue = target._handleQueue; + target._handleQueue = null; + + queue.forEach(function(args) { + target._send(args.message, args.handle, false); + }); + + // Process a pending disconnect (if any). + if (!target.connected && target._channel && !target._handleQueue) + target._disconnect(); + + return; + } + + if (message.cmd !== 'NODE_HANDLE') return; + + // Acknowledge handle receival. Don't emit error events (for example if + // the other side has disconnected) because this call to send() is not + // initiated by the user and it shouldn't be fatal to be unable to ACK + // a message. + target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); + + var obj = handleConversion[message.type]; + + // Update simultaneous accepts on Windows + if (process.platform === 'win32') { + handle._simultaneousAccepts = false; + net._setSimultaneousAccepts(handle); + } + + // Convert handle object + obj.got.call(this, message, handle, function(handle) { + handleMessage(target, message.msg, handle); + }); + }); + + target.send = function(message, handle) { + if (!this.connected) + this.emit('error', new Error('channel closed')); + else + this._send(message, handle, false); + }; + + target._send = function(message, handle, swallowErrors) { + assert(this.connected || this._channel); + + if (message === undefined) + throw new TypeError('message cannot be undefined'); + + // package messages with a handle object + if (handle) { + // this message will be handled by an internalMessage event handler + message = { + cmd: 'NODE_HANDLE', + type: null, + msg: message + }; + + if (handle instanceof net.Socket) { + message.type = 'net.Socket'; + } else if (handle instanceof net.Server) { + message.type = 'net.Server'; + } else if (handle instanceof TCP || handle instanceof Pipe) { + message.type = 'net.Native'; + } else if (handle instanceof dgram.Socket) { + message.type = 'dgram.Socket'; + } else if (handle instanceof UDP) { + message.type = 'dgram.Native'; + } else { + throw new TypeError("This handle type can't be sent"); + } + + // Queue-up message and handle if we haven't received ACK yet. + if (this._handleQueue) { + this._handleQueue.push({ message: message.msg, handle: handle }); + return; + } + + var obj = handleConversion[message.type]; + + // convert TCP object to native handle object + handle = + handleConversion[message.type].send.call(target, message, handle); + + // If handle was sent twice, or it is impossible to get native handle + // out of it - just send a text without the handle. + if (!handle) + message = message.msg; + + // Update simultaneous accepts on Windows + if (obj.simultaneousAccepts) { + net._setSimultaneousAccepts(handle); + } + } else if (this._handleQueue && + !(message && message.cmd === 'NODE_HANDLE_ACK')) { + // Queue request anyway to avoid out-of-order messages. + this._handleQueue.push({ message: message, handle: null }); + return; + } + + var req = new WriteWrap(); + req.oncomplete = nop; + var string = JSON.stringify(message) + '\n'; + var err = channel.writeUtf8String(req, string, handle); + + if (err) { + if (!swallowErrors) + this.emit('error', errnoException(err, 'write')); + } else if (handle && !this._handleQueue) { + this._handleQueue = []; + } + + if (obj && obj.postSend) { + req.oncomplete = obj.postSend.bind(null, handle); + } + + /* If the master is > 2 read() calls behind, please stop sending. */ + return channel.writeQueueSize < (65536 * 2); + }; + + // connected will be set to false immediately when a disconnect() is + // requested, even though the channel might still be alive internally to + // process queued messages. The three states are distinguished as follows: + // - disconnect() never requested: _channel is not null and connected + // is true + // - disconnect() requested, messages in the queue: _channel is not null + // and connected is false + // - disconnect() requested, channel actually disconnected: _channel is + // null and connected is false + target.connected = true; + + target.disconnect = function() { + if (!this.connected) { + this.emit('error', new Error('IPC channel is already disconnected')); + return; + } + + // Do not allow any new messages to be written. + this.connected = false; + + // If there are no queued messages, disconnect immediately. Otherwise, + // postpone the disconnect so that it happens internally after the + // queue is flushed. + if (!this._handleQueue) + this._disconnect(); + }; + + target._disconnect = function() { + assert(this._channel); + + // This marks the fact that the channel is actually disconnected. + this._channel = null; + + var fired = false; + function finish() { + if (fired) return; + fired = true; + + channel.close(); + target.emit('disconnect'); + } + + // If a message is being read, then wait for it to complete. + if (channel.buffering) { + this.once('message', finish); + this.once('internalMessage', finish); + + return; + } + + process.nextTick(finish); + }; + + channel.readStart(); +} + + +const INTERNAL_PREFIX = 'NODE_'; +function handleMessage(target, message, handle) { + var eventName = 'message'; + if (message !== null && + typeof message === 'object' && + typeof message.cmd === 'string' && + message.cmd.length > INTERNAL_PREFIX.length && + message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) { + eventName = 'internalMessage'; + } + target.emit(eventName, message, handle); +} + +function nop() { } + +function _validateStdio(stdio, sync) { + var ipc, + ipcFd; + + // Replace shortcut with an array + if (typeof stdio === 'string') { + switch (stdio) { + case 'ignore': stdio = ['ignore', 'ignore', 'ignore']; break; + case 'pipe': stdio = ['pipe', 'pipe', 'pipe']; break; + case 'inherit': stdio = [0, 1, 2]; break; + default: throw new TypeError('Incorrect value of stdio option: ' + stdio); + } + } else if (!Array.isArray(stdio)) { + throw new TypeError('Incorrect value of stdio option: ' + + util.inspect(stdio)); + } + + // At least 3 stdio will be created + // Don't concat() a new Array() because it would be sparse, and + // stdio.reduce() would skip the sparse elements of stdio. + // See http://stackoverflow.com/a/5501711/3561 + while (stdio.length < 3) stdio.push(undefined); + + // Translate stdio into C++-readable form + // (i.e. PipeWraps or fds) + stdio = stdio.reduce(function(acc, stdio, i) { + function cleanup() { + acc.filter(function(stdio) { + return stdio.type === 'pipe' || stdio.type === 'ipc'; + }).forEach(function(stdio) { + if (stdio.handle) + stdio.handle.close(); + }); + } + + // Defaults + if (stdio === null || stdio === undefined) { + stdio = i < 3 ? 'pipe' : 'ignore'; + } + + if (stdio === null || stdio === 'ignore') { + acc.push({type: 'ignore'}); + } else if (stdio === 'pipe' || typeof stdio === 'number' && stdio < 0) { + var a = { + type: 'pipe', + readable: i === 0, + writable: i !== 0 + }; + + if (!sync) + a.handle = new Pipe(); + + acc.push(a); + } else if (stdio === 'ipc') { + if (sync || ipc !== undefined) { + // Cleanup previously created pipes + cleanup(); + if (!sync) + throw new Error('Child process can have only one IPC pipe'); + else + throw new Error('You cannot use IPC with synchronous forks'); + } + + ipc = new Pipe(true); + ipcFd = i; + + acc.push({ + type: 'pipe', + handle: ipc, + ipc: true + }); + } else if (stdio === 'inherit') { + acc.push({ + type: 'inherit', + fd: i + }); + } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { + acc.push({ + type: 'fd', + fd: stdio.fd || stdio + }); + } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || + getHandleWrapType(stdio._handle)) { + var handle = getHandleWrapType(stdio) ? + stdio : + getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; + + acc.push({ + type: 'wrap', + wrapType: getHandleWrapType(handle), + handle: handle + }); + } else if (stdio instanceof Buffer || typeof stdio === 'string') { + if (!sync) { + cleanup(); + throw new TypeError('Asynchronous forks do not support Buffer input: ' + + util.inspect(stdio)); + } + } else { + // Cleanup + cleanup(); + throw new TypeError('Incorrect value for stdio stream: ' + + util.inspect(stdio)); + } + + return acc; + }, []); + + return {stdio: stdio, ipc: ipc, ipcFd: ipcFd}; +} + + +function getSocketList(type, slave, key) { + var sockets = slave._channel.sockets[type]; + var socketList = sockets[key]; + if (!socketList) { + var Construct = type === 'send' ? SocketListSend : SocketListReceive; + socketList = sockets[key] = new Construct(slave, key); + } + return socketList; +} + + +function maybeClose(subprocess) { + subprocess._closesGot++; + + if (subprocess._closesGot == subprocess._closesNeeded) { + subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); + } +} diff --git a/lib/internal/socket_list.js b/lib/internal/socket_list.js new file mode 100644 index 00000000000000..e9b388d420c29a --- /dev/null +++ b/lib/internal/socket_list.js @@ -0,0 +1,108 @@ +'use strict'; + +module.exports = {SocketListSend, SocketListReceive}; + +const EventEmitter = require('events').EventEmitter; +const util = require('util'); + +// This object keep track of the socket there are sended +function SocketListSend(slave, key) { + EventEmitter.call(this); + + this.key = key; + this.slave = slave; +} +util.inherits(SocketListSend, EventEmitter); + +SocketListSend.prototype._request = function(msg, cmd, callback) { + var self = this; + + if (!this.slave.connected) return onclose(); + this.slave.send(msg); + + function onclose() { + self.slave.removeListener('internalMessage', onreply); + callback(new Error('Slave closed before reply')); + } + + function onreply(msg) { + if (!(msg.cmd === cmd && msg.key === self.key)) return; + self.slave.removeListener('disconnect', onclose); + self.slave.removeListener('internalMessage', onreply); + + callback(null, msg); + } + + this.slave.once('disconnect', onclose); + this.slave.on('internalMessage', onreply); +}; + +SocketListSend.prototype.close = function close(callback) { + this._request({ + cmd: 'NODE_SOCKET_NOTIFY_CLOSE', + key: this.key + }, 'NODE_SOCKET_ALL_CLOSED', callback); +}; + +SocketListSend.prototype.getConnections = function getConnections(callback) { + this._request({ + cmd: 'NODE_SOCKET_GET_COUNT', + key: this.key + }, 'NODE_SOCKET_COUNT', function(err, msg) { + if (err) return callback(err); + callback(null, msg.count); + }); +}; + +// This object keep track of the socket there are received +function SocketListReceive(slave, key) { + EventEmitter.call(this); + + var self = this; + + this.connections = 0; + this.key = key; + this.slave = slave; + + function onempty() { + if (!self.slave.connected) return; + + self.slave.send({ + cmd: 'NODE_SOCKET_ALL_CLOSED', + key: self.key + }); + } + + this.slave.on('internalMessage', function(msg) { + if (msg.key !== self.key) return; + + if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') { + // Already empty + if (self.connections === 0) return onempty(); + + // Wait for sockets to get closed + self.once('empty', onempty); + } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') { + if (!self.slave.connected) return; + self.slave.send({ + cmd: 'NODE_SOCKET_COUNT', + key: self.key, + count: self.connections + }); + } + }); +} +util.inherits(SocketListReceive, EventEmitter); + +SocketListReceive.prototype.add = function(obj) { + var self = this; + + this.connections++; + + // Notify previous owner of socket about its state change + obj.socket.once('close', function() { + self.connections--; + + if (self.connections === 0) self.emit('empty'); + }); +}; diff --git a/node.gyp b/node.gyp index 70c9841a89a176..ad7e271531f80d 100644 --- a/node.gyp +++ b/node.gyp @@ -70,8 +70,10 @@ 'lib/vm.js', 'lib/zlib.js', + 'lib/internal/child_process.js', 'lib/internal/freelist.js', 'lib/internal/smalloc.js', + 'lib/internal/socket_list.js', 'lib/internal/repl.js', ], }, diff --git a/test/parallel/test-child-process-constructor.js b/test/parallel/test-child-process-constructor.js new file mode 100644 index 00000000000000..89db9cf81fddc7 --- /dev/null +++ b/test/parallel/test-child-process-constructor.js @@ -0,0 +1,25 @@ +'use strict'; + +var assert = require('assert'); +var common = require('../common'); +var child_process = require('child_process'); +var ChildProcess = child_process.ChildProcess; +assert.equal(typeof ChildProcess, 'function'); + +// test that we can call spawn +var child = new ChildProcess(); +child.spawn({ + file: process.execPath, + args: ['--interactive'], + cwd: process.cwd(), + stdio: 'pipe' +}); + +assert.equal(child.hasOwnProperty('pid'), true); + +// try killing with invalid signal +assert.throws(function() { + child.kill('foo'); +}, /Unknown signal: foo/); + +assert.equal(child.kill(), true); diff --git a/test/parallel/test-child-process-validate-stdio.js b/test/parallel/test-child-process-validate-stdio.js new file mode 100644 index 00000000000000..aba43551e82b44 --- /dev/null +++ b/test/parallel/test-child-process-validate-stdio.js @@ -0,0 +1,30 @@ +'use strict'; +// Flags: --expose_internals + +var assert = require('assert'); +var common = require('../common'); +var _validateStdio = require('internal/child_process')._validateStdio; + +// should throw if string and not ignore, pipe, or inherit +assert.throws(function() { + _validateStdio('foo'); +}, /Incorrect value of stdio option/); + +// should throw if not a string or array +assert.throws(function() { + _validateStdio(600); +}, /Incorrect value of stdio option/); + +// should populate stdio with undefined if len < 3 +var stdio1 = []; +var result = _validateStdio(stdio1, false); +assert.equal(stdio1.length, 3); +assert.equal(result.hasOwnProperty('stdio'), true); +assert.equal(result.hasOwnProperty('ipc'), true); +assert.equal(result.hasOwnProperty('ipcFd'), true); + +// should throw if stdio has ipc and sync is true +var stdio2 = ['ipc', 'ipc', 'ipc']; +assert.throws(function() { + _validateStdio(stdio2, true); +}, /You cannot use IPC with synchronous forks/);