Skip to content

Commit

Permalink
Fix #1055
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Aug 22, 2023
1 parent 4cb6364 commit 1d1c1d8
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 5 deletions.
13 changes: 12 additions & 1 deletion R/class_active.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,24 @@ active_class <- R6::R6Class(
self$meta$database$dequeue_rows()
self$scheduler$progress$database$dequeue_rows()
},
poll_meta = function() {
dequeue_meta_time = function() {
self$seconds_dequeued <- self$seconds_dequeued %|||% -Inf
now <- time_seconds_local()
if ((now - self$seconds_dequeued) > self$seconds_interval) {
self$dequeue_meta()
self$seconds_dequeued <- time_seconds_local()
}
},
dequeue_meta_file = function(target) {
settings <- .subset2(target, "settings")
format <- .subset2(settings, "format")
repository <- .subset2(settings, "repository")
dequeue <- (format == "file" || format == "file_fast") &&
(repository == "local")
if (dequeue) {
self$dequeue_meta()
}
},
write_gitignore = function() {
writeLines(
c("*", "!.gitignore", "!meta", "meta/*", "!meta/meta"),
Expand Down Expand Up @@ -145,6 +155,7 @@ active_class <- R6::R6Class(
target_debug(target)
target_update_depend(target, self$pipeline, self$meta)
if (target_should_run(target, self$meta)) {
self$dequeue_meta_file(target)
self$run_target(name)
} else {
self$skip_target(target)
Expand Down
2 changes: 1 addition & 1 deletion R/class_clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ clustermq_class <- R6::R6Class(
self$scheduler$backoff$reset()
},
iterate = function() {
self$poll_meta()
self$dequeue_meta_time()
message <- if_any(
self$workers > 0L,
self$worker_list$receive_data(),
Expand Down
2 changes: 1 addition & 1 deletion R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ crew_class <- R6::R6Class(
target_sync_file_meta(target, self$meta)
},
iterate = function() {
self$poll_meta()
self$dequeue_meta_time()
queue <- self$scheduler$queue
should_dequeue <- queue$should_dequeue()
if (should_dequeue) {
Expand Down
2 changes: 1 addition & 1 deletion R/class_future.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ future_class <- R6::R6Class(
lapply(names, self$process_worker)
},
iterate = function() {
self$poll_meta()
self$dequeue_meta_time()
self$process_workers()
self$try_submit(wait = TRUE)
},
Expand Down
2 changes: 1 addition & 1 deletion R/class_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ local_class <- R6::R6Class(
self$unload_transient()
},
process_next = function() {
self$poll_meta()
self$dequeue_meta_time()
self$process_target(self$scheduler$queue$dequeue())
},
run = function() {
Expand Down

0 comments on commit 1d1c1d8

Please sign in to comment.