Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idleworker() function. #14736

Closed
wants to merge 2 commits into from
Closed

Conversation

samoconnor
Copy link
Contributor

idleworker() returns a worker that is not currently busy (not being waited for by remotecall_fetch or remotecall_wait).

If there are no idle workers, idleworker() blocks until a worker is available.

This is intended as a more general interface for the type of dynamic scheduling provided by pmap.
e.g. pmap might eventually be a simple combination of amap() and idleworker() as described in #14843...

remote(f, args...) = (args...) -> remotecall_fetch(f, idleworker(), args...)
pmap(f, c...) = amap(remote(f), c...; max=nworkers())

Implementation notes:

  • Uses the existing RemoteValue.waitingfor field to identify busy workers.
  • Fixes remotecall_fetch and remotecall_wait to set rv.waitingfor back to 0 after waiting.
  • Adds ProcessGroup.worker_is_idle::Condition to enable waiting for multiple busy workers. idleworker() waits on worker_is_idle. remotecall_fetch and remotecall_wait notify worker_is_idle just after setting rv.waitingfor = 0.

@tkelman tkelman added the needs tests Unit tests are required for this change label Jan 20, 2016
@samoconnor
Copy link
Contributor Author

I'm not sure how to proceed with writing test cases for this.
As far as I understand it, the test infrastructure uses pmap.
I assume that this means that the state of workers() and their busyness or otherwise is dependant on the other tests that are running.
Is there a standard approach to writing test code that needs to have control over all the workers?

@tkelman
Copy link
Contributor

tkelman commented Jan 20, 2016

see test/parallel_exec.jl

edit: sorry, I could've sworn we used to have it set up so parallel.jl waited until other test workers were done then ran at the end, but it looks like that might not be the case any more. hard to tell

@tkelman tkelman added parallelism Parallel or distributed computation and removed needs tests Unit tests are required for this change labels Jan 20, 2016
@samoconnor
Copy link
Contributor Author

see test/parallel_exec.jl

It looks like test/parallel.jl spawns a whole new Julia runtime to execute test/parallel_exec.jl so the workers() should be private to that test regardless of execution order.

@samoconnor
Copy link
Contributor Author

CI passing now.

There may well be a more elegant way to implement idleworker(). This 1st attempt is intended to be minimally disruptive.

Any comments on the interface before I continue to move in the direction described here : #12943 (comment)

@StefanKarpinski, @JeffBezanson, @jakebolewski, @amitmurthy ?


w = workers()
@test length(w) == 3
t1 = now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format here is very weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yuyichao, I apologise if this looks weird to you.
I found that putting the temporal comments and assertions into a seperate column made it easier to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the code formatting guideline.

@amitmurthy
Copy link
Contributor

  1. Since we allow all-to-all communication, checking waitingfor on the calling process will not be enough to identify "busy" workers. Worker m could have spawned a fetch from worker n.
  2. Remote references are returned by the @spawn macros as also remotecall. They may not be waited upon at the time of checking waitingfor

@samoconnor
Copy link
Contributor Author

Hi @amitmurthy,

Thanks for the feedback.

Re: 1.

I suspect that with all-to-all communication there is no way to identify an idle worker. There would always be a race condition where some other node makes the "idle" node busy right after it is identified as idle.
However, it seems that the pmap style of central dispatch is a common enough use pattern to warrant supporting functions. Some users will want domain-specific all-to-all dispatch and scheduling algorithms, but those users will have to really understand what they are doing to be efficient and to avoid deadlocks. I think the most common use case is likely to involve worker nodes processing work for a central controller.

What if this instead of being called idleworker() this function was just called worker()?
The documentation could say that best efforts will be made to return a worker that is not busy; and that the returned worker is not locally being waited for by remotecall_fetch or remotecall_wait.

Re: 2.

I suppose that it wouldn't be too hard to have a flag to keep track of which workers are busy with locally originated asynchronous remotecalls or @spawns.

I wonder wether it might not be better to encourage use of the blocking remotecall_fetch mechanism with @async if asynchronous behaviour is needed. This would be consistent with the ::IO subsystem where (nearly) everything is blocking but can be wrapped in @async.

In an case, what I'm aiming for here is a function to call when I want to ask for "a worker that I haven't already given a job to." I'm open to suggestions about what the function should be called.

@samoconnor
Copy link
Contributor Author

@amitmurthy, another thing to note is that this PR does not export idleworker.
My initial intention was to put the pieces in place for a cleaner version of #12943 without exporting anything new.

Perhaps we can leave the design of a public "get me a worker" API for later.

@amitmurthy
Copy link
Contributor

Wouldn't it be simpler to run off a Q at an application level?

A contrived example:

addprocs(4)

# Create a Q accessible remotely
rr = RemoteChannel(()->Channel(128))

# Make it global on all workers
for p in procs()
    @spawnat p global WorkQ = rr
end

# mark initial availability
for p in workers()
   @spawnat p put!(WorkQ, myid())
end

# Find a free worker, assign work, and have worker mark availability again.
for i in rand(1:10, 20)
     p = take!(WorkQ)
     remotecall(t->(println("sleep : ", t, " seconds"); sleep(t); println("DONE!"); put!(WorkQ, myid()); nothing), p, i)
end

@samoconnor
Copy link
Contributor Author

Hi @amitmurthy, using a shared queue to keep track of available workers makes a lot of sense.

What to you think of having a built-in default worker queue something like this...

type WorkerPool
    channel::RemoteChannel{Channel{Int}}
end

function WorkerPool(workers::Vector{Int})

    # Create a shared queue of workers...
    pool = RemoteChannel(()->Channel{Int}(128)) 

    # Check that workers are not already part of a pool...
    check = () -> if :_worker_pool in names(Main)
        error("Worker $(myid()) already in a WorkerPool!")
    end
    foreach(fetch, [@spawnat w check() for w in workers])

    # Put each worker into the pool...
    for w in workers
        put!(pool, w)
        @spawnat w global _worker_pool = pool
    end

    WorkerPool(pool)
end

WorkerPool(n::Integer) = WorkerPool(addprocs(n))
WorkerPool() = WorkerPool(addprocs())

Base.take!(pool::WorkerPool) = take!(pool.channel)

function Base.remotecall_fetch(f, pool::WorkerPool, args...)
    l = (args...)->try f(args...) finally put!(_worker_pool, myid()) end
    remotecall_fetch(l, take!(pool), args...)
end

default_worker_pool() = _default_worker_pool
global _default_worker_pool = WorkerPool(workers())

function Base.remotecall_fetch(f, args...)
    remotecall_fetch(f, default_worker_pool(), args...)
end

@amitmurthy
Copy link
Contributor

A worker pool will definitely be useful. I suspect there will be some debate about whether to include it in Base or have it as part of an external package.

Since we do not yet have a "standard library" or "standard packages", I am OK with having it in Base for now.

@samoconnor
Copy link
Contributor Author

superseded by #15073

@samoconnor samoconnor closed this Feb 16, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants