You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm running an application in R using the AWS eks cluster and my question is if you can't shed some light on the problem I'm facing.
Basically I have 1 main pod (scheduler) that will distribute the tasks to the worker pods, in the end I uninstall the application so as not to generate more costs and the eks cluster has an autoscaler operation, that is, it will increase the number of nodes depending on the load of pods I will start the application. This entire process is started through a remote machine in my domain, making this connection to the eks cluster.
But that's not the problem... I'm doing this using plan(cluster) to connect the main pod to the workers and I'm using future_lapply to distribute these tasks between the pods. These tasks are basically reading a database through an API, applying a time series model and then sending the results to another API. The application runs normally on the local machine.
However, I see that the process is quite unstable, sometimes it works for a reduced size of models, other times it gives errors, such as:
Error in unserialize(node$con) : error reading from connection
Calls: ... FUN -> recvData -> recvData.SOCK0node -> unserialize
or
Error in unserialize(node$con) :
Failed to retrieve the value of ClusterFuture (future_lapply-106) from cluster RichSOCKnode #1 (PID 2694 on localhost ‘localhost’). The reason reported was ‘error reading from connection’. Post-mortem diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive.
Calls: future_lapply ... resolved -> resolved.ClusterFuture -> receiveMessageFromWorker
The R script is
packages<- c("fpp3","tidyverse","httr","fable","fabletools", "future", "future.apply",
"forecast","zoo", "rjson", "jsonlite","readr", "withr")
packages_installed <- packages[!packages%in% installed.packages()]
if (length(pacotes_instalados) > 0){
install.packages(packages_installed, dependencies = TRUE)
}
# Suppressing warning and information messages
suppressMessages({
sapply(pacotes, require, character = TRUE)
})
ids<- read_delim("home/rstudio/Application/pending_test.csv",
delim = ";", escape_double = FALSE, trim_ws = TRUE) %>%
as.data.frame()
args <- commandArgs(trailingOnly = TRUE)
if (length(args) > 0) {
number_workers <- as.integer(args[1])
}
if(nrow(ids) > 0){
source('home/rstudio/Application/modules.R')
start.time<-Sys.time()
if(number_workers == 0){
future::plan(multisession)
}else{
cat("Connecting to the pods.\n")
future::plan(cluster, manual = TRUE, quiet = TRUE)
}
end.time <- Sys.time()
cat("Total time for pods to connect: \n")
print(end.time - start.time)
cat("##########################################################################\n")
cat("Number of requests to be made: ", nrow(ids), '\n')
cat("Number of available workers: ", future::nbrOfWorkers() ,"\n")
test <- future_sapply(seq_len(nbrOfWorkers()), function(i) Sys.info()[["nodename"]])
cat("They are: ", test, '\n')
cat("##########################################################################\n")
cat("Starting function\n")
options(future.wait.timeout = 300)
options(future.wait.interval = 5)
options(future.retry = 3)
tasks <- 1:nrow(ids)
start.time <- Sys.time()
results <- future_lapply(tasks, ProcessForecast, future.seed = TRUE, future.chunk.size = 1)
end.time<-Sys.time()
time_vector <- list()
#Filling the times vector correctly
for (i in 1:length(tasks)){
time_vector[[i]] <- results[i]
}
results_time_ids <- ids %>% mutate(execution_time = unlist(time_vector))
readr::write_rds(results_time_ids, paste0('/home/rstudio/Application/Resultados/ids_results-',Sys.Date(), '.rds'))
#Saving execution errors to perform more detailed validation
tasks_with_error <- tasks[which(sapply(results, is.na))]
cat("\nRecords that gave error:",tasks_with_error, '\n')
if(length(tasks_with_error)>0){
tasks_with_error <- ids %>% filter(row_number(.) %in% tasks_with_error) %>% mutate(ordem_req = tasks_with_error)
readr::write_rds(tasks_with_error, paste0('/home/rstudio/Application/Resultados/Tasks com erros-',Sys.Date(), '.rds'))
cat("\nThese were the requests that went wrong, please analyze:\n")
print(tasks_with_error)
}
cat("\nTotal application time:\n")
print(end.time - start.time)
cat("\n==========================================================")
cat("\nEnd Task\n")
cat("==========================================================\n")
}else{
print("No application")
}
The ProcessForecast function is a function that I call within the R script through the source('home/rstudio/Application/modules.R'), this function performs the operations I mentioned above using some packages, such as tidyverse, httr, fable, fabletools and so on...
I already run the application with 100, 200, 500 requests (ML models) but I've already gotten these errors with the same workload too...
Can you give any idea what might be going on? That would be really nice, thank you.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello Henrik, how are you?
I'm running an application in R using the AWS eks cluster and my question is if you can't shed some light on the problem I'm facing.
Basically I have 1 main pod (scheduler) that will distribute the tasks to the worker pods, in the end I uninstall the application so as not to generate more costs and the eks cluster has an autoscaler operation, that is, it will increase the number of nodes depending on the load of pods I will start the application. This entire process is started through a remote machine in my domain, making this connection to the eks cluster.
But that's not the problem... I'm doing this using plan(cluster) to connect the main pod to the workers and I'm using future_lapply to distribute these tasks between the pods. These tasks are basically reading a database through an API, applying a time series model and then sending the results to another API. The application runs normally on the local machine.
However, I see that the process is quite unstable, sometimes it works for a reduced size of models, other times it gives errors, such as:
Error in unserialize(node$con) : error reading from connection
Calls: ... FUN -> recvData -> recvData.SOCK0node -> unserialize
or
Error in unserialize(node$con) :
Failed to retrieve the value of ClusterFuture (future_lapply-106) from cluster RichSOCKnode #1 (PID 2694 on localhost ‘localhost’). The reason reported was ‘error reading from connection’. Post-mortem diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive.
Calls: future_lapply ... resolved -> resolved.ClusterFuture -> receiveMessageFromWorker
The R script is
The ProcessForecast function is a function that I call within the R script through the source('home/rstudio/Application/modules.R'), this function performs the operations I mentioned above using some packages, such as tidyverse, httr, fable, fabletools and so on...
I already run the application with 100, 200, 500 requests (ML models) but I've already gotten these errors with the same workload too...
Can you give any idea what might be going on? That would be really nice, thank you.
Beta Was this translation helpful? Give feedback.
All reactions