diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f5fa58aaa8..2d856a116fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional - The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information. - Fixes a bug with not being able to recover jobs on Cromwell restart. - Fixes machine type selection to match the Google Cloud Life Sciences backend, including default n1 non shared-core machine types and correct handling of `cpuPlatform` to select n2 or n2d machine types as appropriate. -- Fixes the preemption error handling, now, the correct error message is printed, this also handles the other potential exit codes. +- Fixes preemption and maxRetries behavior. In particular, once a task has exhausted its allowed preemptible attempts, the task will be scheduled again on a non-preemptible VM. - Fixes error message reporting for failed jobs. - Fixes the "retry with more memory" feature. - Fixes the reference disk feature. diff --git a/centaur/src/main/resources/standardTestCases/checkpointing.test b/centaur/src/main/resources/standardTestCases/checkpointing.test index 89a1d3dc633..9abbfbbdcfb 100644 --- a/centaur/src/main/resources/standardTestCases/checkpointing.test +++ b/centaur/src/main/resources/standardTestCases/checkpointing.test @@ -1,6 +1,6 @@ name: checkpointing testFormat: workflowsuccess -backends: [Papiv2, GCPBATCH] +backends: [Papiv2, GCPBATCH_ALT] files { workflow: checkpointing/checkpointing.wdl diff --git a/centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl b/centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl new file mode 100644 index 00000000000..eccccfcb5e6 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/checkpointing/gcpbatch_checkpointing.wdl @@ -0,0 +1,70 @@ +version 1.0 + +workflow checkpointing { + call count { input: count_to = 100 } + output { + String preempted = count.preempted + } +} + +task count { + input { + Int count_to + } + + meta { + volatile: true + } + + command <<< + # Read from the my_checkpoint file if there's content there: + FROM_CKPT=$(cat my_checkpoint | tail -n1 | awk '{ print $1 }') + FROM_CKPT=${FROM_CKPT:-1} + + # We don't want any single VM run the entire count, so work out the max counter value for this attempt: + MAX="$(($FROM_CKPT + 66))" + + INSTANCE_NAME=$(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") + echo "Discovered instance: $INSTANCE_NAME" + + # Run the counter: + echo '--' >> my_checkpoint + for i in $(seq $FROM_CKPT ~{count_to}) + do + echo $i + echo $i ${INSTANCE_NAME} $(date) >> my_checkpoint + + # If we're over our max, "preempt" the VM by simulating a maintenance event: + if [ "${i}" -gt "${MAX}" ] + then + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) + zone=$(basename "$fully_qualified_zone") + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 + fi + + sleep 1 + done + + # Prove that we got preempted at least once: + FIRST_INSTANCE=$(cat my_checkpoint | head -n1 | awk '{ print $2 }') + LAST_INSTANCE=$(cat my_checkpoint | tail -n1 | awk '{ print $2 }') + if [ "${FIRST_INSTANCE}" != "LAST_INSTANCE" ] + then + echo "GOTPREEMPTED" > preempted.txt + else + echo "NEVERPREEMPTED" > preempted.txt + fi + >>> + + runtime { + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + preemptible: 3 + checkpointFile: "my_checkpoint" + } + + output { + File checkpoint_log = "my_checkpoint" + String preempted = read_string("preempted.txt") + } +} diff --git a/centaur/src/main/resources/standardTestCases/error_10_preemptible.test b/centaur/src/main/resources/standardTestCases/error_10_preemptible.test deleted file mode 100644 index aa2e15b108d..00000000000 --- a/centaur/src/main/resources/standardTestCases/error_10_preemptible.test +++ /dev/null @@ -1,12 +0,0 @@ -name: error_10_preemptible -testFormat: workflowsuccess -# Try to fake a preemption which doesn't seem to work on GCP Batch but probably shouldn't be working on PAPI v2 -backends: [Papiv2, GCPBATCH_TESTING_PAPIV2_QUIRKS] - -files { - workflow: error_10_preemptible/error_10_preemptible.wdl -} - -metadata { - status: Succeeded -} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test b/centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test new file mode 100644 index 00000000000..a683cbd5fcb --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_checkpointing.test @@ -0,0 +1,13 @@ +name: gcpbatch_checkpointing +testFormat: workflowsuccess +backends: [GCPBATCH] + +files { + workflow: checkpointing/gcpbatch_checkpointing.wdl +} + +metadata { + workflowName: checkpointing + status: Succeeded + "outputs.checkpointing.preempted": "GOTPREEMPTED" +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test index d63dc5a7e35..64a379ddc35 100644 --- a/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_delocalization_required_files.test @@ -11,5 +11,5 @@ metadata { "calls.required_files.check_it.executionStatus": "Done" "calls.required_files.do_it.executionStatus": "Failed" "calls.required_files.do_it.retryableFailure": "false" - "calls.required_files.do_it.failures.0.message": ~~"failed" + "calls.required_files.do_it.failures.0.message": ~~"Job exited without an error, exit code 0. Batch error code 0. Job failed with an unknown reason" } diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test new file mode 100644 index 00000000000..0be13eb0ea4 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_papi_preemptible_and_max_retries.test @@ -0,0 +1,13 @@ +name: gcpbatch_papi_preemptible_and_max_retries +testFormat: workflowfailure +backends: [GCPBATCH] + +files { + workflow: papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl +} + +metadata { + workflowName: papi_preemptible_and_max_retries + status: Failed + "papi_preemptible_and_max_retries.delete_self.-1.attempt": 3 +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test new file mode 100644 index 00000000000..dfbcd9fcf41 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_and_memory_retry.test @@ -0,0 +1,28 @@ +name: gcpbatch_preemptible_and_memory_retry +testFormat: workflowfailure +# The original version of this test was tailored to the quirks of Papi v2 in depending on the misdiagnosis of its own +# VM deletion as a preemption event. However GCP Batch perhaps more correctly diagnoses VM deletion as a weird +# non-preemption event. The GCPBATCH version of this test uses `gcloud beta compute instances simulate-maintenance-event` +# to simulate a preemption in a way that GCP Batch actually perceives as a preemption. +backends: [GCPBATCH] + +files { + workflow: retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl + options: retry_with_more_memory/retry_with_more_memory.options +} + +metadata { + workflowName: preemptible_and_memory_retry + status: Failed + "failures.0.message": "Workflow failed" + "failures.0.causedBy.0.message": "stderr for job `preemptible_and_memory_retry.imitate_oom_error_on_preemptible:NA:3` contained one of the `memory-retry-error-keys: [OutOfMemory,Killed]` specified in the Cromwell config. Job might have run out of memory." + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.1.preemptible": "true" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.1.executionStatus": "RetryableFailure" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.1.runtimeAttributes.memory": "1 GB" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.2.preemptible": "false" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.2.executionStatus": "RetryableFailure" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.2.runtimeAttributes.memory": "1 GB" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.3.preemptible": "false" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.3.executionStatus": "Failed" + "preemptible_and_memory_retry.imitate_oom_error_on_preemptible.-1.3.runtimeAttributes.memory": "1.1 GB" +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test new file mode 100644 index 00000000000..accc2ceda51 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_preemptible_basic.test @@ -0,0 +1,11 @@ +name: gcpbatch_preemptible_basic +testFormat: workflowsuccess +backends: [GCPBATCH] + +files { + workflow: preemptible_basic/gcpbatch_preemptible_basic.wdl +} + +metadata { + status: Succeeded +} diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test b/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test index 7df55d2389f..e4b0a2fba2a 100644 --- a/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_requester_pays_localization_negative.test @@ -14,5 +14,5 @@ metadata { workflowName: requester_pays_localization status: Failed "failures.0.message": "Workflow failed" - "failures.0.causedBy.0.message": ~~"failed" + "failures.0.causedBy.0.message": ~~"The job was stopped before the command finished. Batch error code 0. Job failed with an unknown reason" } diff --git a/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test b/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test index fb8847b52cf..cdb626e658b 100644 --- a/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test +++ b/centaur/src/main/resources/standardTestCases/gcpbatch_retry_with_more_memory.test @@ -1,5 +1,5 @@ name: gcpbatch_retry_with_more_memory -testFormat: workflowfailure +testFormat: workflowsuccess backends: [GCPBATCH] files { @@ -9,13 +9,10 @@ files { metadata { workflowName: retry_with_more_memory - status: Failed - "failures.0.message": "Workflow failed" - "failures.0.causedBy.0.message": "stderr for job `retry_with_more_memory.imitate_oom_error:NA:3` contained one of the `memory-retry-error-keys: [OutOfMemory,Killed]` specified in the Cromwell config. Job might have run out of memory." + status: Succeeded "retry_with_more_memory.imitate_oom_error.-1.1.executionStatus": "RetryableFailure" "retry_with_more_memory.imitate_oom_error.-1.1.runtimeAttributes.memory": "1 GB" "retry_with_more_memory.imitate_oom_error.-1.2.executionStatus": "RetryableFailure" "retry_with_more_memory.imitate_oom_error.-1.2.runtimeAttributes.memory": "1.1 GB" - "retry_with_more_memory.imitate_oom_error.-1.3.executionStatus": "Failed" - "retry_with_more_memory.imitate_oom_error.-1.3.runtimeAttributes.memory": "1.2100000000000002 GB" + "outputs.retry_with_more_memory.memory_output": "1.2100000000000002 GB" } diff --git a/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test index 0cfbc9e13d5..74ea22f8cb2 100644 --- a/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test +++ b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries.test @@ -1,7 +1,7 @@ name: papi_preemptible_and_max_retries testFormat: workflowfailure -# faking own preemption doesn't work on GCP Batch -backends: [Papiv2, GCPBATCH_TESTING_PAPIV2_QUIRKS] +# Faking own preemption has to be done differently on GCP Batch +backends: [Papiv2, GCPBATCH_ALT] files { workflow: papi_preemptible_and_max_retries/papi_preemptible_and_max_retries.wdl diff --git a/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl new file mode 100644 index 00000000000..9614d0656de --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/papi_preemptible_and_max_retries/gcpbatch_papi_preemptible_and_max_retries.wdl @@ -0,0 +1,31 @@ +version 1.0 + +task delete_self { + + command { + preemptible=$(curl -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible") + + # Simulate a maintenance event on ourselves if running on a preemptible VM, otherwise delete ourselves. + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) + zone=$(basename "$fully_qualified_zone") + + if [ "$preemptible" = "TRUE" ]; then + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 + else + # We need to actually delete ourselves if the VM is not preemptible; simulated maintenance events don't seem to + # precipitate the demise of on-demand VMs. + gcloud compute instances delete $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + fi + } + + runtime { + preemptible: 1 + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + maxRetries: 1 + } +} + +workflow papi_preemptible_and_max_retries { + call delete_self +} diff --git a/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test b/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test index 1ef397d95ae..58f882e9d26 100644 --- a/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test +++ b/centaur/src/main/resources/standardTestCases/preemptible_and_memory_retry.test @@ -1,8 +1,10 @@ name: preemptible_and_memory_retry testFormat: workflowfailure -# The original version of this test seems to have been tailored to the quirks of Papi v2 in depending on the misdiagnosis of its own VM deletion as a preemption event. GCP Batch perhaps more correctly diagnoses the VM deletion as a weird non-preemption happening, but that frustrates the logic of this test. -# Disabling this as it's not possible to induce a real preemption. -backends: [Papiv2, GCPBATCH_TESTING_PAPIV2_QUIRKS] +# The original version of this test was tailored to the quirks of Papi v2 in depending on the misdiagnosis of its own +# VM deletion as a preemption event. However GCP Batch perhaps more correctly diagnoses VM deletion as a weird +# non-preemption event. The GCPBATCH version of this test uses `gcloud beta compute instances simulate-maintenance-event` +# to simulate a preemption in a way that GCP Batch actually perceives as a preemption. +backends: [Papiv2, GCPBATCH_ALT] files { workflow: retry_with_more_memory/preemptible_and_memory_retry.wdl diff --git a/centaur/src/main/resources/standardTestCases/preemptible_basic.test b/centaur/src/main/resources/standardTestCases/preemptible_basic.test new file mode 100644 index 00000000000..1c77e9265bc --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/preemptible_basic.test @@ -0,0 +1,11 @@ +name: preemptible_basic +testFormat: workflowsuccess +backends: [Papiv2, GCPBATCH_ALT] + +files { + workflow: preemptible_basic/preemptible_basic.wdl +} + +metadata { + status: Succeeded +} diff --git a/centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl b/centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl new file mode 100644 index 00000000000..3217b772a33 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/preemptible_basic/gcpbatch_preemptible_basic.wdl @@ -0,0 +1,33 @@ +version 1.0 + +task delete_self_if_preemptible { + + command <<< + # Prepend date, time and pwd to xtrace log entries. + PS4='\D{+%F %T} \w $ ' + set -o errexit -o nounset -o pipefail -o xtrace + + preemptible=$(curl -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible") + + # Perform a maintenance event on this VM if it is preemptible, which should cause it to be preempted. + # Since `preemptible: 1` the job should be restarted on a non-preemptible VM. + if [ "$preemptible" = "TRUE" ]; then + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) + zone=$(basename "$fully_qualified_zone") + + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 + fi + + >>> + + runtime { + preemptible: 1 + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + } +} + + +workflow preemptible_basic { + call delete_self_if_preemptible +} diff --git a/centaur/src/main/resources/standardTestCases/error_10_preemptible/error_10_preemptible.wdl b/centaur/src/main/resources/standardTestCases/preemptible_basic/preemptible_basic.wdl similarity index 96% rename from centaur/src/main/resources/standardTestCases/error_10_preemptible/error_10_preemptible.wdl rename to centaur/src/main/resources/standardTestCases/preemptible_basic/preemptible_basic.wdl index bac7c838121..3b8bd649210 100644 --- a/centaur/src/main/resources/standardTestCases/error_10_preemptible/error_10_preemptible.wdl +++ b/centaur/src/main/resources/standardTestCases/preemptible_basic/preemptible_basic.wdl @@ -9,7 +9,7 @@ task delete_self_if_preemptible { # Delete self if running on a preemptible VM. This should produce an "error 10" which Cromwell should treat as a preemption. # Since `preemptible: 1` the job should be restarted on a non-preemptible VM. if [ "$preemptible" = "TRUE" ]; then - + fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) zone=$(basename "$fully_qualified_zone") @@ -25,6 +25,6 @@ task delete_self_if_preemptible { } -workflow error_10_preemptible { +workflow preemptible_basic { call delete_self_if_preemptible } diff --git a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl index 43e9f57c9a1..98adc76f03a 100644 --- a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl +++ b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/preemptible_and_memory_retry.wdl @@ -12,13 +12,14 @@ task imitate_oom_error_on_preemptible { preemptible=$(curl -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible") - # Delete self if running on a preemptible VM + # Simulate a maintenance event on ourselves if running on a preemptible VM # Since `preemptible: 1` the job should be restarted on a non-preemptible VM. if [ "$preemptible" = "TRUE" ]; then fully_qualified_zone=$(curl -s -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone) zone=$(basename "$fully_qualified_zone") - gcloud compute instances delete $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + gcloud beta compute instances simulate-maintenance-event $(curl -s "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google") --zone=$zone -q + sleep 60 fi # Should reach here on the second attempt diff --git a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl index c9efea52dd3..2c50ed34c86 100644 --- a/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl +++ b/centaur/src/main/resources/standardTestCases/retry_with_more_memory/gcpbatch/retry_with_more_memory.wdl @@ -2,12 +2,21 @@ version 1.0 task imitate_oom_error { command { - printf "Exception in thread "main" java.lang.OutOfMemoryError: testing\n\tat Test.main(Test.java:1)\n" >&2 && (exit 1) - # As a simulation of an OOM condition, do not create the 'foo' file. Cromwell should still be able to delocalize important detritus. - # touch foo + echo "$MEM_SIZE $MEM_UNIT" + + # Current bashes do not do floating point arithmetic, Python to the rescue. + LESS=$(python -c "print($MEM_SIZE < 1.21)") + + if [[ "$LESS" = "True" ]] + then + printf "Exception in thread "main" java.lang.OutOfMemoryError: testing\n\tat Test.main(Test.java:1)\n" >&2 + exit 1 + fi + + echo "$MEM_SIZE $MEM_UNIT" > memory_output.txt } output { - File foo = "foo" + String memory_output = read_string("memory_output.txt") } runtime { docker: "python:latest" @@ -19,4 +28,8 @@ task imitate_oom_error { workflow retry_with_more_memory { call imitate_oom_error + + output { + String memory_output = imitate_oom_error.memory_output + } } diff --git a/docs/RuntimeAttributes.md b/docs/RuntimeAttributes.md index 857e5245dba..061c2dfc5a8 100644 --- a/docs/RuntimeAttributes.md +++ b/docs/RuntimeAttributes.md @@ -328,7 +328,13 @@ runtime { } ``` - +In GCP Batch, preempted jobs can be identified in job metadata (`gcloud batch jobs describe`) by a `statusEvent` with a description that looks like: +``` +Job state is set from RUNNING to FAILED for job projects/abc/locations/us-central1/jobs/job-abc.Job +failed due to task failure. Specifically, task with index 0 failed due to the +following task event: "Task state is updated from RUNNING to FAILED on zones/us-central1-b/instances/8675309 +due to Spot VM preemption with exit code 50001." +``` ### `bootDiskSizeGb` diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala index c1585ffe9fa..dd59c910e09 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala @@ -7,6 +7,11 @@ import com.google.cloud.batch.v1.BatchServiceSettings import com.google.common.collect.ImmutableMap import com.typesafe.scalalogging.StrictLogging import cromwell.backend._ +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{ + preemptionCountKey, + robustBuildAttributes, + unexpectedRetryCountKey +} import cromwell.backend.google.batch.actors._ import cromwell.backend.google.batch.api.request.{BatchRequestExecutor, RequestHandler} import cromwell.backend.google.batch.authentication.GcpBatchDockerCredentials @@ -30,7 +35,7 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String, ) extends StandardLifecycleActorFactory with GcpPlatform { - import GcpBatchBackendLifecycleActorFactory._ + override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey, unexpectedRetryCountKey) override def jobIdKey: String = "__gcp_batch" protected val googleConfig: GoogleConfiguration = GoogleConfiguration(configurationDescriptor.globalConfig) @@ -133,6 +138,8 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String, } object GcpBatchBackendLifecycleActorFactory extends StrictLogging { + val preemptionCountKey = "PreemptionCount" + val unexpectedRetryCountKey = "UnexpectedRetryCount" private[batch] def robustBuildAttributes(buildAttributes: () => GcpBatchConfigurationAttributes, maxAttempts: Int = 3, diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 9aa98c8da8b..5dc80b89251 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -1,9 +1,11 @@ package cromwell.backend.google.batch.actors +import _root_.io.grpc.{Status => GrpcStatus} import akka.actor.{ActorLogging, ActorRef} import akka.http.scaladsl.model.{ContentType, ContentTypes} import akka.pattern.AskSupport import cats.data.NonEmptyList +import cats.data.Validated.{Invalid, Valid} import cats.implicits._ import com.google.cloud.batch.v1.JobName import com.google.cloud.storage.contrib.nio.CloudStorageOptions @@ -14,8 +16,10 @@ import cromwell.backend.async.{ AbortedExecutionHandle, ExecutionHandle, FailedNonRetryableExecutionHandle, + FailedRetryableExecutionHandle, PendingExecutionHandle } +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory import cromwell.backend.google.batch.api.GcpBatchRequestFactory._ import cromwell.backend.google.batch.io._ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration @@ -25,42 +29,33 @@ import cromwell.backend.google.batch.models._ import cromwell.backend.google.batch.monitoring.{BatchInstrumentation, CheckpointingConfiguration, MonitoringImage} import cromwell.backend.google.batch.runnable.WorkflowOptionKeys import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, RuntimeOutputMapping} -import cromwell.filesystems.gcs.GcsPathBuilder -import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath - -import java.io.FileNotFoundException -import cromwell.backend.standard.{ - ScriptPreambleData, - StandardAdHocValue, - StandardAsyncExecutionActor, - StandardAsyncExecutionActorParams, - StandardAsyncJob -} +import cromwell.backend.standard._ import cromwell.core._ import cromwell.core.io.IoCommandBuilder import cromwell.core.path.{DefaultPathBuilder, Path} import cromwell.core.retry.SimpleExponentialBackoff import cromwell.filesystems.drs.{DrsPath, DrsResolver} +import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder} +import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder -import cromwell.filesystems.gcs.GcsPath import cromwell.filesystems.http.HttpPath import cromwell.filesystems.sra.SraPath import cromwell.services.instrumentation.CromwellInstrumentation -import cromwell.services.keyvalue.KeyValueServiceActor.KvJobKey +import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey} import cromwell.services.metadata.CallMetadataKeys import mouse.all._ -import shapeless.Coproduct import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.csv.{CSVFormat, CSVPrinter} import org.apache.commons.io.output.ByteArrayOutputStream +import shapeless.Coproduct +import wom.callable.AdHocValue import wom.callable.Callable.OutputDefinition import wom.callable.MetaValueElement.{MetaValueElementBoolean, MetaValueElementObject} -import wom.callable.AdHocValue import wom.core.FullyQualifiedName import wom.expression.{FileEvaluation, NoIoFunctionSet} import wom.values._ -import java.io.OutputStreamWriter +import java.io.{FileNotFoundException, OutputStreamWriter} import java.net.SocketTimeoutException import java.nio.charset.Charset import java.util.Base64 @@ -68,13 +63,25 @@ import scala.concurrent.Future import scala.concurrent.duration._ import scala.io.Source import scala.language.postfixOps -import scala.util.{Failure, Success, Try} import scala.util.control.NoStackTrace +import scala.util.{Failure, Success, Try} object GcpBatchAsyncBackendJobExecutionActor { - def StandardException(message: String, jobTag: String): Exception = - new Exception(s"Task $jobTag failed: $message") + def StandardException(errorCode: GrpcStatus, + message: String, + jobTag: String, + returnCodeOption: Option[Int], + stderrPath: Path + ): Exception = { + val returnCodeMessage = returnCodeOption match { + case Some(returnCode) if returnCode == 0 => "Job exited without an error, exit code 0." + case Some(returnCode) => s"Job exit code $returnCode. Check $stderrPath for more information." + case None => "The job was stopped before the command finished." + } + + new Exception(s"Task $jobTag failed. $returnCodeMessage Batch error code ${errorCode.getCode.value}. $message") + } // GCS path regexes comments: // - The (?s) option at the start makes '.' expression to match any symbol, including '\n' @@ -175,6 +182,15 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override def dockerImageUsed: Option[String] = Option(jobDockerImage) + // Need to add previousRetryReasons and preemptible in order to get preemptible to work in the tests + protected val previousRetryReasons: ErrorOr[PreviousRetryReasons] = + PreviousRetryReasons.tryApply(jobDescriptor.prefetchedKvStoreEntries, jobDescriptor.key.attempt) + + override lazy val preemptible: Boolean = previousRetryReasons match { + case Valid(PreviousRetryReasons(p, _)) => p < maxPreemption + case _ => false + } + override def tryAbort(job: StandardAsyncJob): Unit = abortJob(workflowId = workflowId, jobName = JobName.parse(job.jobId), @@ -644,6 +660,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar projectId = googleProject(jobDescriptor.workflowDescriptor), computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor), googleLabels = backendLabels ++ customLabels, + preemptible = preemptible, batchTimeout = batchConfiguration.batchTimeout, jobShell = batchConfiguration.jobShell, privateDockerKeyAndEncryptedToken = dockerKeyAndToken, @@ -1067,13 +1084,20 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case _ => false } - // returnCode is provided by cromwell, so far, this is empty for all the tests I ran + private lazy val standardPaths = jobPaths.standardPaths + override def handleExecutionFailure(runStatus: RunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = { - def handleFailedRunStatus(runStatus: RunStatus.UnsuccessfulRunStatus): ExecutionHandle = + val prettyPrintedError = "Job failed with an unknown reason" + + // Inner function: Handles a 'Failed' runStatus (or Preempted if preemptible was false) + def handleFailedRunStatus(runStatus: RunStatus.UnsuccessfulRunStatus, returnCode: Option[Int]): ExecutionHandle = FailedNonRetryableExecutionHandle( StandardException( - message = runStatus.prettyPrintedError, - jobTag = jobTag + runStatus.errorCode, + prettyPrintedError, + jobTag, + returnCode, + standardPaths.error ), returnCode, None @@ -1082,17 +1106,71 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar Future.fromTry { Try { runStatus match { - case RunStatus.Aborted(_, _) => AbortedExecutionHandle - case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus) + case _: RunStatus.Aborted => AbortedExecutionHandle + case preemption: RunStatus.Preempted if preemptible => + handlePreemption(preemption, returnCode, prettyPrintedError) + case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode) case unknown => throw new RuntimeException( - s"Method handleExecutionFailure not called with RunStatus.Failed or RunStatus.Preempted. Instead got $unknown" + s"handleExecutionFailure not called with RunStatus.Failed or RunStatus.Preempted. Instead got $unknown" ) } } } } + private def nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p: Int, ur: Int): Seq[KvPair] = + Seq( + KvPair(ScopedKey(workflowId, futureKvJobKey, GcpBatchBackendLifecycleActorFactory.unexpectedRetryCountKey), + ur.toString + ), + KvPair(ScopedKey(workflowId, futureKvJobKey, GcpBatchBackendLifecycleActorFactory.preemptionCountKey), p.toString) + ) + + private def handlePreemption( + runStatus: RunStatus.UnsuccessfulRunStatus, + jobReturnCode: Option[Int], + prettyPrintedError: String + ): ExecutionHandle = { + import common.numeric.IntegerUtil._ + + val errorCode: GrpcStatus = runStatus.errorCode + previousRetryReasons match { + case Valid(PreviousRetryReasons(p, ur)) => + val thisPreemption = p + 1 + val taskName = s"${workflowDescriptor.id}:${call.localName}" + val baseMsg = s"Task $taskName was preempted for the ${thisPreemption.toOrdinal} time." + + val preemptionAndUnexpectedRetryCountsKvPairs = + nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(thisPreemption, ur) + if (thisPreemption < maxPreemption) { + // Increment preemption count and unexpectedRetryCount stays the same + val msg = + s"$baseMsg The call will be restarted with another preemptible VM (max preemptible attempts number is " + + s"$maxPreemption). Error code $errorCode.$prettyPrintedError" + FailedRetryableExecutionHandle( + StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error), + jobReturnCode, + kvPairsToSave = Option(preemptionAndUnexpectedRetryCountsKvPairs) + ) + } else { + val msg = s"$baseMsg The maximum number of preemptible attempts ($maxPreemption) has been reached. The " + + s"call will be restarted with a non-preemptible VM. Error code $errorCode.$prettyPrintedError)" + FailedRetryableExecutionHandle( + StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error), + jobReturnCode, + kvPairsToSave = Option(preemptionAndUnexpectedRetryCountsKvPairs) + ) + } + case Invalid(_) => + FailedNonRetryableExecutionHandle( + StandardException(errorCode, prettyPrintedError, jobTag, jobReturnCode, standardPaths.error), + jobReturnCode, + None + ) + } + } + override lazy val startMetadataKeyValues: Map[String, Any] = super[GcpBatchJobCachingActorHelper].startMetadataKeyValues diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala index bc97ab24cde..620bd880bdf 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala @@ -25,6 +25,10 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { batchConfiguration.runtimeConfig ) + lazy val maxPreemption: Int = runtimeAttributes.preemptible + + def preemptible: Boolean + lazy val workingDisk: GcpBatchAttachedDisk = runtimeAttributes.disks.find(_.name == GcpBatchWorkingDisk.Name).get lazy val callRootPath: Path = gcpBatchCallPaths.callExecutionRoot @@ -71,9 +75,10 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { .get(WorkflowOptionKeys.GoogleProject) .getOrElse(batchAttributes.project) - Map[String, String]( + Map[String, Any]( GcpBatchMetadataKeys.GoogleProject -> googleProject, - GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString + GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString, + "preemptible" -> preemptible ) ++ originalLabelEvents } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala index 7c203a8c3ae..443cdff8ded 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala @@ -84,6 +84,7 @@ object GcpBatchRequestFactory { projectId: String, computeServiceAccount: String, googleLabels: Seq[GcpLabel], + preemptible: Boolean, batchTimeout: FiniteDuration, jobShell: String, privateDockerKeyAndEncryptedToken: Option[CreateBatchDockerKeyAndToken], diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala index 0ad12904c06..090f0f6f5ee 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala @@ -126,7 +126,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe private def createTaskSpec(runnables: List[Runnable], computeResource: ComputeResource, - retryCount: Int, durationInSeconds: Long, volumes: List[Volume] ) = @@ -134,7 +133,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe .addAllRunnables(runnables.asJava) .setComputeResource(computeResource) .addAllVolumes(volumes.asJava) - .setMaxRetryCount(retryCount) .setMaxRunDuration( Duration.newBuilder .setSeconds(durationInSeconds) @@ -180,7 +178,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe val createParameters = data.createParameters val runtimeAttributes = createParameters.runtimeAttributes - val retryCount = runtimeAttributes.preemptible val allDisksToBeMounted: Seq[GcpBatchAttachedDisk] = createParameters.disks ++ createParameters.referenceDisksForLocalizationOpt.getOrElse(List.empty) val gcpBootDiskSizeMb = convertGbToMib(runtimeAttributes) @@ -221,7 +218,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe val taskCount: Long = 1 // parse preemption value and set value for Spot. Spot is replacement for preemptible - val spotModel = toProvisioningModel(runtimeAttributes.preemptible) + val spotModel = toProvisioningModel(createParameters.preemptible) // Set GPU accelerators val accelerators = runtimeAttributes.gpuResource.map(toAccelerator) @@ -260,7 +257,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe ) val computeResource = createComputeResource(cpuCores, memory, gcpBootDiskSizeMb) - val taskSpec = createTaskSpec(sortedRunnables, computeResource, retryCount, durationInSeconds, allVolumes) + val taskSpec = createTaskSpec(sortedRunnables, computeResource, durationInSeconds, allVolumes) val taskGroup: TaskGroup = createTaskGroup(taskCount, taskSpec) val machineType = GcpBatchMachineConstraints.machineType(runtimeAttributes.memory, runtimeAttributes.cpu, diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala index 38ebc66cf49..8411827a650 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala @@ -10,11 +10,13 @@ import cromwell.backend.google.batch.actors.BatchApiAbortClient.{ } import cromwell.backend.google.batch.api.BatchApiRequestManager._ import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse} -import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus} +import cromwell.backend.google.batch.models.RunStatus import cromwell.core.ExecutionEvent +import io.grpc.Status import cromwell.services.cost.InstantiatedVmInfo import cromwell.services.metadata.CallMetadataKeys +import java.util.regex.Pattern import scala.annotation.unused import scala.concurrent.{ExecutionContext, Future, Promise} import scala.jdk.CollectionConverters.ListHasAsScala @@ -26,6 +28,9 @@ trait BatchRequestExecutor { } object BatchRequestExecutor { + private val VM_PREEMPTION_PATTERN = Pattern.compile( + "failed due to the following task event: \"Task state is updated from RUNNING to FAILED on zones/\\S+ due to Spot VM preemption with exit code 50001.\"" + ) class CloudImpl(batchSettings: BatchServiceSettings) extends BatchRequestExecutor with LazyLogging { @@ -122,7 +127,7 @@ object BatchRequestExecutor { } catch { // A job can't be cancelled but deleted, which is why we consider 404 status as the job being cancelled successfully case apiException: ApiException if apiException.getStatusCode.getCode == StatusCode.Code.NOT_FOUND => - BatchApiResponse.StatusQueried(RunStatus.Aborted(Seq.empty)) + BatchApiResponse.StatusQueried(RunStatus.Aborted(io.grpc.Status.NOT_FOUND)) // We don't need to detect preemptible VMs because that's handled automatically by GCP case apiException: ApiException if apiException.getStatusCode.getCode == StatusCode.Code.RESOURCE_EXHAUSTED => @@ -130,6 +135,9 @@ object BatchRequestExecutor { } private[request] def interpretOperationStatus(job: Job): RunStatus = { + def isPreemption(events: List[ExecutionEvent]): Boolean = + events.exists(e => VM_PREEMPTION_PATTERN.matcher(e.name).find()) + lazy val events = getEventList( Option(job) .flatMap(e => Option(e.getStatus)) @@ -137,7 +145,6 @@ object BatchRequestExecutor { .map(_.asScala.toList) .getOrElse(List.empty) ) - lazy val exitCode = findBatchExitCode(events) // Get vm info for this job val allocationPolicy = job.getAllocationPolicy @@ -162,17 +169,17 @@ object BatchRequestExecutor { } else if (job.getStatus.getState == JobStatus.State.RUNNING) { RunStatus.Running(events, instantiatedVmInfo) } else if (job.getStatus.getState == JobStatus.State.FAILED) { - RunStatus.Failed(exitCode, events, instantiatedVmInfo) + if (isPreemption(events)) { + RunStatus.Preempted(Status.OK, events, instantiatedVmInfo) + } else { + // Status.OK is hardcoded because the request succeeded, we don't have access to the internal response code + RunStatus.Failed(Status.OK, events, instantiatedVmInfo) + } } else { RunStatus.Initializing(events, instantiatedVmInfo) } } - private def findBatchExitCode(events: List[ExecutionEvent]): Option[GcpBatchExitCode] = - events.flatMap { e => - GcpBatchExitCode.fromEventMessage(e.name.toLowerCase) - }.headOption - private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = { val startedRegex = ".*SCHEDULED to RUNNING.*".r val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala deleted file mode 100644 index 9ef9d7c88db..00000000000 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchExitCode.scala +++ /dev/null @@ -1,37 +0,0 @@ -package cromwell.backend.google.batch.models - -sealed abstract class GcpBatchExitCode(val code: Int) extends Product with Serializable - -/** - * Represents the possible exit codes from Batch. - * - * See: https://cloud.google.com/batch/docs/troubleshooting#reserved-exit-codes - */ -object GcpBatchExitCode { - case object VMPreemption extends GcpBatchExitCode(50001) - - case object VMReportingTimeout extends GcpBatchExitCode(50002) - - case object VMRebootedDuringExecution extends GcpBatchExitCode(50003) - - case object VMAndTaskAreUnresponsive extends GcpBatchExitCode(50004) - - case object TaskRunsOverMaximumRuntime extends GcpBatchExitCode(50005) - - case object VMRecreatedDuringExecution extends GcpBatchExitCode(50006) - - val values: List[GcpBatchExitCode] = List( - VMPreemption, - VMReportingTimeout, - VMRebootedDuringExecution, - VMAndTaskAreUnresponsive, - TaskRunsOverMaximumRuntime, - VMRecreatedDuringExecution - ) - - def fromEventMessage(message: String): Option[GcpBatchExitCode] = - values.find { target => - message.contains(s"exit code ${target.code}") - } - -} diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala index 7e908dd92af..c346297e5f3 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala @@ -51,7 +51,6 @@ final case class GcpBatchRuntimeAttributes(cpu: Int Refined Positive, object GcpBatchRuntimeAttributes { val ZonesKey = "zones" - private val ZonesDefaultValue = WomString("us-central1-b") val PreemptibleKey = "preemptible" diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala new file mode 100644 index 00000000000..0bd3c1e1535 --- /dev/null +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/PreviousRetryReasons.scala @@ -0,0 +1,44 @@ +package cromwell.backend.google.batch.models + +import cats.syntax.apply._ +import cats.syntax.validated._ +import common.validation.ErrorOr.ErrorOr +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{preemptionCountKey, unexpectedRetryCountKey} +import cromwell.services.keyvalue.KeyValueServiceActor._ + +import scala.util.{Failure, Success, Try} + +case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int) + +object PreviousRetryReasons { + + def tryApply(prefetchedKvEntries: Map[String, KvResponse], attemptNumber: Int): ErrorOr[PreviousRetryReasons] = { + val validatedPreemptionCount = validatedKvResponse(prefetchedKvEntries.get(preemptionCountKey), preemptionCountKey) + val validatedUnexpectedRetryCount = + validatedKvResponse(prefetchedKvEntries.get(unexpectedRetryCountKey), unexpectedRetryCountKey) + + (validatedPreemptionCount, validatedUnexpectedRetryCount) mapN PreviousRetryReasons.apply + } + + def apply(knownPreemptedCount: Int, knownUnexpectedRetryCount: Int, attempt: Int): PreviousRetryReasons = { + // If we have anything unaccounted for, we can top up the unexpected retry count. + // NB: 'attempt' is 1-indexed, so, magic number: + // NB2: for sanity's sake, I won't let this unaccounted for drop below 0, just in case... + val unaccountedFor = Math.max(attempt - 1 - knownPreemptedCount - knownUnexpectedRetryCount, 0) + PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor) + } + + private def validatedKvResponse(r: Option[KvResponse], fromKey: String): ErrorOr[Int] = r match { + case Some(KvPair(_, v)) => validatedInt(v, fromKey) + case Some(_: KvKeyLookupFailed) => 0.validNel + case Some(KvFailure(_, failure)) => s"Failed to get key $fromKey: ${failure.getMessage}".invalidNel + case Some(_: KvPutSuccess) => s"Programmer Error: Got a KvPutSuccess from a Get request...".invalidNel + case None => s"Programmer Error: Engine made no effort to prefetch $fromKey".invalidNel + } + + private def validatedInt(s: String, fromKey: String): ErrorOr[Int] = + Try(s.toInt) match { + case Success(i) => i.validNel + case Failure(_) => s"Unexpected value found in the KV store: $fromKey='$s'".invalidNel + } +} diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala index 80c1b63f405..9f20b7bbf42 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala @@ -1,6 +1,7 @@ package cromwell.backend.google.batch.models import cromwell.core.ExecutionEvent +import io.grpc.Status import cromwell.services.cost.InstantiatedVmInfo sealed trait RunStatus { @@ -31,49 +32,29 @@ object RunStatus { } sealed trait UnsuccessfulRunStatus extends TerminalRunStatus { - val exitCode: Option[GcpBatchExitCode] - val prettyPrintedError: String + val errorCode: Status } final case class Failed( - exitCode: Option[GcpBatchExitCode], + errorCode: Status, eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty ) extends UnsuccessfulRunStatus { override def toString = "Failed" - - override val prettyPrintedError: String = - exitCode match { - case Some(code) => - code match { - case GcpBatchExitCode.VMPreemption => "A Spot VM for the job was preempted during run time" - case GcpBatchExitCode.VMReportingTimeout => - "There was a timeout in the backend that caused a VM for the job to no longer receive updates" - case GcpBatchExitCode.VMRebootedDuringExecution => "A VM for the job unexpectedly rebooted during run time" - case GcpBatchExitCode.VMAndTaskAreUnresponsive => - "A task reached the unresponsive time limit and cannot be cancelled" - case GcpBatchExitCode.TaskRunsOverMaximumRuntime => - "A task's run time exceeded the time limit specified in the maxRunDuration, or, a runnable's run time exceeded the time limit specified in the timeout" - case GcpBatchExitCode.VMRecreatedDuringExecution => - "A VM for a job is unexpectedly recreated during run time" - } - case None => - // Take the last event as that is more likely to be indicative of what killed the job than the first event. - eventList.lastOption - .map(_.name) - .getOrElse( - "The job has failed but the exit code couldn't be derived, there isn't an event message either, please review the logs and report a bug" - ) - } } - final case class Aborted(eventList: Seq[ExecutionEvent], - instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty - ) extends UnsuccessfulRunStatus { + final case class Aborted(errorCode: Status, instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty) + extends UnsuccessfulRunStatus { override def toString = "Aborted" - override val exitCode: Option[GcpBatchExitCode] = None + override def eventList: Seq[ExecutionEvent] = List.empty + } - override val prettyPrintedError: String = "The job was aborted" + final case class Preempted( + errorCode: Status, + eventList: Seq[ExecutionEvent], + instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty + ) extends UnsuccessfulRunStatus { + override def toString = "Preempted" } } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala index d69502295cd..09208b96865 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/RunnableBuilder.scala @@ -6,6 +6,7 @@ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsT import cromwell.backend.google.batch.models.{BatchParameter, GcpBatchInput, GcpBatchOutput} import cromwell.core.path.Path import mouse.all.anySyntaxMouse +import wom.format.MemorySize import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.jdk.CollectionConverters._ @@ -147,7 +148,8 @@ object RunnableBuilder { scriptContainerPath: String, jobShell: String, volumes: List[Volume], - dockerhubCredentials: (String, String) + dockerhubCredentials: (String, String), + memory: MemorySize ): Runnable.Builder = { val container = (dockerhubCredentials._1, dockerhubCredentials._2) match { @@ -164,9 +166,20 @@ object RunnableBuilder { .setEntrypoint(jobShell) .addCommands(scriptContainerPath) } + + // adding memory as environment variables makes it easy for a user to retrieve the new value of memory + // on the machine to utilize in their command blocks if needed + val environment = + Environment + .newBuilder() + .putAllVariables( + Map("MEM_UNIT" -> memory.unit.toString, "MEM_SIZE" -> memory.amount.toString).asJava + ) + Runnable .newBuilder() .setContainer(container) + .setEnvironment(environment) .withVolumes(volumes) .putLabels(Key.Tag, Value.UserRunnable) } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala index cba665dbf9e..05b6334ccb4 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/UserRunnable.scala @@ -12,7 +12,8 @@ trait UserRunnable { scriptContainerPath = createParameters.commandScriptContainerPath.pathAsString, jobShell = "/bin/bash", volumes = volumes, - dockerhubCredentials = createParameters.dockerhubCredentials + dockerhubCredentials = createParameters.dockerhubCredentials, + memory = createParameters.runtimeAttributes.memory ) val describeRunnable = RunnableBuilder.describeDocker("user runnable", userRunnable) diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala index 239424ed73f..1698c68192e 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/BatchUtilityConversions.scala @@ -36,10 +36,8 @@ trait BatchUtilityConversions { (memory.amount * 1024).toLong // set Standard or Spot instances - def toProvisioningModel(preemption: Int): ProvisioningModel = preemption compare 0 match { - case 0 => ProvisioningModel.STANDARD - case 1 => ProvisioningModel.SPOT - } + def toProvisioningModel(preemptible: Boolean): ProvisioningModel = + if (preemptible) ProvisioningModel.SPOT else ProvisioningModel.STANDARD def toDisks(disks: Seq[GcpBatchAttachedDisk]): List[AttachedDisk] = disks.map(toDisk).toList diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala index f95dd13595e..12ddd8f8f47 100644 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala @@ -1,6 +1,7 @@ package cromwell.backend.google.batch package actors +import _root_.io.grpc.Status import _root_.wdl.draft2.model._ import akka.actor.{ActorRef, Props} import akka.testkit.{ImplicitSender, TestActorRef, TestDuration, TestProbe} @@ -14,6 +15,8 @@ import common.collections.EnhancedCollections._ import common.mock.MockSugar import cromwell.backend.BackendJobExecutionActor.BackendJobExecutionResponse import cromwell.backend._ +import cromwell.backend.async.AsyncBackendJobExecutionActor.{Execute, ExecutionMode} +import cromwell.backend.async.{ExecutionHandle, FailedNonRetryableExecutionHandle} import cromwell.backend.google.batch.actors.GcpBatchAsyncBackendJobExecutionActor.GcpBatchPendingExecutionHandle import cromwell.backend.google.batch.api.GcpBatchRequestFactory import cromwell.backend.google.batch.io.{DiskType, GcpBatchWorkingDisk} @@ -34,7 +37,10 @@ import cromwell.core.logging.JobLogger import cromwell.core.path.{DefaultPathBuilder, PathBuilder} import cromwell.filesystems.drs.DrsPathBuilder import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder, MockGcsPathBuilder} +import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage +import cromwell.services.instrumentation.{CromwellBucket, CromwellIncrement} import cromwell.services.keyvalue.InMemoryKvServiceActor +import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey} import cromwell.services.metadata.CallMetadataKeys import cromwell.services.metadata.MetadataService.PutMetadataAction import cromwell.services.metrics.bard.BardEventing.BardEventRequest @@ -63,6 +69,7 @@ import java.time.temporal.ChronoUnit import java.util.UUID import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.language.postfixOps import scala.util.Success class GcpBatchAsyncBackendJobExecutionActorSpec @@ -220,8 +227,130 @@ class GcpBatchAsyncBackendJobExecutionActorSpec |} """.stripMargin + private def buildPreemptibleJobDescriptor(preemptible: Int, + previousPreemptions: Int, + previousUnexpectedRetries: Int, + failedRetriesCountOpt: Option[Int] = None + ): BackendJobDescriptor = { + val attempt = previousPreemptions + previousUnexpectedRetries + 1 + val wdlNamespace = WdlNamespaceWithWorkflow + .load(YoSup.replace("[PREEMPTIBLE]", s"preemptible: $preemptible"), Seq.empty[Draft2ImportResolver]) + .get + val womDefinition = wdlNamespace.workflow + .toWomWorkflowDefinition(isASubworkflow = false) + .getOrElse(fail("failed to get WomDefinition from WdlWorkflow")) + + wdlNamespace.toWomExecutable(Option(Inputs.toJson.compactPrint), NoIoFunctionSet, strictValidation = true) match { + case Right(womExecutable) => + val inputs = for { + combined <- womExecutable.resolvedExecutableInputs + (port, resolvedInput) = combined + value <- resolvedInput.select[WomValue] + } yield port -> value + + val workflowDescriptor = BackendWorkflowDescriptor( + WorkflowId.randomId(), + womDefinition, + inputs, + NoOptions, + Labels.empty, + HogGroup("foo"), + List.empty, + None + ) + + val job = workflowDescriptor.callable.taskCallNodes.head + val key = BackendJobDescriptorKey(job, None, attempt) + val runtimeAttributes = makeRuntimeAttributes(job) + val prefetchedKvEntries = Map( + GcpBatchBackendLifecycleActorFactory.preemptionCountKey -> KvPair( + ScopedKey(workflowDescriptor.id, KvJobKey(key), GcpBatchBackendLifecycleActorFactory.preemptionCountKey), + previousPreemptions.toString + ), + GcpBatchBackendLifecycleActorFactory.unexpectedRetryCountKey -> KvPair( + ScopedKey(workflowDescriptor.id, + KvJobKey(key), + GcpBatchBackendLifecycleActorFactory.unexpectedRetryCountKey + ), + previousUnexpectedRetries.toString + ) + ) + val prefetchedKvEntriesUpd = if (failedRetriesCountOpt.isEmpty) { + prefetchedKvEntries + } else { + prefetchedKvEntries + (BackendLifecycleActorFactory.FailedRetryCountKey -> KvPair( + ScopedKey(workflowDescriptor.id, KvJobKey(key), BackendLifecycleActorFactory.FailedRetryCountKey), + failedRetriesCountOpt.get.toString + )) + } + BackendJobDescriptor(workflowDescriptor, + key, + runtimeAttributes, + fqnWdlMapToDeclarationMap(Inputs), + NoDocker, + None, + prefetchedKvEntriesUpd + ) + case Left(badtimes) => fail(badtimes.toList.mkString(", ")) + } + } + + private def executionActor(jobDescriptor: BackendJobDescriptor, + promise: Promise[BackendJobExecutionResponse], + batchSingletonActor: ActorRef, + shouldBePreemptible: Boolean, + serviceRegistryActor: ActorRef, + referenceInputFilesOpt: Option[Set[GcpBatchInput]] + ): ActorRef = { + + val job = generateStandardAsyncJob + val run = Run(job) + val handle = new GcpBatchPendingExecutionHandle(jobDescriptor, run.job, Option(run), None) + + class ExecuteOrRecoverActor + extends TestableGcpBatchJobExecutionActor(jobDescriptor, + promise, + gcpBatchConfiguration, + batchSingletonActor = batchSingletonActor, + serviceRegistryActor = serviceRegistryActor + ) { + override def executeOrRecover(mode: ExecutionMode)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { + sendIncrementMetricsForReferenceFiles(referenceInputFilesOpt) + + if (preemptible == shouldBePreemptible) Future.successful(handle) + else Future.failed(new Exception(s"Test expected preemptible to be $shouldBePreemptible but got $preemptible")) + } + } + + system.actorOf(Props(new ExecuteOrRecoverActor), "ExecuteOrRecoverActor-" + UUID.randomUUID) + } + + def buildPreemptibleTestActorRef(attempt: Int, + preemptible: Int, + failedRetriesCountOpt: Option[Int] = None + ): TestActorRef[TestableGcpBatchJobExecutionActor] = { + // For this test we say that all previous attempts were preempted: + val jobDescriptor = buildPreemptibleJobDescriptor(preemptible, + attempt - 1, + previousUnexpectedRetries = 0, + failedRetriesCountOpt = failedRetriesCountOpt + ) + val props = Props( + new TestableGcpBatchJobExecutionActor(jobDescriptor, + Promise(), + gcpBatchConfiguration, + TestableGcpBatchExpressionFunctions, + emptyActor, + failIoActor + ) + ) + TestActorRef(props, s"TestableGcpBatchJobExecutionActor-${jobDescriptor.workflowDescriptor.id}") + } + behavior of "GcpBatchAsyncBackendJobExecutionActor" + private val timeout = 25 seconds + it should "group files by bucket" in { def makeInput(bucket: String, name: String): GcpBatchFileInput = { @@ -302,6 +431,104 @@ class GcpBatchAsyncBackendJobExecutionActorSpec s"drs://drs.example.org/aaa,$MountPoint/path/to/aaa.bai\r\ndrs://drs.example.org/bbb,$MountPoint/path/to/bbb.bai\r\n" } + it should "send proper value for \"number of reference files used gauge\" metric, or don't send anything if reference disks feature is disabled" in { + + val expectedInput1 = GcpBatchFileInput(name = "testfile1", + relativeHostPath = + DefaultPathBuilder.build(Paths.get(s"test/reference/path/file1")), + mount = null, + cloudPath = null + ) + val expectedInput2 = GcpBatchFileInput(name = "testfile2", + relativeHostPath = + DefaultPathBuilder.build(Paths.get(s"test/reference/path/file2")), + mount = null, + cloudPath = null + ) + val expectedReferenceInputFiles = Set[GcpBatchInput](expectedInput1, expectedInput2) + + val expectedMsg1 = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, NonEmptyList.of("referencefiles", expectedInput1.relativeHostPath.pathAsString)) + ) + ) + val expectedMsg2 = InstrumentationServiceMessage( + CromwellIncrement( + CromwellBucket(List.empty, NonEmptyList.of("referencefiles", expectedInput2.relativeHostPath.pathAsString)) + ) + ) + + val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0) + val serviceRegistryProbe = TestProbe() + + val backend1 = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + referenceInputFilesOpt = Option(expectedReferenceInputFiles) + ) + backend1 ! Execute + serviceRegistryProbe.expectMsgAllOf(expectedMsg1, expectedMsg2) + + val backend2 = executionActor( + jobDescriptor, + Promise[BackendJobExecutionResponse](), + TestProbe().ref, + shouldBePreemptible = false, + serviceRegistryActor = serviceRegistryProbe.ref, + referenceInputFilesOpt = None + ) + backend2 ! Execute + serviceRegistryProbe.expectNoMessage(timeout) + } + + it should "not restart 2 of 1 unexpected shutdowns without another preemptible VM" in { + + val actorRef = buildPreemptibleTestActorRef(2, 1) + val batchBackend = actorRef.underlyingActor + val runId = generateStandardAsyncJob + val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None) + + val failedStatus = RunStatus.Failed( + Status.ABORTED, + Seq.empty + ) + val executionResult = batchBackend.handleExecutionResult(failedStatus, handle) + val result = Await.result(executionResult, timeout) + result.isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + val failedHandle = result.asInstanceOf[FailedNonRetryableExecutionHandle] + failedHandle.returnCode shouldBe None + } + + it should "handle Failure Status for various errors" in { + + val actorRef = buildPreemptibleTestActorRef(1, 1) + val batchBackend = actorRef.underlyingActor + val runId = generateStandardAsyncJob + val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None) + + def checkFailedResult(errorCode: Status, errorMessage: Option[String]): ExecutionHandle = { + val failed = RunStatus.Failed( + errorCode, + Seq.empty + ) + Await.result(batchBackend.handleExecutionResult(failed, handle), timeout) + } + + checkFailedResult(Status.ABORTED, Option("15: Other type of error.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.OUT_OF_RANGE, Option("14: Wrong errorCode.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.ABORTED, Option("Weird error message.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.ABORTED, Option("UnparsableInt: Even weirder error message.")) + .isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + checkFailedResult(Status.ABORTED, None).isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + actorRef.stop() + } + it should "map GCS paths and *only* GCS paths to local" in { val wdlString = s"""|workflow wf { @@ -1135,7 +1362,31 @@ class GcpBatchAsyncBackendJobExecutionActorSpec "gs://path/to/gcs_root/w/e6236763-c518-41d0-9688-432549a8bf7d/call-B/shard-2/B-2.log" } + it should "return preemptible = true only in the correct cases" in { + + def attempt(max: Int, attempt: Int): GcpBatchAsyncBackendJobExecutionActor = + buildPreemptibleTestActorRef(attempt, max).underlyingActor + def attempt1(max: Int) = attempt(max, 1) + def attempt2(max: Int) = attempt(max, 2) + + val descriptorWithMax0AndKey1 = attempt1(max = 0) + descriptorWithMax0AndKey1.preemptible shouldBe false + + val descriptorWithMax1AndKey1 = attempt1(max = 1) + descriptorWithMax1AndKey1.preemptible shouldBe true + + val descriptorWithMax2AndKey1 = attempt1(max = 2) + descriptorWithMax2AndKey1.preemptible shouldBe true + + val descriptorWithMax1AndKey2 = attempt2(max = 1) + descriptorWithMax1AndKey2.preemptible shouldBe false + + val descriptorWithMax2AndKey2 = attempt2(max = 2) + descriptorWithMax2AndKey2.preemptible shouldBe true + } + it should "return the project from the workflow options in the start metadata" in { + val googleProject = "baa-ram-ewe" val batchGcsRoot = "gs://anorexic/duck" val workflowId = WorkflowId.randomId() @@ -1190,6 +1441,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec "gcpBatch:googleProject" -> googleProject, "labels:cromwell-workflow-id" -> s"cromwell-$workflowId", "labels:wdl-task-name" -> "goodbye", + "preemptible" -> "false", "runtimeAttributes:bootDiskSizeGb" -> "10", "runtimeAttributes:continueOnReturnCode" -> "0", "runtimeAttributes:cpu" -> "1", @@ -1374,9 +1626,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec val pollResult0 = RunStatus.Initializing(Seq.empty) val pollResult1 = RunStatus.Running(Seq(ExecutionEvent("fakeEvent", expectedJobStart))) val pollResult2 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, expectedVmStart))) - val abortStatus = RunStatus.Aborted( - Seq(ExecutionEvent("got aborted", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))) - ) + val abortStatus = RunStatus.Aborted(Status.NOT_FOUND) val serviceRegistryProbe = TestProbe() @@ -1423,4 +1673,9 @@ class GcpBatchAsyncBackendJobExecutionActorSpec evaluatedAttributes.getOrElse(fail("Failed to evaluate runtime attributes")) ) } + + private def generateStandardAsyncJob = + StandardAsyncJob( + JobName.newBuilder().setJob(UUID.randomUUID().toString).setProject("test").setLocation("local").build().toString + ) } diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala deleted file mode 100644 index db5943759a3..00000000000 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchExitCodeSpec.scala +++ /dev/null @@ -1,52 +0,0 @@ -package cromwell.backend.google.batch.models - -import org.scalatest.OptionValues._ -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -class GcpBatchExitCodeSpec extends AnyWordSpec with Matchers { - - "fromEventMessage" should { - "detect VMPreemption error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to Spot Preemption with exit code 50001." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMPreemption) - } - - "detect VMReportingTimeout error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to Batch no longer receives VM updates with exit code 50002." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMReportingTimeout) - } - - "detect VMRebootedDuringExecution error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to VM is rebooted during task execution with exit code 50003." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMRebootedDuringExecution) - } - - "detect VMAndTaskAreUnresponsive error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to tasks cannot be canceled with exit code 50004." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMAndTaskAreUnresponsive) - } - - "detect TaskRunsOverMaximumRuntime error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to task runs over the maximum runtime with exit code 50005." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.TaskRunsOverMaximumRuntime) - } - - "detect VMRecreatedDuringExecution error" in { - val msg = - "Task state is updated from PRE-STATE to FAILED on zones/ZONE/instances/INSTANCE_ID due to VM is recreated during task execution with exit code 50006." - val result = GcpBatchExitCode.fromEventMessage(msg) - result.value should be(GcpBatchExitCode.VMRecreatedDuringExecution) - } - } -}