Skip to content

Commit

Permalink
Merge pull request #1554 from trendscenter/bugfix/unregisted-crash
Browse files Browse the repository at this point in the history
Fix various pipeline bugs
  • Loading branch information
rssk authored Nov 3, 2022
2 parents 10d5862 + b048798 commit b2ea308
Show file tree
Hide file tree
Showing 13 changed files with 77,371 additions and 35,166 deletions.
1 change: 0 additions & 1 deletion packages/coinstac-manager/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ const getStats = async (runId, userId) => {
}
await services[`${runId}-${userId}`].service;
return services[`${runId}-${userId}`].container.stats({ stream: false });
// return services[`${runId}-${userId}`].getStats();
};

/**
Expand Down
29 changes: 18 additions & 11 deletions packages/coinstac-pipeline/src/express-file-server-setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const { extractTar } = require('./pipeline-manager-helpers');
const {
unlink,
} = fs.promises;

async function expressFileServerSetup({
activePipelines,
remoteClients,
Expand All @@ -24,17 +23,25 @@ async function expressFileServerSetup({
}) {
const storage = multer.diskStorage({
destination: (req, file, cb) => {
const fp = path.join(activePipelines[req.body.runId].baseDirectory, req.body.clientId);
mkdirp(fp)
.then(() => {
cb(
null,
fp
);
});
try {
const fp = path.join(activePipelines[req.body.runId].baseDirectory, req.body.clientId);
mkdirp(fp)
.then(() => {
cb(
null,
fp
);
});
} catch (e) {
cb(e);
}
},
filename: (req, file, cb) => {
cb(null, req.body.filename);
try {
cb(null, req.body.filename);
} catch (e) {
cb(e);
}
},
});

Expand All @@ -49,7 +56,6 @@ async function expressFileServerSetup({
const { clientId, runId } = req.body;
const client = remoteClients[clientId][runId];


Promise.resolve().then(() => {
// is this file the last?
if (client.files.expected.length !== 0
Expand Down Expand Up @@ -94,6 +100,7 @@ async function expressFileServerSetup({
});
}
}).catch((error) => {
if (!activePipelines[runId]) return logger.error(`File Transmission attempt on invalid runId ${runId} error: ${error} `);
const runError = Object.assign(
error,
{
Expand Down
10 changes: 7 additions & 3 deletions packages/coinstac-pipeline/src/setup-central.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ async function setupCentral({
const message = store.getAndRemove(pipeline.id, clientId);
if (message instanceof Error) {
const runError = Object.assign(
message,
{
error: `Pipeline error from central node\n Error details: ${message.error}`,
message: `Pipeline error from central node\n Error details: ${message.message}`,
Expand Down Expand Up @@ -187,10 +186,11 @@ async function setupCentral({
);
}).catch((e) => {
logger.error(e);

clientPublish('run', {
id: clientId,
runId: pipeline.id,
error: `Error from central node ${e}`,
error: { message: `Error from central node ${e}`, error: `Error from central node ${e}`, stack: e.stack },
debug: { sent: Date.now() },
});
});
Expand Down Expand Up @@ -320,8 +320,12 @@ async function setupCentral({
remoteClients[id]
);
} else {
mqttServer.publish(`${mqttSubChannel}${id}-register`, JSON.stringify({ runId }));
if (!remoteClients[id]) {
logger.silly(`Supposed clientID: ${id} for client list: ${JSON.stringify(remoteClients, null, 2)}`);
return mqttServer.publish(`${mqttSubChannel}${id}-register`, JSON.stringify({ error: { message: 'no such run' } }));
}
remoteClients[id].state = 'registered';
mqttServer.publish(`${mqttSubChannel}${id}-register`, JSON.stringify({ runId }));
logger.silly(`MQTT registered: ${id}`);
}
break;
Expand Down
11 changes: 6 additions & 5 deletions packages/coinstac-pipeline/src/setup-outer.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async function setupOuter({
publishData('run', {
id: clientId,
runId: pipeline.id,
error: { message: message.message, error: message.error, stack: message.stack },
error: { message: e.message, error: e.error, stack: e.stack },
debug: { sent: Date.now() },
}, 1);
});
Expand Down Expand Up @@ -302,7 +302,7 @@ async function setupOuter({
publishData('run', {
id: clientId,
runId: pipeline.id,
error: e,
error: { error: e.error, message: e.message, stack: e.stack },
debug: { sent: Date.now() },
}, 1);
throw e;
Expand All @@ -311,7 +311,7 @@ async function setupOuter({
}


async function mqqttSetup() {
async function mqttSetup() {
let clientInit = false;
logger.silly('Starting local pipeline manager');
const getMqttConn = () => {
Expand Down Expand Up @@ -419,7 +419,7 @@ async function setupOuter({
publishData('run', {
id: clientId,
runId: data.runId,
error: { stack: error.stack, message: error.message },
error: { error: error.error, stack: error.stack, message: error.message },
}, 1);
activePipelines[data.runId].remote.reject(error);
} else {
Expand All @@ -442,6 +442,7 @@ async function setupOuter({

break;
case `${mqttSubChannel}${clientId}-register`:
if (data.error) throw new Error(data.error.message);
if (activePipelines[data.runId]) {
if (activePipelines[data.runId].registered) break;
activePipelines[data.runId].registered = true;
Expand All @@ -462,7 +463,7 @@ async function setupOuter({
}


mqttClient = await mqqttSetup();
mqttClient = await mqttSetup();

return {
mqttClient,
Expand Down
56 changes: 29 additions & 27 deletions packages/coinstac-simulator/bin/coinstac-simulator
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,36 @@ async function runPipeline(pipelineSpec, operatingDirectory) {
'memPercent',
'cpuPercent',
];
const document = term.createDocument({
palette: new termkit.Palette(),
});

const textTable = new termkit.TextTable({
parent: document,
cellContents: [
rowKeys,
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
],
contentHasMarkup: true,
fit: true,
borderAttr: { color: 'blue' },
selectable: 'cell',
autoWidth: 1,
autoHeight: 1,
});

let textTable;
if (!DEBUG_MODE) {
const document = term.createDocument({
palette: new termkit.Palette(),
});
textTable = new termkit.TextTable({
parent: document,
cellContents: [
rowKeys,
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
['', '', '', '', '', '', '', '', '', '', ''],
],
contentHasMarkup: true,
fit: true,
borderAttr: { color: 'blue' },
selectable: 'cell',
autoWidth: 1,
autoHeight: 1,
});
}
const writeValuesToRow = (rowNumber, valuesToWrite) => {
if (DEBUG_MODE) return;
Object.keys(valuesToWrite).forEach((rowKey) => {
textTable.setCellContent(rowKeys.indexOf(rowKey), rowNumber, valuesToWrite[rowKey]);
});
Expand Down
Loading

0 comments on commit b2ea308

Please sign in to comment.