From cf90507a6c325b88265622e7d7aaa668b09ecb4d Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Tue, 17 Jan 2023 17:26:51 -0500 Subject: [PATCH 1/7] Allow repeating attempts at initialization --- .../engine/workflow/WorkflowActor.scala | 71 ++++++++++++------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index ba82bab3e96..b51495a6559 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -135,6 +135,12 @@ object WorkflowActor { case class StateCheckpoint(state: WorkflowActorState, failures: Option[List[Throwable]] = None) + /** + * Message sent from WorkflowActor to itself to trigger the initialization to start. Allows us to idle + * in the initializing state without an assigned actor for a few seconds if the first attempt fails. + */ + case object StartInitializing + /** * @param currentLifecycleStateActor The current lifecycle stage, represented by an ActorRef. */ @@ -145,7 +151,8 @@ object WorkflowActor { effectiveStartableState: StartableState, workflowFinalOutputs: Set[WomValue] = Set.empty, workflowAllOutputs: Set[WomValue] = Set.empty, - rootAndSubworkflowIds: Set[WorkflowId] = Set.empty) + rootAndSubworkflowIds: Set[WorkflowId] = Set.empty, + failedInitializationAttempts: Int = 0) object WorkflowActorData { def apply(startableState: StartableState): WorkflowActorData = WorkflowActorData( currentLifecycleStateActor = None, @@ -299,6 +306,19 @@ class WorkflowActor(workflowToStart: WorkflowToStart, when(MaterializingWorkflowDescriptorState) { case Event(MaterializeWorkflowDescriptorSuccessResponse(workflowDescriptor), data) => + self ! StartInitializing + goto(InitializingWorkflowState) using data.copy(workflowDescriptor = Option(workflowDescriptor)) + case Event(MaterializeWorkflowDescriptorFailureResponse(reason: Throwable), data) => + goto(WorkflowFailedState) using data.copy(lastStateReached = StateCheckpoint(MaterializingWorkflowDescriptorState, Option(List(reason)))) + // If the workflow is not being restarted then we can abort it immediately as nothing happened yet + case Event(AbortWorkflowCommand, _) if !restarting => goto(WorkflowAbortedState) + } + + /* ************************** */ + /* ****** Initializing ****** */ + /* ************************** */ + when(InitializingWorkflowState) { + case Event(StartInitializing, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => val initializerActor = context.actorOf( WorkflowInitializationActor.props( workflowIdForLogging, @@ -308,21 +328,10 @@ class WorkflowActor(workflowToStart: WorkflowToStart, serviceRegistryActor, restarting ), - name = s"WorkflowInitializationActor-$workflowId") + name = s"WorkflowInitializationActor-$workflowId-${data.failedInitializationAttempts + 1}") initializerActor ! StartInitializationCommand goto(InitializingWorkflowState) using data.copy(currentLifecycleStateActor = Option(initializerActor), workflowDescriptor = Option(workflowDescriptor)) - case Event(MaterializeWorkflowDescriptorFailureResponse(reason: Throwable), data) => - goto(WorkflowFailedState) using data.copy(lastStateReached = StateCheckpoint(MaterializingWorkflowDescriptorState, Option(List(reason)))) - // If the workflow is not being restarted then we can abort it immediately as nothing happened yet - case Event(AbortWorkflowCommand, _) if !restarting => goto(WorkflowAbortedState) - } - - /* ************************** */ - /* ****** Initializing ****** */ - /* ************************** */ - - when(InitializingWorkflowState) { - case Event(WorkflowInitializationSucceededResponse(initializationData), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + case Event(WorkflowInitializationSucceededResponse(initializationData), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => val executionActor = context.actorOf(WorkflowExecutionActor.props( workflowDescriptor, ioActor = ioActor, @@ -349,11 +358,19 @@ class WorkflowActor(workflowToStart: WorkflowToStart, case _ => ExecutingWorkflowState } goto(nextState) using data.copy(currentLifecycleStateActor = Option(executionActor), initializationData = initializationData) - case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => - finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList)) + case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => + if (data.failedInitializationAttempts < 10) { + workflowLogger.info(s"Initialization failed on attempt ${data.failedInitializationAttempts + 1}. Will retry in 30 seconds", CromwellAggregatedException(reason, "Initialization Failure")) + context.system.scheduler.scheduleOnce(30.seconds) { self ! StartInitializing} + // Make sure we don't leave the old actor lying around: + data.currentLifecycleStateActor.foreach(context.stop) + stay() using data.copy(currentLifecycleStateActor = None, failedInitializationAttempts = data.failedInitializationAttempts + 1) + } else { + finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList)) + } // If the workflow is not restarting, handle the Abort command normally and send an abort message to the init actor - case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) if !restarting => + case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) if !restarting => handleAbortCommand(data, workflowDescriptor) } @@ -365,20 +382,20 @@ class WorkflowActor(workflowToStart: WorkflowToStart, val executionResponseHandler: StateFunction = { // Workflow responses case Event(WorkflowExecutionSucceededResponse(jobKeys, rootAndSubworklowIds, finalOutputs, allOutputs), - data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => finalizeWorkflow(data, workflowDescriptor, jobKeys, finalOutputs, None, allOutputs, rootAndSubworklowIds) case Event(WorkflowExecutionFailedResponse(jobKeys, failures), - data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => finalizeWorkflow(data, workflowDescriptor, jobKeys, CallOutputs.empty, Option(List(failures))) case Event(WorkflowExecutionAbortedResponse(jobKeys), - data @ WorkflowActorData(_, Some(workflowDescriptor), _, StateCheckpoint(_, failures), _, _, _, _)) => + data @ WorkflowActorData(_, Some(workflowDescriptor), _, StateCheckpoint(_, failures), _, _, _, _, _)) => finalizeWorkflow(data, workflowDescriptor, jobKeys, CallOutputs.empty, failures) // Whether we're running or aborting, restarting or not, pass along the abort command. // Note that aborting a workflow multiple times will result in as many abort commands sent to the execution actor - case Event(AbortWorkflowWithExceptionCommand(ex), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + case Event(AbortWorkflowWithExceptionCommand(ex), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => handleAbortCommand(data, workflowDescriptor, Option(ex)) - case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => handleAbortCommand(data, workflowDescriptor) } @@ -391,11 +408,11 @@ class WorkflowActor(workflowToStart: WorkflowToStart, // Handles initialization responses we can get if the abort came in when we were initializing the workflow val abortHandler: StateFunction = { // If the initialization failed, record the failure in the data and finalize the workflow - case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList)) // Otherwise (success or abort), finalize the workflow without failures - case Event(_: WorkflowInitializationResponse, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) => + case Event(_: WorkflowInitializationResponse, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, failures = None) } @@ -436,7 +453,11 @@ class WorkflowActor(workflowToStart: WorkflowToStart, currentActor ! EngineLifecycleActorAbortCommand goto(WorkflowAbortingState) using data.copy(lastStateReached = StateCheckpoint(stateName, exceptionCausedAbortOpt.map(List(_)))) case None => - workflowLogger.warn(s"Received an abort command in state $stateName but there's no lifecycle actor associated. This is an abnormal state, finalizing the workflow anyway.") + if (stateName == InitializingWorkflowState) { + workflowLogger.info(s"Received an abort command in state $stateName (while awaiting an initialization retry). Finalizing the workflow.") + } else { + workflowLogger.warn(s"Received an abort command in state $stateName but there's no lifecycle actor associated. This is an abnormal state, finalizing the workflow anyway.") + } finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, None) } } From 93d8c62d8d9ab1318be5d5e1e56b266cb8af30ac Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Wed, 18 Jan 2023 14:13:33 -0500 Subject: [PATCH 2/7] WIP --- .../engine/workflow/WorkflowActor.scala | 81 ++++++++++--------- .../engine/workflow/WorkflowActorSpec.scala | 57 ++++++++++--- 2 files changed, 93 insertions(+), 45 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index b51495a6559..92ec3c4d44b 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -262,6 +262,9 @@ class WorkflowActor(workflowToStart: WorkflowToStart, protected val pathBuilderFactories: List[PathBuilderFactory] = EngineFilesystems.configuredPathBuilderFactories + protected val maxInitializationAttempts: Int = 10 + protected val initializationRetryInterval: FiniteDuration = 30.seconds + startWith(WorkflowUnstartedState, WorkflowActorData(initialStartableState)) pushCurrentStateToMetadataService(workflowId, WorkflowUnstartedState.workflowState) @@ -317,54 +320,39 @@ class WorkflowActor(workflowToStart: WorkflowToStart, /* ************************** */ /* ****** Initializing ****** */ /* ************************** */ + protected def createInitializationActor(workflowDescriptor: EngineWorkflowDescriptor, name: String): ActorRef = { + context.actorOf( + WorkflowInitializationActor.props( + workflowIdForLogging, + rootWorkflowIdForLogging, + workflowDescriptor, + ioActor, + serviceRegistryActor, + restarting + ), + name) + } + when(InitializingWorkflowState) { case Event(StartInitializing, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => - val initializerActor = context.actorOf( - WorkflowInitializationActor.props( - workflowIdForLogging, - rootWorkflowIdForLogging, - workflowDescriptor, - ioActor, - serviceRegistryActor, - restarting - ), - name = s"WorkflowInitializationActor-$workflowId-${data.failedInitializationAttempts + 1}") + val initializerActor = createInitializationActor(workflowDescriptor, s"WorkflowInitializationActor-$workflowId-${data.failedInitializationAttempts + 1}") initializerActor ! StartInitializationCommand goto(InitializingWorkflowState) using data.copy(currentLifecycleStateActor = Option(initializerActor), workflowDescriptor = Option(workflowDescriptor)) case Event(WorkflowInitializationSucceededResponse(initializationData), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => - val executionActor = context.actorOf(WorkflowExecutionActor.props( - workflowDescriptor, - ioActor = ioActor, - serviceRegistryActor = serviceRegistryActor, - jobStoreActor = jobStoreActor, - subWorkflowStoreActor = subWorkflowStoreActor, - callCacheReadActor = callCacheReadActor, - callCacheWriteActor = callCacheWriteActor, - workflowDockerLookupActor = workflowDockerLookupActor, - jobRestartCheckTokenDispenserActor = jobRestartCheckTokenDispenserActor, - jobExecutionTokenDispenserActor = jobExecutionTokenDispenserActor, - backendSingletonCollection, - initializationData, - startState = data.effectiveStartableState, - rootConfig = conf, - totalJobsByRootWf = totalJobsByRootWf, - fileHashCacheActor = fileHashCacheActorProps map context.system.actorOf, - blacklistCache = blacklistCache), name = s"WorkflowExecutionActor-$workflowId") - + val dataWithInitializationData = data.copy(initializationData = initializationData) + val executionActor = createWorkflowExecutionActor(workflowDescriptor, dataWithInitializationData) executionActor ! ExecuteWorkflowCommand - val nextState = data.effectiveStartableState match { case RestartableAborting => WorkflowAbortingState case _ => ExecutingWorkflowState } - goto(nextState) using data.copy(currentLifecycleStateActor = Option(executionActor), initializationData = initializationData) + goto(nextState) using dataWithInitializationData.copy(currentLifecycleStateActor = Option(executionActor)) case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => - if (data.failedInitializationAttempts < 10) { + val failedInitializationAttempts = data.failedInitializationAttempts + 1 + if (failedInitializationAttempts < maxInitializationAttempts) { workflowLogger.info(s"Initialization failed on attempt ${data.failedInitializationAttempts + 1}. Will retry in 30 seconds", CromwellAggregatedException(reason, "Initialization Failure")) - context.system.scheduler.scheduleOnce(30.seconds) { self ! StartInitializing} - // Make sure we don't leave the old actor lying around: - data.currentLifecycleStateActor.foreach(context.stop) - stay() using data.copy(currentLifecycleStateActor = None, failedInitializationAttempts = data.failedInitializationAttempts + 1) + context.system.scheduler.scheduleOnce(initializationRetryInterval) { self ! StartInitializing} + stay() using data.copy(currentLifecycleStateActor = None, failedInitializationAttempts = failedInitializationAttempts) } else { finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList)) } @@ -378,6 +366,27 @@ class WorkflowActor(workflowToStart: WorkflowToStart, /* ****** Running ****** */ /* ********************* */ + def createWorkflowExecutionActor(workflowDescriptor: EngineWorkflowDescriptor, data: WorkflowActorData): ActorRef = { + context.actorOf(WorkflowExecutionActor.props( + workflowDescriptor, + ioActor = ioActor, + serviceRegistryActor = serviceRegistryActor, + jobStoreActor = jobStoreActor, + subWorkflowStoreActor = subWorkflowStoreActor, + callCacheReadActor = callCacheReadActor, + callCacheWriteActor = callCacheWriteActor, + workflowDockerLookupActor = workflowDockerLookupActor, + jobRestartCheckTokenDispenserActor = jobRestartCheckTokenDispenserActor, + jobExecutionTokenDispenserActor = jobExecutionTokenDispenserActor, + backendSingletonCollection, + data.initializationData, + startState = data.effectiveStartableState, + rootConfig = conf, + totalJobsByRootWf = totalJobsByRootWf, + fileHashCacheActor = fileHashCacheActorProps map context.system.actorOf, + blacklistCache = blacklistCache), name = s"WorkflowExecutionActor-$workflowId") + } + // Handles workflow completion events from the WEA and abort command val executionResponseHandler: StateFunction = { // Workflow responses diff --git a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala index ab47dedeb5c..6259ce9972b 100644 --- a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala @@ -2,7 +2,6 @@ package cromwell.engine.workflow import java.time.OffsetDateTime import java.util.concurrent.atomic.AtomicInteger - import akka.actor.{Actor, ActorRef, ActorSystem, Kill, Props} import akka.testkit.{EventFilter, TestActorRef, TestFSMRef, TestProbe} import com.typesafe.config.{Config, ConfigFactory} @@ -15,10 +14,10 @@ import cromwell.engine.backend.BackendSingletonCollection import cromwell.engine.workflow.WorkflowActor._ import cromwell.engine.workflow.WorkflowManagerActor.WorkflowActorWorkComplete import cromwell.engine.workflow.lifecycle.EngineLifecycleActorAbortCommand -import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor.{WorkflowExecutionAbortedResponse, WorkflowExecutionFailedResponse, WorkflowExecutionSucceededResponse} +import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor.{ExecuteWorkflowCommand, WorkflowExecutionAbortedResponse, WorkflowExecutionFailedResponse, WorkflowExecutionSucceededResponse} import cromwell.engine.workflow.lifecycle.finalization.CopyWorkflowLogsActor import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationSucceededResponse} -import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor.{WorkflowInitializationAbortedResponse, WorkflowInitializationFailedResponse} +import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor.{StartInitializationCommand, WorkflowInitializationAbortedResponse, WorkflowInitializationFailedResponse, WorkflowInitializationSucceededResponse} import cromwell.engine.workflow.lifecycle.materialization.MaterializeWorkflowDescriptorActor.MaterializeWorkflowDescriptorFailureResponse import cromwell.engine.workflow.workflowstore.{StartableState, Submitted, WorkflowHeartbeatConfig, WorkflowToStart} import cromwell.engine.{EngineFilesystems, EngineWorkflowDescriptor} @@ -60,6 +59,8 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB createMaterializedEngineWorkflowDescriptor(WorkflowId.randomId(), workflowSources = workflowSources) val supervisorProbe: TestProbe = TestProbe("supervisorProbe") val deathwatch: TestProbe = TestProbe("deathwatch") + val initializationProbe: TestProbe = TestProbe("initializationProbe") + val executionProbe: TestProbe = TestProbe("executionProbe") val finalizationProbe: TestProbe = TestProbe("finalizationProbe") var copyWorkflowLogsProbe: TestProbe = _ val AwaitAlmostNothing: FiniteDuration = 100.milliseconds @@ -78,7 +79,7 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB private val workflowHeartbeatConfig = WorkflowHeartbeatConfig(ConfigFactory.load()) - private def createWorkflowActor(state: WorkflowActorState, extraPathBuilderFactory: Option[PathBuilderFactory] = None) = { + private def createWorkflowActor(state: WorkflowActorState, extraPathBuilderFactory: Option[PathBuilderFactory] = None, initializationMaxRetries: Int = 3) = { val actor = TestFSMRef( factory = new WorkflowActorWithTestAddons( finalizationProbe = finalizationProbe, @@ -102,7 +103,11 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB workflowStoreActor = system.actorOf(Props.empty, s"workflowStoreActor-$currentWorkflowId"), workflowHeartbeatConfig = workflowHeartbeatConfig, totalJobsByRootWf = initialJobCtByRootWf, - extraPathBuilderFactory = extraPathBuilderFactory + extraPathBuilderFactory = extraPathBuilderFactory, + initializationMaxRetries = initializationMaxRetries, + initializationInterval = 10.millis, + workflowInitializationActorProbe = initializationProbe, + workflowExecutionActorProbe = executionProbe ), supervisor = supervisorProbe.ref, name = s"workflowActor-$currentWorkflowId", @@ -124,10 +129,17 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB "WorkflowActor" should { - "run Finalization actor if Initialization fails" in { - val actor = createWorkflowActor(InitializingWorkflowState) + "run Finalization actor if Initialization repeatedly fails" in { + val actor = createWorkflowActor(InitializingWorkflowState, initializationMaxRetries = 3) deathwatch watch actor - actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Materialization Failed"))) + + // Expect the WorkflowActor to retry up to three times: + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (1)"))) + initializationProbe.expectMsg(StartInitializationCommand) + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (2)"))) + initializationProbe.expectMsg(StartInitializationCommand) + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (3)"))) + finalizationProbe.expectMsg(StartFinalizationCommand) actor.stateName should be(FinalizingWorkflowState) actor ! WorkflowFinalizationSucceededResponse @@ -136,6 +148,23 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB deathwatch.expectTerminated(actor) } + "begin execution if Initialization eventually succeeds" in { + val actor = createWorkflowActor(InitializingWorkflowState, initializationMaxRetries = 3) + deathwatch watch actor + + // Expect the WorkflowActor to retry up to three times: + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (1)"))) + initializationProbe.expectMsg(StartInitializationCommand) + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (2)"))) + initializationProbe.expectMsg(StartInitializationCommand) + actor ! WorkflowInitializationSucceededResponse(AllBackendInitializationData(Map.empty)) + + executionProbe.expectMsg(ExecuteWorkflowCommand) + actor.stateName should be(ExecutingWorkflowState) + // Tidy up: + system.stop(actor) + } + "run Finalization actor if Initialization is aborted" in { val actor = createWorkflowActor(InitializingWorkflowState) deathwatch watch actor @@ -283,7 +312,11 @@ class WorkflowActorWithTestAddons(val finalizationProbe: TestProbe, workflowStoreActor: ActorRef, workflowHeartbeatConfig: WorkflowHeartbeatConfig, totalJobsByRootWf: AtomicInteger, - extraPathBuilderFactory: Option[PathBuilderFactory]) extends WorkflowActor( + extraPathBuilderFactory: Option[PathBuilderFactory], + initializationMaxRetries: Int, + initializationInterval: FiniteDuration, + workflowInitializationActorProbe: TestProbe, + workflowExecutionActorProbe: TestProbe) extends WorkflowActor( workflowToStart = WorkflowToStart(id = workflowId, submissionTime = OffsetDateTime.now, state = startState, @@ -310,6 +343,12 @@ class WorkflowActorWithTestAddons(val finalizationProbe: TestProbe, fileHashCacheActorProps = None, blacklistCache = None) { + override def createInitializationActor(workflowDescriptor: EngineWorkflowDescriptor, name: String): ActorRef = workflowInitializationActorProbe.ref + override def createWorkflowExecutionActor(workflowDescriptor: EngineWorkflowDescriptor, data: WorkflowActorData): ActorRef = workflowExecutionActorProbe.ref + + override val initializationRetryInterval: FiniteDuration = initializationInterval + override val maxInitializationAttempts: Int = initializationMaxRetries + override val pathBuilderFactories: List[PathBuilderFactory] = extraPathBuilderFactory match { case Some(pbf) => EngineFilesystems.configuredPathBuilderFactories :+ pbf case None => EngineFilesystems.configuredPathBuilderFactories From a320207a2d630110ae490000db5ebe1b856d514d Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Wed, 18 Jan 2023 15:32:29 -0500 Subject: [PATCH 3/7] Fixup and improve WorkflowActor tests --- .../engine/workflow/WorkflowActor.scala | 17 ++++-- .../engine/workflow/WorkflowActorSpec.scala | 56 ++++++++++++++++++- 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 92ec3c4d44b..81d1941f825 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -423,6 +423,10 @@ class WorkflowActor(workflowToStart: WorkflowToStart, // Otherwise (success or abort), finalize the workflow without failures case Event(_: WorkflowInitializationResponse, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, failures = None) + + case Event(StartInitializing, _) => + // An initialization trigger we no longer need to action. Ignore: + stay() } // In aborting state, we can receive initialization responses or execution responses. @@ -439,6 +443,9 @@ class WorkflowActor(workflowToStart: WorkflowToStart, val failures = data.lastStateReached.failures.getOrElse(List.empty) ++ finalizationFailures goto(WorkflowFailedState) using data.copy(lastStateReached = StateCheckpoint(FinalizingWorkflowState, Option(failures))) case Event(AbortWorkflowCommand, _) => stay() + case Event(StartInitializing, _) => + // An initialization trigger we no longer need to action. Ignore: + stay() } when(MetadataIntegrityValidationState) { @@ -457,17 +464,18 @@ class WorkflowActor(workflowToStart: WorkflowToStart, } def handleAbortCommand(data: WorkflowActorData, workflowDescriptor: EngineWorkflowDescriptor, exceptionCausedAbortOpt: Option[Throwable] = None) = { + val updatedData = data.copy(lastStateReached = StateCheckpoint(stateName, exceptionCausedAbortOpt.map(List(_)))) data.currentLifecycleStateActor match { case Some(currentActor) => currentActor ! EngineLifecycleActorAbortCommand - goto(WorkflowAbortingState) using data.copy(lastStateReached = StateCheckpoint(stateName, exceptionCausedAbortOpt.map(List(_)))) + goto(WorkflowAbortingState) using updatedData case None => if (stateName == InitializingWorkflowState) { workflowLogger.info(s"Received an abort command in state $stateName (while awaiting an initialization retry). Finalizing the workflow.") } else { workflowLogger.warn(s"Received an abort command in state $stateName but there's no lifecycle actor associated. This is an abnormal state, finalizing the workflow anyway.") } - finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, None) + finalizeWorkflow(updatedData, workflowDescriptor, Map.empty, CallOutputs.empty, failures = None, lastStateOverride = Option(WorkflowAbortingState)) } } @@ -675,11 +683,12 @@ class WorkflowActor(workflowToStart: WorkflowToStart, workflowFinalOutputs: CallOutputs, failures: Option[List[Throwable]], workflowAllOutputs: Set[WomValue] = Set.empty, - rootAndSubworkflowIds: Set[WorkflowId] = Set.empty) = { + rootAndSubworkflowIds: Set[WorkflowId] = Set.empty, + lastStateOverride: Option[WorkflowActorState] = None) = { val finalizationActor = makeFinalizationActor(workflowDescriptor, jobExecutionMap, workflowFinalOutputs) finalizationActor ! StartFinalizationCommand goto(FinalizingWorkflowState) using data.copy( - lastStateReached = StateCheckpoint (stateName, failures), + lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures), workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet, workflowAllOutputs = workflowAllOutputs, rootAndSubworkflowIds = rootAndSubworkflowIds diff --git a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala index 6259ce9972b..58029c04820 100644 --- a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala @@ -79,7 +79,7 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB private val workflowHeartbeatConfig = WorkflowHeartbeatConfig(ConfigFactory.load()) - private def createWorkflowActor(state: WorkflowActorState, extraPathBuilderFactory: Option[PathBuilderFactory] = None, initializationMaxRetries: Int = 3) = { + private def createWorkflowActor(state: WorkflowActorState, extraPathBuilderFactory: Option[PathBuilderFactory] = None, initializationMaxRetries: Int = 3, initializationInterval: FiniteDuration = 10.millis) = { val actor = TestFSMRef( factory = new WorkflowActorWithTestAddons( finalizationProbe = finalizationProbe, @@ -105,7 +105,7 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB totalJobsByRootWf = initialJobCtByRootWf, extraPathBuilderFactory = extraPathBuilderFactory, initializationMaxRetries = initializationMaxRetries, - initializationInterval = 10.millis, + initializationInterval = initializationInterval, workflowInitializationActorProbe = initializationProbe, workflowExecutionActorProbe = executionProbe ), @@ -161,8 +161,9 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB executionProbe.expectMsg(ExecuteWorkflowCommand) actor.stateName should be(ExecutingWorkflowState) - // Tidy up: + // Tidy up (and satisfy the deathwatch): system.stop(actor) + deathwatch.expectTerminated(actor) } "run Finalization actor if Initialization is aborted" in { @@ -180,6 +181,55 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB deathwatch.expectTerminated(actor) } + "run Finalization actor if Initialization is aborted during a retry" in { + val actor = createWorkflowActor(InitializingWorkflowState) + deathwatch watch actor + + // Set the stage with a few unfortunate retries: + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (1)"))) + initializationProbe.expectMsg(StartInitializationCommand) + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (2)"))) + initializationProbe.expectMsg(StartInitializationCommand) + + actor ! AbortWorkflowCommand + eventually { actor.stateName should be(WorkflowAbortingState) } + // Because we failed a few times, the actor test's initial "currentLifecycleActor will have been replaced by this + // new initializationProbe: + initializationProbe.expectMsgPF(TimeoutDuration) { + case EngineLifecycleActorAbortCommand => actor ! WorkflowInitializationAbortedResponse + } + + finalizationProbe.expectMsg(StartFinalizationCommand) + actor.stateName should be(FinalizingWorkflowState) + actor ! WorkflowFinalizationSucceededResponse + workflowManagerActorExpectsSingleWorkCompleteNotification(WorkflowAborted) + deathwatch.expectTerminated(actor) + } + + "run Finalization actor if Initialization is aborted between retries" in { + val actor = createWorkflowActor(InitializingWorkflowState) + deathwatch watch actor + + // Set the stage with a few unfortunate retries: + actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (1)"))) + eventually { actor.stateData.currentLifecycleStateActor should be(None) } + + actor ! AbortWorkflowCommand + // Because there are no active lifecycle actors, this actor should jump to Finalizing without + // needing any further input: + eventually { actor.stateName should be(FinalizingWorkflowState) } + + // Expect the mailboxes for the initialization actor to be empty (and check "currentLifecycleActor" for good measure) + currentLifecycleActor.expectNoMessage(10.millis) + initializationProbe.expectNoMessage(10.millis) + + finalizationProbe.expectMsg(StartFinalizationCommand) + actor.stateName should be(FinalizingWorkflowState) + actor ! WorkflowFinalizationSucceededResponse + workflowManagerActorExpectsSingleWorkCompleteNotification(WorkflowAborted) + deathwatch.expectTerminated(actor) + } + "run Finalization if Execution fails" in { val actor = createWorkflowActor(ExecutingWorkflowState) deathwatch watch actor From cbe40fcccca7fefcf47ba7909acb0af15509a026 Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Wed, 18 Jan 2023 17:24:02 -0500 Subject: [PATCH 4/7] Fix race condition in test --- .../scala/cromwell/engine/workflow/WorkflowActorSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala index 58029c04820..578a6909783 100644 --- a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala @@ -182,14 +182,14 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB } "run Finalization actor if Initialization is aborted during a retry" in { - val actor = createWorkflowActor(InitializingWorkflowState) + val actor = createWorkflowActor(InitializingWorkflowState, initializationInterval = 10.seconds) deathwatch watch actor // Set the stage with a few unfortunate retries: actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (1)"))) - initializationProbe.expectMsg(StartInitializationCommand) + initializationProbe.expectMsg(20.seconds, StartInitializationCommand) actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (2)"))) - initializationProbe.expectMsg(StartInitializationCommand) + initializationProbe.expectMsg(20.seconds, StartInitializationCommand) actor ! AbortWorkflowCommand eventually { actor.stateName should be(WorkflowAbortingState) } From 86cc07a397234813ebacead0b4ad33d664ea6517 Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Thu, 19 Jan 2023 10:41:13 -0500 Subject: [PATCH 5/7] Oops widened the wrong retry interval --- .../cromwell/engine/workflow/WorkflowActorSpec.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala index 578a6909783..d9de71b9125 100644 --- a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala @@ -182,14 +182,14 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB } "run Finalization actor if Initialization is aborted during a retry" in { - val actor = createWorkflowActor(InitializingWorkflowState, initializationInterval = 10.seconds) + val actor = createWorkflowActor(InitializingWorkflowState, initializationInterval = 1.second) deathwatch watch actor // Set the stage with a few unfortunate retries: actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (1)"))) - initializationProbe.expectMsg(20.seconds, StartInitializationCommand) + initializationProbe.expectMsg(StartInitializationCommand) actor ! WorkflowInitializationFailedResponse(Seq(new Exception("Initialization Failed (2)"))) - initializationProbe.expectMsg(20.seconds, StartInitializationCommand) + initializationProbe.expectMsg(StartInitializationCommand) actor ! AbortWorkflowCommand eventually { actor.stateName should be(WorkflowAbortingState) } @@ -207,7 +207,8 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB } "run Finalization actor if Initialization is aborted between retries" in { - val actor = createWorkflowActor(InitializingWorkflowState) + // Set the interval long so that we can guarantee the "between retries" part + val actor = createWorkflowActor(InitializingWorkflowState, initializationInterval = 10.seconds) deathwatch watch actor // Set the stage with a few unfortunate retries: From ef60a58a6a10e279d6c50e0d3b8c02d24281cb23 Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Thu, 19 Jan 2023 11:29:07 -0500 Subject: [PATCH 6/7] Fix "Next retry" log message --- .../src/main/scala/cromwell/engine/workflow/WorkflowActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 81d1941f825..3c5c20cad39 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -350,7 +350,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) => val failedInitializationAttempts = data.failedInitializationAttempts + 1 if (failedInitializationAttempts < maxInitializationAttempts) { - workflowLogger.info(s"Initialization failed on attempt ${data.failedInitializationAttempts + 1}. Will retry in 30 seconds", CromwellAggregatedException(reason, "Initialization Failure")) + workflowLogger.info(s"Initialization failed on attempt $failedInitializationAttempts. Will retry up to $maxInitializationAttempts times. Next retry is in $initializationRetryInterval", CromwellAggregatedException(reason, "Initialization Failure")) context.system.scheduler.scheduleOnce(initializationRetryInterval) { self ! StartInitializing} stay() using data.copy(currentLifecycleStateActor = None, failedInitializationAttempts = failedInitializationAttempts) } else { From f646d6b49bfdf1a6471af851c849719010727f8c Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Thu, 19 Jan 2023 12:11:26 -0500 Subject: [PATCH 7/7] Increase total initialization retry window from 5m to 15m --- .../main/scala/cromwell/engine/workflow/WorkflowActor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 3c5c20cad39..b4ccf3de53b 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -262,8 +262,8 @@ class WorkflowActor(workflowToStart: WorkflowToStart, protected val pathBuilderFactories: List[PathBuilderFactory] = EngineFilesystems.configuredPathBuilderFactories - protected val maxInitializationAttempts: Int = 10 - protected val initializationRetryInterval: FiniteDuration = 30.seconds + protected val maxInitializationAttempts: Int = 16 + protected val initializationRetryInterval: FiniteDuration = 1.minute startWith(WorkflowUnstartedState, WorkflowActorData(initialStartableState))