Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WM-1616: Allow repeating attempts at initialization (take 2) #6985

Merged
merged 7 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 89 additions & 50 deletions engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ object WorkflowActor {

case class StateCheckpoint(state: WorkflowActorState, failures: Option[List[Throwable]] = None)

/**
* Message sent from WorkflowActor to itself to trigger the initialization to start. Allows us to idle
* in the initializing state without an assigned actor for a few seconds if the first attempt fails.
*/
case object StartInitializing

/**
* @param currentLifecycleStateActor The current lifecycle stage, represented by an ActorRef.
*/
Expand All @@ -145,7 +151,8 @@ object WorkflowActor {
effectiveStartableState: StartableState,
workflowFinalOutputs: Set[WomValue] = Set.empty,
workflowAllOutputs: Set[WomValue] = Set.empty,
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty)
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty,
failedInitializationAttempts: Int = 0)
object WorkflowActorData {
def apply(startableState: StartableState): WorkflowActorData = WorkflowActorData(
currentLifecycleStateActor = None,
Expand Down Expand Up @@ -255,6 +262,9 @@ class WorkflowActor(workflowToStart: WorkflowToStart,

protected val pathBuilderFactories: List[PathBuilderFactory] = EngineFilesystems.configuredPathBuilderFactories

protected val maxInitializationAttempts: Int = 16
protected val initializationRetryInterval: FiniteDuration = 1.minute

startWith(WorkflowUnstartedState, WorkflowActorData(initialStartableState))

pushCurrentStateToMetadataService(workflowId, WorkflowUnstartedState.workflowState)
Expand Down Expand Up @@ -299,18 +309,8 @@ class WorkflowActor(workflowToStart: WorkflowToStart,

when(MaterializingWorkflowDescriptorState) {
case Event(MaterializeWorkflowDescriptorSuccessResponse(workflowDescriptor), data) =>
val initializerActor = context.actorOf(
WorkflowInitializationActor.props(
workflowIdForLogging,
rootWorkflowIdForLogging,
workflowDescriptor,
ioActor,
serviceRegistryActor,
restarting
),
name = s"WorkflowInitializationActor-$workflowId")
initializerActor ! StartInitializationCommand
goto(InitializingWorkflowState) using data.copy(currentLifecycleStateActor = Option(initializerActor), workflowDescriptor = Option(workflowDescriptor))
self ! StartInitializing
goto(InitializingWorkflowState) using data.copy(workflowDescriptor = Option(workflowDescriptor))
case Event(MaterializeWorkflowDescriptorFailureResponse(reason: Throwable), data) =>
goto(WorkflowFailedState) using data.copy(lastStateReached = StateCheckpoint(MaterializingWorkflowDescriptorState, Option(List(reason))))
// If the workflow is not being restarted then we can abort it immediately as nothing happened yet
Expand All @@ -320,65 +320,91 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
/* ************************** */
/* ****** Initializing ****** */
/* ************************** */

when(InitializingWorkflowState) {
case Event(WorkflowInitializationSucceededResponse(initializationData), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
val executionActor = context.actorOf(WorkflowExecutionActor.props(
protected def createInitializationActor(workflowDescriptor: EngineWorkflowDescriptor, name: String): ActorRef = {
context.actorOf(
WorkflowInitializationActor.props(
workflowIdForLogging,
rootWorkflowIdForLogging,
workflowDescriptor,
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
callCacheWriteActor = callCacheWriteActor,
workflowDockerLookupActor = workflowDockerLookupActor,
jobRestartCheckTokenDispenserActor = jobRestartCheckTokenDispenserActor,
jobExecutionTokenDispenserActor = jobExecutionTokenDispenserActor,
backendSingletonCollection,
initializationData,
startState = data.effectiveStartableState,
rootConfig = conf,
totalJobsByRootWf = totalJobsByRootWf,
fileHashCacheActor = fileHashCacheActorProps map context.system.actorOf,
blacklistCache = blacklistCache), name = s"WorkflowExecutionActor-$workflowId")
ioActor,
serviceRegistryActor,
restarting
),
name)
}

when(InitializingWorkflowState) {
case Event(StartInitializing, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
val initializerActor = createInitializationActor(workflowDescriptor, s"WorkflowInitializationActor-$workflowId-${data.failedInitializationAttempts + 1}")
initializerActor ! StartInitializationCommand
goto(InitializingWorkflowState) using data.copy(currentLifecycleStateActor = Option(initializerActor), workflowDescriptor = Option(workflowDescriptor))
case Event(WorkflowInitializationSucceededResponse(initializationData), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
val dataWithInitializationData = data.copy(initializationData = initializationData)
val executionActor = createWorkflowExecutionActor(workflowDescriptor, dataWithInitializationData)
executionActor ! ExecuteWorkflowCommand

val nextState = data.effectiveStartableState match {
case RestartableAborting => WorkflowAbortingState
case _ => ExecutingWorkflowState
}
goto(nextState) using data.copy(currentLifecycleStateActor = Option(executionActor), initializationData = initializationData)
case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList))
goto(nextState) using dataWithInitializationData.copy(currentLifecycleStateActor = Option(executionActor))
case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
val failedInitializationAttempts = data.failedInitializationAttempts + 1
if (failedInitializationAttempts < maxInitializationAttempts) {
workflowLogger.info(s"Initialization failed on attempt $failedInitializationAttempts. Will retry up to $maxInitializationAttempts times. Next retry is in $initializationRetryInterval", CromwellAggregatedException(reason, "Initialization Failure"))
context.system.scheduler.scheduleOnce(initializationRetryInterval) { self ! StartInitializing}
stay() using data.copy(currentLifecycleStateActor = None, failedInitializationAttempts = failedInitializationAttempts)
} else {
finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList))
}

// If the workflow is not restarting, handle the Abort command normally and send an abort message to the init actor
case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) if !restarting =>
case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) if !restarting =>
handleAbortCommand(data, workflowDescriptor)
}

/* ********************* */
/* ****** Running ****** */
/* ********************* */

def createWorkflowExecutionActor(workflowDescriptor: EngineWorkflowDescriptor, data: WorkflowActorData): ActorRef = {
context.actorOf(WorkflowExecutionActor.props(
workflowDescriptor,
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
callCacheWriteActor = callCacheWriteActor,
workflowDockerLookupActor = workflowDockerLookupActor,
jobRestartCheckTokenDispenserActor = jobRestartCheckTokenDispenserActor,
jobExecutionTokenDispenserActor = jobExecutionTokenDispenserActor,
backendSingletonCollection,
data.initializationData,
startState = data.effectiveStartableState,
rootConfig = conf,
totalJobsByRootWf = totalJobsByRootWf,
fileHashCacheActor = fileHashCacheActorProps map context.system.actorOf,
blacklistCache = blacklistCache), name = s"WorkflowExecutionActor-$workflowId")
}

// Handles workflow completion events from the WEA and abort command
val executionResponseHandler: StateFunction = {
// Workflow responses
case Event(WorkflowExecutionSucceededResponse(jobKeys, rootAndSubworklowIds, finalOutputs, allOutputs),
data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
finalizeWorkflow(data, workflowDescriptor, jobKeys, finalOutputs, None, allOutputs, rootAndSubworklowIds)
case Event(WorkflowExecutionFailedResponse(jobKeys, failures),
data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
finalizeWorkflow(data, workflowDescriptor, jobKeys, CallOutputs.empty, Option(List(failures)))
case Event(WorkflowExecutionAbortedResponse(jobKeys),
data @ WorkflowActorData(_, Some(workflowDescriptor), _, StateCheckpoint(_, failures), _, _, _, _)) =>
data @ WorkflowActorData(_, Some(workflowDescriptor), _, StateCheckpoint(_, failures), _, _, _, _, _)) =>
finalizeWorkflow(data, workflowDescriptor, jobKeys, CallOutputs.empty, failures)

// Whether we're running or aborting, restarting or not, pass along the abort command.
// Note that aborting a workflow multiple times will result in as many abort commands sent to the execution actor
case Event(AbortWorkflowWithExceptionCommand(ex), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
case Event(AbortWorkflowWithExceptionCommand(ex), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
handleAbortCommand(data, workflowDescriptor, Option(ex))
case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
case Event(AbortWorkflowCommand, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
handleAbortCommand(data, workflowDescriptor)
}

Expand All @@ -391,12 +417,16 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
// Handles initialization responses we can get if the abort came in when we were initializing the workflow
val abortHandler: StateFunction = {
// If the initialization failed, record the failure in the data and finalize the workflow
case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
case Event(WorkflowInitializationFailedResponse(reason), data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, Option(reason.toList))

// Otherwise (success or abort), finalize the workflow without failures
case Event(_: WorkflowInitializationResponse, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _)) =>
case Event(_: WorkflowInitializationResponse, data @ WorkflowActorData(_, Some(workflowDescriptor), _, _, _, _, _, _, _)) =>
finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, failures = None)

case Event(StartInitializing, _) =>
// An initialization trigger we no longer need to action. Ignore:
stay()
}

// In aborting state, we can receive initialization responses or execution responses.
Expand All @@ -413,6 +443,9 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
val failures = data.lastStateReached.failures.getOrElse(List.empty) ++ finalizationFailures
goto(WorkflowFailedState) using data.copy(lastStateReached = StateCheckpoint(FinalizingWorkflowState, Option(failures)))
case Event(AbortWorkflowCommand, _) => stay()
case Event(StartInitializing, _) =>
// An initialization trigger we no longer need to action. Ignore:
stay()
}

when(MetadataIntegrityValidationState) {
Expand All @@ -431,13 +464,18 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
}

def handleAbortCommand(data: WorkflowActorData, workflowDescriptor: EngineWorkflowDescriptor, exceptionCausedAbortOpt: Option[Throwable] = None) = {
val updatedData = data.copy(lastStateReached = StateCheckpoint(stateName, exceptionCausedAbortOpt.map(List(_))))
data.currentLifecycleStateActor match {
case Some(currentActor) =>
currentActor ! EngineLifecycleActorAbortCommand
goto(WorkflowAbortingState) using data.copy(lastStateReached = StateCheckpoint(stateName, exceptionCausedAbortOpt.map(List(_))))
goto(WorkflowAbortingState) using updatedData
case None =>
workflowLogger.warn(s"Received an abort command in state $stateName but there's no lifecycle actor associated. This is an abnormal state, finalizing the workflow anyway.")
finalizeWorkflow(data, workflowDescriptor, Map.empty, CallOutputs.empty, None)
if (stateName == InitializingWorkflowState) {
workflowLogger.info(s"Received an abort command in state $stateName (while awaiting an initialization retry). Finalizing the workflow.")
} else {
workflowLogger.warn(s"Received an abort command in state $stateName but there's no lifecycle actor associated. This is an abnormal state, finalizing the workflow anyway.")
}
finalizeWorkflow(updatedData, workflowDescriptor, Map.empty, CallOutputs.empty, failures = None, lastStateOverride = Option(WorkflowAbortingState))
}
}

Expand Down Expand Up @@ -645,11 +683,12 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
workflowFinalOutputs: CallOutputs,
failures: Option[List[Throwable]],
workflowAllOutputs: Set[WomValue] = Set.empty,
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty) = {
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty,
lastStateOverride: Option[WorkflowActorState] = None) = {
val finalizationActor = makeFinalizationActor(workflowDescriptor, jobExecutionMap, workflowFinalOutputs)
finalizationActor ! StartFinalizationCommand
goto(FinalizingWorkflowState) using data.copy(
lastStateReached = StateCheckpoint (stateName, failures),
lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures),
workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet,
workflowAllOutputs = workflowAllOutputs,
rootAndSubworkflowIds = rootAndSubworkflowIds
Expand Down
Loading