Skip to content

Commit

Permalink
make cpus, gpus, and memory retryable options
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Nov 12, 2024
1 parent 57e22f3 commit fd8322a
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 36 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Reduce argument clutter with `crew_options_aws_batch()`. Supports direct inputs for CPUs, GPUs, and memory without having to specify a complicated `containerOverrides` list.
* Sanitize job names.
* Use `crashes_error` from `crew`.
* Make `cpus`, `gpus`, and `memory` retryable options.

# crew.aws.batch 0.0.6

Expand Down
37 changes: 19 additions & 18 deletions R/crew_launcher_aws_batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -170,27 +170,24 @@ crew_class_launcher_aws_batch <- R6::R6Class(
region = private$.options_aws_batch$region
)
},
.args_submit = function(call, name) {
container_overrides <- as.list(
private$.options_aws_batch$container_overrides
)
.args_submit = function(call, name, attempt) {
options <- crew_options_slice(private$.options_aws_batch, attempt)
container_overrides <- as.list(options$container_overrides)
container_overrides$command <- list("Rscript", "-e", call)
out <- list(
jobName = crew.aws.batch::crew_aws_batch_job_name(name),
jobQueue = private$.options_aws_batch$job_queue,
shareIdentifier = private$.options_aws_batch$share_identifier,
schedulingPriorityOverride =
private$.options_aws_batch$scheduling_priority_override,
jobDefinition = private$.options_aws_batch$job_definition,
parameters = private$.options_aws_batch$parameters,
jobQueue = options$job_queue,
shareIdentifier = options$share_identifier,
schedulingPriorityOverride = options$scheduling_priority_override,
jobDefinition = options$job_definition,
parameters = options$parameters,
containerOverrides = container_overrides,
nodeOverrides = private$.options_aws_batch$node_overrides,
retryStrategy = private$.options_aws_batch$retry_strategy,
propagateTags = private$.options_aws_batch$propagate_tags,
timeout = private$.options_aws_batch$timeout,
tags = private$.options_aws_batch$tags,
eksPropertiesOverride =
private$.options_aws_batch$eks_properties_override
nodeOverrides = options$node_overrides,
retryStrategy = options$retry_strategy,
propagateTags = options$propagate_tags,
timeout = options$timeout,
tags = options$tags,
eksPropertiesOverride = options$eks_properties_override
)
non_null(out)
}
Expand Down Expand Up @@ -300,7 +297,11 @@ crew_class_launcher_aws_batch <- R6::R6Class(
),
data = list(
args_client = private$.args_client(),
args_submit = private$.args_submit(call = call, name = name)
args_submit = private$.args_submit(
call = call,
name = name,
attempt = self$crashes(index = worker) + 1L
)
)
)
# nocov end
Expand Down
40 changes: 31 additions & 9 deletions R/crew_options_aws_batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
#' @keywords plugin_aws_batch
#' @description Options for the AWS Batch controller.
#' @return A classed list of options for the controller.
#' @section Retryable options:
#' Arguments `cpus`, `gpus`, and `memory` are retryable options.
#' Each of these arguments be a vector where each successive element is
#' used during a retry if the worker previously exited without
#' completing all its assigned tasks.
#' The last element of the vector is used if there are more retries than
#' the length of the vector.
#' Control the number of allowable retries with `crashes_error`
#' argument of the controller.
#' @param job_definition Character of length 1, name of the AWS
#' Batch job definition to use. There is no default for this argument,
#' and a job definition must be created prior to running the controller.
Expand All @@ -15,13 +24,26 @@
#' Batch job queue to use. There is no default for this argument,
#' and a job queue must be created prior to running the controller.
#' Please see <https://docs.aws.amazon.com/batch/> for details.
#' @param cpus Number of virtual CPUs to request per job. Can be `NULL`
#' @param cpus Positive numeric vector, usually with a single element.
#' Supply a vector to make `cpus` a retryable option
#' (see the "Retryable options" section for details).
#'
#' `cpus` is the number of virtual CPUs to request per job. Can be `NULL`
#' to go with the defaults in the job definition. Ignored if
#' `container_overrides` is not `NULL`.
#' @param gpus Number of GPUs to request per job. Can be `NULL`
#' @param gpus Positive numeric vector, usually a single element.
#' Supply a vector to make `gpus` a retryable option
#' (see the "Retryable options" section for details).
#'
#' `gpus` is the number of GPUs to request per job. Can be `NULL`
#' to go with the defaults in the job definition. Ignored if
#' `container_overrides` is not `NULL`.
#' @param memory Positive number, amount of memory to request per job.
#' @param memory Positive numeric vector number, usually with a single
#' element.
#' Supply a vector to make `memory` a retryable option
#' (see the "Retryable options" section for details).
#'
#' `memory` is the amount of random access memory (RAM) to request per job.
#' Choose the units of memory with the `memory_units` argument.
#' Fargate instances can only be certain discrete values of mebibytes,
#' so please choose `memory_units = "mebibytes"` in that case.
Expand Down Expand Up @@ -136,26 +158,26 @@ crew_options_aws_batch <- function(
crew::crew_assert(
cpus %|||% 1,
is.numeric(.),
length(.) == 1L,
length(.) >= 1L,
is.finite(.),
. > 0,
message = "cpus must be NULL or a single positive number"
message = "cpus must be NULL or a numeric vector"
)
crew::crew_assert(
gpus %|||% 1,
is.numeric(.),
length(.) == 1L,
length(.) >= 1L,
is.finite(.),
. > 0,
message = "gpus must be NULL or a single positive number"
message = "gpus must be NULL or a numeric vector"
)
crew::crew_assert(
memory %|||% 0,
is.numeric(.),
length(.) == 1L,
length(.) >= 1L,
is.finite(.),
. >= 0,
message = "memory must be NULL or a single positive number"
message = "memory must be NULL or a numeric vector"
)
container_overrides <- container_overrides %|||% make_container_overrides(
cpus = cpus,
Expand Down
14 changes: 14 additions & 0 deletions R/utils_options.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
crew_options_slice <- function(options, index) {
for (name in c("cpus", "gpus", "memory")) {
options$container_overrides$resourceRequirements[[name]]$value <-
slice_bounded(
options$container_overrides$resourceRequirements[[name]]$value,
index
)
}
options
}

slice_bounded <- function(x, index) {
x[min(index, length(x))]
}
5 changes: 4 additions & 1 deletion README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ controller <- crew_controller_aws_batch(
job_queue = "YOUR_JOB_QUEUE_NAME",
cpus = 2,
gpus = 0,
memory = 4,
# Launch workers with 4 GB memory, then 8 GB if the worker crashes,
# then 16 GB on all subsequent launches. Go back to 4 GB if the worker
# completes all its tasks before exiting.
memory = c(4, 8, 16),
memory_units = "gigabytes"
)
Expand Down
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,10 @@ controller <- crew_controller_aws_batch(
job_queue = "YOUR_JOB_QUEUE_NAME",
cpus = 2,
gpus = 0,
memory = 4,
# Launch workers with 4 GB memory, then 8 GB if the worker crashes,
# then 16 GB on all subsequent launches. Go back to 4 GB if the worker
# completes all its tasks before exiting.
memory = c(4, 8, 16),
memory_units = "gigabytes"
)

Expand Down Expand Up @@ -418,7 +421,7 @@ citation("crew.aws.batch")
To cite package 'crew.aws.batch' in publications use:

Landau WM (????). _crew.aws.batch: A Crew Launcher Plugin for AWS
Batch_. R package version 0.0.6.9010,
Batch_. R package version 0.0.7,
https://github.com/wlandau/crew.aws.batch,
<https://wlandau.github.io/crew.aws.batch/>.

Expand All @@ -427,7 +430,7 @@ A BibTeX entry for LaTeX users is
@Manual{,
title = {crew.aws.batch: A Crew Launcher Plugin for AWS Batch},
author = {William Michael Landau},
note = {R package version 0.0.6.9010,
note = {R package version 0.0.7,
https://github.com/wlandau/crew.aws.batch},
url = {https://wlandau.github.io/crew.aws.batch/},
}
Expand Down
1 change: 1 addition & 0 deletions inst/WORDLIST
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Fargate
FitzJohn
GCP
vCPUs
retryable
GPU
Gábor
Gao
Expand Down
1 change: 1 addition & 0 deletions man/crew_class_launcher_aws_batch.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions man/crew_options_aws_batch.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 64 additions & 1 deletion tests/testthat/test-crew_controller_aws_batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test_that("AWS batch controller", {
sort(names(private$.args_client())),
sort(c("config", "credentials", "endpoint", "region"))
)
out <- private$.args_submit(call = "run", name = "x")
out <- private$.args_submit(call = "run", name = "x", attempt = 2L)
expect_true(is.list(out))
expect_equal(out$jobName, "x")
expect_equal(out$jobDefinition, "crew-definition")
Expand All @@ -38,3 +38,66 @@ test_that("AWS batch controller", {
)
)
})

test_that("AWS batch controller retryable options", {
options <- crew_options_aws_batch(
job_definition = "crew-definition",
job_queue = "crew-queue",
cpus = c(2.5, 3.5, 1.7),
gpus = c(3, 2),
memory = c(1234, 1157),
memory_units = "mebibytes"
)
x <- crew_controller_aws_batch(options_aws_batch = options)
private <- crew_private(x$launcher)
out <- private$.args_submit(call = "run", name = "x", attempt = 1L)
expect_true(is.list(out))
expect_equal(out$jobName, "x")
expect_equal(out$jobDefinition, "crew-definition")
expect_equal(out$jobQueue, "crew-queue")
expect_equal(
out$containerOverrides,
list(
resourceRequirements = list(
memory = list(value = "1234", type = "MEMORY"),
cpus = list(value = "2.5", type = "VCPU"),
gpus = list(value = "3", type = "GPU")
),
command = list("Rscript", "-e", "run")
)
)
out <- private$.args_submit(call = "run", name = "x", attempt = 2L)
expect_true(is.list(out))
expect_equal(out$jobName, "x")
expect_equal(out$jobDefinition, "crew-definition")
expect_equal(out$jobQueue, "crew-queue")
expect_equal(
out$containerOverrides,
list(
resourceRequirements = list(
memory = list(value = "1157", type = "MEMORY"),
cpus = list(value = "3.5", type = "VCPU"),
gpus = list(value = "2", type = "GPU")
),
command = list("Rscript", "-e", "run")
)
)
for (index in seq(3L, 6L)) {
out <- private$.args_submit(call = "run", name = "x", attempt = 3L)
expect_true(is.list(out))
expect_equal(out$jobName, "x")
expect_equal(out$jobDefinition, "crew-definition")
expect_equal(out$jobQueue, "crew-queue")
expect_equal(
out$containerOverrides,
list(
resourceRequirements = list(
memory = list(value = "1157", type = "MEMORY"),
cpus = list(value = "1.7", type = "VCPU"),
gpus = list(value = "2", type = "GPU")
),
command = list("Rscript", "-e", "run")
)
)
}
})
2 changes: 1 addition & 1 deletion tests/testthat/test-crew_launcher_aws_batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test_that("AWS batch launcher", {
sort(names(private$.args_client())),
sort(c("config", "credentials", "endpoint", "region"))
)
out <- private$.args_submit(call = "run", name = "x")
out <- private$.args_submit(call = "run", name = "x", attempt = 1L)
expect_true(is.list(out))
expect_equal(out$jobName, "x")
expect_equal(out$jobDefinition, "crew-definition")
Expand Down
13 changes: 13 additions & 0 deletions tests/testthat/test-utils_options.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
test_that("slice_bounded", {
for (index in seq_len(3L)) {
expect_equal(slice_bounded(NULL, index), NULL)
expect_equal(slice_bounded(character(0L), index), character(0L))
expect_equal(slice_bounded(integer(0L), index), integer(0L))
}
x <- c("a", "b", "c")
expect_equal(slice_bounded(x, 1L), "a")
expect_equal(slice_bounded(x, 2L), "b")
for (index in seq(3L, 10L)) {
expect_equal(slice_bounded(x, index), "c")
}
})

0 comments on commit fd8322a

Please sign in to comment.