From 1dabea0bb5b607732060c2735063c6abc50a6889 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Fri, 31 Jan 2025 15:18:49 -0500 Subject: [PATCH 1/9] Revert "WX-1595 Refactor the preemption error handling from GCP Batch backend (#7457)" This reverts commit 49d675dbb95d7ba8562560eb00bc2868a30c9e6a. --- CHANGELOG.md | 1 - ...GcpBatchBackendLifecycleActorFactory.scala | 9 +- ...cpBatchAsyncBackendJobExecutionActor.scala | 107 ++++- .../GcpBatchJobCachingActorHelper.scala | 9 +- .../batch/api/GcpBatchRequestFactory.scala | 1 + .../api/request/BatchRequestExecutor.scala | 14 +- .../batch/models/GcpBatchExitCode.scala | 37 -- .../models/GcpBatchRuntimeAttributes.scala | 29 +- .../batch/models/PreviousRetryReasons.scala | 44 +++ .../google/batch/models/RunStatus.scala | 41 +- ...tchAsyncBackendJobExecutionActorSpec.scala | 370 +++++++++++++++++- .../batch/models/GcpBatchExitCodeSpec.scala | 52 --- 12 files changed, 556 insertions(+), 158 deletions(-) delete mode 100644 supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala create mode 100644 supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala delete mode 100644 supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index bd3dea3baf2..daff03c251f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,6 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional ### GCP Batch - The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information. -- Fixes the preemption error handling, now, the correct error message is printed, this also handles the other potential exit codes. - Fixes pulling Docker image metadata from private GCR repositories. ### Improved handling of Life Sciences API quota errors diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala index c1585ffe9fa..dd59c910e09 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala @@ -7,6 +7,11 @@ import com.google.cloud.batch.v1.BatchServiceSettings import com.google.common.collect.ImmutableMap import com.typesafe.scalalogging.StrictLogging import cromwell.backend._ +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{ + preemptionCountKey, + robustBuildAttributes, + unexpectedRetryCountKey +} import cromwell.backend.google.batch.actors._ import cromwell.backend.google.batch.api.request.{BatchRequestExecutor, RequestHandler} import cromwell.backend.google.batch.authentication.GcpBatchDockerCredentials @@ -30,7 +35,7 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String, ) extends StandardLifecycleActorFactory with GcpPlatform { - import GcpBatchBackendLifecycleActorFactory._ + override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey, unexpectedRetryCountKey) override def jobIdKey: String = "__gcp_batch" protected val googleConfig: GoogleConfiguration = GoogleConfiguration(configurationDescriptor.globalConfig) @@ -133,6 +138,8 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String, } object GcpBatchBackendLifecycleActorFactory extends StrictLogging { + val preemptionCountKey = "PreemptionCount" + val unexpectedRetryCountKey = "UnexpectedRetryCount" private[batch] def robustBuildAttributes(buildAttributes: () => GcpBatchConfigurationAttributes, maxAttempts: Int = 3, diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 26229236646..0a2a5a4a61c 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -1,9 +1,11 @@ package cromwell.backend.google.batch.actors +import _root_.io.grpc.{Status => GrpcStatus} import akka.actor.{ActorLogging, ActorRef} import akka.http.scaladsl.model.{ContentType, ContentTypes} import akka.pattern.AskSupport import cats.data.NonEmptyList +import cats.data.Validated.{Invalid, Valid} import cats.implicits._ import com.google.cloud.batch.v1.JobName import com.google.cloud.storage.contrib.nio.CloudStorageOptions @@ -14,8 +16,10 @@ import cromwell.backend.async.{ AbortedExecutionHandle, ExecutionHandle, FailedNonRetryableExecutionHandle, + FailedRetryableExecutionHandle, PendingExecutionHandle } +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory import cromwell.backend.google.batch.api.GcpBatchRequestFactory._ import cromwell.backend.google.batch.io._ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration @@ -46,7 +50,7 @@ import cromwell.filesystems.gcs.GcsPath import cromwell.filesystems.http.HttpPath import cromwell.filesystems.sra.SraPath import cromwell.services.instrumentation.CromwellInstrumentation -import cromwell.services.keyvalue.KeyValueServiceActor.KvJobKey +import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey} import cromwell.services.metadata.CallMetadataKeys import mouse.all._ import shapeless.Coproduct @@ -73,8 +77,20 @@ import scala.util.control.NoStackTrace object GcpBatchAsyncBackendJobExecutionActor { - def StandardException(message: String, jobTag: String): Exception = - new Exception(s"Task $jobTag failed: $message") + def StandardException(errorCode: GrpcStatus, + message: String, + jobTag: String, + returnCodeOption: Option[Int], + stderrPath: Path + ): Exception = { + val returnCodeMessage = returnCodeOption match { + case Some(returnCode) if returnCode == 0 => "Job exited without an error, exit code 0." + case Some(returnCode) => s"Job exit code $returnCode. Check $stderrPath for more information." + case None => "The job was stopped before the command finished." + } + + new Exception(s"Task $jobTag failed. $returnCodeMessage Batch error code ${errorCode.getCode.value}. $message") + } // GCS path regexes comments: // - The (?s) option at the start makes '.' expression to match any symbol, including '\n' @@ -174,6 +190,15 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override def dockerImageUsed: Option[String] = Option(jobDockerImage) + // Need to add previousRetryReasons and preemptible in order to get preemptible to work in the tests + protected val previousRetryReasons: ErrorOr[PreviousRetryReasons] = + PreviousRetryReasons.tryApply(jobDescriptor.prefetchedKvStoreEntries, jobDescriptor.key.attempt) + + override lazy val preemptible: Boolean = previousRetryReasons match { + case Valid(PreviousRetryReasons(p, _)) => p < maxPreemption + case _ => false + } + override def tryAbort(job: StandardAsyncJob): Unit = abortJob(workflowId = workflowId, jobName = JobName.parse(job.jobId), @@ -639,6 +664,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar projectId = googleProject(jobDescriptor.workflowDescriptor), computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor), googleLabels = backendLabels ++ customLabels, + preemptible = preemptible, batchTimeout = batchConfiguration.batchTimeout, jobShell = batchConfiguration.jobShell, privateDockerKeyAndEncryptedToken = dockerKeyAndToken, @@ -1084,13 +1110,20 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case _ => false } - // returnCode is provided by cromwell, so far, this is empty for all the tests I ran + private lazy val standardPaths = jobPaths.standardPaths + override def handleExecutionFailure(runStatus: RunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = { - def handleFailedRunStatus(runStatus: RunStatus.UnsuccessfulRunStatus): ExecutionHandle = + val prettyPrintedError = "Job failed with an unknown reason" + + // Inner function: Handles a 'Failed' runStatus (or Preempted if preemptible was false) + def handleFailedRunStatus(runStatus: RunStatus.UnsuccessfulRunStatus, returnCode: Option[Int]): ExecutionHandle = FailedNonRetryableExecutionHandle( StandardException( - message = runStatus.prettyPrintedError, - jobTag = jobTag + runStatus.errorCode, + prettyPrintedError, + jobTag, + returnCode, + standardPaths.error ), returnCode, None @@ -1099,17 +1132,71 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar Future.fromTry { Try { runStatus match { - case RunStatus.Aborted => AbortedExecutionHandle - case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus) + case preemptedStatus: RunStatus.Preempted if preemptible => + handlePreemption(preemptedStatus, returnCode, prettyPrintedError) + case _: RunStatus.Aborted => AbortedExecutionHandle + case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode) case unknown => throw new RuntimeException( - s"Method handleExecutionFailure not called with RunStatus.Failed or RunStatus.Preempted. Instead got $unknown" + s"handleExecutionFailure not called with RunStatus.Failed or RunStatus.Preempted. Instead got $unknown" ) } } } } + private def nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p: Int, ur: Int): Seq[KvPair] = + Seq( + KvPair(ScopedKey(workflowId, futureKvJobKey, GcpBatchBackendLifecycleActorFactory.unexpectedRetryCountKey), + ur.toString + ), + KvPair(ScopedKey(workflowId, futureKvJobKey, GcpBatchBackendLifecycleActorFactory.preemptionCountKey), p.toString) + ) + + private def handlePreemption( + runStatus: RunStatus.Preempted, + jobReturnCode: Option[Int], + prettyPrintedError: String + ): ExecutionHandle = { + import common.numeric.IntegerUtil._ + + val errorCode: GrpcStatus = runStatus.errorCode + previousRetryReasons match { + case Valid(PreviousRetryReasons(p, ur)) => + val thisPreemption = p + 1 + val taskName = s"${workflowDescriptor.id}:${call.localName}" + val baseMsg = s"Task $taskName was preempted for the ${thisPreemption.toOrdinal} time." + + val preemptionAndUnexpectedRetryCountsKvPairs = + nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(thisPreemption, ur) + if (thisPreemption < maxPreemption) { + // Increment preemption count and unexpectedRetryCount stays the same + val msg = + s"$baseMsg The call will be restarted with another preemptible VM (max preemptible attempts number is " + + s"$maxPreemption). Error code $errorCode.$prettyPrintedError" + FailedRetryableExecutionHandle( + StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error), + jobReturnCode, + kvPairsToSave = Option(preemptionAndUnexpectedRetryCountsKvPairs) + ) + } else { + val msg = s"$baseMsg The maximum number of preemptible attempts ($maxPreemption) has been reached. The " + + s"call will be restarted with a non-preemptible VM. Error code $errorCode.$prettyPrintedError)" + FailedRetryableExecutionHandle( + StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error), + jobReturnCode, + kvPairsToSave = Option(preemptionAndUnexpectedRetryCountsKvPairs) + ) + } + case Invalid(_) => + FailedNonRetryableExecutionHandle( + StandardException(errorCode, prettyPrintedError, jobTag, jobReturnCode, standardPaths.error), + jobReturnCode, + None + ) + } + } + override lazy val startMetadataKeyValues: Map[String, Any] = super[GcpBatchJobCachingActorHelper].startMetadataKeyValues diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala index 773222fb07f..15ed54e387c 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala @@ -25,6 +25,10 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { batchConfiguration.runtimeConfig ) + lazy val maxPreemption: Int = runtimeAttributes.preemptible + + def preemptible: Boolean + lazy val workingDisk: GcpBatchAttachedDisk = runtimeAttributes.disks.find(_.name == GcpBatchWorkingDisk.Name).get lazy val callRootPath: Path = gcpBatchCallPaths.callExecutionRoot @@ -72,9 +76,10 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { .get(WorkflowOptionKeys.GoogleProject) .getOrElse(batchAttributes.project) - Map[String, String]( + Map[String, Any]( GcpBatchMetadataKeys.GoogleProject -> googleProject, - GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString + GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString, + "preemptible" -> preemptible ) ++ originalLabelEvents } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala index ef38c8972c1..5ff60678265 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala @@ -75,6 +75,7 @@ object GcpBatchRequestFactory { projectId: String, computeServiceAccount: String, googleLabels: Seq[GcpLabel], + preemptible: Boolean, batchTimeout: FiniteDuration, jobShell: String, privateDockerKeyAndEncryptedToken: Option[CreateBatchDockerKeyAndToken], diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala index 1ec59cc1dc3..46a87cc110b 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala @@ -9,8 +9,9 @@ import cromwell.backend.google.batch.actors.BatchApiAbortClient.{ } import cromwell.backend.google.batch.api.BatchApiRequestManager._ import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse} -import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus} +import cromwell.backend.google.batch.models.RunStatus import cromwell.core.ExecutionEvent +import io.grpc.Status import scala.annotation.unused import scala.concurrent.{ExecutionContext, Future, Promise} @@ -120,7 +121,7 @@ object BatchRequestExecutor { } catch { // A job can't be cancelled but deleted, which is why we consider 404 status as the job being cancelled successfully case apiException: ApiException if apiException.getStatusCode.getCode == StatusCode.Code.NOT_FOUND => - BatchApiResponse.StatusQueried(RunStatus.Aborted) + BatchApiResponse.StatusQueried(RunStatus.Aborted(io.grpc.Status.NOT_FOUND)) // We don't need to detect preemptible VMs because that's handled automatically by GCP case apiException: ApiException if apiException.getStatusCode.getCode == StatusCode.Code.RESOURCE_EXHAUSTED => @@ -135,24 +136,19 @@ object BatchRequestExecutor { .map(_.asScala.toList) .getOrElse(List.empty) ) - lazy val exitCode = findBatchExitCode(events) if (job.getStatus.getState == JobStatus.State.SUCCEEDED) { RunStatus.Success(events) } else if (job.getStatus.getState == JobStatus.State.RUNNING) { RunStatus.Running } else if (job.getStatus.getState == JobStatus.State.FAILED) { - RunStatus.Failed(exitCode, events) + // Status.OK is hardcoded because the request succeeded, we don't have access to the internal response code + RunStatus.Failed(Status.OK, events) } else { RunStatus.Initializing } } - private def findBatchExitCode(events: List[ExecutionEvent]): Option[GcpBatchExitCode] = - events.flatMap { e => - GcpBatchExitCode.fromEventMessage(e.name.toLowerCase) - }.headOption - private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = events.map { e => val time = java.time.Instant diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala deleted file mode 100644 index 9ef9d7c88db..00000000000 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala +++ /dev/null @@ -1,37 +0,0 @@ -package cromwell.backend.google.batch.models - -sealed abstract class GcpBatchExitCode(val code: Int) extends Product with Serializable - -/** - * Represents the possible exit codes from Batch. - * - * See: https://cloud.google.com/batch/docs/troubleshooting#reserved-exit-codes - */ -object GcpBatchExitCode { - case object VMPreemption extends GcpBatchExitCode(50001) - - case object VMReportingTimeout extends GcpBatchExitCode(50002) - - case object VMRebootedDuringExecution extends GcpBatchExitCode(50003) - - case object VMAndTaskAreUnresponsive extends GcpBatchExitCode(50004) - - case object TaskRunsOverMaximumRuntime extends GcpBatchExitCode(50005) - - case object VMRecreatedDuringExecution extends GcpBatchExitCode(50006) - - val values: List[GcpBatchExitCode] = List( - VMPreemption, - VMReportingTimeout, - VMRebootedDuringExecution, - VMAndTaskAreUnresponsive, - TaskRunsOverMaximumRuntime, - VMRecreatedDuringExecution - ) - - def fromEventMessage(message: String): Option[GcpBatchExitCode] = - values.find { target => - message.contains(s"exit code ${target.code}") - } - -} diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala index 550d2c79866..ef9e2389d4b 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala @@ -55,7 +55,6 @@ final case class GcpBatchRuntimeAttributes(cpu: Int Refined Positive, object GcpBatchRuntimeAttributes { val ZonesKey = "zones" - private val ZonesDefaultValue = WomString("us-central1-b") val PreemptibleKey = "preemptible" @@ -229,20 +228,20 @@ object GcpBatchRuntimeAttributes { ) new GcpBatchRuntimeAttributes( - cpu = cpu, - cpuPlatform = cpuPlatform, - gpuResource = gpuResource, - zones = zones, - preemptible = preemptible, - bootDiskSize = bootDiskSize, - memory = memory, - disks = disks, - dockerImage = docker, - failOnStderr = failOnStderr, - continueOnReturnCode = continueOnReturnCode, - noAddress = noAddress, - useDockerImageCache = useDockerImageCache, - checkpointFilename = checkpointFileName + cpu, + cpuPlatform, + gpuResource, + zones, + preemptible, + bootDiskSize, + memory, + disks, + docker, + failOnStderr, + continueOnReturnCode, + noAddress, + useDockerImageCache, + checkpointFileName ) } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala new file mode 100644 index 00000000000..0bd3c1e1535 --- /dev/null +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala @@ -0,0 +1,44 @@ +package cromwell.backend.google.batch.models + +import cats.syntax.apply._ +import cats.syntax.validated._ +import common.validation.ErrorOr.ErrorOr +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{preemptionCountKey, unexpectedRetryCountKey} +import cromwell.services.keyvalue.KeyValueServiceActor._ + +import scala.util.{Failure, Success, Try} + +case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int) + +object PreviousRetryReasons { + + def tryApply(prefetchedKvEntries: Map[String, KvResponse], attemptNumber: Int): ErrorOr[PreviousRetryReasons] = { + val validatedPreemptionCount = validatedKvResponse(prefetchedKvEntries.get(preemptionCountKey), preemptionCountKey) + val validatedUnexpectedRetryCount = + validatedKvResponse(prefetchedKvEntries.get(unexpectedRetryCountKey), unexpectedRetryCountKey) + + (validatedPreemptionCount, validatedUnexpectedRetryCount) mapN PreviousRetryReasons.apply + } + + def apply(knownPreemptedCount: Int, knownUnexpectedRetryCount: Int, attempt: Int): PreviousRetryReasons = { + // If we have anything unaccounted for, we can top up the unexpected retry count. + // NB: 'attempt' is 1-indexed, so, magic number: + // NB2: for sanity's sake, I won't let this unaccounted for drop below 0, just in case... + val unaccountedFor = Math.max(attempt - 1 - knownPreemptedCount - knownUnexpectedRetryCount, 0) + PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor) + } + + private def validatedKvResponse(r: Option[KvResponse], fromKey: String): ErrorOr[Int] = r match { + case Some(KvPair(_, v)) => validatedInt(v, fromKey) + case Some(_: KvKeyLookupFailed) => 0.validNel + case Some(KvFailure(_, failure)) => s"Failed to get key $fromKey: ${failure.getMessage}".invalidNel + case Some(_: KvPutSuccess) => s"Programmer Error: Got a KvPutSuccess from a Get request...".invalidNel + case None => s"Programmer Error: Engine made no effort to prefetch $fromKey".invalidNel + } + + private def validatedInt(s: String, fromKey: String): ErrorOr[Int] = + Try(s.toInt) match { + case Success(i) => i.validNel + case Failure(_) => s"Unexpected value found in the KV store: $fromKey='$s'".invalidNel + } +} diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala index 58e3887111d..c541358b691 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala @@ -1,6 +1,7 @@ package cromwell.backend.google.batch.models import cromwell.core.ExecutionEvent +import io.grpc.Status sealed trait RunStatus @@ -21,47 +22,27 @@ object RunStatus { } sealed trait UnsuccessfulRunStatus extends TerminalRunStatus { - val exitCode: Option[GcpBatchExitCode] - val prettyPrintedError: String + val errorCode: Status } final case class Failed( - exitCode: Option[GcpBatchExitCode], + errorCode: Status, eventList: Seq[ExecutionEvent] ) extends UnsuccessfulRunStatus { override def toString = "Failed" - - override val prettyPrintedError: String = - exitCode match { - case Some(code) => - code match { - case GcpBatchExitCode.VMPreemption => "A Spot VM for the job was preempted during run time" - case GcpBatchExitCode.VMReportingTimeout => - "There was a timeout in the backend that caused a VM for the job to no longer receive updates" - case GcpBatchExitCode.VMRebootedDuringExecution => "A VM for the job unexpectedly rebooted during run time" - case GcpBatchExitCode.VMAndTaskAreUnresponsive => - "A task reached the unresponsive time limit and cannot be cancelled" - case GcpBatchExitCode.TaskRunsOverMaximumRuntime => - "A task's run time exceeded the time limit specified in the maxRunDuration, or, a runnable's run time exceeded the time limit specified in the timeout" - case GcpBatchExitCode.VMRecreatedDuringExecution => - "A VM for a job is unexpectedly recreated during run time" - } - case None => - eventList.headOption - .map(_.name) - .getOrElse( - "The job has failed but the exit code couldn't be derived, there isn't an event message either, please review the logs and report a bug" - ) - } } - final case object Aborted extends UnsuccessfulRunStatus { + final case class Aborted(errorCode: Status) extends UnsuccessfulRunStatus { override def toString = "Aborted" - override val exitCode: Option[GcpBatchExitCode] = None - override def eventList: Seq[ExecutionEvent] = List.empty + } - override val prettyPrintedError: String = "The job was aborted" + // TODO: Use this when detecting a preemption or remove it + final case class Preempted( + errorCode: Status, + eventList: Seq[ExecutionEvent] + ) extends UnsuccessfulRunStatus { + override def toString = "Preempted" } } diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala index d3bcb87c016..e71b2c90131 100644 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala @@ -1,9 +1,10 @@ package cromwell.backend.google.batch package actors +import _root_.io.grpc.Status import _root_.wdl.draft2.model._ import akka.actor.{ActorRef, Props} -import akka.testkit.{ImplicitSender, TestActorRef, TestDuration} +import akka.testkit.{ImplicitSender, TestActorRef, TestDuration, TestProbe} import cats.data.NonEmptyList import cloud.nio.impl.drs.DrsCloudNioFileProvider.DrsReadInterpreter import cloud.nio.impl.drs.{DrsCloudNioFileSystemProvider, GoogleOauthDrsCredentials} @@ -14,6 +15,9 @@ import common.collections.EnhancedCollections._ import common.mock.MockSugar import cromwell.backend.BackendJobExecutionActor.BackendJobExecutionResponse import cromwell.backend._ +import cromwell.backend.async.AsyncBackendJobExecutionActor.{Execute, ExecutionMode} +import cromwell.backend.async.{ExecutionHandle, FailedNonRetryableExecutionHandle} +import cromwell.backend.google.batch.actors.GcpBatchAsyncBackendJobExecutionActor.GcpBatchPendingExecutionHandle import cromwell.backend.google.batch.api.GcpBatchRequestFactory import cromwell.backend.google.batch.io.{DiskType, GcpBatchWorkingDisk} import cromwell.backend.google.batch.models._ @@ -22,6 +26,7 @@ import cromwell.backend.io.JobPathsSpecHelper._ import cromwell.backend.standard.{ DefaultStandardAsyncExecutionActorParams, StandardAsyncExecutionActorParams, + StandardAsyncJob, StandardExpressionFunctionsParams } import cromwell.core._ @@ -31,7 +36,10 @@ import cromwell.core.logging.JobLogger import cromwell.core.path.{DefaultPathBuilder, PathBuilder} import cromwell.filesystems.drs.DrsPathBuilder import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder, MockGcsPathBuilder} +import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage +import cromwell.services.instrumentation.{CromwellBucket, CromwellIncrement} import cromwell.services.keyvalue.InMemoryKvServiceActor +import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey} import cromwell.util.JsonFormatting.WomValueJsonFormatter._ import cromwell.util.SampleWdl import org.mockito.Mockito._ @@ -56,6 +64,7 @@ import java.time.temporal.ChronoUnit import java.util.UUID import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.language.postfixOps import scala.util.Success class GcpBatchAsyncBackendJobExecutionActorSpec @@ -200,8 +209,143 @@ class GcpBatchAsyncBackendJobExecutionActorSpec |} """.stripMargin + private def buildPreemptibleJobDescriptor(preemptible: Int, + previousPreemptions: Int, + previousUnexpectedRetries: Int, + failedRetriesCountOpt: Option[Int] = None + ): BackendJobDescriptor = { + val attempt = previousPreemptions + previousUnexpectedRetries + 1 + val wdlNamespace = WdlNamespaceWithWorkflow + .load(YoSup.replace("[PREEMPTIBLE]", s"preemptible: $preemptible"), Seq.empty[Draft2ImportResolver]) + .get + val womDefinition = wdlNamespace.workflow + .toWomWorkflowDefinition(isASubworkflow = false) + .getOrElse(fail("failed to get WomDefinition from WdlWorkflow")) + + wdlNamespace.toWomExecutable(Option(Inputs.toJson.compactPrint), NoIoFunctionSet, strictValidation = true) match { + case Right(womExecutable) => + val inputs = for { + combined <- womExecutable.resolvedExecutableInputs + (port, resolvedInput) = combined + value <- resolvedInput.select[WomValue] + } yield port -> value + + val workflowDescriptor = BackendWorkflowDescriptor( + WorkflowId.randomId(), + womDefinition, + inputs, + NoOptions, + Labels.empty, + HogGroup("foo"), + List.empty, + None + ) + + val job = workflowDescriptor.callable.taskCallNodes.head + val key = BackendJobDescriptorKey(job, None, attempt) + val runtimeAttributes = makeRuntimeAttributes(job) + val prefetchedKvEntries = Map( + GcpBatchBackendLifecycleActorFactory.preemptionCountKey -> KvPair( + ScopedKey(workflowDescriptor.id, KvJobKey(key), GcpBatchBackendLifecycleActorFactory.preemptionCountKey), + previousPreemptions.toString + ), + GcpBatchBackendLifecycleActorFactory.unexpectedRetryCountKey -> KvPair( + ScopedKey(workflowDescriptor.id, + KvJobKey(key), + GcpBatchBackendLifecycleActorFactory.unexpectedRetryCountKey + ), + previousUnexpectedRetries.toString + ) + ) + val prefetchedKvEntriesUpd = if (failedRetriesCountOpt.isEmpty) { + prefetchedKvEntries + } else { + prefetchedKvEntries + (BackendLifecycleActorFactory.FailedRetryCountKey -> KvPair( + ScopedKey(workflowDescriptor.id, KvJobKey(key), BackendLifecycleActorFactory.FailedRetryCountKey), + failedRetriesCountOpt.get.toString + )) + } + BackendJobDescriptor(workflowDescriptor, + key, + runtimeAttributes, + fqnWdlMapToDeclarationMap(Inputs), + NoDocker, + None, + prefetchedKvEntriesUpd + ) + case Left(badtimes) => fail(badtimes.toList.mkString(", ")) + } + } + + private case class DockerImageCacheTestingParameters(dockerImageCacheDiskOpt: Option[String], + dockerImageAsSpecifiedByUser: String, + isDockerImageCacheUsageRequested: Boolean + ) + + private def executionActor(jobDescriptor: BackendJobDescriptor, + promise: Promise[BackendJobExecutionResponse], + batchSingletonActor: ActorRef, + shouldBePreemptible: Boolean, + serviceRegistryActor: ActorRef, + referenceInputFilesOpt: Option[Set[GcpBatchInput]] = None, + dockerImageCacheTestingParamsOpt: Option[DockerImageCacheTestingParameters] = None + ): ActorRef = { + + val job = generateStandardAsyncJob + val run = Run(job) + val handle = new GcpBatchPendingExecutionHandle(jobDescriptor, run.job, Option(run), None) + + class ExecuteOrRecoverActor + extends TestableGcpBatchJobExecutionActor(jobDescriptor, + promise, + gcpBatchConfiguration, + batchSingletonActor = batchSingletonActor, + serviceRegistryActor = serviceRegistryActor + ) { + override def executeOrRecover(mode: ExecutionMode)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { + sendIncrementMetricsForReferenceFiles(referenceInputFilesOpt) + dockerImageCacheTestingParamsOpt.foreach { dockerImageCacheTestingParams => + sendIncrementMetricsForDockerImageCache( + dockerImageCacheTestingParams.dockerImageCacheDiskOpt, + dockerImageCacheTestingParams.dockerImageAsSpecifiedByUser, + dockerImageCacheTestingParams.isDockerImageCacheUsageRequested + ) + } + + if (preemptible == shouldBePreemptible) Future.successful(handle) + else Future.failed(new Exception(s"Test expected preemptible to be $shouldBePreemptible but got $preemptible")) + } + } + + system.actorOf(Props(new ExecuteOrRecoverActor), "ExecuteOrRecoverActor-" + UUID.randomUUID) + } + + def buildPreemptibleTestActorRef(attempt: Int, + preemptible: Int, + failedRetriesCountOpt: Option[Int] = None + ): TestActorRef[TestableGcpBatchJobExecutionActor] = { + // For this test we say that all previous attempts were preempted: + val jobDescriptor = buildPreemptibleJobDescriptor(preemptible, + attempt - 1, + previousUnexpectedRetries = 0, + failedRetriesCountOpt = failedRetriesCountOpt + ) + val props = Props( + new TestableGcpBatchJobExecutionActor(jobDescriptor, + Promise(), + gcpBatchConfiguration, + TestableGcpBatchExpressionFunctions, + emptyActor, + failIoActor + ) + ) + TestActorRef(props, s"TestableGcpBatchJobExecutionActor-${jobDescriptor.workflowDescriptor.id}") + } + behavior of "GcpBatchAsyncBackendJobExecutionActor" + private val timeout = 25 seconds + it should "group files by bucket" in { def makeInput(bucket: String, name: String): GcpBatchFileInput = { @@ -282,6 +426,200 @@ class GcpBatchAsyncBackendJobExecutionActorSpec "drs://drs.example.org/aaa,/mnt/disks/cromwell_root/path/to/aaa.bai\r\ndrs://drs.example.org/bbb,/mnt/disks/cromwell_root/path/to/bbb.bai\r\n" } + it should "send proper value for \"number of reference files used gauge\" metric, or don't send anything if reference disks feature is disabled" in { + + val expectedInput1 = GcpBatchFileInput(name = "testfile1", + relativeHostPath = + DefaultPathBuilder.build(Paths.get(s"test/reference/path/file1")), + mount = null, + cloudPath = null + ) + val expectedInput2 = GcpBatchFileInput(name = "testfile2", + relativeHostPath = + DefaultPathBuilder.build(Paths.get(s"test/reference/path/file2")), + mount = null, + cloudPath = null + ) + val expectedReferenceInputFiles = Set[GcpBatchInput](expectedInput1, expectedInput2) + + val expectedMsg1 = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, NonEmptyList.of("referencefiles", expectedInput1.relativeHostPath.pathAsString)) + ) + ) + val expectedMsg2 = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, NonEmptyList.of("referencefiles", expectedInput2.relativeHostPath.pathAsString)) + ) + ) + + val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0) + val serviceRegistryProbe = TestProbe() + + val backend1 = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + referenceInputFilesOpt = Option(expectedReferenceInputFiles) + ) + backend1 ! Execute + serviceRegistryProbe.expectMsgAllOf(expectedMsg1, expectedMsg2) + + val backend2 = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + referenceInputFilesOpt = None + ) + backend2 ! Execute + serviceRegistryProbe.expectNoMessage(timeout) + } + + it should "sends proper metrics for docker image cache feature" in { + + val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0) + val serviceRegistryProbe = TestProbe() + val madeUpDockerImageName = "test_madeup_docker_image_name" + + val expectedMessageWhenRequestedNotFound = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, + NonEmptyList("docker", List("image", "cache", "image_not_in_cache", madeUpDockerImageName)) + ) + ) + ) + val backendDockerCacheRequestedButNotFound = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + dockerImageCacheTestingParamsOpt = Option( + DockerImageCacheTestingParameters( + None, + "test_madeup_docker_image_name", + isDockerImageCacheUsageRequested = true + ) + ) + ) + backendDockerCacheRequestedButNotFound ! Execute + serviceRegistryProbe.expectMsg(expectedMessageWhenRequestedNotFound) + + val expectedMessageWhenRequestedAndFound = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, + NonEmptyList("docker", List("image", "cache", "used_image_from_cache", madeUpDockerImageName)) + ) + ) + ) + val backendDockerCacheRequestedAndFound = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + dockerImageCacheTestingParamsOpt = Option( + DockerImageCacheTestingParameters( + Option("test_madeup_disk_image_name"), + "test_madeup_docker_image_name", + isDockerImageCacheUsageRequested = true + ) + ) + ) + backendDockerCacheRequestedAndFound ! Execute + serviceRegistryProbe.expectMsg(expectedMessageWhenRequestedAndFound) + + val expectedMessageWhenNotRequestedButFound = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, + NonEmptyList("docker", List("image", "cache", "cached_image_not_used", madeUpDockerImageName)) + ) + ) + ) + val backendDockerCacheNotRequestedButFound = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + dockerImageCacheTestingParamsOpt = Option( + DockerImageCacheTestingParameters( + Option("test_madeup_disk_image_name"), + "test_madeup_docker_image_name", + isDockerImageCacheUsageRequested = false + ) + ) + ) + backendDockerCacheNotRequestedButFound ! Execute + serviceRegistryProbe.expectMsg(expectedMessageWhenNotRequestedButFound) + + val backendDockerCacheNotRequestedNotFound = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + dockerImageCacheTestingParamsOpt = Option( + DockerImageCacheTestingParameters( + None, + "test_madeup_docker_image_name", + isDockerImageCacheUsageRequested = false + ) + ) + ) + backendDockerCacheNotRequestedNotFound ! Execute + serviceRegistryProbe.expectNoMessage(timeout) + } + + it should "not restart 2 of 1 unexpected shutdowns without another preemptible VM" in { + + val actorRef = buildPreemptibleTestActorRef(2, 1) + val batchBackend = actorRef.underlyingActor + val runId = generateStandardAsyncJob + val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None) + + val failedStatus = RunStatus.Failed( + Status.ABORTED, + Seq.empty + ) + val executionResult = batchBackend.handleExecutionResult(failedStatus, handle) + val result = Await.result(executionResult, timeout) + result.isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + val failedHandle = result.asInstanceOf[FailedNonRetryableExecutionHandle] + failedHandle.returnCode shouldBe None + } + + it should "handle Failure Status for various errors" in { + + val actorRef = buildPreemptibleTestActorRef(1, 1) + val batchBackend = actorRef.underlyingActor + val runId = generateStandardAsyncJob + val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None) + + def checkFailedResult(errorCode: Status, errorMessage: Option[String]): ExecutionHandle = { + val failed = RunStatus.Failed( + errorCode, + Seq.empty + ) + Await.result(batchBackend.handleExecutionResult(failed, handle), timeout) + } + + checkFailedResult(Status.ABORTED, Option("15: Other type of error.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.OUT_OF_RANGE, Option("14: Wrong errorCode.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.ABORTED, Option("Weird error message.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.ABORTED, Option("UnparsableInt: Even weirder error message.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.ABORTED, None).isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + actorRef.stop() + } + it should "map GCS paths and *only* GCS paths to local" in { val wdlString = s"""|workflow wf { @@ -1115,7 +1453,31 @@ class GcpBatchAsyncBackendJobExecutionActorSpec "gs://path/to/gcs_root/w/e6236763-c518-41d0-9688-432549a8bf7d/call-B/shard-2/B-2.log" } + it should "return preemptible = true only in the correct cases" in { + + def attempt(max: Int, attempt: Int): GcpBatchAsyncBackendJobExecutionActor = + buildPreemptibleTestActorRef(attempt, max).underlyingActor + def attempt1(max: Int) = attempt(max, 1) + def attempt2(max: Int) = attempt(max, 2) + + val descriptorWithMax0AndKey1 = attempt1(max = 0) + descriptorWithMax0AndKey1.preemptible shouldBe false + + val descriptorWithMax1AndKey1 = attempt1(max = 1) + descriptorWithMax1AndKey1.preemptible shouldBe true + + val descriptorWithMax2AndKey1 = attempt1(max = 2) + descriptorWithMax2AndKey1.preemptible shouldBe true + + val descriptorWithMax1AndKey2 = attempt2(max = 1) + descriptorWithMax1AndKey2.preemptible shouldBe false + + val descriptorWithMax2AndKey2 = attempt2(max = 2) + descriptorWithMax2AndKey2.preemptible shouldBe true + } + it should "return the project from the workflow options in the start metadata" in { + val googleProject = "baa-ram-ewe" val batchGcsRoot = "gs://anorexic/duck" val workflowId = WorkflowId.randomId() @@ -1170,6 +1532,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec "gcpBatch:googleProject" -> googleProject, "labels:cromwell-workflow-id" -> s"cromwell-$workflowId", "labels:wdl-task-name" -> "goodbye", + "preemptible" -> "false", "runtimeAttributes:bootDiskSizeGb" -> "10", "runtimeAttributes:continueOnReturnCode" -> "0", "runtimeAttributes:cpu" -> "1", @@ -1224,4 +1587,9 @@ class GcpBatchAsyncBackendJobExecutionActorSpec evaluatedAttributes.getOrElse(fail("Failed to evaluate runtime attributes")) ) } + + private def generateStandardAsyncJob = + StandardAsyncJob( + JobName.newBuilder().setJob(UUID.randomUUID().toString).setProject("test").setLocation("local").build().toString + ) } diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala deleted file mode 100644 index db5943759a3..00000000000 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala +++ /dev/null @@ -1,52 +0,0 @@ -package cromwell.backend.google.batch.models - -import org.scalatest.OptionValues._ -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -class GcpBatchExitCodeSpec extends AnyWordSpec with Matchers { - - "fromEventMessage" should { - "detect VMPreemption error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to Spot Preemption with exit code 50001." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMPreemption) - } - - "detect VMReportingTimeout error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to Batch no longer receives VM updates with exit code 50002." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMReportingTimeout) - } - - "detect VMRebootedDuringExecution error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to VM is rebooted during task execution with exit code 50003." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMRebootedDuringExecution) - } - - "detect VMAndTaskAreUnresponsive error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to tasks cannot be canceled with exit code 50004." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMAndTaskAreUnresponsive) - } - - "detect TaskRunsOverMaximumRuntime error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to task runs over the maximum runtime with exit code 50005." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.TaskRunsOverMaximumRuntime) - } - - "detect VMRecreatedDuringExecution error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to VM is recreated during task execution with exit code 50006." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMRecreatedDuringExecution) - } - } -} From 4df2a07382c908dc79799a78bec839e0245fb3cd Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Fri, 31 Jan 2025 16:26:26 -0500 Subject: [PATCH 2/9] scalafmt --- .../cromwell/backend/google/batch/models/RunStatus.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala index ecbd50060fb..f565692347d 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala @@ -43,9 +43,8 @@ object RunStatus { override def toString = "Failed" } - final case class Aborted(errorCode: Status, - instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty - ) extends UnsuccessfulRunStatus { + final case class Aborted(errorCode: Status, instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty) + extends UnsuccessfulRunStatus { override def toString = "Aborted" override def eventList: Seq[ExecutionEvent] = List.empty From 78bfd5365acc3ef1c9f9ebe20982fef75c3b9b07 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Sat, 1 Feb 2025 01:50:02 -0500 Subject: [PATCH 3/9] fixups --- ...tchAsyncBackendJobExecutionActorSpec.scala | 112 ++---------------- 1 file changed, 7 insertions(+), 105 deletions(-) diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala index 456de7c5c37..07216f057fc 100644 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala @@ -24,7 +24,12 @@ import cromwell.backend.google.batch.models._ import cromwell.backend.google.batch.runnable.RunnableUtils.MountPoint import cromwell.backend.google.batch.util.BatchExpressionFunctions import cromwell.backend.io.JobPathsSpecHelper._ -import cromwell.backend.standard.{DefaultStandardAsyncExecutionActorParams, StandardAsyncExecutionActorParams, StandardAsyncJob, StandardExpressionFunctionsParams} +import cromwell.backend.standard.{ + DefaultStandardAsyncExecutionActorParams, + StandardAsyncExecutionActorParams, + StandardAsyncJob, + StandardExpressionFunctionsParams +} import cromwell.core._ import cromwell.core.callcaching.NoDocker import cromwell.core.labels.Labels @@ -290,19 +295,12 @@ class GcpBatchAsyncBackendJobExecutionActorSpec } } - private case class DockerImageCacheTestingParameters(dockerImageCacheDiskOpt: Option[String], - dockerImageAsSpecifiedByUser: String, - isDockerImageCacheUsageRequested: Boolean - ) - private def executionActor(jobDescriptor: BackendJobDescriptor, promise: Promise[BackendJobExecutionResponse], batchSingletonActor: ActorRef, shouldBePreemptible: Boolean, serviceRegistryActor: ActorRef, - referenceInputFilesOpt: Option[Set[GcpBatchInput]] = None, - dockerImageCacheTestingParamsOpt: Option[DockerImageCacheTestingParameters] = None - ): ActorRef = { + referenceInputFilesOpt: Option[Set[GcpBatchInput]]): ActorRef = { val job = generateStandardAsyncJob val run = Run(job) @@ -485,102 +483,6 @@ class GcpBatchAsyncBackendJobExecutionActorSpec serviceRegistryProbe.expectNoMessage(timeout) } - it should "sends proper metrics for docker image cache feature" in { - - val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0) - val serviceRegistryProbe = TestProbe() - val madeUpDockerImageName = "test_madeup_docker_image_name" - - val expectedMessageWhenRequestedNotFound = InstrumentationServiceMessage( - CromwellIncrement( - CromwellBucket(List.empty, - NonEmptyList("docker", List("image", "cache", "image_not_in_cache", madeUpDockerImageName)) - ) - ) - ) - val backendDockerCacheRequestedButNotFound = executionActor( - jobDescriptor, - Promise[BackendJobExecutionResponse](), - TestProbe().ref, - shouldBePreemptible = false, - serviceRegistryActor = serviceRegistryProbe.ref, - dockerImageCacheTestingParamsOpt = Option( - DockerImageCacheTestingParameters( - None, - "test_madeup_docker_image_name", - isDockerImageCacheUsageRequested = true - ) - ) - ) - backendDockerCacheRequestedButNotFound ! Execute - serviceRegistryProbe.expectMsg(expectedMessageWhenRequestedNotFound) - - val expectedMessageWhenRequestedAndFound = InstrumentationServiceMessage( - CromwellIncrement( - CromwellBucket(List.empty, - NonEmptyList("docker", List("image", "cache", "used_image_from_cache", madeUpDockerImageName)) - ) - ) - ) - val backendDockerCacheRequestedAndFound = executionActor( - jobDescriptor, - Promise[BackendJobExecutionResponse](), - TestProbe().ref, - shouldBePreemptible = false, - serviceRegistryActor = serviceRegistryProbe.ref, - dockerImageCacheTestingParamsOpt = Option( - DockerImageCacheTestingParameters( - Option("test_madeup_disk_image_name"), - "test_madeup_docker_image_name", - isDockerImageCacheUsageRequested = true - ) - ) - ) - backendDockerCacheRequestedAndFound ! Execute - serviceRegistryProbe.expectMsg(expectedMessageWhenRequestedAndFound) - - val expectedMessageWhenNotRequestedButFound = InstrumentationServiceMessage( - CromwellIncrement( - CromwellBucket(List.empty, - NonEmptyList("docker", List("image", "cache", "cached_image_not_used", madeUpDockerImageName)) - ) - ) - ) - val backendDockerCacheNotRequestedButFound = executionActor( - jobDescriptor, - Promise[BackendJobExecutionResponse](), - TestProbe().ref, - shouldBePreemptible = false, - serviceRegistryActor = serviceRegistryProbe.ref, - dockerImageCacheTestingParamsOpt = Option( - DockerImageCacheTestingParameters( - Option("test_madeup_disk_image_name"), - "test_madeup_docker_image_name", - isDockerImageCacheUsageRequested = false - ) - ) - ) - backendDockerCacheNotRequestedButFound ! Execute - serviceRegistryProbe.expectMsg(expectedMessageWhenNotRequestedButFound) - - val backendDockerCacheNotRequestedNotFound = executionActor( - jobDescriptor, - Promise[BackendJobExecutionResponse](), - TestProbe().ref, - shouldBePreemptible = false, - serviceRegistryActor = serviceRegistryProbe.ref, - dockerImageCacheTestingParamsOpt = Option( - DockerImageCacheTestingParameters( - None, - "test_madeup_docker_image_name", - isDockerImageCacheUsageRequested = false - ) - ) - ) - backendDockerCacheNotRequestedNotFound ! Execute - serviceRegistryProbe.expectNoMessage(timeout) - } - it should "not restart 2 of 1 unexpected shutdowns without another preemptible VM" in { val actorRef = buildPreemptibleTestActorRef(2, 1) From 4e2562bb2de2787df601de59c2a32924659a5797 Mon Sep 17 00:00:00 2001 From: LizBaldo Date: Tue, 4 Feb 2025 16:21:03 -0500 Subject: [PATCH 4/9] [AN-393] Add waiting for quota to quota messages (#7686) Co-authored-by: Adam Nichols --- .../cromwell/backend/async/KnownJobFailureException.scala | 4 +++- .../main/resources/standardTestCases/quota_fail_retry.test | 1 + .../backend/google/pipelines/common/errors/package.scala | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala index e106f06fcd2..23ad85e957d 100644 --- a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala +++ b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala @@ -5,7 +5,9 @@ import common.exception.ThrowableAggregation import cromwell.core.path.Path import wom.expression.{NoIoFunctionSet, WomExpression} -abstract class KnownJobFailureException extends Exception { +import scala.util.control.NoStackTrace + +abstract class KnownJobFailureException extends Exception with NoStackTrace { def stderrPath: Option[Path] } diff --git a/centaur/src/main/resources/standardTestCases/quota_fail_retry.test b/centaur/src/main/resources/standardTestCases/quota_fail_retry.test index db6d29e1d8f..7203a9958ae 100644 --- a/centaur/src/main/resources/standardTestCases/quota_fail_retry.test +++ b/centaur/src/main/resources/standardTestCases/quota_fail_retry.test @@ -1,3 +1,4 @@ +ignore: true # GCP seems to have fixed the quota handling bug, which makes this test unnecessary name: quota_fail_retry testFormat: workflowfailure # In PAPI v2 there seems to be a quota exhaustion message in a reasonably timely manner, while in GCP Batch the job diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala index ade5f84e00f..2a90f588803 100644 --- a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala +++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala @@ -7,7 +7,8 @@ package object errors { "usage too high", "no available zones", "resource_exhausted", - "quota too low" + "quota too low", + "waiting for quota" ) def isQuotaMessage(msg: String): Boolean = From 01a294a1758c8b459e70033a30be55e690741a1a Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Sat, 1 Feb 2025 14:33:32 -0500 Subject: [PATCH 5/9] Fix preemptible / maxRetries on GCP Batch [AN-274] --- .../standardTestCases/checkpointing.test | 2 +- .../checkpointing/gcpbatch_checkpointing.wdl | 70 +++++++++++++++++++ .../error_10_preemptible.test | 12 ---- .../gcpbatch_checkpointing.test | 13 ++++ ...ch_papi_delocalization_required_files.test | 2 +- ...atch_papi_preemptible_and_max_retries.test | 13 ++++ ...gcpbatch_preemptible_and_memory_retry.test | 28 ++++++++ .../gcpbatch_preemptible_basic.test | 11 +++ ..._requester_pays_localization_negative.test | 2 +- .../papi_preemptible_and_max_retries.test | 4 +- ...batch_papi_preemptible_and_max_retries.wdl | 31 ++++++++ .../preemptible_and_memory_retry.test | 8 ++- .../standardTestCases/preemptible_basic.test | 11 +++ .../gcpbatch_preemptible_basic.wdl | 33 +++++++++ .../preemptible_basic.wdl} | 4 +- .../gcpbatch/preemptible_and_memory_retry.wdl | 5 +- ...cpBatchAsyncBackendJobExecutionActor.scala | 18 ++++- .../api/GcpBatchRequestFactoryImpl.scala | 7 +- .../batch/util/BatchUtilityConversions.scala | 6 +- ...tchAsyncBackendJobExecutionActorSpec.scala | 3 +- 20 files changed, 246 insertions(+), 37 deletions(-) create mode 100644 centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl delete mode 100644 centaur/src/main/resources/standardTestCases/error_10_preemptible.test create mode 100644 centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test create mode 100644 centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test create mode 100644 centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test create mode 100644 centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test create mode 100644 centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl create mode 100644 centaur/src/main/resources/standardTestCases/preemptible_basic.test create mode 100644 centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl rename centaur/src/main/resources/standardTestCases/{error_10_preemptible/error_10_preemptible.wdl => preemptible_basic/preemptible_basic.wdl} (96%) diff --git a/centaur/src/main/resources/standardTestCases/checkpointing.test b/centaur/src/main/resources/standardTestCases/checkpointing.test index 89a1d3dc633..9abbfbbdcfb 100644 --- a/centaur/src/main/resources/standardTestCases/checkpointing.test +++ b/centaur/src/main/resources/standardTestCases/checkpointing.test @@ -1,6 +1,6 @@ name: checkpointing testFormat: workflowsuccess -backends: [Papiv2, GCPBATCH] +backends: [Papiv2, GCPBATCH_ALT] files { workflow: checkpointing/checkpointing.wdl diff --git a/centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl b/centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl new file mode 100644 index 00000000000..eccccfcb5e6 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl @@ -0,0 +1,70 @@ +version 1.0 + +workflow checkpointing { + call count { input: count_to = 100 } + output { + String preempted = count.preempted + } +} + +task count { + input { + Int count_to + } + + meta { + volatile: true + } + + command <<< + # Read from the my_checkpoint file if there's content there: + FROM_CKPT=$(cat my_checkpoint | tail -n1 | awk '{ print $1 }') + FROM_CKPT=${FROM_CKPT:-1} + + # We don't want any single VM run the entire count, so work out the max counter value for this attempt: + MAX="$(($FROM_CKPT + 66))" + + INSTANCE_NAME=$(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") + echo "Discovered instance: $INSTANCE_NAME" + + # Run the counter: + echo '--' >> my_checkpoint + for i in $(seq $FROM_CKPT ~{count_to}) + do + echo $i + echo $i ${INSTANCE_NAME} $(date) >> my_checkpoint + + # If we're over our max, "preempt" the VM by simulating a maintenance event: + if [ "${i}" -gt "${MAX}" ] + then + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) + zone=$(basename "$fully_qualified_zone") + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 + fi + + sleep 1 + done + + # Prove that we got preempted at least once: + FIRST_INSTANCE=$(cat my_checkpoint | head -n1 | awk '{ print $2 }') + LAST_INSTANCE=$(cat my_checkpoint | tail -n1 | awk '{ print $2 }') + if [ "${FIRST_INSTANCE}" != "LAST_INSTANCE" ] + then + echo "GOTPREEMPTED" > preempted.txt + else + echo "NEVERPREEMPTED" > preempted.txt + fi + >>> + + runtime { + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + preemptible: 3 + checkpointFile: "my_checkpoint" + } + + output { + File checkpoint_log = "my_checkpoint" + String preempted = read_string("preempted.txt") + } +} diff --git a/centaur/src/main/resources/standardTestCases/error_10_preemptible.test b/centaur/src/main/resources/standardTestCases/error_10_preemptible.test deleted file mode 100644 index aa2e15b108d..00000000000 --- a/centaur/src/main/resources/standardTestCases/error_10_preemptible.test +++ /dev/null @@ -1,12 +0,0 @@ -name: error_10_preemptible -testFormat: workflowsuccess -# Try to fake a preemption which doesn't seem to work on GCP Batch but probably shouldn't be working on PAPI v2 -backends: [Papiv2, GCPBATCH_TESTING_PAPIV2_QUIRKS] - -files { - workflow: error_10_preemptible/error_10_preemptible.wdl -} - -metadata { - status: Succeeded -} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test b/centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test new file mode 100644 index 00000000000..a683cbd5fcb --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test @@ -0,0 +1,13 @@ +name: gcpbatch_checkpointing +testFormat: workflowsuccess +backends: [GCPBATCH] + +files { + workflow: checkpointing/gcpbatch_checkpointing.wdl +} + +metadata { + workflowName: checkpointing + status: Succeeded + "outputs.checkpointing.preempted": "GOTPREEMPTED" +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test index dc1f2b7a2ce..64a379ddc35 100644 --- a/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test @@ -11,5 +11,5 @@ metadata { "calls.required_files.check_it.executionStatus": "Done" "calls.required_files.do_it.executionStatus": "Failed" "calls.required_files.do_it.retryableFailure": "false" - "calls.required_files.do_it.failures.0.message": ~~"Job failed due to task failure. Specifically, task with index 0 failed due to the following task event: \"Task state is updated from RUNNING to FAILED" + "calls.required_files.do_it.failures.0.message": ~~"Job exited without an error, exit code 0. Batch error code 0. Job failed with an unknown reason" } diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test new file mode 100644 index 00000000000..0be13eb0ea4 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test @@ -0,0 +1,13 @@ +name: gcpbatch_papi_preemptible_and_max_retries +testFormat: workflowfailure +backends: [GCPBATCH] + +files { + workflow: papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl +} + +metadata { + workflowName: papi_preemptible_and_max_retries + status: Failed + "papi_preemptible_and_max_retries.delete_self.-1.attempt": 3 +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test new file mode 100644 index 00000000000..dfbcd9fcf41 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test @@ -0,0 +1,28 @@ +name: gcpbatch_preemptible_and_memory_retry +testFormat: workflowfailure +# The original version of this test was tailored to the quirks of Papi v2 in depending on the misdiagnosis of its own +# VM deletion as a preemption event. However GCP Batch perhaps more correctly diagnoses VM deletion as a weird +# non-preemption event. The GCPBATCH version of this test uses `gcloud beta compute instances simulate-maintenance-event` +# to simulate a preemption in a way that GCP Batch actually perceives as a preemption. +backends: [GCPBATCH] + +files { + workflow: retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl + options: retry_with_more_memory/retry_with_more_memory.options +} + +metadata { + workflowName: preemptible_and_memory_retry + status: Failed + "failures.0.message": "Workflow failed" + "failures.0.causedBy.0.message": "stderr for job `preemptible_and_memory_retry.imitate_oom_error_on_preemptible:NA:3` contained one of the `memory-retry-error-keys: [OutOfMemory,Killed]` specified in the Cromwell config. Job might have run out of memory." + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.1.preemptible": "true" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.1.executionStatus": "RetryableFailure" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.1.runtimeAttributes.memory": "1 GB" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.2.preemptible": "false" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.2.executionStatus": "RetryableFailure" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.2.runtimeAttributes.memory": "1 GB" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.3.preemptible": "false" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.3.executionStatus": "Failed" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.3.runtimeAttributes.memory": "1.1 GB" +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test new file mode 100644 index 00000000000..accc2ceda51 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test @@ -0,0 +1,11 @@ +name: gcpbatch_preemptible_basic +testFormat: workflowsuccess +backends: [GCPBATCH] + +files { + workflow: preemptible_basic/gcpbatch_preemptible_basic.wdl +} + +metadata { + status: Succeeded +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test b/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test index 7a34545d6c1..e4b0a2fba2a 100644 --- a/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test @@ -14,5 +14,5 @@ metadata { workflowName: requester_pays_localization status: Failed "failures.0.message": "Workflow failed" - "failures.0.causedBy.0.message": ~~"Job failed due to task failure. Specifically, task with index 0 failed due to the following task event: \"Task state is updated from RUNNING to FAILED" + "failures.0.causedBy.0.message": ~~"The job was stopped before the command finished. Batch error code 0. Job failed with an unknown reason" } diff --git a/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test index 0cfbc9e13d5..74ea22f8cb2 100644 --- a/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test +++ b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test @@ -1,7 +1,7 @@ name: papi_preemptible_and_max_retries testFormat: workflowfailure -# faking own preemption doesn't work on GCP Batch -backends: [Papiv2, GCPBATCH_TESTING_PAPIV2_QUIRKS] +# Faking own preemption has to be done differently on GCP Batch +backends: [Papiv2, GCPBATCH_ALT] files { workflow: papi_preemptible_and_max_retries/papi_preemptible_and_max_retries.wdl diff --git a/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl new file mode 100644 index 00000000000..9614d0656de --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl @@ -0,0 +1,31 @@ +version 1.0 + +task delete_self { + + command { + preemptible=$(curl -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible") + + # Simulate a maintenance event on ourselves if running on a preemptible VM, otherwise delete ourselves. + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) + zone=$(basename "$fully_qualified_zone") + + if [ "$preemptible" = "TRUE" ]; then + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 + else + # We need to actually delete ourselves if the VM is not preemptible; simulated maintenance events don't seem to + # precipitate the demise of on-demand VMs. + gcloud compute instances delete $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + fi + } + + runtime { + preemptible: 1 + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + maxRetries: 1 + } +} + +workflow papi_preemptible_and_max_retries { + call delete_self +} diff --git a/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test b/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test index 1ef397d95ae..58f882e9d26 100644 --- a/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test +++ b/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test @@ -1,8 +1,10 @@ name: preemptible_and_memory_retry testFormat: workflowfailure -# The original version of this test seems to have been tailored to the quirks of Papi v2 in depending on the misdiagnosis of its own VM deletion as a preemption event. GCP Batch perhaps more correctly diagnoses the VM deletion as a weird non-preemption happening, but that frustrates the logic of this test. -# Disabling this as it's not possible to induce a real preemption. -backends: [Papiv2, GCPBATCH_TESTING_PAPIV2_QUIRKS] +# The original version of this test was tailored to the quirks of Papi v2 in depending on the misdiagnosis of its own +# VM deletion as a preemption event. However GCP Batch perhaps more correctly diagnoses VM deletion as a weird +# non-preemption event. The GCPBATCH version of this test uses `gcloud beta compute instances simulate-maintenance-event` +# to simulate a preemption in a way that GCP Batch actually perceives as a preemption. +backends: [Papiv2, GCPBATCH_ALT] files { workflow: retry_with_more_memory/preemptible_and_memory_retry.wdl diff --git a/centaur/src/main/resources/standardTestCases/preemptible_basic.test b/centaur/src/main/resources/standardTestCases/preemptible_basic.test new file mode 100644 index 00000000000..1c77e9265bc --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/preemptible_basic.test @@ -0,0 +1,11 @@ +name: preemptible_basic +testFormat: workflowsuccess +backends: [Papiv2, GCPBATCH_ALT] + +files { + workflow: preemptible_basic/preemptible_basic.wdl +} + +metadata { + status: Succeeded +} diff --git a/centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl b/centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl new file mode 100644 index 00000000000..3217b772a33 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl @@ -0,0 +1,33 @@ +version 1.0 + +task delete_self_if_preemptible { + + command <<< + # Prepend date, time and pwd to xtrace log entries. + PS4='\D{+%F %T} \w $ ' + set -o errexit -o nounset -o pipefail -o xtrace + + preemptible=$(curl -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible") + + # Perform a maintenance event on this VM if it is preemptible, which should cause it to be preempted. + # Since `preemptible: 1` the job should be restarted on a non-preemptible VM. + if [ "$preemptible" = "TRUE" ]; then + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) + zone=$(basename "$fully_qualified_zone") + + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 + fi + + >>> + + runtime { + preemptible: 1 + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + } +} + + +workflow preemptible_basic { + call delete_self_if_preemptible +} diff --git a/centaur/src/main/resources/standardTestCases/error_10_preemptible/error_10_preemptible.wdl b/centaur/src/main/resources/standardTestCases/preemptible_basic/preemptible_basic.wdl similarity index 96% rename from centaur/src/main/resources/standardTestCases/error_10_preemptible/error_10_preemptible.wdl rename to centaur/src/main/resources/standardTestCases/preemptible_basic/preemptible_basic.wdl index bac7c838121..3b8bd649210 100644 --- a/centaur/src/main/resources/standardTestCases/error_10_preemptible/error_10_preemptible.wdl +++ b/centaur/src/main/resources/standardTestCases/preemptible_basic/preemptible_basic.wdl @@ -9,7 +9,7 @@ task delete_self_if_preemptible { # Delete self if running on a preemptible VM. This should produce an "error 10" which Cromwell should treat as a preemption. # Since `preemptible: 1` the job should be restarted on a non-preemptible VM. if [ "$preemptible" = "TRUE" ]; then - + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) zone=$(basename "$fully_qualified_zone") @@ -25,6 +25,6 @@ task delete_self_if_preemptible { } -workflow error_10_preemptible { +workflow preemptible_basic { call delete_self_if_preemptible } diff --git a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl index 43e9f57c9a1..98adc76f03a 100644 --- a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl +++ b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl @@ -12,13 +12,14 @@ task imitate_oom_error_on_preemptible { preemptible=$(curl -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible") - # Delete self if running on a preemptible VM + # Simulate a maintenance event on ourselves if running on a preemptible VM # Since `preemptible: 1` the job should be restarted on a non-preemptible VM. if [ "$preemptible" = "TRUE" ]; then fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) zone=$(basename "$fully_qualified_zone") - gcloud compute instances delete $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 fi # Should reach here on the second attempt diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 5e4c0e753f9..47be345291d 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -68,6 +68,7 @@ import java.io.OutputStreamWriter import java.net.SocketTimeoutException import java.nio.charset.Charset import java.util.Base64 +import java.util.regex.Pattern import scala.concurrent.Future import scala.concurrent.duration._ import scala.io.Source @@ -77,6 +78,10 @@ import scala.util.control.NoStackTrace object GcpBatchAsyncBackendJobExecutionActor { + private val VM_PREEMPTION_PATTERN = Pattern.compile( + "failed due to the following task event: \"Task state is updated from RUNNING to FAILED on zones/\\S+ due to Spot VM preemption with exit code 50001.\"" + ) + def StandardException(errorCode: GrpcStatus, message: String, jobTag: String, @@ -1112,12 +1117,19 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar None ) + def isPreemption(maybePreemption: RunStatus.UnsuccessfulRunStatus): Boolean = + maybePreemption match { + case _: RunStatus.Failed => + maybePreemption.eventList.exists(e => VM_PREEMPTION_PATTERN.matcher(e.name).find()) + case _ => false + } + Future.fromTry { Try { runStatus match { - case preemptedStatus: RunStatus.Preempted if preemptible => - handlePreemption(preemptedStatus, returnCode, prettyPrintedError) case _: RunStatus.Aborted => AbortedExecutionHandle + case maybePreemption: RunStatus.UnsuccessfulRunStatus if preemptible && isPreemption(maybePreemption) => + handlePreemption(maybePreemption, returnCode, prettyPrintedError) case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode) case unknown => throw new RuntimeException( @@ -1137,7 +1149,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar ) private def handlePreemption( - runStatus: RunStatus.Preempted, + runStatus: RunStatus.UnsuccessfulRunStatus, jobReturnCode: Option[Int], prettyPrintedError: String ): ExecutionHandle = { diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala index 0ad12904c06..090f0f6f5ee 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala @@ -126,7 +126,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe private def createTaskSpec(runnables: List[Runnable], computeResource: ComputeResource, - retryCount: Int, durationInSeconds: Long, volumes: List[Volume] ) = @@ -134,7 +133,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe .addAllRunnables(runnables.asJava) .setComputeResource(computeResource) .addAllVolumes(volumes.asJava) - .setMaxRetryCount(retryCount) .setMaxRunDuration( Duration.newBuilder .setSeconds(durationInSeconds) @@ -180,7 +178,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe val createParameters = data.createParameters val runtimeAttributes = createParameters.runtimeAttributes - val retryCount = runtimeAttributes.preemptible val allDisksToBeMounted: Seq[GcpBatchAttachedDisk] = createParameters.disks ++ createParameters.referenceDisksForLocalizationOpt.getOrElse(List.empty) val gcpBootDiskSizeMb = convertGbToMib(runtimeAttributes) @@ -221,7 +218,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe val taskCount: Long = 1 // parse preemption value and set value for Spot. Spot is replacement for preemptible - val spotModel = toProvisioningModel(runtimeAttributes.preemptible) + val spotModel = toProvisioningModel(createParameters.preemptible) // Set GPU accelerators val accelerators = runtimeAttributes.gpuResource.map(toAccelerator) @@ -260,7 +257,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe ) val computeResource = createComputeResource(cpuCores, memory, gcpBootDiskSizeMb) - val taskSpec = createTaskSpec(sortedRunnables, computeResource, retryCount, durationInSeconds, allVolumes) + val taskSpec = createTaskSpec(sortedRunnables, computeResource, durationInSeconds, allVolumes) val taskGroup: TaskGroup = createTaskGroup(taskCount, taskSpec) val machineType = GcpBatchMachineConstraints.machineType(runtimeAttributes.memory, runtimeAttributes.cpu, diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala index 239424ed73f..1698c68192e 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala @@ -36,10 +36,8 @@ trait BatchUtilityConversions { (memory.amount * 1024).toLong // set Standard or Spot instances - def toProvisioningModel(preemption: Int): ProvisioningModel = preemption compare 0 match { - case 0 => ProvisioningModel.STANDARD - case 1 => ProvisioningModel.SPOT - } + def toProvisioningModel(preemptible: Boolean): ProvisioningModel = + if (preemptible) ProvisioningModel.SPOT else ProvisioningModel.STANDARD def toDisks(disks: Seq[GcpBatchAttachedDisk]): List[AttachedDisk] = disks.map(toDisk).toList diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala index 07216f057fc..7735c9e25ac 100644 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala @@ -300,7 +300,8 @@ class GcpBatchAsyncBackendJobExecutionActorSpec batchSingletonActor: ActorRef, shouldBePreemptible: Boolean, serviceRegistryActor: ActorRef, - referenceInputFilesOpt: Option[Set[GcpBatchInput]]): ActorRef = { + referenceInputFilesOpt: Option[Set[GcpBatchInput]] + ): ActorRef = { val job = generateStandardAsyncJob val run = Run(job) From 4a92bbcbbae91458d697c4f735a13eb8f085e2f1 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Tue, 4 Feb 2025 12:06:26 -0500 Subject: [PATCH 6/9] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f5fa58aaa8..2d856a116fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional - The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information. - Fixes a bug with not being able to recover jobs on Cromwell restart. - Fixes machine type selection to match the Google Cloud Life Sciences backend, including default n1 non shared-core machine types and correct handling of `cpuPlatform` to select n2 or n2d machine types as appropriate. -- Fixes the preemption error handling, now, the correct error message is printed, this also handles the other potential exit codes. +- Fixes preemption and maxRetries behavior. In particular, once a task has exhausted its allowed preemptible attempts, the task will be scheduled again on a non-preemptible VM. - Fixes error message reporting for failed jobs. - Fixes the "retry with more memory" feature. - Fixes the reference disk feature. From f87aff296e3de9c809ad760407491971cf35c867 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Wed, 5 Feb 2025 11:09:18 -0500 Subject: [PATCH 7/9] Retry with more memory MEM_SIZE/MEM_UNIT Centaur test --- .../gcpbatch_retry_with_more_memory.test | 9 +++----- .../gcpbatch/retry_with_more_memory.wdl | 21 +++++++++++++++---- .../batch/runnable/RunnableBuilder.scala | 15 ++++++++++++- .../google/batch/runnable/UserRunnable.scala | 3 ++- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test b/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test index fb8847b52cf..cdb626e658b 100644 --- a/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test @@ -1,5 +1,5 @@ name: gcpbatch_retry_with_more_memory -testFormat: workflowfailure +testFormat: workflowsuccess backends: [GCPBATCH] files { @@ -9,13 +9,10 @@ files { metadata { workflowName: retry_with_more_memory - status: Failed - "failures.0.message": "Workflow failed" - "failures.0.causedBy.0.message": "stderr for job `retry_with_more_memory.imitate_oom_error:NA:3` contained one of the `memory-retry-error-keys: [OutOfMemory,Killed]` specified in the Cromwell config. Job might have run out of memory." + status: Succeeded "retry_with_more_memory.imitate_oom_error.-1.1.executionStatus": "RetryableFailure" "retry_with_more_memory.imitate_oom_error.-1.1.runtimeAttributes.memory": "1 GB" "retry_with_more_memory.imitate_oom_error.-1.2.executionStatus": "RetryableFailure" "retry_with_more_memory.imitate_oom_error.-1.2.runtimeAttributes.memory": "1.1 GB" - "retry_with_more_memory.imitate_oom_error.-1.3.executionStatus": "Failed" - "retry_with_more_memory.imitate_oom_error.-1.3.runtimeAttributes.memory": "1.2100000000000002 GB" + "outputs.retry_with_more_memory.memory_output": "1.2100000000000002 GB" } diff --git a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl index c9efea52dd3..2c50ed34c86 100644 --- a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl +++ b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl @@ -2,12 +2,21 @@ version 1.0 task imitate_oom_error { command { - printf "Exception in thread "main" java.lang.OutOfMemoryError: testing\n\tat Test.main(Test.java:1)\n" >&2 && (exit 1) - # As a simulation of an OOM condition, do not create the 'foo' file. Cromwell should still be able to delocalize important detritus. - # touch foo + echo "$MEM_SIZE $MEM_UNIT" + + # Current bashes do not do floating point arithmetic, Python to the rescue. + LESS=$(python -c "print($MEM_SIZE < 1.21)") + + if [[ "$LESS" = "True" ]] + then + printf "Exception in thread "main" java.lang.OutOfMemoryError: testing\n\tat Test.main(Test.java:1)\n" >&2 + exit 1 + fi + + echo "$MEM_SIZE $MEM_UNIT" > memory_output.txt } output { - File foo = "foo" + String memory_output = read_string("memory_output.txt") } runtime { docker: "python:latest" @@ -19,4 +28,8 @@ task imitate_oom_error { workflow retry_with_more_memory { call imitate_oom_error + + output { + String memory_output = imitate_oom_error.memory_output + } } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala index d69502295cd..09208b96865 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala @@ -6,6 +6,7 @@ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsT import cromwell.backend.google.batch.models.{BatchParameter, GcpBatchInput, GcpBatchOutput} import cromwell.core.path.Path import mouse.all.anySyntaxMouse +import wom.format.MemorySize import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.jdk.CollectionConverters._ @@ -147,7 +148,8 @@ object RunnableBuilder { scriptContainerPath: String, jobShell: String, volumes: List[Volume], - dockerhubCredentials: (String, String) + dockerhubCredentials: (String, String), + memory: MemorySize ): Runnable.Builder = { val container = (dockerhubCredentials._1, dockerhubCredentials._2) match { @@ -164,9 +166,20 @@ object RunnableBuilder { .setEntrypoint(jobShell) .addCommands(scriptContainerPath) } + + // adding memory as environment variables makes it easy for a user to retrieve the new value of memory + // on the machine to utilize in their command blocks if needed + val environment = + Environment + .newBuilder() + .putAllVariables( + Map("MEM_UNIT" -> memory.unit.toString, "MEM_SIZE" -> memory.amount.toString).asJava + ) + Runnable .newBuilder() .setContainer(container) + .setEnvironment(environment) .withVolumes(volumes) .putLabels(Key.Tag, Value.UserRunnable) } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala index cba665dbf9e..05b6334ccb4 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala @@ -12,7 +12,8 @@ trait UserRunnable { scriptContainerPath = createParameters.commandScriptContainerPath.pathAsString, jobShell = "/bin/bash", volumes = volumes, - dockerhubCredentials = createParameters.dockerhubCredentials + dockerhubCredentials = createParameters.dockerhubCredentials, + memory = createParameters.runtimeAttributes.memory ) val describeRunnable = RunnableBuilder.describeDocker("user runnable", userRunnable) From a35bc4e2a7b37ddd93f2206dd764230065b321cf Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Thu, 6 Feb 2025 14:44:53 -0500 Subject: [PATCH 8/9] PR feedback, cleanup --- ...cpBatchAsyncBackendJobExecutionActor.scala | 39 +++++-------------- .../api/request/BatchRequestExecutor.scala | 15 ++++++- .../google/batch/models/RunStatus.scala | 1 - 3 files changed, 22 insertions(+), 33 deletions(-) diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 47be345291d..5dc80b89251 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -29,59 +29,45 @@ import cromwell.backend.google.batch.models._ import cromwell.backend.google.batch.monitoring.{BatchInstrumentation, CheckpointingConfiguration, MonitoringImage} import cromwell.backend.google.batch.runnable.WorkflowOptionKeys import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, RuntimeOutputMapping} -import cromwell.filesystems.gcs.GcsPathBuilder -import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath - -import java.io.FileNotFoundException -import cromwell.backend.standard.{ - ScriptPreambleData, - StandardAdHocValue, - StandardAsyncExecutionActor, - StandardAsyncExecutionActorParams, - StandardAsyncJob -} +import cromwell.backend.standard._ import cromwell.core._ import cromwell.core.io.IoCommandBuilder import cromwell.core.path.{DefaultPathBuilder, Path} import cromwell.core.retry.SimpleExponentialBackoff import cromwell.filesystems.drs.{DrsPath, DrsResolver} +import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder} +import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder -import cromwell.filesystems.gcs.GcsPath import cromwell.filesystems.http.HttpPath import cromwell.filesystems.sra.SraPath import cromwell.services.instrumentation.CromwellInstrumentation import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey} import cromwell.services.metadata.CallMetadataKeys import mouse.all._ -import shapeless.Coproduct import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.csv.{CSVFormat, CSVPrinter} import org.apache.commons.io.output.ByteArrayOutputStream +import shapeless.Coproduct +import wom.callable.AdHocValue import wom.callable.Callable.OutputDefinition import wom.callable.MetaValueElement.{MetaValueElementBoolean, MetaValueElementObject} -import wom.callable.AdHocValue import wom.core.FullyQualifiedName import wom.expression.{FileEvaluation, NoIoFunctionSet} import wom.values._ -import java.io.OutputStreamWriter +import java.io.{FileNotFoundException, OutputStreamWriter} import java.net.SocketTimeoutException import java.nio.charset.Charset import java.util.Base64 -import java.util.regex.Pattern import scala.concurrent.Future import scala.concurrent.duration._ import scala.io.Source import scala.language.postfixOps -import scala.util.{Failure, Success, Try} import scala.util.control.NoStackTrace +import scala.util.{Failure, Success, Try} object GcpBatchAsyncBackendJobExecutionActor { - private val VM_PREEMPTION_PATTERN = Pattern.compile( - "failed due to the following task event: \"Task state is updated from RUNNING to FAILED on zones/\\S+ due to Spot VM preemption with exit code 50001.\"" - ) - def StandardException(errorCode: GrpcStatus, message: String, jobTag: String, @@ -1117,19 +1103,12 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar None ) - def isPreemption(maybePreemption: RunStatus.UnsuccessfulRunStatus): Boolean = - maybePreemption match { - case _: RunStatus.Failed => - maybePreemption.eventList.exists(e => VM_PREEMPTION_PATTERN.matcher(e.name).find()) - case _ => false - } - Future.fromTry { Try { runStatus match { case _: RunStatus.Aborted => AbortedExecutionHandle - case maybePreemption: RunStatus.UnsuccessfulRunStatus if preemptible && isPreemption(maybePreemption) => - handlePreemption(maybePreemption, returnCode, prettyPrintedError) + case preemption: RunStatus.Preempted if preemptible => + handlePreemption(preemption, returnCode, prettyPrintedError) case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode) case unknown => throw new RuntimeException( diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala index 464cd4862a5..8411827a650 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala @@ -16,6 +16,7 @@ import io.grpc.Status import cromwell.services.cost.InstantiatedVmInfo import cromwell.services.metadata.CallMetadataKeys +import java.util.regex.Pattern import scala.annotation.unused import scala.concurrent.{ExecutionContext, Future, Promise} import scala.jdk.CollectionConverters.ListHasAsScala @@ -27,6 +28,9 @@ trait BatchRequestExecutor { } object BatchRequestExecutor { + private val VM_PREEMPTION_PATTERN = Pattern.compile( + "failed due to the following task event: \"Task state is updated from RUNNING to FAILED on zones/\\S+ due to Spot VM preemption with exit code 50001.\"" + ) class CloudImpl(batchSettings: BatchServiceSettings) extends BatchRequestExecutor with LazyLogging { @@ -131,6 +135,9 @@ object BatchRequestExecutor { } private[request] def interpretOperationStatus(job: Job): RunStatus = { + def isPreemption(events: List[ExecutionEvent]): Boolean = + events.exists(e => VM_PREEMPTION_PATTERN.matcher(e.name).find()) + lazy val events = getEventList( Option(job) .flatMap(e => Option(e.getStatus)) @@ -162,8 +169,12 @@ object BatchRequestExecutor { } else if (job.getStatus.getState == JobStatus.State.RUNNING) { RunStatus.Running(events, instantiatedVmInfo) } else if (job.getStatus.getState == JobStatus.State.FAILED) { - // Status.OK is hardcoded because the request succeeded, we don't have access to the internal response code - RunStatus.Failed(Status.OK, events, instantiatedVmInfo) + if (isPreemption(events)) { + RunStatus.Preempted(Status.OK, events, instantiatedVmInfo) + } else { + // Status.OK is hardcoded because the request succeeded, we don't have access to the internal response code + RunStatus.Failed(Status.OK, events, instantiatedVmInfo) + } } else { RunStatus.Initializing(events, instantiatedVmInfo) } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala index f565692347d..9f20b7bbf42 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala @@ -50,7 +50,6 @@ object RunStatus { override def eventList: Seq[ExecutionEvent] = List.empty } - // TODO: Use this when detecting a preemption or remove it final case class Preempted( errorCode: Status, eventList: Seq[ExecutionEvent], From ba64037e942c8826e21f377dde36e4bfb93d0b03 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Fri, 7 Feb 2025 11:24:56 -0500 Subject: [PATCH 9/9] docs --- docs/RuntimeAttributes.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/RuntimeAttributes.md b/docs/RuntimeAttributes.md index 857e5245dba..061c2dfc5a8 100644 --- a/docs/RuntimeAttributes.md +++ b/docs/RuntimeAttributes.md @@ -328,7 +328,13 @@ runtime { } ``` - +In GCP Batch, preempted jobs can be identified in job metadata (`gcloud batch jobs describe`) by a `statusEvent` with a description that looks like: +``` +Job state is set from RUNNING to FAILED for job projects/abc/locations/us-central1/jobs/job-abc.Job +failed due to task failure. Specifically, task with index 0 failed due to the +following task event: "Task state is updated from RUNNING to FAILED on zones/us-central1-b/instances/8675309 +due to Spot VM preemption with exit code 50001." +``` ### `bootDiskSizeGb`