Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slow queueing of tasks when n > workers #293

Closed
r2evans opened this issue Mar 25, 2019 · 4 comments
Closed

slow queueing of tasks when n > workers #293

r2evans opened this issue Mar 25, 2019 · 4 comments

Comments

@r2evans
Copy link

r2evans commented Mar 25, 2019

First things first, nice package! Thank you.

Up Front: is there a clean way to return "immediately" to the prompt when queueing more jobs than workers?

I notice when queueing multiple tasks (more than the number of workers) that the time-to-queue is not "immediate". I don't think it should be instantaneous (esp since in my actual workflow the data transfer is a factor), but it seems to be waiting until previous tasks are complete before sending the next. I'm not certain if I'm misunderstanding the expected sequence or perhaps where the delay resides.

Working example:

library(future)
d <- data.frame(id = 1:7, val = 11:17)
somefunc <- function(a) { Sys.sleep(10); a$val[1] * 1:3; }
system.time({
  v <- by(d, d$id, function(z) { future({ somefunc(z) }) })
})
#    user  system elapsed 
#    0.03    0.00   21.42 
sapply(v, resolved)
    1     2     3     4     5     6     7 
#  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE FALSE 

It's the "21 seconds" that seems odd to me. In this example, data-transfer is not a factor. What I think is contributing to this are the following:

  • the first 3 (in this example) tasks are queued, all other tasks must wait for a worker to be available in order to be sent to them
  • once the second "batch" of 3 tasks have been sent to workers, then the first one to finish allows the 7th and final task to be sent to a worker, allowing the initial call to return

This operation should take 30 seconds, but I had hoped that the leading R console would return to its prompt in much less than 20 seconds. Is there an option or method already that supports "queue all tasks and return as soon as feasible"? If not, would it make sense?

I realize that job queueing in this fashion relies on knowing which worker will finish first ... not something that you can know ahead of time (ergo a difference between parallel::parLapply and parallel::parLapplyLB).

A complicated option comes to mind, using a background controller: one of the workers is really a coordinator, and I send all data/jobs to that one, who then handles coordinating jobs with all of the remaining workers. I imagine a multi-layer future hierarchy could do this, but am I over-thinking things?

@HenrikBengtsson
Copy link
Collaborator

Hi, thks for feedback/questions. Before anything else, what plan() do you set and what does nbrOfWorkers() report after that?

@r2evans
Copy link
Author

r2evans commented Mar 25, 2019

Of course, I missed that in the example.

cl <- makeClusterPSOCK(3)
plan(cluster, workers = cl, gc = TRUE)
nbrOfWorkers()
# [1] 3

(My impetus for using cluster is so that I can use parallel::clusterEvalQ to push something to all workers.)

@HenrikBengtsson
Copy link
Collaborator

I don't think there is any "slowness" in the "queueing" - the delays you are seeing is from the blocking due to all workers being occupied with other tasks. As soon as they're done, the next future will be resolved - but there's no other delay/slowness involved. Here's a simplified version of your example. It creates seven futures which are process in parallel by three workers.

library(future)
cl <- plan(multisession, workers = 3)

fs <- list()
for (ii in 1:7) {
  fs[[ii]] <- future({ Sys.sleep(10); list(ii = ii, time = Sys.time()) })
}
vs <- values(fs)
str(vs)
## List of 7
##  $ :List of 2
##   ..$ ii  : int 1
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:23"
##  $ :List of 2
##   ..$ ii  : int 2
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:23"
##  $ :List of 2
##   ..$ ii  : int 3
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:23"
##  $ :List of 2
##   ..$ ii  : int 4
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:33"
##  $ :List of 2
##   ..$ ii  : int 5
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:33"
##  $ :List of 2
##   ..$ ii  : int 6
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:34"
##  $ :List of 2
##   ..$ ii  : int 7
##   ..$ time: POSIXct[1:1], format: "2019-03-25 11:58:44"

This works as expected and by design. The results show that the first three futures are process in parallel immediately because all three workers are free. When the fourth future is created it blocks because there are no available workers. It will block until any of the workers become available to take on new tasks. This means that the for loop will also block on ii=4. So, after approx 10 seconds, all three workers have resolved the first three futures, and can take on new work. That will process the next round of three futures. The seventh future (and iteration ii=7 will therefore block until one of the workers is free again.

To your question:

Up Front: is there a clean way to return "immediately" to the prompt when queueing more jobs than workers?
...
Is there an option or method already that supports "queue all tasks and return as soon as feasible"? If not, would it make sense?

The Future API or the future framework itself does not provide a queueing mechanism. As you realized, that would require being able to run a "scheduler/event loop" in the background that orchestrates pull futures from the queue and distribute them to workers and so on. Job schedulers can be quite complicated and don't think that is a task for the future framework. Instead, you might be able to build something on top of the future framework that implement such a mechanism. You already propose a first poor man's implementation for this where you run a simple background process that orchestrates a queue. Also, not 100% sure though, but I think I recall some discussion on having "job queues" in the drake package. Then there's world of job schedulers from the HPC world, e.g. SGE, Torque/PBS, and Slurm. They're designed to handle these type of tasks in a proper way with resource managements etc. So if you already have the latter set up on your system, you can indeed make use of those via the future.batchtools backend. That would look like something:

library(future.batchtools)
plan(batchtools_sge)
nbrOfWorkers()
## [1] Inf

The way to interpret this is that the future framework will treat the backend job scheduler to have an infinite number of workers by default. That means futures will not block upon creation. Instead, when created, they'll be submitted to the job scheduler and block on its queue instead.

Does this answer your questions?

@r2evans
Copy link
Author

r2evans commented Mar 25, 2019

Yes, it absolutely answers my questions. It (1) confirms my belief, stemming from what I think is a correct interpretation of a simple/elegant design; and (2) adding a "queue manager" would be the way to get what I was suggesting, but it does not yet exist and would need careful thought and planning.

I'm working future more into my workflow, so as my familiarity with it grows and my "annoyance with the wait" grows (not annoyance, I hope you know what I mean), perhaps I'll have a few spare moments to look more at how I could control it. I understand how it would fit into future.batchtools, though I wonder if it is really a meta-batchtools, because theoretically it could sit in front of many other types of plans.

I'm assuming that attempting this would result in a double-copy of all data, is that right? As I said before, my workflow includes data where this transfer is not insignificant). Granted, I'm using parallelism because the calc time is much higher than data-transfer, so I believe "2 times transfer time" will still be sufficiently under calc-time as to be acceptable.

Thanks again, Henrik! I appreciate the quick and thorough replies.

(BTW: my hope is to push this job I'm working on onto a server with 40 cores and 256GB ram, where I really will have more workers than jobs. That'll be nice, your package makes that effort much easier than I'd jerry-rigged with parallel:: and friends.)

@r2evans r2evans closed this as completed Mar 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants