Skip to content

Commit

Permalink
Merge pull request #7 from kube-HPC/V2_0
Browse files Browse the repository at this point in the history
V2 0
  • Loading branch information
maty21 committed Apr 11, 2018
1 parent 24f6066 commit 8591671
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 148 deletions.
23 changes: 12 additions & 11 deletions core/pipeline-driver/lib/producer/jobs-producer.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const EventEmitter = require('events');
const validate = require('djsv');
const uuidv4 = require('uuid/v4');
const { Producer } = require('@hkube/producer-consumer');
const { Producer, Events } = require('@hkube/producer-consumer');
const schema = require('./schema');
const Events = require('../consts/Events');
const { TASKS } = require('../consts/Events');
const States = require('../state/States');
const stateManager = require('../state/state-manager');
const log = require('@hkube/logger').GetLogFromContainer();
Expand All @@ -27,21 +27,22 @@ class JobProducer extends EventEmitter {
}
setting.tracer = tracer;
this._producer = new Producer({ setting });
this._producer.on(Events.JOBS.WAITING, (data) => {
this.emit(Events.TASKS.WAITING, data.jobID);
}).on(Events.JOBS.ACTIVE, (data) => {
this.emit(Events.TASKS.ACTIVE, data.jobID);
}).on(Events.JOBS.STALLED, (data) => {
this.emit(Events.TASKS.STALLED, data.jobID);
}).on(Events.JOBS.CRASHED, (data) => {
this.emit(Events.TASKS.CRASHED, { taskId: data.jobID, error: data.error });
this._producer.on(Events.WAITING, (data) => {
this.emit(TASKS.WAITING, data.jobID);
}).on(Events.COMPLETED, (data) => {
this.emit(TASKS.SUCCEED, data.jobID);
}).on(Events.ACTIVE, (data) => {
this.emit(TASKS.ACTIVE, data.jobID);
}).on(Events.STALLED, (data) => {
this.emit(TASKS.STALLED, data.jobID);
}).on(Events.CRASHED, (data) => {
this.emit(TASKS.CRASHED, { taskId: data.jobID, error: data.error });
});
}

async createJob(options) {
const opt = {
job: {
id: options.data && options.data.taskID,
type: options.type,
data: options.data,
}
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline-driver/lib/producer/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module.exports = {
"properties": {
"prefix": {
"type": "string",
"default": "jobs-workers",
"default": "pipeline-queue",
"description": "prefix for all queue keys"
}
},
Expand Down
40 changes: 23 additions & 17 deletions core/pipeline-driver/lib/tasks/task-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class TaskRunner {
this._nodes = null;
this._active = false;
this._pipelineName = null;
this._pipelinePriority = null;
}

init(options) {
Expand All @@ -39,13 +40,7 @@ class TaskRunner {
stateManager.on(Events.JOBS.STOP, (data) => {
this._tryStopPipeline(null, data.reason);
});
producer.on(Events.TASKS.WAITING, (taskId) => {
this._setTaskState(taskId, { status: States.PENDING });
});
producer.on(Events.TASKS.STALLED, (taskId) => {
this._setTaskState(taskId, { status: States.STALLED });
});
producer.on(Events.TASKS.CRASHED, (data) => {
stateManager.on(Events.TASKS.FAILED, (data) => {
this._setTaskState(data.taskId, { status: States.FAILED, error: data.error });
this._taskComplete(data.taskId);
});
Expand Down Expand Up @@ -91,6 +86,7 @@ class TaskRunner {
throw new Error(`unable to find pipeline ${this._jobId}`);
}
this._pipelineName = this._pipeline.name;
this._pipelinePriority = this._pipeline.priority;
this._nodes = new NodesMap(this._pipeline);
this._nodes.on('node-ready', (node) => {
log.debug(`new node ready to run: ${node.nodeName}`, { component: components.TASK_RUNNER });
Expand Down Expand Up @@ -316,13 +312,13 @@ class TaskRunner {
nodeName: node.nodeName,
batchIndex: (ind + 1),
algorithmName: node.algorithmName,
extraData: node.extraData,
input: inp
input: inp,
extraData: node.extraData
});
this._nodes.addBatch(batch);
this._setTaskState(batch.taskId, batch);
this._createJob(batch, storage);
})
});
this._createJob(node, storage);
}

_taskComplete(taskId) {
Expand Down Expand Up @@ -376,25 +372,35 @@ class TaskRunner {
else {
log.debug(`task ${options.status} ${taskId}`, { component: components.TASK_RUNNER, jobId: this._jobId, pipelineName: this._pipelineName, taskId, algorithmName: task.algorithmName });
}

progress.debug({ jobId: this._jobId, pipeline: this._pipelineName, status: States.ACTIVE });
stateManager.setTaskState({ jobId: this._jobId, taskId, data: task });
}


async _createJob(node, storage) {
let tasks = [];
if (node.batch.length > 0) {
tasks = node.batch.map(b => ({ taskID: b.taskId, input: b.input, batchIndex: b.batchIndex }));
}
else {
tasks.push({ taskID: node.taskId, input: node.input });
}
const options = {
type: node.algorithmName,
data: {
jobID: this._jobId,
tasks,
taskID: node.taskId,
input: node.input,
storage: storage,
node: node.nodeName,
batchIndex: node.batchIndex,
pipelineName: this._pipelineName,
extraData: node.extraData
priority: this._pipelinePriority,
algorithmName: node.algorithmName,
info: {
storage: storage,
extraData: node.extraData
}
}
}
};
await producer.createJob(options);
}
}
Expand Down
Loading

0 comments on commit 8591671

Please sign in to comment.