Skip to content

Commit

Permalink
Distributed: Propagate package environment to local workers. (#43270)
Browse files Browse the repository at this point in the history
Local workers now inherit the package environment of the main process,
i.e. the active project, LOAD_PATH, and DEPOT_PATH. This behavior
can be overridden by passing the new `env` keyword argument, or by
passing `--project` in the `exeflags` keyword argument.

Fixes #28781, and closes #42089.
  • Loading branch information
fredrikekre authored Feb 25, 2022
1 parent 47416d3 commit 32e4dd1
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ default_addprocs_params() = Dict{Symbol,Any}(
:dir => pwd(),
:exename => joinpath(Sys.BINDIR, julia_exename()),
:exeflags => ``,
:env => [],
:enable_threaded_blas => false,
:lazy => true)

Expand Down
39 changes: 34 additions & 5 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ It is possible to launch multiple processes on a remote host by using a tuple in
workers to be launched on the specified host. Passing `:auto` as the worker count will
launch as many workers as the number of CPU threads on the remote host.
Examples:
**Examples**:
```julia
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
Expand All @@ -76,7 +76,7 @@ addprocs([
])
```
Keyword arguments:
**Keyword arguments**:
* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.
Expand Down Expand Up @@ -445,10 +445,17 @@ end
Launch `np` workers on the local host using the in-built `LocalManager`.
*Keyword arguments:*
Local workers inherit the current package environment (i.e., active project,
[`LOAD_PATH`](@ref), and [`DEPOT_PATH`](@ref)) from the main process.
**Keyword arguments**:
- `restrict::Bool`: if `true` (default) binding is restricted to `127.0.0.1`.
- `dir`, `exename`, `exeflags`, `topology`, `lazy`, `enable_threaded_blas`: same effect
- `dir`, `exename`, `exeflags`, `env`, `topology`, `lazy`, `enable_threaded_blas`: same effect
as for `SSHManager`, see documentation for [`addprocs(machines::AbstractVector)`](@ref).
!!! compat "Julia 1.9"
The inheriting of the package environment and the `env` keyword argument were
added in Julia 1.9.
"""
function addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...)
manager = LocalManager(np, restrict)
Expand All @@ -463,10 +470,32 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
env = Dict{String,String}(params[:env])

# TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function
# together with load_path() etc. Might be useful to have when spawning julia
# processes outside of Distributed.jl too.
# JULIA_(LOAD|DEPOT)_PATH are used to populate (LOAD|DEPOT)_PATH on startup,
# but since (LOAD|DEPOT)_PATH might have changed they are re-serialized here.
# Users can opt-out of this by passing `env = ...` to addprocs(...).
pathsep = Sys.iswindows() ? ";" : ":"
if get(env, "JULIA_LOAD_PATH", nothing) === nothing
env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep)
end
if get(env, "JULIA_DEPOT_PATH", nothing) === nothing
env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep)
end
# Set the active project on workers using JULIA_PROJECT.
# Users can opt-out of this by (i) passing `env = ...` or (ii) passing
# `--project=...` as `exeflags` to addprocs(...).
project = Base.ACTIVE_PROJECT[]
if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing
env["JULIA_PROJECT"] = project
end

for i in 1:manager.np
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
io = open(detach(setenv(cmd, dir=dir)), "r+")
io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
write_cookie(io)

wconfig = WorkerConfig()
Expand Down
107 changes: 107 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,113 @@ for p in procs()
@test @fetchfrom(p, i27429) == 27429
end

# Propagation of package environments for local workers (#28781)
let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp
project = mkdir(joinpath(tmp, "project"))
depots = [mkdir(joinpath(tmp, "depot1")), mkdir(joinpath(tmp, "depot2"))]
load_path = [mkdir(joinpath(tmp, "load_path")), "@stdlib", "@"]
pathsep = Sys.iswindows() ? ";" : ":"
env = Dict(
"JULIA_DEPOT_PATH" => join(depots, pathsep),
"JULIA_LOAD_PATH" => join(load_path, pathsep),
)
setupcode = """
using Distributed, Test
@everywhere begin
depot_path() = DEPOT_PATH
load_path() = LOAD_PATH
active_project() = Base.ACTIVE_PROJECT[]
end
"""
testcode = setupcode * """
for w in workers()
@test remotecall_fetch(depot_path, w) == DEPOT_PATH
@test remotecall_fetch(load_path, w) == LOAD_PATH
@test remotecall_fetch(Base.load_path, w) == Base.load_path()
@test remotecall_fetch(active_project, w) == Base.ACTIVE_PROJECT[]
@test remotecall_fetch(Base.active_project, w) == Base.active_project()
end
"""
# No active project
extracode = """
for w in workers()
@test remotecall_fetch(active_project, w) === Base.ACTIVE_PROJECT[] === nothing
end
"""
cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`, env)
@test success(cmd)
# --project
extracode = """
for w in workers()
@test remotecall_fetch(active_project, w) == Base.ACTIVE_PROJECT[] ==
$(repr(project))
end
"""
cmd = setenv(`$(julia) --project=$(project) -p1 -e $(testcode * extracode)`, env)
@test success(cmd)
# JULIA_PROJECT
cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`,
(env["JULIA_PROJECT"] = project; env))
@test success(cmd)
# Pkg.activate(...)
activateish = """
Base.ACTIVE_PROJECT[] = $(repr(project))
using Distributed
addprocs(1)
"""
cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env)
@test success(cmd)
# JULIA_(LOAD|DEPOT)_PATH
shufflecode = """
d = reverse(DEPOT_PATH)
append!(empty!(DEPOT_PATH), d)
l = reverse(LOAD_PATH)
append!(empty!(LOAD_PATH), l)
"""
addcode = """
using Distributed
addprocs(1) # after shuffling
"""
extracode = """
for w in workers()
@test remotecall_fetch(load_path, w) == $(repr(reverse(load_path)))
@test remotecall_fetch(depot_path, w) == $(repr(reverse(depots)))
end
"""
cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env)
@test success(cmd)
# Mismatch when shuffling after proc addition
failcode = shufflecode * setupcode * """
for w in workers()
@test remotecall_fetch(load_path, w) == reverse(LOAD_PATH) == $(repr(load_path))
@test remotecall_fetch(depot_path, w) == reverse(DEPOT_PATH) == $(repr(depots))
end
"""
cmd = setenv(`$(julia) -p1 -e $(failcode)`, env)
@test success(cmd)
# Passing env or exeflags to addprocs(...) to override defaults
envcode = """
using Distributed
project = mktempdir()
env = Dict(
"JULIA_LOAD_PATH" => LOAD_PATH[1],
"JULIA_DEPOT_PATH" => DEPOT_PATH[1],
)
addprocs(1; env = env, exeflags = `--project=\$(project)`)
env["JULIA_PROJECT"] = project
addprocs(1; env = env)
""" * setupcode * """
for w in workers()
@test remotecall_fetch(depot_path, w) == [DEPOT_PATH[1]]
@test remotecall_fetch(load_path, w) == [LOAD_PATH[1]]
@test remotecall_fetch(active_project, w) == project
@test remotecall_fetch(Base.active_project, w) == joinpath(project, "Project.toml")
end
"""
cmd = setenv(`$(julia) -e $(envcode)`, env)
@test success(cmd)
end end

include("splitrange.jl")

# Run topology tests last after removing all workers, since a given
Expand Down

0 comments on commit 32e4dd1

Please sign in to comment.