Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exec jobs api #1528

Merged
merged 3 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/api-server/api/rest-api/routes/v1/exec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ const routes = (options) => {
const response = await Execution.getRunningPipelines();
res.json(response);
});
router.all('/jobs', methods(['GET']), async (req, res,) => {
const { status, raw } = req.query;
const response = await Execution.getActivePipelines({ status, raw });
res.json(response);
});
router.all('/status/:jobId?', methods(['GET']), async (req, res) => {
const { jobId } = req.params;
const response = await Execution.getJobStatus({ jobId });
Expand Down
98 changes: 91 additions & 7 deletions core/api-server/api/rest-api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"info": {
"title": "HKube API",
"description": "HKube RESTful API",
"version": "2.2.10",
"version": "2.2.11",
"contact": {
"email": "hkube.dev@gmail.com"
},
Expand Down Expand Up @@ -3072,6 +3072,82 @@
}
}
},
"/exec/jobs": {
"get": {
"tags": [
"Execution"
],
"summary": "Get jobs (active and pending)",
"description": "Returns a list of all jobs (active and pending)",
"parameters": [
{
"name": "status",
"in": "query",
"description": "status of the jobs to return",
"required": false,
"schema": {
"type": "string",
"enum": [
"active",
"pending"
]
}
},
{
"name": "raw",
"in": "query",
"description": "if true returns just the jobId",
"required": false,
"schema": {
"type": "boolean"
}
}
],
"responses": {
"200": {
"description": "jobs list",
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"type": "object",
"properties": {
"jobId": {
"type": "string",
"description": "Unique identifier representing pipeline execution"
},
"status": {
"type": "string"
}
}
}
}
}
}
},
"default": {
"description": "Unexpected error",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"code": {
"type": "integer",
"format": "int32"
},
"message": {
"type": "string"
}
}
}
}
}
}
}
}
},
"/exec/status/{jobId}": {
"get": {
"tags": [
Expand Down Expand Up @@ -18973,6 +19049,18 @@
"jobId"
]
},
"jobListItem": {
"type": "object",
"properties": {
"jobId": {
"type": "string",
"description": "Unique identifier representing pipeline execution"
},
"status": {
"type": "string"
}
}
},
"tag": {
"type": "string",
"description": "Unique identifier representing a specific tag",
Expand All @@ -18983,8 +19071,7 @@
"type": "object",
"properties": {
"name": {
"type": "string",
"default": ""
"type": "string"
},
"experimentName": {
"type": "string",
Expand All @@ -19008,10 +19095,7 @@
"minimum": 1,
"maximum": 100000
}
},
"required": [
"name"
]
}
},
"queryRange": {
"type": "object",
Expand Down
6 changes: 6 additions & 0 deletions core/api-server/lib/service/execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ class ExecutionService {
return list.map(l => ({ jobId: l.jobId, ...l.pipeline }));
}

async getActivePipelines({ status, raw } = {}) {
const active = await stateManager.getRunningJobs({ status });

return raw === 'true' ? active.map(f => (f.jobId)) : active;
}

async stopJob(options) {
validator.executions.validateStopPipeline(options);
const { jobId, reason } = options;
Expand Down
7 changes: 6 additions & 1 deletion core/api-server/lib/state/state-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const storageManager = require('@hkube/storage-manager');
const { tracer } = require('@hkube/metrics');
const dbConnect = require('@hkube/db');
const Logger = require('@hkube/logger');
const { buildStatuses } = require('@hkube/consts');
const { buildStatuses, pipelineStatuses } = require('@hkube/consts');
const component = require('../consts/componentNames').DB;

class StateManager {
Expand Down Expand Up @@ -274,6 +274,11 @@ class StateManager {
return this._db.jobs.fetch({ jobId, fields });
}

async getRunningJobs({ status } = {}) {
const statuses = status ? [status] : [pipelineStatuses.ACTIVE, pipelineStatuses.PENDING];
return this._db.jobs.search({ pipelineStatus: { $in: statuses }, fields: { jobId: true, status: 'status.status' } });
}

async getStatus(status) {
return this._db.jobs.fetchStatus(status);
}
Expand Down
95 changes: 95 additions & 0 deletions core/api-server/tests/exec-jobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
const { expect } = require('chai');
const HttpStatus = require('http-status-codes');
const stateManager = require('../lib/state/state-manager');
const { pipelines } = require('./mocks');
const { request } = require('./utils');
let restUrl;

describe('Executions', () => {
before(() => {
restUrl = global.testParams.restUrl;
});
describe('/exec/jobs', () => {
let restPath = null;
before(() => {
restPath = `${restUrl}/exec/jobs`;
});
beforeEach(async () => {
const list = await stateManager.getRunningJobs();
for (const job of list) {
await stateManager._db.jobs.delete({jobId: job.jobId});
}
});

it('should return empty list', async () => {
const options = {
method: 'GET',
uri: restPath,
};
const response = await request(options);
expect(response.body).to.be.empty
});

it('should return active list', async () => {
const options = {
method: 'GET',
uri: `${restPath}?status=active`,
};
for (let i=0;i<2;i++) {

await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'active'}, type: 'stored'});
}
for (let i=2;i<6;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'pending'}, type: 'stored'});
}
const response = await request(options);
expect(response.body).to.have.length(2)
});

it('should return pending list', async () => {
const options = {
method: 'GET',
uri: `${restPath}?status=pending`,
};
for (let i=0;i<2;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'active'}, type: 'stored'});
}
for (let i=2;i<6;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'pending'}, type: 'stored'});
}
const response = await request(options);
expect(response.body).to.have.length(4)
});

it('should return all list', async () => {
const options = {
method: 'GET',
uri: `${restPath}`,
};
for (let i=0;i<111;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'active'}, type: 'stored'});
}
for (let i=111;i<300;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'pending'}, type: 'stored'});
}
const response = await request(options);
expect(response.body).to.have.length(300)
});

it('should return all list just jobIds', async () => {
const options = {
method: 'GET',
uri: `${restPath}?raw=true`,
};
for (let i=0;i<111;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'active'}, type: 'stored'});
}
for (let i=111;i<300;i++) {
await stateManager._db.jobs.create({jobId: `job_${i}`, status: {status:'pending'}, type: 'stored'});
}
const response = await request(options);
expect(response.body).to.have.length(300);
expect(response.body[0]).to.eql('job_0');
});
});
});
Loading