Skip to content

Commit

Permalink
feat: validate flowInput (#732)
Browse files Browse the repository at this point in the history
* feat: validate flowInput
  • Loading branch information
nassiharel authored Feb 20, 2020
1 parent fc24cac commit 111737a
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 267 deletions.
11 changes: 7 additions & 4 deletions core/api-server/lib/service/execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,23 @@ class ExecutionService {
}

async _runStored(options) {
const { pipeline, jobId, types } = options;
const { pipeline, jobId, rootJobId, types } = options;
const storedPipeline = await stateManager.pipelines.get({ name: pipeline.name });
if (!storedPipeline) {
throw new ResourceNotFoundError('pipeline', pipeline.name);
}
const newPipeline = mergeWith(storedPipeline, pipeline, (obj, src, key) => (key === 'flowInput' ? src || obj : undefined));
return this._run({ pipeline: newPipeline, jobId, options: { parentSpan: pipeline.spanId }, types });
return this._run({ pipeline: newPipeline, jobId, rootJobId, options: { parentSpan: pipeline.spanId }, types });
}

async _run(payload) {
let { pipeline, jobId } = payload;
const { types } = payload;
const { types, rootJobId } = payload;
const { alreadyExecuted, parentSpan } = payload.options || {};

validator.addPipelineDefaults(pipeline);
validator.validatePipeline(pipeline);

if (!jobId) {
jobId = this._createJobID({ name: pipeline.name, experimentName: pipeline.experimentName });
}
Expand All @@ -85,7 +88,7 @@ class ExecutionService {
};
}
const lastRunResult = await this._getLastPipeline(jobId);
const pipelineObject = { ...pipeline, jobId, startTime: Date.now(), lastRunResult, types };
const pipelineObject = { ...pipeline, jobId, rootJobId, startTime: Date.now(), lastRunResult, types };
await storageManager.hkubeIndex.put({ jobId }, tracer.startSpan.bind(tracer, { name: 'storage-put-index', parent: span.context() }));
await storageManager.hkubeExecutions.put({ jobId, data: pipelineObject }, tracer.startSpan.bind(tracer, { name: 'storage-put-executions', parent: span.context() }));
await stateManager.executions.stored.set(pipelineObject);
Expand Down
11 changes: 5 additions & 6 deletions core/api-server/lib/service/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@ class InternalService {

async runStoredSubPipeline(options) {
validator.validateStoredSubPipeline(options);
const pipeline = await this._createPipeline(options);
const { pipeline, rootJobId } = await this._createPipeline(options);
const parentSpan = options.spanId;
return execution._runStored({ pipeline, options: { parentSpan }, types: [pipelineTypes.INTERNAL, pipelineTypes.STORED, pipelineTypes.SUB_PIPELINE] });
return execution._runStored({ pipeline, rootJobId, options: { parentSpan }, types: [pipelineTypes.INTERNAL, pipelineTypes.STORED, pipelineTypes.SUB_PIPELINE] });
}

async runRawSubPipeline(options) {
validator.validateRawSubPipeline(options);
const pipeline = await this._createPipeline(options);
const { pipeline, rootJobId } = await this._createPipeline(options);
const parentSpan = options.spanId;
return execution._run({ pipeline, options: { parentSpan }, types: [pipelineTypes.INTERNAL, pipelineTypes.RAW, pipelineTypes.SUB_PIPELINE] });
return execution._run({ pipeline, rootJobId, options: { parentSpan }, types: [pipelineTypes.INTERNAL, pipelineTypes.RAW, pipelineTypes.SUB_PIPELINE] });
}

async _createPipeline(options) {
const { jobId, taskId, rootJobId, ...pipeline } = options;
const experimentName = await this._getExperimentName({ jobId });
pipeline.rootJobId = rootJobId || jobId;
pipeline.experimentName = experimentName;
return pipeline;
return { pipeline, rootJobId: rootJobId || jobId };
}

async _getExperimentName(options) {
Expand Down
15 changes: 11 additions & 4 deletions core/api-server/lib/validation/api-validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ class ApiValidator {
this._validate(this._definitionsInternal.storedSubPipeline, pipeline, false);
}

validatePipeline(pipeline) {
this._validate(this._definitions.pipeline, pipeline, false, { checkFlowInput: true });
}

validateRunRawPipeline(pipeline) {
this._validate(this._definitions.pipeline, pipeline, false, { checkFlowInput: true });
}

validateRunStoredPipeline(pipeline) {
this._validate(this._definitions.storedPipelineRequest, pipeline, false, { checkFlowInput: true });
this._validate(this._definitions.storedPipelineRequest, pipeline, false);
}

validateCaching(request) {
Expand Down Expand Up @@ -205,10 +209,13 @@ class ApiValidator {
const valid = validatorInstance.validate(schema, object);
if (!valid) {
const { errors } = validatorInstance;
let error = validatorInstance.errorsText(errors);
let error = validatorInstance.errorsText(errors, { extraInfo: true });
if (errors[0].params && errors[0].params.allowedValues) {
error += ` (${errors[0].params.allowedValues.join(',')})`;
}
else if (errors[0].params && errors[0].params.additionalProperty) {
error += ` (${errors[0].params.additionalProperty})`;
}
throw new InvalidDataError(error);
}
if (object.nodes) {
Expand Down Expand Up @@ -352,12 +359,12 @@ class ApiValidator {
}

wrapErrorMessageFn(wrappedFn) {
const errorsTextWapper = (errors) => {
const errorsTextWapper = (errors, options) => {
let message;
if (errors) {
message = this.getCustomMessage(errors[0]);
}
return message || wrappedFn(errors);
return message || wrappedFn(errors, options);
};
return errorsTextWapper;
}
Expand Down
66 changes: 48 additions & 18 deletions core/api-server/tests/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,57 @@ const swagger = require('../api/rest-api/swagger.json')
const { request } = require('./utils');
const httpMethods = ['GET', 'POST', 'PUT', 'DELETE'];

describe('Method Not Allowed', () => {
describe('Common', () => {
before(() => {
restUrl = global.testParams.restUrl;
});
Object.entries(swagger.paths).forEach(([k, v]) => {
it(`${k} - should throw Method Not Allowed`, async () => {
const keys = Object.keys(v).map(m => m.toUpperCase());
const methods = httpMethods.filter(h => !keys.includes(h))
const method = methods[0];
const uri = `${restUrl}${k}`;
const options = {
method,
uri,
body: {}
};
const response = await request(options);
if (response.body.error && response.body.error.code === HttpStatus.METHOD_NOT_ALLOWED) {
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.METHOD_NOT_ALLOWED);
expect(response.body.error.message).to.equal('Method Not Allowed');
}
describe('Method Not Allowed', () => {
Object.entries(swagger.paths).forEach(([k, v]) => {
it(`${k} - should throw Method Not Allowed`, async () => {
const keys = Object.keys(v).map(m => m.toUpperCase());
const methods = httpMethods.filter(h => !keys.includes(h))
const method = methods[0];
const uri = `${restUrl}${k}`;
const options = {
method,
uri,
body: {}
};
const response = await request(options);
if (response.body.error && response.body.error.code === HttpStatus.METHOD_NOT_ALLOWED) {
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.METHOD_NOT_ALLOWED);
expect(response.body.error.message).to.equal('Method Not Allowed');
}
});
});
});
describe.skip('should NOT have additional properties', () => {
Object.entries(swagger.paths).forEach(([k, v]) => {
it(`${k} - should NOT have additional properties`, async () => {
const method = Object.keys(v).map(m => m.toUpperCase()).find(m => m === 'POST');
if (!method) {
return;
}
const content = v.post.requestBody && v.post.requestBody.content['application/json'];
if (!content || content.additionalProperties || (content.schema && !content.schema.hasOwnProperty('additionalProperties'))) {
return;
}
const uri = `${restUrl}${k}`;
const options = {
method,
uri,
body: {
no_such_prop: 'bla'
}
};
const response = await request(options);
if (response.body.error && response.body.error.code === HttpStatus.BAD_REQUEST) {
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal('Method Not Allowed');
}
});
});
})
});
Expand Down
23 changes: 0 additions & 23 deletions core/api-server/tests/exec-algorithm.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,6 @@ describe('Executions', () => {
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
});
it('should throw validation error of data should NOT have additional properties', async () => {
const options = {
uri: restPath,
body: {
name: 'string',
nodes: [
{
nodeName: 'string',
algorithmName: 'green-alg',
input: []
}
],
additionalProps: {
bla: 60,
blabla: 'info'
}
}
};
const response = await request(options);
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal('data should NOT have additional properties');
});
it('should throw validation error of algorithm not found', async () => {
const options = {
uri: restPath,
Expand Down
4 changes: 2 additions & 2 deletions core/api-server/tests/exec-raw.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('Executions', () => {
const response = await request(options);
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal('data should NOT have additional properties');
expect(response.body.error.message).to.equal('data should NOT have additional properties (stam)');
});
it('should fail on no such node or job', async () => {
const options = {
Expand Down Expand Up @@ -231,7 +231,7 @@ describe('Executions', () => {
const response = await request(options);
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal('data should NOT have additional properties');
expect(response.body.error.message).to.equal('data should NOT have additional properties (additionalProps)');
});
it('should throw validation error of duplicate node', async () => {
const options = {
Expand Down
16 changes: 14 additions & 2 deletions core/api-server/tests/exec-stored.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe('Executions', () => {
const response = await request(options);
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal('data should NOT have additional properties');
expect(response.body.error.message).to.equal('data should NOT have additional properties (nodes)');
});
it('should throw pipeline not found', async () => {
const options = {
Expand All @@ -91,6 +91,18 @@ describe('Executions', () => {
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal(validationMessages.PIPELINE_NAME_FORMAT);
});
it('should throw unable to find flowInput', async () => {
const options = {
uri: restPath,
body: {
name: 'flowInput'
}
};
const response = await request(options);
expect(response.body).to.have.property('error');
expect(response.body.error.code).to.equal(HttpStatus.BAD_REQUEST);
expect(response.body.error.message).to.equal('unable to find flowInput.files.links');
});
it('should succeed and return job id', async () => {
const options = {
uri: restPath,
Expand Down Expand Up @@ -120,7 +132,7 @@ describe('Executions', () => {
const options = {
uri: restPath,
body: {
name: 'flow4',
name: 'override-flowInput',
flowInput: {
inp: [
[],
Expand Down
44 changes: 16 additions & 28 deletions core/api-server/tests/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,7 @@ describe('Internal', () => {
body: {
name: pipeline.name,
jobId: response1.body.jobId,
taskId: `taskId:${uuid()} `,
flowInput: {
bla: 'bla'
}
taskId: `taskId:${uuid()} `
}
};
const response2 = await request(options2);
Expand All @@ -232,10 +229,7 @@ describe('Internal', () => {
body: {
name: pipeline.name,
jobId: `jobId - ${uuid()} `,
taskId: `taskId - ${uuid()} `,
flowInput: {
bla: 'bla'
}
taskId: `taskId - ${uuid()} `
}
};
const res1 = await request(options);
Expand All @@ -254,18 +248,15 @@ describe('Internal', () => {
name: pipeline.name,
nodes: [
{
"nodeName": "green",
"algorithmName": "green-alg",
"input": [
"@flowInput"
nodeName: "green",
algorithmName: "green-alg",
input: [
"data"
]
}
],
jobId: `jobId - ${uuid()} `,
taskId: `taskId - ${uuid()} `,
flowInput: {
bla: 'bla'
}
jobId: `jobId-${uuid()}`,
taskId: `taskId-${uuid()}`
}
};
const response = await request(options);
Expand All @@ -279,18 +270,15 @@ describe('Internal', () => {
name: pipeline.name,
nodes: [
{
"nodeName": "green",
"algorithmName": "green-alg",
"input": [
"@flowInput"
nodeName: "green",
algorithmName: "green-alg",
input: [
"data"
]
}
],
jobId: `jobId - ${uuid()} `,
taskId: `taskId - ${uuid()} `,
flowInput: {
bla: 'bla'
}
jobId: `jobId-${uuid()}`,
taskId: `taskId-${uuid()}`
}
};
const res1 = await request(options);
Expand Down Expand Up @@ -321,8 +309,8 @@ describe('Internal', () => {
}
],
flowInput: {
"files": {
"link": "links-1"
files: {
link: "links-1"
}
},
triggers: {
Expand Down
Loading

0 comments on commit 111737a

Please sign in to comment.