Skip to content

Commit

Permalink
more memory-efficient dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 2, 2023
1 parent 272477c commit c33e743
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
on.exit(reap(sockc), add = TRUE, after = FALSE)
dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
recv(sockc, mode = 6L, block = .timelimit) && stop(.messages[["sync_timeout"]])
req <- send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
saio <- send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
}

Expand Down Expand Up @@ -188,14 +188,14 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
} else {
data <- as.integer(c(seq_n, activevec, instance, assigned, complete))
}
req <- send_aio(sockc, data = data, mode = 2L)
saio <- send_aio(sockc, data = data, mode = 2L)
cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
next
}

for (i in seq_n)
if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["res"]])) {
req <- .subset2(queue[[i]][["res"]], "value")
if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["req"]])) {
req <- .subset2(queue[[i]][["req"]], "value")
if (is.object(req)) req <- serialize(req, NULL)
exiting <- req[3L]
req <- send_aio(queue[[i]][["ctx"]], data = req, mode = 2L)
Expand All @@ -217,7 +217,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
for (q in free)
for (i in seq_n) {
if (length(queue[[i]]) == 2L && !unresolved(queue[[i]][["req"]])) {
queue[[i]][["res"]] <- request_signal(.context(servers[[q]]), data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L)
queue[[i]][["req"]] <- request_signal(.context(servers[[q]]), data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L)
queue[[i]][["daemon"]] <- q
serverfree[q] <- FALSE
assigned[q] <- assigned[q] + 1L
Expand Down

0 comments on commit c33e743

Please sign in to comment.