Skip to content

Commit

Permalink
fix dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 2, 2023
1 parent c33e743 commit d7b4f59
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.11.2.9018
Version: 0.11.2.9019
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand All @@ -22,7 +22,7 @@ Encoding: UTF-8
Depends:
R (>= 3.5)
Imports:
nanonext (>= 0.10.4.9024)
nanonext (>= 0.10.4.9025)
Enhances:
parallel,
promises
Expand Down
5 changes: 3 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# mirai 0.11.2.9018 (development)
# mirai 0.11.2.9019 (development)

* Implements `register()` for registering custom serialization and unserialization functions when using daemons.
* Introduces `call_mirai_()`, a user-interruptible version of `call_mirai()` suitable for interactive use.
* Simplification of `mirai()` interface:
+ '.args' will now coerece to a list if an object other than a list is supplied, rather than error.
+ '.signal' argument removed - now all 'mirai' signal if daemons are set up.
* `everywhere()` now returns invisible NULL in the case the specified compute profile is not set up, rather than error.
* Improved memory efficiency and stability at dispatcher.
* Improved error messages and handling for daemon/dispatcher connection errors.
* Requires nanonext >= [0.10.4.9024].
* Requires nanonext >= [0.10.4.9025].

# mirai 0.11.2

Expand Down
5 changes: 3 additions & 2 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,

cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
on.exit(close(sock))
autoexit && pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)
Expand Down Expand Up @@ -148,6 +148,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
nextmode(mark = TRUE)
send(ctx, data = data, mode = 3L)
aio <- recv_aio_signal(ctx, cv = cv, mode = 8L)
wait(cv)
break
}
Expand All @@ -174,7 +175,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
.daemon <- function(url) {

sock <- socket(protocol = "rep", dial = url, autostart = NA)
on.exit(reap(sock))
on.exit(close(sock))
._mirai_. <- recv(sock, mode = 1L, block = TRUE)
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
Expand Down
4 changes: 2 additions & 2 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
length(envir) || return(0L)

if (signal) send_signal(envir = envir)
reap(envir[["sock"]])
length(envir[["sockc"]]) && reap(envir[["sockc"]])
close(envir[["sock"]])
length(envir[["sockc"]]) && close(envir[["sockc"]])
..[[.compute]] <- NULL -> envir

} else if (is.null(envir)) {
Expand Down
21 changes: 10 additions & 11 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,

cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
on.exit(close(sock))
pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sock, url = host, asyncdial = asyncdial)

Expand Down Expand Up @@ -135,12 +135,12 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
queue[[i]] <- create_req(ctx = .context(sock), cv = cv)
}

on.exit(lapply(servers, reap), add = TRUE, after = TRUE)
on.exit(lapply(servers, close), add = TRUE, after = TRUE)

ctrchannel <- is.character(monitor)
if (ctrchannel) {
sockc <- socket(protocol = "rep")
on.exit(reap(sockc), add = TRUE, after = FALSE)
on.exit(close(sockc), add = TRUE, after = FALSE)
dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
recv(sockc, mode = 6L, block = .timelimit) && stop(.messages[["sync_timeout"]])
saio <- send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
Expand Down Expand Up @@ -173,7 +173,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,

} else if (i < 0L) {
i <- -i
reap(servers[[i]])
close(servers[[i]])
servers[[i]] <- nsock <- req_socket(NULL)
pipe_notify(nsock, cv = active[[i]], cv2 = cv, add = TRUE, remove = TRUE, flag = FALSE)
lock(nsock, cv = active[[i]])
Expand All @@ -197,13 +197,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["req"]])) {
req <- .subset2(queue[[i]][["req"]], "value")
if (is.object(req)) req <- serialize(req, NULL)
exiting <- req[3L]
req <- send_aio(queue[[i]][["ctx"]], data = req, mode = 2L)
send(queue[[i]][["ctx"]], data = req, mode = 2L)
q <- queue[[i]][["daemon"]]
if (exiting) {
reap(attr(servers[[q]], "listener")[[1L]])
attr(servers[[q]], "listener") <- NULL
listen(servers[[q]], url = servernames[q], tls = tls, error = TRUE)
if (req[3L]) {
send(queue[[i]][["rctx"]], NULL, mode = 2L)
reap(queue[[i]][["rctx"]])
} else {
serverfree[q] <- TRUE
}
Expand All @@ -217,7 +215,8 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
for (q in free)
for (i in seq_n) {
if (length(queue[[i]]) == 2L && !unresolved(queue[[i]][["req"]])) {
queue[[i]][["req"]] <- request_signal(.context(servers[[q]]), data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L)
queue[[i]][["rctx"]] <- .context(servers[[q]])
queue[[i]][["req"]] <- request_signal(queue[[i]][["rctx"]], data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L)
queue[[i]][["daemon"]] <- q
serverfree[q] <- FALSE
assigned[q] <- assigned[q] + 1L
Expand Down

0 comments on commit d7b4f59

Please sign in to comment.