Skip to content

Commit

Permalink
attempt number two to kill zombies
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-buerkner committed Nov 13, 2024
1 parent 31a6518 commit 2f6dc0d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 29 deletions.
6 changes: 4 additions & 2 deletions R/brmsfit-helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -845,11 +845,13 @@ arg_names <- function(method) {
}

# validate 'cores' argument for use in post-processing functions
validate_cores_post_processing <- function(cores) {
validate_cores_post_processing <- function(cores, use_mc_cores = FALSE) {
if (is.null(cores)) {
if (os_is_windows()) {
if (os_is_windows() || !use_mc_cores) {
# multi cores often leads to a slowdown on windows
# in post-processing functions as discussed in #1129
# multi cores may also lead to zombie workers
# on unix systems as discussed in #1658
cores <- 1L
} else {
cores <- getOption("mc.cores", 1L)
Expand Down
22 changes: 1 addition & 21 deletions R/log_lik.R
Original file line number Diff line number Diff line change
Expand Up @@ -124,27 +124,7 @@ log_lik.brmsprep <- function(object, cores = NULL, ...) {
object$dpars[[dp]] <- get_dpar(object, dpar = dp)
}
N <- choose_N(object)

# out <- plapply(seq_len(N), log_lik_fun, cores = cores, prep = object)
# copy over plapply here
if (cores == 1) {
out <- lapply(seq_len(N), log_lik_fun, prep = object)
} else {
if (!os_is_windows()) {
out <- parallel::mclapply(
X = seq_len(N), FUN = log_lik_fun,
mc.cores = cores, prep = object
)
} else {
cl <- parallel::makePSOCKcluster(cores)
on.exit(parallel::stopCluster(cl))
out <- parallel::parLapply(
cl = cl, X = seq_len(N), fun = log_lik_fun,
mc.cores = cores, prep = object
)
}
}

out <- plapply(seq_len(N), log_lik_fun, .cores = cores, prep = object)
out <- do_call(cbind, out)
colnames(out) <- NULL
old_order <- object$old_order
Expand Down
13 changes: 8 additions & 5 deletions R/misc.R
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,17 @@ cblapply <- function(X, FUN, ...) {
}

# parallel lapply sensitive to the operating system
plapply <- function(X, FUN, cores = 1, ...) {
if (cores == 1) {
# args:
# .psock: use a PSOCK cluster? Default is TRUE until
#. the zombie worker issue #1658 has been fully resolved
plapply <- function(X, FUN, .cores = 1, .psock = TRUE, ...) {
if (.cores == 1) {
out <- lapply(X, FUN, ...)
} else {
if (!os_is_windows()) {
out <- parallel::mclapply(X = X, FUN = FUN, mc.cores = cores, ...)
if (!os_is_windows() && !.psock) {
out <- parallel::mclapply(X = X, FUN = FUN, mc.cores = .cores, ...)
} else {
cl <- parallel::makePSOCKcluster(cores)
cl <- parallel::makePSOCKcluster(.cores)
on.exit(parallel::stopCluster(cl))
out <- parallel::parLapply(cl = cl, X = X, fun = FUN, ...)
}
Expand Down
2 changes: 1 addition & 1 deletion R/posterior_predict.R
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ posterior_predict.brmsprep <- function(object, transform = NULL, sort = FALSE,
pp_fun <- paste0("posterior_predict_", object$family$fun)
pp_fun <- get(pp_fun, asNamespace("brms"))
N <- choose_N(object)
out <- plapply(seq_len(N), pp_fun, cores = cores, prep = object, ...)
out <- plapply(seq_len(N), pp_fun, .cores = cores, prep = object, ...)
if (grepl("_mv$", object$family$fun)) {
out <- do_call(abind, c(out, along = 3))
out <- aperm(out, perm = c(1, 3, 2))
Expand Down

0 comments on commit 2f6dc0d

Please sign in to comment.