Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setcpuaffinity(cmd, cpus) for setting CPU affinity of subprocesses #42469

Merged
merged 23 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ New library functions
---------------------

* `hardlink(src, dst)` can be used to create hard links. ([#41639])
* `setcpuaffinity(cmd, cpus)` can be used to set CPU affinity of sub-processes. ([#42469])

New library features
--------------------
Expand Down
49 changes: 44 additions & 5 deletions base/cmd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,31 @@ struct Cmd <: AbstractCmd
flags::UInt32 # libuv process flags
env::Union{Vector{String},Nothing}
dir::String
cpus::Union{Nothing,Vector{Int}}
tkf marked this conversation as resolved.
Show resolved Hide resolved
Cmd(exec::Vector{String}) =
new(exec, false, 0x00, nothing, "")
Cmd(cmd::Cmd, ignorestatus, flags, env, dir) =
new(exec, false, 0x00, nothing, "", nothing)
Cmd(cmd::Cmd, ignorestatus, flags, env, dir, cpus = nothing) =
new(cmd.exec, ignorestatus, flags, env,
dir === cmd.dir ? dir : cstr(dir))
dir === cmd.dir ? dir : cstr(dir), cpus)
function Cmd(cmd::Cmd; ignorestatus::Bool=cmd.ignorestatus, env=cmd.env, dir::AbstractString=cmd.dir,
cpus::Union{Nothing,Vector{Int}} = cmd.cpus,
tkf marked this conversation as resolved.
Show resolved Hide resolved
detach::Bool = 0 != cmd.flags & UV_PROCESS_DETACHED,
windows_verbatim::Bool = 0 != cmd.flags & UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS,
windows_hide::Bool = 0 != cmd.flags & UV_PROCESS_WINDOWS_HIDE)
flags = detach * UV_PROCESS_DETACHED |
windows_verbatim * UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS |
windows_hide * UV_PROCESS_WINDOWS_HIDE
new(cmd.exec, ignorestatus, flags, byteenv(env),
dir === cmd.dir ? dir : cstr(dir))
dir === cmd.dir ? dir : cstr(dir), cpus)
end
end

has_nondefault_cmd_flags(c::Cmd) =
c.ignorestatus ||
c.flags != 0x00 ||
c.env !== nothing ||
c.dir !== ""
c.dir !== "" ||
c.cpus !== nothing

"""
Cmd(cmd::Cmd; ignorestatus, detach, windows_verbatim, windows_hide, env, dir)
Expand Down Expand Up @@ -114,6 +117,8 @@ function show(io::IO, cmd::Cmd)
print_env = cmd.env !== nothing
print_dir = !isempty(cmd.dir)
(print_env || print_dir) && print(io, "setenv(")
print_cpus = cmd.cpus !== nothing
print_cpus && print(io, "setcpuaffinity(")
print(io, '`')
join(io, map(cmd.exec) do arg
replace(sprint(context=io) do io
Expand All @@ -123,6 +128,11 @@ function show(io::IO, cmd::Cmd)
end, '`' => "\\`")
end, ' ')
print(io, '`')
if print_cpus
print(io, ',')
tkf marked this conversation as resolved.
Show resolved Hide resolved
show(io, cmd.cpus)
print(io, ')')
tkf marked this conversation as resolved.
Show resolved Hide resolved
end
print_env && (print(io, ","); show(io, cmd.env))
print_dir && (print(io, "; dir="); show(io, cmd.dir))
(print_dir || print_env) && print(io, ")")
Expand Down Expand Up @@ -287,6 +297,35 @@ function addenv(cmd::Cmd, env::Vector{<:AbstractString}; inherit::Bool = true)
return addenv(cmd, Dict(k => v for (k, v) in eachsplit.(env, "=")); inherit)
end

"""
setcpuaffinity(original_command::Cmd, cpus) -> command::Cmd

Set the CPU affinity of the `command` by a list of CPU IDs (1-based) `cpus`. Passing
`cpus = nothing` means to unset the CPU affinity if the `original_command` has any.

This is supported on Unix and Windows but not in macOS.

# Examples

In Linux, the `taskset` command line program can be used to see how `setcpuaffinity` works.

```julia
julia> run(setcpuaffinity(`sh -c 'taskset -p \$\$'`, [1, 2, 5]));
pid 2273's current affinity mask: 13
```

Note that the mask value `13` reflects that the first, second, and the fifth bits (counting
from the least significant position) are turned on:

```julia
julia> 0b010011
Copy link
Member

@KristofferC KristofferC Oct 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic required to go from the first command to understand the point of the second feels a bit large here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good point. I added a bit more explanations in the latest commit.

0x13
```
"""
function setcpuaffinity end
setcpuaffinity(cmd::Cmd, ::Nothing) = Cmd(cmd; cpus = nothing)
setcpuaffinity(cmd::Cmd, cpus) = Cmd(cmd; cpus = collect(Int, cpus))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
setcpuaffinity(cmd::Cmd, ::Nothing) = Cmd(cmd; cpus = nothing)
setcpuaffinity(cmd::Cmd, cpus) = Cmd(cmd; cpus = collect(Int, cpus))
setcpuaffinity(cmd::Cmd, ::Nothing) = Cmd(cmd; cpus = nothing)
setcpuaffinity(cmd::Cmd, cpus::AbstractVector{Bool}) = Cmd(cmd; cpus = findall(cpus))
setcpuaffinity(cmd::Cmd, cpus::AbstractVector) = Cmd(cmd; cpus = convert(Vector{UInt16}, cpus))
setcpuaffinity(cmd::Cmd, cpus) = setcpuaffinity(cmd, collect(cpus)::AbstractVector)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against dispatching on Bool element type to change the meaning (in general). It will change the behavior between cpus = Bool[true] and cpus = Integer[true] (or Any[true]). I prefer to add a distinct function like setcpuaffinitymask or maybe a Mask type for supporting this.


(&)(left::AbstractCmd, right::AbstractCmd) = AndCmds(left, right)
redir_out(src::AbstractCmd, dest::AbstractCmd) = OrCmds(src, dest)
redir_err(src::AbstractCmd, dest::AbstractCmd) = ErrOrCmds(src, dest)
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ export
run,
setenv,
addenv,
setcpuaffinity,
success,
withenv,

Expand Down
15 changes: 14 additions & 1 deletion base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,20 @@ end

const SpawnIOs = Vector{Any} # convenience name for readability

as_cpumask(::Nothing) = nothing
function as_cpumask(cpus::Vector{Int})
n = max(maximum(cpus), ccall(:uv_cpumask_size, Cint, ()))
cpumask = zeros(Cchar, n)
tkf marked this conversation as resolved.
Show resolved Hide resolved
for i in cpus
cpumask[i] = true
end
return cpumask
end

# handle marshalling of `Cmd` arguments from Julia to C
@noinline function _spawn_primitive(file, cmd::Cmd, stdio::SpawnIOs)
loop = eventloop()
cpumask = as_cpumask(cmd.cpus)
tkf marked this conversation as resolved.
Show resolved Hide resolved
GC.@preserve stdio begin
iohandles = Tuple{Cint, UInt}[ # assuming little-endian layout
let h = rawhandle(io)
Expand All @@ -89,12 +100,14 @@ const SpawnIOs = Vector{Any} # convenience name for readability
err = ccall(:jl_spawn, Int32,
(Cstring, Ptr{Cstring}, Ptr{Cvoid}, Ptr{Cvoid},
Ptr{Tuple{Cint, UInt}}, Int,
UInt32, Ptr{Cstring}, Cstring, Ptr{Cvoid}),
UInt32, Ptr{Cstring}, Cstring, Ptr{Cchar}, Csize_t, Ptr{Cvoid}),
tkf marked this conversation as resolved.
Show resolved Hide resolved
file, exec, loop, handle,
iohandles, length(iohandles),
flags,
env === nothing ? C_NULL : env,
isempty(dir) ? C_NULL : dir,
cpumask === nothing ? C_NULL : cpumask,
cpumask === nothing ? 0 : length(cpumask),
@cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32)))
end
if err != 0
Expand Down
1 change: 1 addition & 0 deletions doc/src/base/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ Base.Cmd
Base.setenv
Base.addenv
Base.withenv
Base.setcpuaffinity
Base.pipeline(::Any, ::Any, ::Any, ::Any...)
Base.pipeline(::Base.AbstractCmd)
Base.Libc.gethostname
Expand Down
7 changes: 4 additions & 3 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ JL_DLLEXPORT void jl_uv_disassociate_julia_struct(uv_handle_t *handle)
JL_DLLEXPORT int jl_spawn(char *name, char **argv,
uv_loop_t *loop, uv_process_t *proc,
uv_stdio_container_t *stdio, int nstdio,
uint32_t flags, char **env, char *cwd, uv_exit_cb cb)
uint32_t flags, char **env, char *cwd, char* cpumask,
size_t cpumask_size, uv_exit_cb cb)
{
uv_process_options_t opts = {0};
opts.stdio = stdio;
Expand All @@ -300,8 +301,8 @@ JL_DLLEXPORT int jl_spawn(char *name, char **argv,
// unused fields:
//opts.uid = 0;
//opts.gid = 0;
//opts.cpumask = NULL;
//opts.cpumask_size = 0;
opts.cpumask = cpumask;
opts.cpumask_size = cpumask_size;
opts.cwd = cwd;
opts.args = argv;
opts.stdio_count = nstdio;
Expand Down
42 changes: 42 additions & 0 deletions test/print_process_affinity.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

const pthread_t = Culong
tkf marked this conversation as resolved.
Show resolved Hide resolved
const uv_thread_t = pthread_t

function uv_thread_getaffinity()
masksize = ccall(:uv_cpumask_size, Cint, ())
self = ccall(:uv_thread_self, uv_thread_t, ())
selfref = Ref(self)
cpumask = zeros(Cchar, masksize)
err = ccall(
:uv_thread_getaffinity,
Cint,
(Ptr{uv_thread_t}, Ptr{Cchar}, Cssize_t),
selfref,
tkf marked this conversation as resolved.
Show resolved Hide resolved
cpumask,
masksize,
)
@assert err == 0
tkf marked this conversation as resolved.
Show resolved Hide resolved
n = findlast(isone, cpumask)
resize!(cpumask, n)
tkf marked this conversation as resolved.
Show resolved Hide resolved
return cpumask
end

function print_process_affinity()
isfirst = true
for (i, m) in enumerate(uv_thread_getaffinity())
if m != 0
if isfirst
isfirst = false
else
print(",")
end
print(i)
end
end
tkf marked this conversation as resolved.
Show resolved Hide resolved
println()
end

if Base.Filesystem.samefile(PROGRAM_FILE, @__FILE__)
print_process_affinity()
end
8 changes: 8 additions & 0 deletions test/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2325,3 +2325,11 @@ end
@test_repr "T[1;;; 2 3;;; 4]"
@test_repr "T[1;;; 2;;;; 3;;; 4]"
end

@testset "Cmd" begin
@test sprint(show, `true`) == "`true`"
@test sprint(show, setenv(`true`, "A" => "B")) == """setenv(`true`,["A=B"])"""
@test sprint(show, setcpuaffinity(`true`, [1, 2])) == "setcpuaffinity(`true`,[1, 2])"
@test sprint(show, setenv(setcpuaffinity(`true`, [1, 2]), "A" => "B")) ==
"""setenv(setcpuaffinity(`true`,[1, 2]),["A=B"])"""
tkf marked this conversation as resolved.
Show resolved Hide resolved
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find where show(::IO, ::Cmd) was tested. It looks like the original commit dee7f6c didn't add the test for show with setenv. So, I just threw this in for completeness.

16 changes: 12 additions & 4 deletions test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no thr
end
end

function run_with_affinity(cpus)
script = joinpath(@__DIR__, "print_process_affinity.jl")
return readchomp(setcpuaffinity(`$(Base.julia_cmd()) $script`, cpus))
end

# issue #34415 - make sure external affinity settings work
if Sys.islinux()
const SYS_rrcall_check_presence = 1008
global running_under_rr() = 0 == ccall(:syscall, Int,
(Int, Int, Int, Int, Int, Int, Int),
SYS_rrcall_check_presence, 0, 0, 0, 0, 0, 0)
if Sys.CPU_THREADS > 1 && Sys.which("taskset") !== nothing && !running_under_rr()
run_with_affinity(spec) = readchomp(`taskset -c $spec $(Base.julia_cmd()) -e "run(\`taskset -p \$(getpid())\`)"`)
@test endswith(run_with_affinity("1"), "2")
@test endswith(run_with_affinity("0,1"), "3")
else
global running_under_rr() = false
end
if Sys.islinux() || Sys.iswindows() || Sys.isfreebsd()
if Sys.CPU_THREADS > 1 && !running_under_rr()
tkf marked this conversation as resolved.
Show resolved Hide resolved
@test run_with_affinity([2]) == "2"
@test run_with_affinity([1, 2]) == "1,2"
end
end

Expand Down