From 7c94b57f6901d37e8b4ecd0eeaf57a5634075c12 Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 5 Jun 2024 14:56:12 -0400 Subject: [PATCH 1/2] fix several issues with search job cancellation --- .../job_orchestration/scheduler/search/search_scheduler.py | 6 +++--- .../webui/imports/api/search/server/SearchJobsDbManager.js | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py index 50ae2d3b0..b4221ff95 100644 --- a/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py @@ -163,8 +163,8 @@ async def handle_cancelling_search_jobs(db_conn_pool) -> None: with contextlib.closing(db_conn_pool.connect()) as db_conn: cancelling_jobs = fetch_cancelling_search_jobs(db_conn) - for job in cancelling_jobs: - job_id = job["job_id"] + for cancelling_job in cancelling_jobs: + job_id = str(cancelling_job["job_id"]) if job_id in active_jobs: job = active_jobs.pop(job_id) cancel_job_except_reducer(job) @@ -459,7 +459,7 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): error_msg = f"Unexpected msg_type: {msg.msg_type.name}" raise NotImplementedError(error_msg) - if set_job_status(db_conn, job_id, new_job_status, SearchJobStatus.RUNNING): + if set_job_status(db_conn, job_id, new_job_status): if new_job_status == SearchJobStatus.SUCCEEDED: logger.info(f"Completed job {job_id}.") elif reducer_failed: diff --git a/components/webui/imports/api/search/server/SearchJobsDbManager.js b/components/webui/imports/api/search/server/SearchJobsDbManager.js index 005fbf0a9..8f5f950e7 100644 --- a/components/webui/imports/api/search/server/SearchJobsDbManager.js +++ b/components/webui/imports/api/search/server/SearchJobsDbManager.js @@ -90,7 +90,9 @@ class SearchJobsDbManager { await this.#sqlDbConnPool.query( `UPDATE ${this.#searchJobsTableName} SET ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${SEARCH_JOB_STATUS.CANCELLING} - WHERE ${SEARCH_JOBS_TABLE_COLUMN_NAMES.ID} = ?`, + WHERE ${SEARCH_JOBS_TABLE_COLUMN_NAMES.ID} = ? + AND ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} + IN (${SEARCH_JOB_STATUS.PENDING}, ${SEARCH_JOB_STATUS.RUNNING})`, jobId, ); } From 256c0a047579b1425a6bd3dec9156a91bba7365f Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 5 Jun 2024 16:28:19 -0400 Subject: [PATCH 2/2] apply suggestions from code review --- .../job_orchestration/scheduler/search/search_scheduler.py | 2 ++ .../webui/imports/api/search/server/SearchJobsDbManager.js | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py index b4221ff95..7f4107d22 100644 --- a/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py @@ -459,6 +459,8 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): error_msg = f"Unexpected msg_type: {msg.msg_type.name}" raise NotImplementedError(error_msg) + # We set the status regardless of the job's previous status to handle the case where the + # job is cancelled (status = CANCELLING) while we're in this method. if set_job_status(db_conn, job_id, new_job_status): if new_job_status == SearchJobStatus.SUCCEEDED: logger.info(f"Completed job {job_id}.") diff --git a/components/webui/imports/api/search/server/SearchJobsDbManager.js b/components/webui/imports/api/search/server/SearchJobsDbManager.js index 8f5f950e7..df1dc27fa 100644 --- a/components/webui/imports/api/search/server/SearchJobsDbManager.js +++ b/components/webui/imports/api/search/server/SearchJobsDbManager.js @@ -90,8 +90,8 @@ class SearchJobsDbManager { await this.#sqlDbConnPool.query( `UPDATE ${this.#searchJobsTableName} SET ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${SEARCH_JOB_STATUS.CANCELLING} - WHERE ${SEARCH_JOBS_TABLE_COLUMN_NAMES.ID} = ? - AND ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} + WHERE ${SEARCH_JOBS_TABLE_COLUMN_NAMES.ID} = ? + AND ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} IN (${SEARCH_JOB_STATUS.PENDING}, ${SEARCH_JOB_STATUS.RUNNING})`, jobId, );