Skip to content

Commit

Permalink
Merge pull request #1370 from ropensci/1364-ref3
Browse files Browse the repository at this point in the history
Reduce memory consumption using lightweight references
  • Loading branch information
wlandau authored Nov 10, 2024
2 parents ac7344c + 49a6697 commit 5439145
Show file tree
Hide file tree
Showing 23 changed files with 499 additions and 133 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally
The methodology in this package
borrows from GNU 'Make' (2015, ISBN:978-9881443519)
and 'drake' (2018, <doi:10.21105/joss.00550>).
Version: 1.8.0.9009
Version: 1.8.0.9010
License: MIT + file LICENSE
URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets
BugReports: https://github.com/ropensci/targets/issues
Expand Down
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,16 @@ S3method(target_patternview_meta,default)
S3method(target_patternview_meta,tar_branch)
S3method(target_prepare,default)
S3method(target_prepare,tar_builder)
S3method(target_produce_child,tar_pattern)
S3method(target_produce_child,tar_stem)
S3method(target_produce_junction,tar_pattern)
S3method(target_produce_junction,tar_stem)
S3method(target_produce_record,tar_branch)
S3method(target_produce_record,tar_pattern)
S3method(target_produce_record,tar_stem)
S3method(target_produce_reference,default)
S3method(target_produce_reference,tar_branch)
S3method(target_produce_reference,tar_bud)
S3method(target_read_value,tar_bud)
S3method(target_read_value,tar_builder)
S3method(target_read_value,tar_pattern)
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# targets 1.8.0.9009 (development)
# targets 1.8.0.9010 (development)

* Un-break workflows that use `format = "file_fast"` (#1339, @koefoeden).
* Fix deadlock in `error = "trim"` (#1340, @koefoeden).
Expand All @@ -12,6 +12,7 @@
* Avoid `store_assert_format()` and `store_convert_object()` is `storage` is `"none"`.
* Add a `list()` method to `tar_repository_cas()` to make it easier and more efficient to specify custom CAS repositories (#1366).
* Improve speed and reduce memory consumption by avoiding deep copies of inner environments of target definition objects (#1368).
* Reduce memory consumption by storing buds and branches as lightweight references when `memory` is `"transient"` (#1364).

# targets 1.8.0

Expand Down
11 changes: 11 additions & 0 deletions R/class_branch.R
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,14 @@ target_patternview_errored.tar_branch <- function(
parent <- pipeline_get_target(pipeline, target_get_parent(target))
patternview_register_errored(parent$patternview, parent, scheduler)
}

#' @export
target_produce_reference.tar_branch <- function(target) {
file <- .subset2(target, "file")
reference_init(
parent = target_get_parent(target),
path = .subset2(file, "path"),
stage = .subset2(file, "stage"),
hash = .subset2(file, "hash")
)
}
5 changes: 5 additions & 0 deletions R/class_bud.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ target_read_value.tar_bud <- function(target, pipeline) {
value_init(object, parent$settings$iteration)
}

#' @export
target_produce_reference.tar_bud <- function(target) {
reference_init(parent = target_get_parent(target))
}

#' @export
target_validate.tar_bud <- function(target) {
tar_assert_correct_fields(target, bud_new, optional = "value")
Expand Down
23 changes: 7 additions & 16 deletions R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ target_bootstrap.tar_builder <- function(
record <- target_bootstrap_record(target, meta)
target$store <- record_bootstrap_store(record)
target$file <- record_bootstrap_file(record)
pipeline_set_target(pipeline, target)
invisible()
}

Expand Down Expand Up @@ -191,6 +192,7 @@ target_skip.tar_builder <- function(
) {
target_update_queue(target, scheduler)
file_repopulate(target$file, meta$get_record(target_get_name(target)))
pipeline_set_target(pipeline, target)
if (active) {
builder_ensure_workspace(
target = target,
Expand Down Expand Up @@ -240,6 +242,7 @@ builder_completed <- function(target, pipeline, scheduler, meta) {
target_ensure_buds(target, pipeline, scheduler)
meta$insert_record(target_produce_record(target, pipeline, meta))
target_patternview_meta(target, pipeline, meta)
pipeline_set_target(pipeline, target)
pipeline_register_loaded(pipeline, target_get_name(target))
scheduler$progress$register_completed(target)
scheduler$reporter$report_completed(target, scheduler$progress)
Expand Down Expand Up @@ -306,26 +309,13 @@ builder_ensure_deps <- function(target, pipeline, retrieval) {
if (!identical(target$settings$retrieval, retrieval)) {
return()
}
tryCatch(
target_ensure_deps(target, pipeline),
error = function(error) {
message <- paste0(
"could not load dependencies of target ",
target_get_name(target),
". ",
conditionMessage(error)
)
expr <- as.expression(as.call(list(quote(stop), message)))
target$command$expr <- expr
target$settings$deployment <- "main"
}
)
target_ensure_deps(target, pipeline)
}

builder_update_subpipeline <- function(target, pipeline) {
target$subpipeline <- pipeline_produce_subpipeline(
pipeline,
target_get_name(target)
target
)
}

Expand All @@ -344,7 +334,7 @@ builder_unmarshal_subpipeline <- function(target) {
pipeline_unmarshal_values(target$subpipeline)
}
patterns <- fltr(
names(subpipeline$targets),
pipeline_get_names(subpipeline),
~inherits(pipeline_get_target(subpipeline, .x), "tar_pattern")
)
map(
Expand Down Expand Up @@ -390,6 +380,7 @@ builder_error_null <- function(target, pipeline, scheduler, meta) {
record$data <- "error"
meta$insert_record(record)
target_patternview_meta(target, pipeline, meta)
pipeline_set_target(pipeline, target)
pipeline_register_loaded(pipeline, target_get_name(target))
scheduler$progress$register_errored(target)
}
Expand Down
1 change: 0 additions & 1 deletion R/class_clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ clustermq_class <- R6::R6Class(
if (is.null(target)) {
return()
}
pipeline_set_target(self$pipeline, target)
self$unmarshal_target(target)
target_conclude(
target,
Expand Down
1 change: 0 additions & 1 deletion R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ crew_class <- R6::R6Class(
msg = paste("target", result$name, "error:", result$error)
)
target <- result$result[[1]]
pipeline_set_target(self$pipeline, target)
self$unmarshal_target(target)
target_conclude(
target,
Expand Down
1 change: 0 additions & 1 deletion R/class_future.R
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ future_class <- R6::R6Class(
},
conclude_worker_target = function(value, name) {
target <- future_value_target(value, name, self$pipeline)
pipeline_set_target(self$pipeline, target)
self$unmarshal_target(target)
target_conclude(
target,
Expand Down
50 changes: 32 additions & 18 deletions R/class_junction.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,55 @@ junction_init <- function(
deps = list()
) {
splits <- make.unique(splits, sep = "_")
names(deps) <- names(deps) %|||% seq_along(deps)
index <- seq_along(splits)
names(index) <- splits
deps <- as_data_frame(deps)
junction_new(nexus, splits, deps)
has_deps <- nrow(deps) > 0L
junction_new(nexus, index, deps, has_deps)
}

junction_new <- function(nexus = NULL, splits = NULL, deps = NULL) {
junction_new <- function(
nexus = NULL,
index = NULL,
deps = NULL,
has_deps = NULL
) {
out <- new.env(parent = emptyenv(), hash = FALSE)
out$nexus <- nexus
out$splits <- splits
out$index <- index
out$deps <- deps
out$has_deps <- has_deps
out
}

junction_upstream_edges <- function(junction) {
from <- utils::stack(junction$deps)$values
to <- rep(junction$splits, times = ncol(junction$deps))
from <- unlist(junction$deps, use.names = FALSE)
to <- rep(junction_splits(junction), times = ncol(junction$deps))
data_frame(from = from, to = to)
}

junction_get_splits <- function(junction) {
as.character(junction$splits)
junction_length <- function(junction) {
length(.subset2(junction, "index"))
}

junction_splits <- function(junction) {
names(.subset2(junction, "index"))
}

junction_transpose <- function(junction) {
splits <- junction$splits
deps <- junction$deps
out <- map_rows(deps, ~list(deps = unname(.x))) %||%
replicate(length(splits), list(deps = character(0)), simplify = FALSE)
for (index in seq_along(splits)) {
out[[index]]$split <- splits[index]
junction_extract_index <- function(junction, name) {
as.integer(.subset2(.subset2(junction, "index"), name))
}

junction_extract_deps <- function(junction, index) {
if (.subset2(junction, "has_deps")) {
as.character(vctrs::vec_slice(x = .subset2(junction, "deps"), i = index))
} else {
character(0L)
}
out
}

junction_invalidate <- function(junction) {
junction$splits <- rep(NA_character_, length(junction$splits))
names(junction$index) <- rep(NA_character_, length(junction$index))
}

junction_validate_deps <- function(deps) {
Expand All @@ -52,6 +65,7 @@ junction_validate <- function(junction) {
tar_assert_correct_fields(junction, junction_new)
tar_assert_scalar(junction$nexus)
tar_assert_chr(junction$nexus)
tar_assert_chr(junction$splits)
tar_assert_int(junction$index)
tar_assert_chr(junction_splits(junction))
junction_validate_deps(junction$deps)
}
57 changes: 31 additions & 26 deletions R/class_pattern.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pattern_s3_class <- c("tar_pattern", "tar_target")

#' @export
target_get_children.tar_pattern <- function(target) {
junction_get_splits(target$junction)
junction_splits(target$junction)
}

#' @export
Expand Down Expand Up @@ -96,13 +96,13 @@ target_conclude.tar_pattern <- function(target, pipeline, scheduler, meta) {
#' @export
target_read_value.tar_pattern <- function(target, pipeline) {
branches <- target_get_children(target)
map(
branches,
~target_ensure_value(pipeline_get_target(pipeline, .x), pipeline)
)
objects <- map(
branches,
~pipeline_get_target(pipeline, .x)$value$object
~ {
target <- pipeline_get_target(pipeline, .x)
target_ensure_value(target, pipeline)
target$value$object
}
)
names(objects) <- branches
value <- value_init(iteration = target$settings$iteration)
Expand Down Expand Up @@ -204,6 +204,11 @@ target_unmarshal_value.tar_pattern <- function(target) {
target$value <- NULL
}

#' @export
target_produce_child.tar_pattern <- function(target, name) {
pattern_produce_branch(target, name)
}

#' @export
print.tar_pattern <- function(x, ...) {
cat(
Expand Down Expand Up @@ -247,27 +252,27 @@ pattern_prepend_branches <- function(target, scheduler) {
scheduler$queue$prepend(children, ranks)
}

pattern_produce_branch <- function(target, name) {
junction <- .subset2(target, "junction")
index <- junction_extract_index(junction, name)
branch_init(
name = name,
command = .subset2(target, "command"),
deps_parent = .subset2(target, "deps"),
deps_child = junction_extract_deps(junction, index),
settings = .subset2(target, "settings"),
cue = .subset2(target, "cue"),
store = .subset2(target, "store"),
index = index
)
}

pattern_set_branches <- function(target, pipeline) {
command <- target$command
deps_parent <- target$deps
settings <- target$settings
cue <- target$cue
store <- target$store
specs <- junction_transpose(target$junction)
for (index in seq_along(specs)) {
spec <- .subset2(specs, index)
branch <- branch_init(
name = .subset2(spec, "split"),
command = command,
deps_parent = deps_parent,
deps_child = .subset2(spec, "deps"),
settings = settings,
cue = cue,
store = store,
index = index
)
pipeline_set_target(pipeline, branch)
}
pipeline_initialize_references_children(
pipeline = pipeline,
name_parent = target_get_name(target),
names_children = junction_splits(target$junction)
)
}

pattern_insert_branches <- function(target, pipeline, scheduler) {
Expand Down
Loading

0 comments on commit 5439145

Please sign in to comment.