Skip to content

Commit

Permalink
Add ability to cancel build jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Salim Alam <salam@chef.io>
  • Loading branch information
chefsalim committed Oct 31, 2017
1 parent d7ee7d2 commit cc079b6
Show file tree
Hide file tree
Showing 27 changed files with 1,549 additions and 524 deletions.
19 changes: 19 additions & 0 deletions components/builder-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,25 @@ impl Client {
}
}

/// Cancel a job group
///
/// # Failures
///
/// * Remote API Server is not available
pub fn job_group_cancel(&self, group_id: u64, token: &str) -> Result<()> {
let url = format!("jobs/group/{}/cancel", group_id);
let res = self.add_authz(self.0.post(&url), token).send().map_err(
Error::HyperError,
)?;

if res.status != StatusCode::NoContent {
debug!("Failed to cancel group, status: {:?}", res.status);
return Err(err_from_response(res));
}

Ok(())
}

fn add_authz<'a>(&'a self, rb: RequestBuilder<'a>, token: &str) -> RequestBuilder {
rb.header(Authorization(Bearer { token: token.to_string() }))
}
Expand Down
44 changes: 41 additions & 3 deletions components/builder-api/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use hyper::status::StatusCode;
use iron::status;
use params::{FromValue, Params};
use persistent;
use protocol::jobsrv::{Job, JobGet, JobLog, JobLogGet, JobState, ProjectJobsGet,
ProjectJobsGetResponse};
use protocol::jobsrv::{JobGraphPackageReverseDependencies, JobGraphPackageReverseDependenciesGet};
use protocol::jobsrv::{Job, JobGet, JobLogGet, JobLog, JobState, ProjectJobsGet,
ProjectJobsGetResponse, JobGroupCancel, JobGroupGet, JobGroup};
use protocol::jobsrv::{JobGraphPackageReverseDependenciesGet, JobGraphPackageReverseDependencies};
use protocol::originsrv::*;
use protocol::sessionsrv::{Account, AccountGetId, AccountInvitationListRequest,
AccountInvitationListResponse, AccountOriginListRequest,
Expand Down Expand Up @@ -181,6 +181,44 @@ pub fn job_group_promote(req: &mut Request) -> IronResult<Response> {
}
}

pub fn job_group_cancel(req: &mut Request) -> IronResult<Response> {
let group_id = match get_param(req, "id") {
Some(id) => {
match id.parse::<u64>() {
Ok(g) => g,
Err(e) => {
debug!("Error finding group. e = {:?}", e);
return Ok(Response::with(status::BadRequest));
}
}
}
None => return Ok(Response::with(status::BadRequest)),
};

let mut jgg = JobGroupGet::new();
jgg.set_group_id(group_id);

let group = match route_message::<JobGroupGet, JobGroup>(req, &jgg) {
Ok(group) => group,
Err(err) => return Ok(render_net_error(&err)),
};

let name_split: Vec<&str> = group.get_project_name().split("/").collect();
assert!(name_split.len() == 2);

if !check_origin_access(req, &name_split[0]).unwrap_or(false) {
return Ok(Response::with(status::Forbidden));
}

let mut jgc = JobGroupCancel::new();
jgc.set_group_id(group_id);

match route_message::<JobGroupCancel, NetOk>(req, &jgc) {
Ok(_) => Ok(Response::with(status::NoContent)),
Err(err) => Ok(render_net_error(&err)),
}
}

pub fn validate_registry_credentials(req: &mut Request) -> IronResult<Response> {
let json_body = req.get::<bodyparser::Json>();

Expand Down
3 changes: 3 additions & 0 deletions components/builder-api/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ impl HttpGateway for ApiSrv {
job_group_promote: post "/jobs/group/:id/promote/:channel" => {
XHandler::new(job_group_promote).before(basic.clone())
},
job_group_cancel: post "/jobs/group/:id/cancel" => {
XHandler::new(job_group_cancel).before(basic.clone())
},
rdeps: get "/rdeps/:origin/:name" => rdeps_show,

user_invitations: get "/user/invitations" => {
Expand Down
1 change: 1 addition & 0 deletions components/builder-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::{Deref, DerefMut};
use protocol::jobsrv;
use protocol::originsrv;

#[derive(Clone)]
pub struct Job(jobsrv::Job);

impl Job {
Expand Down
34 changes: 34 additions & 0 deletions components/builder-jobsrv/src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,25 @@ impl DataStore {
Ok(jobs)
}

/// Get a list of cancel pending jobs
///
/// # Errors
///
/// * If a connection cannot be gotten from the pool
/// * If the cancel pending jobs cannot be selected from the database
/// * If the row returned cannot be translated into a Job
pub fn get_cancel_pending_jobs(&self) -> Result<Vec<jobsrv::Job>> {
let mut jobs = Vec::new();
let conn = self.pool.get_shard(0)?;
let rows = &conn.query("SELECT * FROM get_cancel_pending_jobs_v1()", &[])
.map_err(Error::JobPending)?;
for row in rows {
let job = row_to_job(&row)?;
jobs.push(job);
}
Ok(jobs)
}

/// Reset any Dispatched jobs back to Pending state
/// This is used for recovery scenario
///
Expand Down Expand Up @@ -521,6 +540,14 @@ impl DataStore {
Ok(group)
}

pub fn cancel_job_group(&self, group_id: u64) -> Result<()> {
let conn = self.pool.get_shard(0)?;
conn.query("SELECT cancel_group_v1($1)", &[&(group_id as i64)])
.map_err(Error::JobGroupCancel)?;

Ok(())
}

pub fn get_job_group(&self, msg: &jobsrv::JobGroupGet) -> Result<Option<jobsrv::JobGroup>> {
let group_id = msg.get_group_id();
let conn = self.pool.get_shard(0)?;
Expand Down Expand Up @@ -596,6 +623,7 @@ impl DataStore {
"Complete" => jobsrv::JobGroupState::GroupComplete,
"Failed" => jobsrv::JobGroupState::GroupFailed,
"Queued" => jobsrv::JobGroupState::GroupQueued,
"Canceled" => jobsrv::JobGroupState::GroupCanceled,
_ => return Err(Error::UnknownJobGroupState),
};
group.set_state(group_state);
Expand Down Expand Up @@ -626,6 +654,7 @@ impl DataStore {
"Success" => jobsrv::JobGroupProjectState::Success,
"Failure" => jobsrv::JobGroupProjectState::Failure,
"Skipped" => jobsrv::JobGroupProjectState::Skipped,
"Canceled" => jobsrv::JobGroupProjectState::Canceled,
_ => return Err(Error::UnknownJobGroupProjectState),
};

Expand Down Expand Up @@ -663,6 +692,7 @@ impl DataStore {
jobsrv::JobGroupState::GroupComplete => "Complete",
jobsrv::JobGroupState::GroupFailed => "Failed",
jobsrv::JobGroupState::GroupQueued => "Queued",
jobsrv::JobGroupState::GroupCanceled => "Canceled",
};
conn.execute(
"SELECT set_group_state_v1($1, $2)",
Expand All @@ -684,6 +714,7 @@ impl DataStore {
jobsrv::JobGroupProjectState::Success => "Success",
jobsrv::JobGroupProjectState::Failure => "Failure",
jobsrv::JobGroupProjectState::Skipped => "Skipped",
jobsrv::JobGroupProjectState::Canceled => "Canceled",
};
conn.execute(
"SELECT set_group_project_name_state_v1($1, $2, $3)",
Expand Down Expand Up @@ -715,6 +746,9 @@ impl DataStore {
jobsrv::JobState::Pending |
jobsrv::JobState::Processing |
jobsrv::JobState::Dispatched => "InProgress",
jobsrv::JobState::CancelPending |
jobsrv::JobState::CancelProcessing |
jobsrv::JobState::CancelComplete => "Canceled",
};

if job.get_state() == jobsrv::JobState::Complete {
Expand Down
3 changes: 3 additions & 0 deletions components/builder-jobsrv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Error {
InvalidUrl,
IO(io::Error),
JobGroupCreate(postgres::error::Error),
JobGroupCancel(postgres::error::Error),
JobGroupGet(postgres::error::Error),
JobGroupPending(postgres::error::Error),
JobGroupSetState(postgres::error::Error),
Expand Down Expand Up @@ -115,6 +116,7 @@ impl fmt::Display for Error {
Error::InvalidUrl => format!("Bad URL!"),
Error::IO(ref e) => format!("{}", e),
Error::JobGroupCreate(ref e) => format!("Database error creating a new group, {}", e),
Error::JobGroupCancel(ref e) => format!("Database error canceling a job group, {}", e),
Error::JobGroupGet(ref e) => format!("Database error getting group data, {}", e),
Error::JobGroupPending(ref e) => format!("Database error getting pending group, {}", e),
Error::JobGroupSetState(ref e) => format!("Database error setting group state, {}", e),
Expand Down Expand Up @@ -194,6 +196,7 @@ impl error::Error for Error {
Error::IO(ref err) => err.description(),
Error::InvalidUrl => "Bad Url!",
Error::JobGroupCreate(ref err) => err.description(),
Error::JobGroupCancel(ref err) => err.description(),
Error::JobGroupGet(ref err) => err.description(),
Error::JobGroupPending(ref err) => err.description(),
Error::JobGroupSetState(ref err) => err.description(),
Expand Down
12 changes: 12 additions & 0 deletions components/builder-jobsrv/src/migrations/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,5 +354,17 @@ pub fn migrate(migrator: &mut Migrator) -> Result<()> {
"#,
)?;

// Get a list of all the cancel pending jobs
migrator.migrate(
"jobsrv",
r#"CREATE OR REPLACE FUNCTION get_cancel_pending_jobs_v1()
RETURNS SETOF jobs
LANGUAGE SQL VOLATILE AS $$
SELECT *
FROM jobs
WHERE job_state = 'CancelPending'
$$"#,
)?;

Ok(())
}
12 changes: 12 additions & 0 deletions components/builder-jobsrv/src/migrations/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,5 +315,17 @@ pub fn migrate(migrator: &mut Migrator) -> Result<()> {
$$ LANGUAGE SQL VOLATILE"#,
)?;

// Cancel a job group
migrator.migrate(
"jobsrv",
r#"CREATE OR REPLACE FUNCTION cancel_group_v1(in_gid bigint) RETURNS void AS $$
UPDATE group_projects SET project_state='Canceled'
WHERE owner_id = in_gid
AND (project_state = 'NotStarted');
UPDATE groups SET group_state='Canceled' where id = in_gid;
$$ LANGUAGE SQL VOLATILE
"#,
)?;

Ok(())
}
Loading

0 comments on commit cc079b6

Please sign in to comment.