Skip to content

Commit

Permalink
if stop was received while worker in initializing state, exception wa…
Browse files Browse the repository at this point in the history
…s thrown on null reference - kube-HPC/hkube#40
  • Loading branch information
bahalool authored and yehiyam committed May 29, 2019
1 parent 7b4caed commit e2d3135
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 39 deletions.
43 changes: 23 additions & 20 deletions core/worker/lib/consumer/JobConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,34 +161,37 @@ class JobConsumer extends EventEmitter {
};
}

async initJob() {
let error = null;
let span;
try {
if (this._job != null) {
span = tracer.startSpan({
name: 'storage-get',
id: this._taskID,
tags: {
jobID: this._jobID,
taskID: this._taskID,
}
});
const input = await dataExtractor.extract(this._job.data.input, this._job.data.storage, this._storageAdapter);
this._job.data.input = input;
async extractData(jobInfo) {
const span = tracer.startSpan({
name: 'storage-get',
id: this._taskID,
tags: {
jobID: this._jobID,
taskID: this._taskID,
}
}
catch (err) {
log.error(`failed to extract data input ${err.message}`, { component }, err);
error = err;
});
const { error, data } = await this._tryExtractDataFromStorage(jobInfo);
if (error) {
log.error(`failed to extract data input: ${error.message}`, { component }, error);
stateManager.done({ error });
}
if (span) {
span.finish(error);
}
return error;
return { error, data };
}

async _tryExtractDataFromStorage(jobInfo) {
try {
const input = await dataExtractor.extract(jobInfo.input, jobInfo.storage, this._storageAdapter);
return { data: { ...jobInfo, input } };
}
catch (error) {
return { error };
}
}


async finishJob(data = {}) {
if (!this._job) {
return;
Expand Down
3 changes: 2 additions & 1 deletion core/worker/lib/consumer/data-extractor.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const deep = require('deep-get-set');
const flatten = require('flat');
const clone = require('clone');

class DataExtractor {
async extract(input, storage, dataProvider) {
const result = input.slice();
const result = clone(input);
const flatObj = flatten(input);

const promiseDataExtractors = Object.entries(flatObj).map(async ([objectPath, value]) => {
Expand Down
12 changes: 4 additions & 8 deletions core/worker/lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ class Worker {

_registerToStateEvents() {
stateManager.on(stateEvents.stateEntered, async ({ job, state, results }) => {
const { data } = (job || {});
const result = { state, results };

switch (state) {
case workerStates.results:
await jobConsumer.finishJob(result);
Expand All @@ -84,8 +82,8 @@ class Worker {
case workerStates.ready:
break;
case workerStates.init: {
const err = await jobConsumer.initJob();
if (!err) {
const { error, data } = await jobConsumer.extractData(job.data);
if (!error) {
algoRunnerCommunication.send({
command: messages.outgoing.initialize,
data
Expand All @@ -95,8 +93,7 @@ class Worker {
}
case workerStates.working:
algoRunnerCommunication.send({
command: messages.outgoing.start,
data
command: messages.outgoing.start
});
break;
case workerStates.shutdown:
Expand All @@ -109,8 +106,7 @@ class Worker {
stateManager.done('Timeout exceeded trying to stop algorithm');
}, this._stopTimeoutMs);
algoRunnerCommunication.send({
command: messages.outgoing.stop,
data
command: messages.outgoing.stop
});
break;
default:
Expand Down
32 changes: 22 additions & 10 deletions core/worker/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"@hkube/rest-server": "^1.0.6",
"@hkube/s3-adapter": "^1.0.8",
"chalk": "^2.3.2",
"clone": "^2.1.1",
"deep-get-set": "^1.1.0",
"djsv": "^1.4.2",
"express": "^4.16.2",
Expand Down

0 comments on commit e2d3135

Please sign in to comment.