Skip to content

Commit

Permalink
Merge pull request #6 from kube-HPC/Storage
Browse files Browse the repository at this point in the history
update input parser, add support for crashed worker
  • Loading branch information
nassiharel committed Mar 4, 2018
1 parent 2ff99ef commit 3cb1ac7
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 223 deletions.
4 changes: 3 additions & 1 deletion core/pipeline-driver/lib/consts/Events.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ module.exports = {
STOP: 'job-stop',
WAITING: 'job-waiting',
ACTIVE: 'job-active',
FAILED: 'job-failed',
STALLED: 'job-stalled',
FAILED: 'job-failed'
CRASHED: 'job-crashed'
},
TASKS: {
WAITING: 'task-waiting',
ACTIVE: 'task-active',
SUCCEED: 'task-succeed',
FAILED: 'task-failed',
STALLED: 'task-stalled',
CRASHED: 'task-crashed'
}
};
2 changes: 1 addition & 1 deletion core/pipeline-driver/lib/graph/virtual-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class VirtualLink {
constructor(options) {
this.source = options.source;
this.target = options.target;
this.edges = Object.entries(options.edge).filter(([k, v]) => v).map(e => ({ type: e[0] }));
this.edges = options.edges;
}
}

Expand Down
6 changes: 0 additions & 6 deletions core/pipeline-driver/lib/nodes/edge.js

This file was deleted.

1 change: 1 addition & 0 deletions core/pipeline-driver/lib/nodes/node-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class NodeBase extends Task {
this.algorithmName = options.algorithmName;
this.extraData = options.extraData;
this.input = options.input;
this.storage = options.storage;
this.status = options.status || States.CREATING;
this.error = options.error;
this.result = options.result;
Expand Down
7 changes: 2 additions & 5 deletions core/pipeline-driver/lib/nodes/nodes-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const VirtualNode = require('../graph/virtual-node');
const VirtualLink = require('../graph/virtual-link');
const ActualGraph = require('../graph/graph-actual');
const VirtualGraph = require('../graph/graph-virtual');
const createEdge = require('./edge');
const NodeResult = require('./node-result');
const States = require('../state/States');
const { parser, consts } = require('@hkube/parsers');
Expand Down Expand Up @@ -41,14 +40,12 @@ class NodesMap extends EventEmitter {
results.forEach(r => {
let node = nodes.find(f => f.source === r.nodeName && f.target === n.nodeName);
if (!node) {
node = { source: r.nodeName, target: n.nodeName, edge: createEdge(r) }
node = { source: r.nodeName, target: n.nodeName, edges: [{ type: r.type }] }
nodes.push(node);
this._graph.setEdge(node.source, node.target);
}
else {
node.edge.waitNode = r.isWaitNode || node.edge.waitNode;
node.edge.waitBatch = r.isWaitBatch || node.edge.waitBatch;
node.edge.waitAny = r.isWaitAny || node.edge.waitAny;
node.edges.push({ type: r.type });
}
})
})
Expand Down
4 changes: 2 additions & 2 deletions core/pipeline-driver/lib/producer/jobs-producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class JobProducer extends EventEmitter {
this.emit(Events.TASKS.ACTIVE, data.jobID);
}).on(Events.JOBS.STALLED, (data) => {
this.emit(Events.TASKS.STALLED, data.jobID);
}).on(Events.JOBS.FAILED, (data) => {
this.emit(Events.TASKS.FAILED, data);
}).on(Events.JOBS.CRASHED, (data) => {
this.emit(Events.TASKS.CRASHED, { taskId: data.jobID, error: data.error });
});
}

Expand Down
25 changes: 16 additions & 9 deletions core/pipeline-driver/lib/tasks/task-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ class TaskRunner {
producer.on(Events.TASKS.STALLED, (taskId) => {
this._setTaskState(taskId, { status: States.STALLED });
});
producer.on(Events.TASKS.FAILED, (data) => {
producer.on(Events.TASKS.CRASHED, (data) => {
this._setTaskState(data.taskId, { status: States.FAILED, error: data.error });
this._taskComplete(data.taskId);
});
stateManager.on(Events.TASKS.FAILED, (data) => {
this._setTaskState(data.taskId, { status: States.FAILED, error: data.error });
this._taskComplete(data.taskId);
});
Expand Down Expand Up @@ -267,37 +271,39 @@ class TaskRunner {
};
const result = parser.parse(options);
if (index) {
this._runWaitAnyBatch(node, result.input, index);
this._runWaitAnyBatch(node, result.input, index, result.storage);
}
else if (result.batch) {
this._runNodeBatch(node, result.input);
this._runNodeBatch(node, result.input, result.storage);
}
else {
this._runNodeSimple(node, result.input);
this._runNodeSimple(node, result.input, result.storage);
}
}

_runWaitAnyBatch(node, input, index) {
_runWaitAnyBatch(node, input, index, storage) {
const waitNode = this._nodes.getWaitAny(node.nodeName, index);
waitNode.input = input;
waitNode.storage = storage;
this._setTaskState(waitNode.taskId, waitNode);
this._createJob(waitNode);
}

_runNodeSimple(node, input) {
this._nodes.setNode(new Node({ ...node, input }));
_runNodeSimple(node, input, storage) {
this._nodes.setNode(new Node({ ...node, input, storage }));
this._setTaskState(node.taskId, node);
this._createJob(node);
}

_runNodeBatch(node, input) {
_runNodeBatch(node, input, storage) {
input.forEach((inp, ind) => {
const batch = new Batch({
nodeName: node.nodeName,
batchIndex: (ind + 1),
algorithmName: node.algorithmName,
extraData: node.extraData,
input: inp
input: inp,
storage
});
this._nodes.addBatch(batch);
this._setTaskState(batch.taskId, batch);
Expand Down Expand Up @@ -368,6 +374,7 @@ class TaskRunner {
jobID: this._jobId,
taskID: node.taskId,
input: node.input,
storage: node.storage,
node: node.nodeName,
batchIndex: node.batchIndex,
pipelineName: this._pipelineName,
Expand Down
Loading

0 comments on commit 3cb1ac7

Please sign in to comment.