Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/ww/avoid-spawnSync #4

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 165 additions & 30 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,86 @@ const spawn = require('child_process').spawn;
const spawnSync = require('child_process').spawnSync;
const JSON = require('./json-buffer');
const isElectron = require('is-electron');
const fs = require('fs');
const os = require('os');
const crypto = require('crypto');
const index = [0];
const timeStamp = new Date().toISOString();
const net = require('net');
// to hold the keep-alive connection to worker thread.
let client;
// set to true after keep-alive connection is successfully created.
let clientConnected = false;
const debugMode = false;

/**
* Update:
* 1. Instead of constantly relying on spawnSync for handling every msg, create a keep-alive connection that can be
* used to send msg to worker thread.
* 2. For sending response back from worker thread, worker thread confirms the response file doesn't exist, and then
* create the file and writes the response. Upon finishing writing, change the file permission to read-only.
* 3. For loading the response from main thread, check the existance of the response file and confirm the permission
* is read-only, then read the file and delete it.
* Step 2 & 3 serve as file locking mechanism so that the 2 processes can communicate through it.
*/

const responseFolder = path.join(os.tmpdir(), 'sync_rpc_responses');
if (!fs.existsSync(responseFolder)) {
fs.mkdirSync(responseFolder, {recursive: true});
}

function responseFilePath() {
return path.join(responseFolder, `responseFile_${crypto.randomInt(0, 1000000).toString().padStart(6, '0')}.txt`);
}

function responseFileReadOnly(filePath) {
const mode = fs.statSync(filePath).mode;
return (mode & parseInt('777', 8)).toString(8) == '444';
}

/**
* @returns {boolean} True if the file is existing and permission is read only.
*/
function responseFileReadyForReading(filePath) {
return fs.existsSync(filePath) && responseFileReadOnly(filePath);
}

/**
* Read the response file for the response content that is written by worker thread.
* @returns {Buffer}
*/
function readResponseFile(filePath) {
const startTime = Date.now();
while (!responseFileReadyForReading(filePath)) {
}
const contentPath = filePath.split(".")[0];
const content = fs.readFileSync(contentPath);
const res = JSON.parse(fs.readFileSync(filePath));
if (res.s == true) {
res.v.b = content.toString('utf-8');
} else {
res.v = JSON.parse(content.toString('utf-8'));
}
fs.unlinkSync(contentPath);
fs.unlinkSync(filePath);
return res;
}

const outFolder = path.join(os.tmpdir(), 'sync_rpc_log');
if (debugMode && !fs.existsSync(outFolder)) {
fs.mkdirSync(outFolder, {recursive: true});
}

const localClientLog = path.join(outFolder, `client_logs_${timeStamp}.txt`);
const localServerLog = path.join(outFolder, `server_logs_${timeStamp}.txt`);
const localServerErr = path.join(outFolder, `server_error_${timeStamp}.txt`);
function logToFile(msg, logPath) {
const logMessage = `[${new Date().toISOString()}] ${msg}\n`;
fs.appendFileSync(logPath, logMessage);
}

const outputFile = debugMode && fs.openSync(localServerLog, 'a');
const errorFile = debugMode && fs.openSync(localServerErr, 'a');
const host = '127.0.0.1';
function nodeNetCatSrc(port, input) {
return (
Expand All @@ -24,22 +103,28 @@ const FLAGS = isElectron() ? ['--ms-enable-electron-run-as-node'] : [];

let started = false;
const configuration = {port: null, fastestFunction: null};

function start() {
if (!spawnSync) {
throw new Error(
'Sync-request requires node version 0.12 or later. If you need to use it with an older version of node\n' +
'you can `npm install sync-request@2.2.0`, which was the last version to support older versions of node.'
'you can `npm install sync-request@2.2.0`, which was the last version to support older versions of node.'
);
}
const port = findPort();
// '--inspect-brk',
const p = spawn(process.execPath, [require.resolve('./worker'), port, ...FLAGS], {
stdio: 'inherit',
stdio: debugMode ? ['ignore', outputFile, errorFile] : 'inherit',
windowsHide: true,
});
p.unref();
process.on('exit', () => {
p.kill();
});
client = net.connect(port, '127.0.0.1', () => {
debugMode && console.log('client connected.');
clientConnected = true;
});
waitForAlive(port, 1);
const fastestFunction = getFastestFunction(port);
configuration.port = port;
Expand All @@ -64,7 +149,7 @@ function findPort() {
if (findPortResult.status !== 0) {
throw new Error(
findPortResult.stderr.toString() ||
'find port exited with code ' + findPortResult.status
'find port exited with code ' + findPortResult.status
);
}
const portString = findPortResult.stdout.toString('utf8').trim();
Expand All @@ -89,34 +174,52 @@ function waitForAlive(port, delay) {
if (response !== 'pong') {
throw new Error(
'Timed out waiting for sync-rpc server to start (it should respond with "pong" when sent "ping"):\n\n' +
err +
'\n' +
response
err +
'\n' +
response
);
}
}

function nativeNC(port, input) {
return spawnSync('nc', [host, port], {
input: input,
windowsHide: true,
maxBuffer: Infinity,
});
index[0] = index[0] + 1;
const channel = 'nativeNC';
debugMode && logToFile(`${channel} received msg #${index[0]} at client: ${input}`, localClientLog);
let res;
try {
res = spawnSync(`nc`, [host, port], {
input: input,
maxBuffer: Infinity,
windowsHide: true,
});
} catch (e) {
debugMode && logToFile(`Received error: ${e.message}`);
}
debugMode && logToFile(`${channel} received response #${index[0]} at client. stderr: ${res.stderr}, status: ${res.status}, signal: ${res.signal}, error: ${res.error}`, localClientLog);
return res;
}

function nodeNC(port, input) {
index[0] = index[0] + 1;
const channel = 'nodeNC';
const src = nodeNetCatSrc(port, input);
if (src.length < 1000) {
return spawnSync(process.execPath, ['-e', src, ...FLAGS], {
debugMode && logToFile(`${channel} received msg #${index[0]} at client: ${input}`, localClientLog);
const res = spawnSync(process.execPath, ['-e', src, ...FLAGS], {
windowsHide: true,
maxBuffer: Infinity,
});
debugMode && logToFile(`${channel} received response #${index[0]} at client.`, localClientLog);
return res;
} else {
return spawnSync(process.execPath, [...FLAGS], {
debugMode && logToFile(`${channel} received msg #${index[0]} at client: ${input}`, localClientLog);
const res = spawnSync(process.execPath, [...FLAGS], {
input: src,
windowsHide: true,
maxBuffer: Infinity,
});
debugMode && logToFile(`${channel} received response #${index[0]} at client.`, localClientLog);
return res;
}
}

Expand All @@ -134,35 +237,67 @@ function getFastestFunction(port) {
}
}

function _shouldUseSpawnSync(input) {
// use spawnSync (original sync-rpc method for sending request) for:
// 1. setup module (.t == 1)
// 2. normal request (.t == 0) and the request is an api call. (some server api calls fail with writing to keep-alive channel, like Pkg.writeContents.)
return input.t === 1 || input.t === 0 && input.a.u.indexOf('/typesys/8/') === -1;
Copy link
Collaborator Author

@c3wenjiaowang c3wenjiaowang Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit the keep-alive channel to only serve type loading requests (with /typesys/8/ path), as it can't handle large amount of msg written to the channel. In Bundler, we are writing 18 Mb data for Pkg.writeContents call, and it hangs forever.

}

function sendMessage(input) {
if (!started) start();
const res = configuration.fastestFunction(
configuration.port,
JSON.stringify(input) + '\r\n'
);
let res;
let spawnSyncCall = false;
// when keep-alive connection is not created, or the input type is to setup the request handler in worker thread (input.t === 1), rely on the original logic for handling request.
if (!clientConnected || _shouldUseSpawnSync(input)) {
spawnSyncCall = true;
res = configuration.fastestFunction(
configuration.port,
JSON.stringify(input) + '\r\n'
);
} else {
// when keep-alive connection is ready, rely on that for handling normal requests.
input.client = 'ready';
input.responseFilePath = responseFilePath();
index[0] = index[0] + 1;
const channel = 'keepalive channel';
const request = JSON.stringify(input).replace(/\u2028/g, '\\u2028')
.replace(/\u2029/g, '\\u2029') + '\r\n';
debugMode && logToFile(`${channel} received msg #${index[0]} at client: ${request}`, localClientLog);
client.write(request);
res = readResponseFile(input.responseFilePath);
debugMode && logToFile(`${channel} received response #${index[0]} at client.`, localClientLog);
}
try {
return JSON.parse(res.stdout.toString('utf8'));
return spawnSyncCall ? JSON.parse(res.stdout.toString('utf8')) : res;
} catch (ex) {
if (res.error) {
if (typeof res.error === 'string') res.error = new Error(res.error);
throw res.error;
}
if (res.status !== 0) {
throw new Error(
configuration.fastestFunction.name +
if (spawnSyncCall) {
if (res.error) {
if (typeof res.error === 'string') res.error = new Error(res.error);
throw res.error;
}
if (res.status !== 0) {
throw new Error(
configuration.fastestFunction.name +
' failed:\n' +
(res.stdout && res.stdout.toString()) +
'\n' +
(res.stderr && res.stderr.toString())
);
}
throw new Error(
configuration.fastestFunction.name +
);
}
throw new Error(
configuration.fastestFunction.name +
' failed:\n' +
(res.stdout && res.stdout).toString() +
'\n' +
(res.stderr && res.stderr).toString()
);
);
} else {
throw new Error(
'keep-alive channel call failed:\n' +
(res && res.toString())
);
}
}
}
function extractValue(msg) {
Expand Down
53 changes: 50 additions & 3 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,37 @@

const net = require('net');
const JSON = require('./json-buffer');
const fs = require('fs');

const debugMode = false;

function responseFileReadyForWriting(filePath) {
return !fs.existsSync(filePath);
}

function writeResponseToFile(filePath, response) {
while (!responseFileReadyForWriting(filePath)) {
}
fs.writeFileSync(filePath, response);
debugMode && console.log(`Written response to file.`);
fs.chmodSync(filePath, '444');
debugMode && console.log(`Changed response file to readonly.`);
}

function writeResponseContent(filePath, content) {
const startTime = Date.now();
while (!responseFileReadyForWriting(filePath)) {
}
fs.writeFileSync(filePath, content, {encoding: 'binary'});
debugMode && console.log(`Written content to file.`);
fs.chmodSync(filePath, '444');
debugMode && console.log(`Changed content file to readonly.`);
}

const INIT = 1;
const CALL = 0;
const modules = [];
const index = [0];

const NULL_PROMISE = Promise.resolve(null);
const server = net.createServer({allowHalfOpen: true}, c => {
Expand All @@ -24,25 +51,45 @@ const server = net.createServer({allowHalfOpen: true}, c => {
buffer += data.toString('utf8');
if (/\r\n/.test(buffer)) {
onMessage(buffer.trim());
buffer = '';
}
});
function onMessage(str) {
index[0] = index[0] + 1;
debugMode && console.log(`received msg #${index[0]} at server: ${str}`);
if (str === 'ping') {
c.end('pong');
return;
}
const req = JSON.parse(str);
NULL_PROMISE.then(function() {
const req = JSON.parse(str);
if (req.t === INIT) {
return init(req.f, req.a);
}
debugMode && console.log(`sending msg #${index[0]} to c3server.`);
return modules[req.i](req.a);
}).then(
function(response) {
respond({s: true, v: response});
debugMode && console.log(`sending response #${index[0]} to local client.`);
if (req.client != 'ready') {
respond({s: true, v: response});
} else {
const responseContentPath = req.responseFilePath.split('.')[0];
writeResponseContent(responseContentPath, response.b);
response.b = responseContentPath;
writeResponseToFile(req.responseFilePath, JSON.stringify({s: true, v: response}));
}
debugMode && console.log(`sent response #${index[0]} to local client.`);
},
function(err) {
respond({s: false, v: {code: err.code, message: err.message}});
debugMode && console.error(`error: ${err.message} in local server.`);
if (req.client != 'ready') {
respond({s: false, v: {code: err.code, message: err.message}});
} else {
const responseContentPath = req.responseFilePath.split('.')[0];
writeResponseContent(responseContentPath, JSON.stringify({code: err.code, message: err.message}));
writeResponseToFile(req.responseFilePath, JSON.stringify({s: false, v: responseContentPath}));
}
}
);
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@
"url": "http://github.com/ForbesLindesay"
},
"license": "MIT"
}
}