From 9eefd207655a962d3c945967a8f86e419cdcf5a4 Mon Sep 17 00:00:00 2001 From: Jorge Ejarque Date: Fri, 13 Dec 2024 17:27:49 +0100 Subject: [PATCH] Ensure job is killed when exception in task status check (#5561) Signed-off-by: jorgee Signed-off-by: Paolo Di Tommaso Co-authored-by: Paolo Di Tommaso --- .../executor/CachedTaskHandler.groovy | 2 +- .../nextflow/executor/GridTaskHandler.groovy | 2 +- .../nextflow/executor/NopeExecutor.groovy | 2 +- .../executor/StoredTaskHandler.groovy | 2 +- .../executor/local/LocalTaskHandler.groovy | 2 +- .../executor/local/NativeTaskHandler.groovy | 2 +- .../groovy/nextflow/k8s/K8sTaskHandler.groovy | 2 +- .../nextflow/processor/TaskHandler.groovy | 20 +++++++++++++++++-- .../processor/TaskPollingMonitor.groovy | 14 ++++++++++--- .../nextflow/k8s/K8sTaskHandlerTest.groovy | 4 ++-- .../nextflow/processor/TaskHandlerTest.groovy | 14 +++++++++++++ .../processor/TaskPollingMonitorTest.groovy | 2 +- .../groovy/test/MockHelpers.groovy | 2 +- .../aws/batch/AwsBatchTaskHandler.groovy | 2 +- .../aws/batch/AwsBatchTaskHandlerTest.groovy | 6 +++--- .../azure/batch/AzBatchTaskHandler.groovy | 2 +- .../batch/GoogleBatchTaskHandler.groovy | 2 +- .../GoogleLifeSciencesTaskHandler.groovy | 2 +- .../batch/GoogleBatchTaskHandlerTest.groovy | 6 +++--- 19 files changed, 64 insertions(+), 26 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/CachedTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/CachedTaskHandler.groovy index 841db3d876..93d0f50ecc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/CachedTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/CachedTaskHandler.groovy @@ -51,7 +51,7 @@ class CachedTaskHandler extends TaskHandler { } @Override - void kill() { + protected void killTask() { throw new UnsupportedOperationException() } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy index 11d310b600..1d03bf21ba 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy @@ -503,7 +503,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask { } @Override - void kill() { + protected void killTask() { if( batch ) { batch.collect(executor, jobId) } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/NopeExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/NopeExecutor.groovy index ce7dcf4ad4..9b9fbddbf9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/NopeExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/NopeExecutor.groovy @@ -86,7 +86,7 @@ class NopeTaskHandler extends TaskHandler { } @Override - void kill() { } + protected void killTask() { } } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/StoredTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/StoredTaskHandler.groovy index ac32fd2fb6..3c262b11fc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/StoredTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/StoredTaskHandler.groovy @@ -29,7 +29,7 @@ class StoredTaskHandler extends TaskHandler { } @Override - void kill() { + protected void killTask() { throw new UnsupportedOperationException() } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy index 052b10b708..0bad8bb9c7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy @@ -236,7 +236,7 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { * Force the submitted job to quit */ @Override - void kill() { + protected void killTask() { if( !process ) return final pid = ProcessHelper.pid(process) log.trace("Killing process with pid: ${pid}") diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/local/NativeTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/local/NativeTaskHandler.groovy index 910a4248e0..e90d72ae05 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/local/NativeTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/local/NativeTaskHandler.groovy @@ -108,7 +108,7 @@ class NativeTaskHandler extends TaskHandler { } @Override - void kill() { + protected void killTask() { if( result ) result.cancel(true) } diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy index a021a085c4..a0c8d09022 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy @@ -472,7 +472,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { * Terminates the current task execution */ @Override - void kill() { + protected void killTask() { if( cleanupDisabled() ) return diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 1c0c4dc438..cdf90cdaca 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -19,6 +19,7 @@ package nextflow.processor import static nextflow.processor.TaskStatus.* import java.nio.file.NoSuchFileException +import java.util.concurrent.atomic.AtomicBoolean import groovy.transform.CompileDynamic import groovy.transform.CompileStatic @@ -37,6 +38,8 @@ import nextflow.trace.TraceRecord @CompileStatic abstract class TaskHandler { + private AtomicBoolean killed = new AtomicBoolean() + protected TaskHandler(TaskRun task) { this.task = task } @@ -77,10 +80,22 @@ abstract class TaskHandler { abstract boolean checkIfCompleted() /** - * Force the submitted job to quit + * Template method implementing the termination of a task execution. + * This is not mean to be invoked directly. See also {@link #kill()} */ - abstract void kill() + abstract protected void killTask() + /** + * Kill a job execution. + * + * @see #killTask() + */ + void kill() { + if (!killed.getAndSet(true)) { + killTask() + } + } + /** * Submit the task for execution. * @@ -300,4 +315,5 @@ abstract class TaskHandler { final workflowId = env.get("TOWER_WORKFLOW_ID") return workflowId ? "tw-${workflowId}-${name}" : name } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy index 4e97543d2e..e4301c1285 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy @@ -16,6 +16,12 @@ package nextflow.processor +import nextflow.cloud.CloudSpotTerminationException +import nextflow.exception.FailedGuardException +import nextflow.exception.ProcessEvalException +import nextflow.exception.ProcessException +import nextflow.exception.ProcessRetryableException + import static nextflow.processor.TaskProcessor.* import java.util.concurrent.ExecutorService @@ -34,10 +40,9 @@ import nextflow.exception.ProcessSubmitTimeoutException import nextflow.executor.BatchCleanup import nextflow.executor.GridTaskHandler import nextflow.util.Duration +import nextflow.util.SysHelper import nextflow.util.Threads import nextflow.util.Throttle -import static nextflow.util.SysHelper.dumpThreads - /** * Monitors the queued tasks waiting for their termination * @@ -465,7 +470,7 @@ class TaskPollingMonitor implements TaskMonitor { } protected dumpCurrentThreads() { - log.trace "Current running threads:\n${dumpThreads()}" + log.trace "Current running threads:\n${SysHelper.dumpThreads()}" } protected void dumpRunningQueue() { @@ -573,6 +578,9 @@ class TaskPollingMonitor implements TaskMonitor { checkTaskStatus(handler) } catch (Throwable error) { + // At this point NF assumes job is not running, but there could be errors at monitoring that could leave a job running (#5516). + // In this case, NF needs to ensure the job is killed. + handler.kill() handleException(handler, error) } } diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy index 1b866dac79..b3c29cd86d 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -515,13 +515,13 @@ class K8sTaskHandlerTest extends Specification { def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME)) when: - handler.kill() + handler.killTask() then: 1 * handler.cleanupDisabled() >> false 1 * client.podDelete(POD_NAME) >> null when: - handler.kill() + handler.killTask() then: 1 * handler.cleanupDisabled() >> true 0 * client.podDelete(POD_NAME) >> null diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index 6ec29faf44..d5014baaf4 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -263,4 +263,18 @@ class TaskHandlerTest extends Specification { [:] | "job_1" [TOWER_WORKFLOW_ID: '1234'] | "tw-1234-job_1" } + + def 'should not kill task twice'() { + given: + def handler = Spy(TaskHandler) + when: + handler.kill() + then: + 1 * handler.killTask() >> {} + + when: + handler.kill() + then: + 0 * handler.killTask() + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy index 23f5098131..41d4c2e812 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy @@ -120,7 +120,7 @@ class TaskPollingMonitorTest extends Specification { then: 1 * session.disableJobsCancellation >> true and: - 0 * handler.kill() >> null + 0 * handler.killTask() >> null 0 * session.notifyTaskComplete(handler) >> null } diff --git a/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy b/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy index 51a298dbd6..d4f5065c0d 100644 --- a/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy +++ b/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy @@ -189,6 +189,6 @@ class MockTaskHandler extends TaskHandler { } @Override - void kill() { } + protected void killTask() { } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy index 29a2261e25..d5883bb86c 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy @@ -298,7 +298,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler> true and: @@ -889,7 +889,7 @@ class AwsBatchTaskHandlerTest extends Specification { when: handler.@jobId = 'job1:task2' - handler.kill() + handler.killTask() then: 1 * executor.shouldDeleteJob('job1') >> true and: @@ -897,7 +897,7 @@ class AwsBatchTaskHandlerTest extends Specification { when: handler.@jobId = 'job1:task2' - handler.kill() + handler.killTask() then: 1 * executor.shouldDeleteJob('job1') >> false and: diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy index 581bed4ec7..0071e99bfa 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy @@ -177,7 +177,7 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask { } @Override - void kill() { + protected void killTask() { if( !taskKey ) return batchService.terminate(taskKey) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index c651e4fa4e..4bf66f9860 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -563,7 +563,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { } @Override - void kill() { + protected void killTask() { if( isActive() ) { log.trace "[GOOGLE BATCH] Process `${task.lazyName()}` - deleting job name=$jobId" if( executor.shouldDeleteJob(jobId) ) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesTaskHandler.groovy index 4a0882ab28..d3a08a6132 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/lifesciences/GoogleLifeSciencesTaskHandler.groovy @@ -286,7 +286,7 @@ class GoogleLifeSciencesTaskHandler extends TaskHandler { } @Override - void kill() { + protected void killTask() { if( !operation ) return log.debug "[GLS] Killing task > $task.name - Pipeline Id: $pipelineId" helper.cancelOperation(operation) diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 43150d17fd..4c8f08297b 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -559,7 +559,7 @@ class GoogleBatchTaskHandlerTest extends Specification { when: handler.@jobId = 'job1' - handler.kill() + handler.killTask() then: handler.isActive() >> false 0 * executor.shouldDeleteJob('job1') >> true @@ -568,7 +568,7 @@ class GoogleBatchTaskHandlerTest extends Specification { when: handler.@jobId = 'job1' - handler.kill() + handler.killTask() then: handler.isActive() >> true 1 * executor.shouldDeleteJob('job1') >> true @@ -577,7 +577,7 @@ class GoogleBatchTaskHandlerTest extends Specification { when: handler.@jobId = 'job1' - handler.kill() + handler.killTask() then: handler.isActive() >> true 1 * executor.shouldDeleteJob('job1') >> false