From d7b4f59828216e078c99fc139c208d2246cc1b18 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 2 Dec 2023 17:29:41 +0000 Subject: [PATCH] fix dispatcher --- DESCRIPTION | 4 ++-- NEWS.md | 5 +++-- R/daemon.R | 5 +++-- R/daemons.R | 4 ++-- R/dispatcher.R | 21 ++++++++++----------- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index a16b09500..56f5d0b0f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.11.2.9018 +Version: 0.11.2.9019 Description: Lightweight parallel code execution and distributed computing. Designed for simplicity, a 'mirai' evaluates an R expression asynchronously, on local or network resources, resolving automatically upon completion. @@ -22,7 +22,7 @@ Encoding: UTF-8 Depends: R (>= 3.5) Imports: - nanonext (>= 0.10.4.9024) + nanonext (>= 0.10.4.9025) Enhances: parallel, promises diff --git a/NEWS.md b/NEWS.md index 16b823f25..3ecd76ef6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# mirai 0.11.2.9018 (development) +# mirai 0.11.2.9019 (development) * Implements `register()` for registering custom serialization and unserialization functions when using daemons. * Introduces `call_mirai_()`, a user-interruptible version of `call_mirai()` suitable for interactive use. @@ -6,8 +6,9 @@ + '.args' will now coerece to a list if an object other than a list is supplied, rather than error. + '.signal' argument removed - now all 'mirai' signal if daemons are set up. * `everywhere()` now returns invisible NULL in the case the specified compute profile is not set up, rather than error. +* Improved memory efficiency and stability at dispatcher. * Improved error messages and handling for daemon/dispatcher connection errors. -* Requires nanonext >= [0.10.4.9024]. +* Requires nanonext >= [0.10.4.9025]. # mirai 0.11.2 diff --git a/R/daemon.R b/R/daemon.R index 120c31883..62884bacd 100644 --- a/R/daemon.R +++ b/R/daemon.R @@ -106,7 +106,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE, cv <- cv() sock <- socket(protocol = "rep") - on.exit(reap(sock)) + on.exit(close(sock)) autoexit && pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE) if (length(tls)) tls <- tls_config(client = tls) dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls) @@ -148,6 +148,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE, (count >= maxtasks || count > timerstart && mclock() - start >= walltime) && { nextmode(mark = TRUE) send(ctx, data = data, mode = 3L) + aio <- recv_aio_signal(ctx, cv = cv, mode = 8L) wait(cv) break } @@ -174,7 +175,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE, .daemon <- function(url) { sock <- socket(protocol = "rep", dial = url, autostart = NA) - on.exit(reap(sock)) + on.exit(close(sock)) ._mirai_. <- recv(sock, mode = 1L, block = TRUE) data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL), error = mk_mirai_error, interrupt = mk_interrupt_error) diff --git a/R/daemons.R b/R/daemons.R index 191553951..2936471d2 100644 --- a/R/daemons.R +++ b/R/daemons.R @@ -331,8 +331,8 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., length(envir) || return(0L) if (signal) send_signal(envir = envir) - reap(envir[["sock"]]) - length(envir[["sockc"]]) && reap(envir[["sockc"]]) + close(envir[["sock"]]) + length(envir[["sockc"]]) && close(envir[["sockc"]]) ..[[.compute]] <- NULL -> envir } else if (is.null(envir)) { diff --git a/R/dispatcher.R b/R/dispatcher.R index 8a4861d0a..523959ce8 100644 --- a/R/dispatcher.R +++ b/R/dispatcher.R @@ -79,7 +79,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE, cv <- cv() sock <- socket(protocol = "rep") - on.exit(reap(sock)) + on.exit(close(sock)) pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE) dial_and_sync_socket(sock = sock, url = host, asyncdial = asyncdial) @@ -135,12 +135,12 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE, queue[[i]] <- create_req(ctx = .context(sock), cv = cv) } - on.exit(lapply(servers, reap), add = TRUE, after = TRUE) + on.exit(lapply(servers, close), add = TRUE, after = TRUE) ctrchannel <- is.character(monitor) if (ctrchannel) { sockc <- socket(protocol = "rep") - on.exit(reap(sockc), add = TRUE, after = FALSE) + on.exit(close(sockc), add = TRUE, after = FALSE) dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial) recv(sockc, mode = 6L, block = .timelimit) && stop(.messages[["sync_timeout"]]) saio <- send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L) @@ -173,7 +173,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE, } else if (i < 0L) { i <- -i - reap(servers[[i]]) + close(servers[[i]]) servers[[i]] <- nsock <- req_socket(NULL) pipe_notify(nsock, cv = active[[i]], cv2 = cv, add = TRUE, remove = TRUE, flag = FALSE) lock(nsock, cv = active[[i]]) @@ -197,13 +197,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE, if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["req"]])) { req <- .subset2(queue[[i]][["req"]], "value") if (is.object(req)) req <- serialize(req, NULL) - exiting <- req[3L] - req <- send_aio(queue[[i]][["ctx"]], data = req, mode = 2L) + send(queue[[i]][["ctx"]], data = req, mode = 2L) q <- queue[[i]][["daemon"]] - if (exiting) { - reap(attr(servers[[q]], "listener")[[1L]]) - attr(servers[[q]], "listener") <- NULL - listen(servers[[q]], url = servernames[q], tls = tls, error = TRUE) + if (req[3L]) { + send(queue[[i]][["rctx"]], NULL, mode = 2L) + reap(queue[[i]][["rctx"]]) } else { serverfree[q] <- TRUE } @@ -217,7 +215,8 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE, for (q in free) for (i in seq_n) { if (length(queue[[i]]) == 2L && !unresolved(queue[[i]][["req"]])) { - queue[[i]][["req"]] <- request_signal(.context(servers[[q]]), data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L) + queue[[i]][["rctx"]] <- .context(servers[[q]]) + queue[[i]][["req"]] <- request_signal(queue[[i]][["rctx"]], data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L) queue[[i]][["daemon"]] <- q serverfree[q] <- FALSE assigned[q] <- assigned[q] + 1L