Skip to content

Commit

Permalink
trial to make single client connection working
Browse files Browse the repository at this point in the history
  • Loading branch information
c3wenjiaowang committed Jun 16, 2023
1 parent 9968e4b commit d49656f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
75 changes: 59 additions & 16 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,34 @@ const fs = require('fs');
const os = require('os');
const index = [0];
const timeStamp = new Date().toISOString();
const net = require('net');
let clientConnected = false;
let client;

const outFolder = path.join(os.tmpdir(), 'sync_rpc_log');
if (!fs.existsSync(outFolder)) {
fs.mkdirSync(outFolder, {recursive: true});
}

const responseFolder = path.join(os.tmpdir(), 'sync_rpc_responses');
if (!fs.existsSync(responseFolder)) {
fs.mkdirSync(responseFolder, {recursive: true});
}

const responseFile = path.join(responseFolder, `responseFile.txt`);

function readFromFile() {
const startTime = Date.now();
while (!fs.existsSync(responseFile) && startTime + 5000 < Date.now()) {
}
if (fs.existsSync(responseFile)) {
const res = fs.readFileSync(responseFile);
fs.unlinkSync(responseFile);
return res;
}
return null;
}

const localClientLog = path.join(outFolder, `client_logs_${timeStamp}.txt`);
const localServerLog = path.join(outFolder, `server_logs_${timeStamp}.txt`);
const localServerErr = path.join(outFolder, `server_error_${timeStamp}.txt`);
Expand Down Expand Up @@ -61,10 +83,19 @@ function start() {
process.on('exit', () => {
p.kill();
});
client = net.connect(port, '127.0.0.1', () => {
// fs.appendFileSync(path.join(responseFolder, 'clientConnection.txt'), 'connected');
console.log('client connected.');
clientConnected = true;
});
// while (!fs.existsSync(path.join(responseFolder, 'clientConnection.txt'))) {
//
// }
// fs.unlinkSync(path.join(responseFolder, 'clientConnection.txt'));
waitForAlive(port, 1);
const fastestFunction = getFastestFunction(port);
// const fastestFunction = getFastestFunction(port);
configuration.port = port;
configuration.fastestFunction = fastestFunction;
// configuration.fastestFunction = fastestFunction;
started = true;
}

Expand Down Expand Up @@ -101,16 +132,18 @@ function waitForAlive(port, delay) {
const start = Date.now() + delay * 1000;
const timeout = start + 10000;
while (response !== 'pong' && Date.now() < timeout) {
if (Date.now() > start) {
const result = nodeNC(port, 'ping\r\n');
response = result.stdout && result.stdout.toString();
err = result.stderr && result.stderr.toString();
if (Date.now() > start && clientConnected) {
client.write('ping\r\n');
// const result = nativeNC(port, 'ping\r\n');
// response = result && result.toString();
// err = result.stderr && result.stderr.toString();
response = readFromFile();
}
}
if (response !== 'pong') {
if (response == null || response.toString() !== 'pong') {
throw new Error(
'Timed out waiting for sync-rpc server to start (it should respond with "pong" when sent "ping"):\n\n' +
err +
// err +
'\n' +
response
);
Expand All @@ -123,15 +156,16 @@ function nativeNC(port, input) {
logToFile(`${channel} received msg #${index[0]} at client: ${input}`, localClientLog);
let res;
try {
res = execSync(`nc ${host} ${port}`, {
spawn(`nc`, [host, port], {
input: input,
maxBuffer: 1024*1024*10,
shell: true
});
res = readFromFile();
} catch (e) {
logToFile(`Received error: ${e.message}`);
logToFile(`Received error: ${e.message}`, localClientLog);
}
logToFile(`${channel} received response #${index[0]} at client. stderr: ${res.stderr}, status: ${res.status}, signal: ${res.signal}, error: ${res.error}`, localClientLog);
logToFile(`${channel} received response #${index[0]} at client.`, localClientLog);
return res;
}

Expand Down Expand Up @@ -174,16 +208,22 @@ function getFastestFunction(port) {
}

function sendMessage(input) {
if (!started) start();
if (!started) {
start();
}
const delay = 25; // delay for sending each request in ms
const startTime = Date.now();
while (Date.now() < startTime + delay) {
}
const res = configuration.fastestFunction(
configuration.port,
JSON.stringify(input) + '\r\n'
);
let res;
// const res = configuration.fastestFunction(
// configuration.port,
// JSON.stringify(input) + '\r\n'
// );
client.write(JSON.stringify(input).replace(/\u2028/g, '\\u2028')
.replace(/\u2029/g, '\\u2029') + '\r\n');
try {
res = readFromFile();
return JSON.parse(res.toString('utf8'));
} catch (ex) {
if (res.error) {
Expand Down Expand Up @@ -221,6 +261,9 @@ function createClient(filename, args) {
// logToFile(`Segmentation Fault occurred.`, localClientLog);
// process.exit(1);
// });
if (!started) {
start();
}
const id = extractValue(sendMessage({t: 1, f: filename, a: args}));
return function(args) {
return extractValue(sendMessage({t: 0, i: id, a: args}));
Expand Down
28 changes: 25 additions & 3 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,37 @@

const net = require('net');
const JSON = require('./json-buffer');
const fs = require('fs');
const os = require('os');
const path = require("path");

const responseFolder = path.join(os.tmpdir(), 'sync_rpc_responses');
if (!fs.existsSync(responseFolder)) {
fs.mkdirSync(responseFolder, {recursive: true});
}

const responseFile = path.join(responseFolder, `responseFile.txt`);

function writeToFile(response) {
if (fs.existsSync(responseFile)) {
fs.unlinkSync(responseFile);
}
fs.appendFileSync(responseFile, response);
}

const INIT = 1;
const CALL = 0;
const modules = [];
const index = [0];

const NULL_PROMISE = Promise.resolve(null);
const server = net.createServer({allowHalfOpen: true}, c => {
const server = net.createServer(c => {
let responded = false;
function respond(data) {
if (responded) return;
responded = true;
c.end(JSON.stringify(data));
writeToFile(JSON.stringify(data));
// c.end(null);
}

let buffer = '';
Expand All @@ -25,13 +43,17 @@ const server = net.createServer({allowHalfOpen: true}, c => {
buffer += data.toString('utf8');
if (/\r\n/.test(buffer)) {
onMessage(buffer.trim());
buffer = '';
}
});
function onMessage(str) {
index[0] = index[0] + 1;
console.log(`received msg #${index[0]} at server: ${str}`);
if (str === 'ping') {
c.end('pong');
console.log(`writing pong.`);
writeToFile('pong');
console.log(`writing pong finished.`);
// c.end(null);
return;
}
NULL_PROMISE.then(function() {
Expand Down

0 comments on commit d49656f

Please sign in to comment.