diff --git a/core/worker/lib/consumer/JobConsumer.js b/core/worker/lib/consumer/JobConsumer.js index 5aef0f8f2..7cb93ce2a 100644 --- a/core/worker/lib/consumer/JobConsumer.js +++ b/core/worker/lib/consumer/JobConsumer.js @@ -161,34 +161,37 @@ class JobConsumer extends EventEmitter { }; } - async initJob() { - let error = null; - let span; - try { - if (this._job != null) { - span = tracer.startSpan({ - name: 'storage-get', - id: this._taskID, - tags: { - jobID: this._jobID, - taskID: this._taskID, - } - }); - const input = await dataExtractor.extract(this._job.data.input, this._job.data.storage, this._storageAdapter); - this._job.data.input = input; + async extractData(jobInfo) { + const span = tracer.startSpan({ + name: 'storage-get', + id: this._taskID, + tags: { + jobID: this._jobID, + taskID: this._taskID, } - } - catch (err) { - log.error(`failed to extract data input ${err.message}`, { component }, err); - error = err; + }); + const { error, data } = await this._tryExtractDataFromStorage(jobInfo); + if (error) { + log.error(`failed to extract data input: ${error.message}`, { component }, error); stateManager.done({ error }); } if (span) { span.finish(error); } - return error; + return { error, data }; } + async _tryExtractDataFromStorage(jobInfo) { + try { + const input = await dataExtractor.extract(jobInfo.input, jobInfo.storage, this._storageAdapter); + return { data: { ...jobInfo, input } }; + } + catch (error) { + return { error }; + } + } + + async finishJob(data = {}) { if (!this._job) { return; diff --git a/core/worker/lib/consumer/data-extractor.js b/core/worker/lib/consumer/data-extractor.js index c8928717a..fb4f765eb 100644 --- a/core/worker/lib/consumer/data-extractor.js +++ b/core/worker/lib/consumer/data-extractor.js @@ -1,9 +1,10 @@ const deep = require('deep-get-set'); const flatten = require('flat'); +const clone = require('clone'); class DataExtractor { async extract(input, storage, dataProvider) { - const result = input.slice(); + const result = clone(input); const flatObj = flatten(input); const promiseDataExtractors = Object.entries(flatObj).map(async ([objectPath, value]) => { diff --git a/core/worker/lib/worker.js b/core/worker/lib/worker.js index 3c5a8f8d7..cc41d1403 100644 --- a/core/worker/lib/worker.js +++ b/core/worker/lib/worker.js @@ -73,9 +73,7 @@ class Worker { _registerToStateEvents() { stateManager.on(stateEvents.stateEntered, async ({ job, state, results }) => { - const { data } = (job || {}); const result = { state, results }; - switch (state) { case workerStates.results: await jobConsumer.finishJob(result); @@ -84,8 +82,8 @@ class Worker { case workerStates.ready: break; case workerStates.init: { - const err = await jobConsumer.initJob(); - if (!err) { + const { error, data } = await jobConsumer.extractData(job.data); + if (!error) { algoRunnerCommunication.send({ command: messages.outgoing.initialize, data @@ -95,8 +93,7 @@ class Worker { } case workerStates.working: algoRunnerCommunication.send({ - command: messages.outgoing.start, - data + command: messages.outgoing.start }); break; case workerStates.shutdown: @@ -109,8 +106,7 @@ class Worker { stateManager.done('Timeout exceeded trying to stop algorithm'); }, this._stopTimeoutMs); algoRunnerCommunication.send({ - command: messages.outgoing.stop, - data + command: messages.outgoing.stop }); break; default: diff --git a/core/worker/package-lock.json b/core/worker/package-lock.json index c05b70bdf..0c6fbd813 100644 --- a/core/worker/package-lock.json +++ b/core/worker/package-lock.json @@ -2460,7 +2460,7 @@ }, "bindings": { "version": "1.2.1", - "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.2.1.tgz", + "resolved": "http://greenape.ddns.net/npm/bindings/-/bindings-1.2.1.tgz", "integrity": "sha1-FK1hE4EtLTfXLme0ystLtyZQXxE=" }, "bintrees": { @@ -2889,9 +2889,9 @@ } }, "clone": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/clone/-/clone-1.0.3.tgz", - "integrity": "sha1-KY1+IjFmD0DAA8LtMUDezz9TCF8=" + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/clone/-/clone-2.1.1.tgz", + "integrity": "sha1-0hfR6WERjjrJpLi7oyhVU79kfNs=" }, "clone-buffer": { "version": "1.0.0", @@ -3424,7 +3424,7 @@ }, "deasync": { "version": "0.1.9", - "resolved": "https://registry.npmjs.org/deasync/-/deasync-0.1.9.tgz", + "resolved": "http://greenape.ddns.net/npm/deasync/-/deasync-0.1.9.tgz", "integrity": "sha1-9Y3Un6YxEMdL6oQFqQqCi+JtOiQ=", "requires": { "bindings": "1.2.1", @@ -8087,7 +8087,7 @@ }, "node-etcd": { "version": "5.1.0", - "resolved": "https://registry.npmjs.org/node-etcd/-/node-etcd-5.1.0.tgz", + "resolved": "http://greenape.ddns.net/npm/node-etcd/-/node-etcd-5.1.0.tgz", "integrity": "sha1-0CqlfBJ0OHTfFjuLD+/xHMd/B7A=", "requires": { "deasync": "0.1.9", @@ -8927,7 +8927,7 @@ }, "querystringify": { "version": "0.0.4", - "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-0.0.4.tgz", + "resolved": "http://greenape.ddns.net/npm/querystringify/-/querystringify-0.0.4.tgz", "integrity": "sha1-DPf4T5Rj/wrlHExLFC2VvjdyTZw=" }, "question-cache": { @@ -9275,7 +9275,7 @@ }, "requires-port": { "version": "1.0.0", - "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "resolved": "http://greenape.ddns.net/npm/requires-port/-/requires-port-1.0.0.tgz", "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" }, "resolve": { @@ -10639,7 +10639,7 @@ }, "underscore": { "version": "1.8.2", - "resolved": "https://registry.npmjs.org/underscore/-/underscore-1.8.2.tgz", + "resolved": "http://greenape.ddns.net/npm/underscore/-/underscore-1.8.2.tgz", "integrity": "sha1-ZN8utZCJnelQeC83NRkLpC6/MR0=" }, "union-value": { @@ -10765,7 +10765,7 @@ }, "url-parse": { "version": "1.0.5", - "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.0.5.tgz", + "resolved": "http://greenape.ddns.net/npm/url-parse/-/url-parse-1.0.5.tgz", "integrity": "sha1-CFSGBCKv3P7+tsllxmLUgAFpkns=", "requires": { "querystringify": "0.0.4", @@ -10834,6 +10834,13 @@ "clone": "1.0.3", "clone-stats": "0.0.1", "replace-ext": "0.0.1" + }, + "dependencies": { + "clone": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/clone/-/clone-1.0.3.tgz", + "integrity": "sha1-KY1+IjFmD0DAA8LtMUDezz9TCF8=" + } } }, "vinyl-fs": { @@ -10923,6 +10930,11 @@ } } }, + "clone": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/clone/-/clone-1.0.3.tgz", + "integrity": "sha1-KY1+IjFmD0DAA8LtMUDezz9TCF8=" + }, "clone-stats": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/clone-stats/-/clone-stats-1.0.0.tgz", diff --git a/core/worker/package.json b/core/worker/package.json index 722803aa9..1e126208e 100644 --- a/core/worker/package.json +++ b/core/worker/package.json @@ -26,6 +26,7 @@ "@hkube/rest-server": "^1.0.6", "@hkube/s3-adapter": "^1.0.8", "chalk": "^2.3.2", + "clone": "^2.1.1", "deep-get-set": "^1.1.0", "djsv": "^1.4.2", "express": "^4.16.2",