Skip to content

Commit

Permalink
Merge pull request #372 from CartoDB/batch-queries-limits
Browse files Browse the repository at this point in the history
Batch queries limits
  • Loading branch information
Raul Ochoa authored Oct 10, 2016
2 parents 1004131 + d8c3181 commit fd6eb49
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 61 deletions.
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
-------------------

Announcements:
* limited batch queries to 12 hours
* Allow to set statement timeout per query in multi query batch queries.
* Batch queries default statement timeout set to 12 hours.
* Multiple queries jobs pushed as first job between queries.


Expand Down
14 changes: 2 additions & 12 deletions batch/job_backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
var REDIS_PREFIX = 'batch:jobs:';
var REDIS_DB = 5;
var FINISHED_JOBS_TTL_IN_SECONDS = global.settings.finished_jobs_ttl_in_seconds || 2 * 3600; // 2 hours
var jobStatus = require('./job_status');
var finalStatus = [
jobStatus.CANCELLED,
jobStatus.DONE,
jobStatus.FAILED,
jobStatus.UNKNOWN
];
var JobStatus = require('./job_status');

function JobBackend(metadataBackend, jobQueueProducer) {
this.metadataBackend = metadataBackend;
Expand Down Expand Up @@ -156,15 +150,11 @@ JobBackend.prototype.save = function (job, callback) {
});
};

function isFinalStatus(status) {
return finalStatus.indexOf(status) !== -1;
}

JobBackend.prototype.setTTL = function (job, callback) {
var self = this;
var redisKey = REDIS_PREFIX + job.job_id;

if (!isFinalStatus(job.status)) {
if (!JobStatus.isFinal(job.status)) {
return callback();
}

Expand Down
23 changes: 19 additions & 4 deletions batch/job_runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status');
var Profiler = require('step-profiler');
var _ = require('underscore');

function JobRunner(jobService, jobQueue, queryRunner, statsdClient) {
this.jobService = jobService;
Expand All @@ -23,6 +24,16 @@ JobRunner.prototype.run = function (job_id, callback) {
}

var query = job.getNextQuery();
var timeout = 12 * 3600 * 1000;
if (Number.isFinite(global.settings.batch_query_timeout)) {
timeout = global.settings.batch_query_timeout;
}
if (_.isObject(query)) {
if (Number.isFinite(query.timeout) && query.timeout > 0) {
timeout = Math.min(timeout, query.timeout);
}
query = query.query;
}

try {
job.setStatus(jobStatus.RUNNING);
Expand All @@ -37,22 +48,22 @@ JobRunner.prototype.run = function (job_id, callback) {

profiler.done('running');

self._run(job, query, profiler, callback);
self._run(job, query, timeout, profiler, callback);
});
});
};

JobRunner.prototype._run = function (job, query, profiler, callback) {
JobRunner.prototype._run = function (job, query, timeout, profiler, callback) {
var self = this;

self.queryRunner.run(job.data.job_id, query, job.data.user, function (err /*, result */) {
self.queryRunner.run(job.data.job_id, query, job.data.user, timeout, function (err /*, result */) {
if (err) {
if (!err.code) {
return callback(err);
}
// if query has been cancelled then it's going to get the current
// job status saved by query_canceller
if (errorCodes[err.code.toString()] === 'query_canceled') {
if (cancelledByUser(err)) {
return self.jobService.get(job.data.job_id, callback);
}
}
Expand Down Expand Up @@ -93,4 +104,8 @@ JobRunner.prototype._run = function (job, query, profiler, callback) {
});
};

function cancelledByUser(err) {
return errorCodes[err.code.toString()] === 'query_canceled' && err.message.match(/user.*request/);
}

module.exports = JobRunner;
10 changes: 10 additions & 0 deletions batch/job_status.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,13 @@ var JOB_STATUS_ENUM = {
};

module.exports = JOB_STATUS_ENUM;

var finalStatus = [
JOB_STATUS_ENUM.CANCELLED,
JOB_STATUS_ENUM.DONE,
JOB_STATUS_ENUM.FAILED,
JOB_STATUS_ENUM.UNKNOWN
];
module.exports.isFinal = function(status) {
return finalStatus.indexOf(status) !== -1;
};
31 changes: 14 additions & 17 deletions batch/models/job_fallback.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('../job_status');
var JobStatus = require('../job_status');
var QueryFallback = require('./query/query_fallback');
var MainFallback = require('./query/main_fallback');
var QueryFactory = require('./query/query_factory');

var JobUtils = require('./job_state_machine');
var jobUtils = new JobUtils();

function JobFallback(jobDefinition) {
JobBase.call(this, jobDefinition);

Expand Down Expand Up @@ -73,19 +70,19 @@ JobFallback.is = function (query) {
JobFallback.prototype.init = function () {
for (var i = 0; i < this.data.query.query.length; i++) {
if (shouldInitStatus(this.data.query.query[i])){
this.data.query.query[i].status = jobStatus.PENDING;
this.data.query.query[i].status = JobStatus.PENDING;
}
if (shouldInitQueryFallbackStatus(this.data.query.query[i])) {
this.data.query.query[i].fallback_status = jobStatus.PENDING;
this.data.query.query[i].fallback_status = JobStatus.PENDING;
}
}

if (shouldInitStatus(this.data)) {
this.data.status = jobStatus.PENDING;
this.data.status = JobStatus.PENDING;
}

if (shouldInitFallbackStatus(this.data)) {
this.data.fallback_status = jobStatus.PENDING;
this.data.fallback_status = JobStatus.PENDING;
}
};

Expand Down Expand Up @@ -170,7 +167,7 @@ JobFallback.prototype.setJobStatus = function (status, job, hasChanged, errorMes
result.isValid = this.isValidTransition(job.status, status);
if (result.isValid) {
job.status = status;
if (status === jobStatus.FAILED && errorMesssage && !hasChanged.appliedToFallback) {
if (status === JobStatus.FAILED && errorMesssage && !hasChanged.appliedToFallback) {
job.failed_reason = errorMesssage;
}
}
Expand All @@ -191,13 +188,13 @@ JobFallback.prototype.setFallbackStatus = function (status, job, hasChanged) {
JobFallback.prototype.shiftStatus = function (status, hasChanged) {
// jshint maxcomplexity: 7
if (hasChanged.appliedToFallback) {
if (!this.hasNextQueryFromQueries() && (status === jobStatus.DONE || status === jobStatus.FAILED)) {
if (!this.hasNextQueryFromQueries() && (status === JobStatus.DONE || status === JobStatus.FAILED)) {
status = this.getLastFinishedStatus();
} else if (status === jobStatus.DONE || status === jobStatus.FAILED){
status = jobStatus.PENDING;
} else if (status === JobStatus.DONE || status === JobStatus.FAILED){
status = JobStatus.PENDING;
}
} else if (this.hasNextQueryFromQueries() && status !== jobStatus.RUNNING) {
status = jobStatus.PENDING;
} else if (this.hasNextQueryFromQueries() && status !== JobStatus.RUNNING) {
status = JobStatus.PENDING;
}

return status;
Expand All @@ -207,7 +204,7 @@ JobFallback.prototype.getLastFinishedStatus = function () {
return this.queries.reduce(function (lastFinished, query) {
var status = query.getStatus(this.data);
return this.isFinalStatus(status) ? status : lastFinished;
}.bind(this), jobStatus.DONE);
}.bind(this), JobStatus.DONE);
};

JobFallback.prototype.log = function(logger) {
Expand Down Expand Up @@ -251,8 +248,8 @@ JobFallback.prototype.log = function(logger) {
};

function isFinished (job) {
return jobUtils.isFinalStatus(job.data.status) &&
(!job.data.fallback_status || jobUtils.isFinalStatus(job.data.fallback_status));
return JobStatus.isFinal(job.data.status) &&
(!job.data.fallback_status || JobStatus.isFinal(job.data.fallback_status));
}

function parseQueryId (queryId) {
Expand Down
29 changes: 11 additions & 18 deletions batch/models/job_state_machine.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
'use strict';

var assert = require('assert');
var jobStatus = require('../job_status');
var finalStatus = [
jobStatus.CANCELLED,
jobStatus.DONE,
jobStatus.FAILED,
jobStatus.UNKNOWN
];

var JobStatus = require('../job_status');
var validStatusTransitions = [
[jobStatus.PENDING, jobStatus.RUNNING],
[jobStatus.PENDING, jobStatus.CANCELLED],
[jobStatus.PENDING, jobStatus.UNKNOWN],
[jobStatus.PENDING, jobStatus.SKIPPED],
[jobStatus.RUNNING, jobStatus.DONE],
[jobStatus.RUNNING, jobStatus.FAILED],
[jobStatus.RUNNING, jobStatus.CANCELLED],
[jobStatus.RUNNING, jobStatus.PENDING],
[jobStatus.RUNNING, jobStatus.UNKNOWN]
[JobStatus.PENDING, JobStatus.RUNNING],
[JobStatus.PENDING, JobStatus.CANCELLED],
[JobStatus.PENDING, JobStatus.UNKNOWN],
[JobStatus.PENDING, JobStatus.SKIPPED],
[JobStatus.RUNNING, JobStatus.DONE],
[JobStatus.RUNNING, JobStatus.FAILED],
[JobStatus.RUNNING, JobStatus.CANCELLED],
[JobStatus.RUNNING, JobStatus.PENDING],
[JobStatus.RUNNING, JobStatus.UNKNOWN]
];

function JobStateMachine () {
Expand All @@ -42,5 +35,5 @@ JobStateMachine.prototype.isValidTransition = function (initialStatus, finalStat
};

JobStateMachine.prototype.isFinalStatus = function (status) {
return finalStatus.indexOf(status) !== -1;
return JobStatus.isFinal(status);
};
8 changes: 7 additions & 1 deletion batch/models/query/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ Query.is = function (query) {

Query.prototype.getNextQuery = function (job) {
if (job.query.query[this.index].status === jobStatus.PENDING) {
return job.query.query[this.index].query;
var query = {
query: job.query.query[this.index].query
};
if (Number.isFinite(job.query.query[this.index].timeout)) {
query.timeout = job.query.query[this.index].timeout;
}
return query;
}
};

Expand Down
7 changes: 3 additions & 4 deletions batch/query_runner.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

var PSQL = require('cartodb-psql');
var BATCH_QUERY_TIMEOUT = global.settings.batch_query_timeout || 12 * 3600 * 1000; // 12 hours in millisecond
var debug = require('./util/debug')('query-runner');

function QueryRunner(userDatabaseMetadataService) {
Expand All @@ -10,23 +9,23 @@ function QueryRunner(userDatabaseMetadataService) {

module.exports = QueryRunner;

QueryRunner.prototype.run = function (job_id, sql, user, callback) {
QueryRunner.prototype.run = function (job_id, sql, user, timeout, callback) {
this.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}

var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });

pg.query('SET statement_timeout=' + BATCH_QUERY_TIMEOUT, function (err) {
pg.query('SET statement_timeout=' + timeout, function (err) {
if(err) {
return callback(err);
}

// mark query to allow to users cancel their queries
sql = '/* ' + job_id + ' */ ' + sql;

debug('Running query %s', sql);
debug('Running query [timeout=%d] %s', timeout, sql);
pg.eventedQuery(sql, function (err, query) {
if (err) {
return callback(err);
Expand Down
2 changes: 1 addition & 1 deletion config/environments/development.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '5432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_query_timeout || 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
Expand Down
2 changes: 1 addition & 1 deletion config/environments/production.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '6432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_query_timeout || 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.i
Expand Down
2 changes: 1 addition & 1 deletion config/environments/staging.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '6432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_query_timeout || 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
Expand Down
2 changes: 1 addition & 1 deletion config/environments/test.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '5432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_query_timeout || 12 * 3600 * 1000; // 12 hours in milliseconds
module.exports.batch_query_timeout = 5 * 1000; // 5 seconds in milliseconds
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
Expand Down
Loading

0 comments on commit fd6eb49

Please sign in to comment.