Skip to content

Commit

Permalink
♻️ Add write_s3_fun()
Browse files Browse the repository at this point in the history
  • Loading branch information
ThierryO committed Feb 5, 2025
1 parent 943f650 commit 279bfbb
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 112 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,4 @@ Collate:
'store_model.R'
'union.R'
'valid_object.R'
'write_s3_fun.R'
65 changes: 11 additions & 54 deletions R/store_manifest.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,14 @@ setMethod(
validObject(x, complete = TRUE)

#create dir is it doesn't exist
dir <- file.path(base, project, "manifest") %>%
dir <- file.path(base, project, "manifest") |>
normalizePath(winslash = "/", mustWork = FALSE)
if (!dir.exists(dir)) {
dir.create(dir, recursive = TRUE)
}
dir.create(dir, recursive = TRUE, showWarnings = FALSE)

#test if file exists
fingerprint <- get_file_fingerprint(x)
filename <- list.files(
dir,
pattern = sprintf("%s.manifest$", fingerprint),
full.names = TRUE
dir, pattern = sprintf("%s.manifest$", fingerprint), full.names = TRUE
)
if (length(filename) > 0) {
return(normalizePath(filename, winslash = "/"))
Expand All @@ -53,62 +49,23 @@ setMethod(

#' @rdname store_manifest
#' @importFrom methods setMethod new
#' @importFrom assertthat assert_that is.string
#' @importFrom aws.s3 bucket_exists get_bucket s3write_using
#' @importFrom assertthat assert_that is.string noNA
#' @importFrom utils write.table
#' @include import_s3_classes.R
setMethod(
f = "store_manifest",
signature = signature(base = "s3_bucket"),
definition = function(x, base, project) {
assert_that(inherits(x, "n2kManifest"))
assert_that(is.string(project))
assert_that(inherits(x, "n2kManifest"), is.string(project), noNA(project))
validObject(x, complete = TRUE)

filename <- file.path(
project, "manifest", sprintf(
"%s.manifest",
get_file_fingerprint(x)
), fsep = "/"
fsep = "/", project, "manifest",
sprintf("%s.manifest", get_file_fingerprint(x))
)
write_s3_fun(
object = x@Manifest, bucket = base, key = filename, overwrite = FALSE,
row.names = FALSE, sep = "\t"
)
# check if object with same fingerprint exists
existing <- get_bucket(base, prefix = filename)
if (length(existing) > 0) {
return(existing)
}

# create object if it doesn't exists
# try several times to write to S3 bucket
# avoids errors due to time out
i <- 1
repeat {
bucket_ok <- tryCatch(
s3write_using(
x@Manifest,
write.table,
row.names = FALSE,
sep = "\t",
bucket = base,
object = filename
),
error = function(err) {
err
}
)
if (is.logical(bucket_ok)) {
break
}
if (i > 10) {
stop("Unable to write to S3 bucket")
}
message("attempt ", i, " to write to S3 bucket failed. Trying again...")
i <- i + 1
# waiting time between tries increases with the number of tries
Sys.sleep(i)
}
if (!bucket_ok) {
stop("Unable to write to S3 bucket")
}
get_bucket(base, prefix = filename)
}
)
45 changes: 11 additions & 34 deletions R/store_manifest_yaml.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,31 @@ setGeneric(
#' @export
#' @rdname store_manifest_yaml
#' @importFrom methods setMethod new
#' @importFrom assertthat assert_that is.string
#' @importFrom dplyr %>%
#' @importFrom assertthat assert_that is.string noNA
#' @importFrom purrr map_chr
#' @importFrom yaml write_yaml
setMethod(
f = "store_manifest_yaml",
signature = signature(base = "s3_bucket"),
definition = function(x, base, project, docker, dependencies) {
assert_that(is.string(docker))
assert_that(is.character(dependencies))
assert_that(
is.string(docker), is.character(dependencies), noNA(dependencies),
noNA(docker)
)

stored <- store_manifest(x = x, base = base, project = project)
list(
github = dependencies, docker = docker, bucket = attr(base, "Name"),
project = project,
hash = basename(stored$Contents$Key) |>
hash = basename(stored) |>
gsub(pattern = "\\.manifest", replacement = "")
) -> yaml
filename <- sprintf("%s/yaml/%s.yaml", project, sha1(yaml))
available <- get_bucket(base, prefix = filename, max = Inf)
if (length(available)) {
return(map_chr(available, "Key"))
}

# try several times to write to S3 bucket
# avoids errors due to time out
i <- 1
repeat {
bucket_ok <- tryCatch(
s3write_using(yaml, write_yaml, bucket = base, object = filename),
error = function(err) {
err
}
)
if (is.logical(bucket_ok)) {
break
}
stopifnot("Unable to write to S3 bucket" = i <= 10)
message("attempt ", i, " to write to S3 bucket failed. Trying again...")
i <- i + 1
# waiting time between tries increases with the number of tries
Sys.sleep(i)
}
stopifnot("Unable to write to S3 bucket" = bucket_ok)
get_bucket(base, prefix = filename, max = Inf) |>
map_chr("Key")
write_s3_fun(
object = yaml, bucket = base, key = filename, overwrite = FALSE,
fun = write_yaml
)
}
)

Expand All @@ -89,9 +68,7 @@ setMethod(
if (file.exists(filename)) {
return(filename)
}
if (!dir.exists(dirname(filename))) {
dir.create(dirname(filename), recursive = TRUE)
}
dir.create(dirname(filename), recursive = TRUE, showWarnings = FALSE)
write_yaml(yaml, filename)
return(filename)
}
Expand Down
41 changes: 41 additions & 0 deletions R/write_s3_fun.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#' @importFrom assertthat assert_that is.count is.flag noNA
#' @importFrom aws.s3 bucket_exists get_bucket s3write_using
#' @importFrom purrr map_chr
write_s3_fun <- function(
object, bucket, key, fun = write.table, overwrite = FALSE, opts = NULL, ...,
max_attempt = 10
) {
assert_that(is.flag(overwrite), noNA(overwrite), is.count(max_attempt))
# check if object with same fingerprint exists in case we don't overwrite
existing <- get_bucket(bucket, prefix = key)
if (!overwrite && length(existing) > 0) {
return(unname(map_chr(existing, "Key")))
}

# create object if it doesn't exists or we want to overwrite
# try several times to write to S3 bucket
# avoids errors due to time out
i <- 1
repeat {
bucket_ok <- tryCatch(
s3write_using(
x = object, FUN = fun, bucket = bucket, object = key, opts = opts, ...
),
error = function(err) {
err
}
)
if (is.logical(bucket_ok)) {
break
}
stopifnot("Unable to write to S3 bucket" = i <= max_attempt)
message("attempt ", i, " to write to S3 bucket failed. Trying again...")
i <- i + 1
# waiting time between tries increases with the number of tries
Sys.sleep(i)
}
stopifnot("Unable to write to S3 bucket" = bucket_ok)
get_bucket(bucket, prefix = key) |>
map_chr("Key") |>
unname()
}
25 changes: 11 additions & 14 deletions tests/testthat/test_baa_store_manifest.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,25 @@ test_that("store_manifest stores the manifest on a local file system", {

test_that("store_manifest stores the manifest on an S3 bucket", {
skip_if(Sys.getenv("AWS_SECRET_ACCESS_KEY") == "", message = "No AWS access")
bucket <- get_bucket(Sys.getenv("N2KBUCKET"))
project <- "unittest_store_manifest"
bucket <- get_bucket(Sys.getenv("N2KBUCKET"), prefix = project, max = 1)
object <- n2k_manifest(
data.frame(
fingerprint = "1", parent = NA_character_, stringsAsFactors = FALSE
)
)
expect_is(
stored <- store_manifest(
x = object, base = bucket, project = "unittest_store_manifest"
),
"s3_bucket"
expect_type(
stored <- store_manifest(x = object, base = bucket, project = project),
"character"
)
available <- get_bucket(bucket, prefix = "unittest_store_manifest")
expect_equivalent(stored, available)
expect_is(
stored2 <- store_manifest(
x = object, base = bucket, project = "unittest_store_manifest"
),
"s3_bucket"
available <- get_bucket(bucket, prefix = project)
expect_equivalent(stored, map_chr(available, "Key"))
expect_type(
stored2 <- store_manifest(x = object, base = bucket, project = project),
"character"
)
available <- get_bucket(bucket, prefix = "unittest_store_manifest")
expect_equivalent(stored2, available)
expect_equivalent(stored2, map_chr(available, "Key"))
expect_equivalent(stored, stored2)
expect_true(all(sapply(available, delete_object, bucket = bucket)))
})
4 changes: 2 additions & 2 deletions tests/testthat/test_bba_read_manifest.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ test_that("read_manifest reads the manifest on a local file system", {

test_that("read_manifest reads the manifest on an S3 bucket", {
skip_if(Sys.getenv("AWS_SECRET_ACCESS_KEY") == "", message = "No AWS access")
bucket <- get_bucket(Sys.getenv("N2KBUCKET"))
project <- "unittest_read_manifest"
bucket <- get_bucket(Sys.getenv("N2KBUCKET"), prefix = project, max = 1)
object <- n2k_manifest(
data.frame(
fingerprint = "1", parent = NA_character_, stringsAsFactors = FALSE
Expand All @@ -60,7 +60,7 @@ test_that("read_manifest reads the manifest on an S3 bucket", {
)
Sys.sleep(2)
stored <- store_manifest(object2, bucket, project)
expect_equal(read_manifest(bucket, hash = stored$Contents$Key), object2)
expect_equal(read_manifest(bucket, hash = stored), object2)
expect_equal(read_manifest(bucket, project, object2@Fingerprint), object2)
latest <- read_manifest(bucket, project)
expect_equal(latest, object2)
Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test_bbb_store_manifest_yaml.R
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
context("store_manifest_yaml")
test_that("store_manifest_yaml stores the manifest on an S3 bucket", {
skip_if(Sys.getenv("AWS_SECRET_ACCESS_KEY") == "", message = "No AWS access")
bucket <- get_bucket(Sys.getenv("N2KBUCKET"))
object <- n2k_manifest(
data.frame(
fingerprint = "1", parent = NA_character_, stringsAsFactors = FALSE
)
)
project <- "unittest_store_manifest_yaml"
bucket <- get_bucket(Sys.getenv("N2KBUCKET"), prefix = project, max = 1)
docker <- "inbobmk/rn2k:latest"
dependencies <- c("inbo/n2khelper@v0.4.1", "inbo/n2kanalysis@docker")
expect_is(
Expand Down
16 changes: 9 additions & 7 deletions tests/testthat/test_cba_fit_model_manifest.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ test_that("it handles a manifest", {
fit_model(manif, base = base, project = project, verbose = FALSE)
)
y <- store_manifest(manif, base, project)
expect_null(fit_model(y, base = base, project = project))
expect_null(fit_model(y))
expect_null(fit_model(y, base = base, project = project, verbose = FALSE))
expect_null(fit_model(y, verbose = FALSE))
results <- get_result(
x = manif, base = base, project = project, verbose = FALSE
)
Expand All @@ -72,7 +72,9 @@ test_that("it handles a manifest", {
)
expect_true(all(status(results) == "converged"))
expect_s4_class(
results <- get_result(x = manif, base = base, project = project),
results <- get_result(
x = manif, base = base, project = project, verbose = FALSE
),
"n2kResult"
)

Expand Down Expand Up @@ -122,7 +124,9 @@ test_that("it handles a manifest", {
expect_invisible(
fit_model(manif, base = aws_base, project = project, verbose = FALSE)
)
results <- get_result(x = manif, base = aws_base, project = project)
results <- get_result(
x = manif, base = aws_base, project = project, verbose = FALSE
)
expect_s4_class(results, "n2kResult")
expect_identical(
sort(results@AnalysisMetadata$file_fingerprint),
Expand All @@ -131,9 +135,7 @@ test_that("it handles a manifest", {
expect_true(all(status(results) == "converged"))

y <- store_manifest(manif, base = aws_base, project = project)
expect_invisible(fit_model(y$Contents))

expect_null(fit_model(y$Contents$Key, base = aws_base, project = project))
expect_null(fit_model(y, base = aws_base, project = project, verbose = FALSE))

available <- get_bucket(aws_base, prefix = project) |>
sapply("[[", "Key")
Expand Down

0 comments on commit 279bfbb

Please sign in to comment.