Skip to content

Commit

Permalink
Adds interpolation syntax to Threads.@spawn, to evaluate arguments …
Browse files Browse the repository at this point in the history
…immediately (#33119)

Adds $-interpolation syntax to `@async` and `Threads.@spawn`, to evaluate arguments immediately.

Add the ability to evaluate some parts of a `@spawn`/`@async`
immediately, in the current thread context.

This prevents variables being "boxed" in order to capture them in the
closure, exactly the same as wrapping them in a let-block locally.

For example, `$x` expands like this:
```julia
julia> @macroexpand @async $x + 2
quote
    #= task.jl:361 =#
    let var"##454" = x
        #= task.jl:362 =#
        local var"#9#task" = Base.Task((()->begin
                            #= task.jl:358 =#
                            var"##454" + 2
                        end))
        #= task.jl:363 =#
        if $(Expr(:islocal, Symbol("##sync#95")))
            #= task.jl:364 =#
            Base.push!(var"##sync#95", var"#9#task")
        end
        #= task.jl:366 =#
        Base.schedule(var"#9#task")
        #= task.jl:367 =#
        var"#9#task"
    end
end
```
  • Loading branch information
NHDaly authored and JeffBezanson committed Dec 18, 2019
1 parent 5da74be commit 0df3fe7
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 11 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Language changes
Multi-threading changes
-----------------------

* Values can now be interpolated into `@async` and `@spawn` via `$`, which copies the value directly into the constructed
underlying closure. ([#33119])

Build system changes
--------------------
Expand Down
48 changes: 43 additions & 5 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,58 @@ end
@async
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the aysnchronous code from changes to the variable's value in the current task.
!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
"""
macro async(expr)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(sync_varname)
quote
local task = Task($thunk)
if $(Expr(:islocal, var))
push!($var, task)
let $(letargs...)
local task = Task($thunk)
if $(Expr(:islocal, var))
push!($var, task)
end
schedule(task)
task
end
end
end

# Capture interpolated variables in $() and move them to let-block
function _lift_one_interp!(e)
letargs = Any[] # store the new gensymed arguments
_lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
letargs
end
_lift_one_interp_helper(v, _, _) = v
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
if expr.head == :$
if in_quote_context # This $ is simply interpolating out of the quote
# Now, we're out of the quote, so any _further_ $ is ours.
in_quote_context = false
else
newarg = gensym()
push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
return newarg # Don't recurse into the lifted $() exprs
end
schedule(task)
task
elseif expr.head == :quote
in_quote_context = true # Don't try to lift $ directly out of quotes
end
for (i,e) in enumerate(expr.args)
expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
end
expr
end


# add a wait-able object to the sync pool
macro sync_add(expr)
var = esc(sync_varname)
Expand Down
23 changes: 17 additions & 6 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,33 @@ Create and run a [`Task`](@ref) on any available thread. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref)
to wait and then obtain its return value.
Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the aysnchronous code from changes to the variable's value in the current task.
!!! note
This feature is currently considered experimental.
!!! compat "Julia 1.3"
This macro is available as of Julia 1.3.
!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
"""
macro spawn(expr)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
local task = Task($thunk)
task.sticky = false
if $(Expr(:islocal, var))
push!($var, task)
let $(letargs...)
local task = Task($thunk)
task.sticky = false
if $(Expr(:islocal, var))
push!($var, task)
end
schedule(task)
task
end
schedule(task)
task
end
end
83 changes: 83 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,86 @@ catch ex
@test ex isa LoadError
@test ex.error isa ArgumentError
end

@testset "@spawn interpolation" begin
# Issue #30896: evaluating argumentss immediately
begin
outs = zeros(5)
@sync begin
local i = 1
while i <= 5
Threads.@spawn setindex!(outs, $i, $i)
i += 1
end
end
@test outs == 1:5
end

# Args
@test fetch(Threads.@spawn 2+$2) == 4
@test fetch(Threads.@spawn Int($(2.0))) == 2
a = 2
@test fetch(Threads.@spawn *($a,$a)) == a^2
# kwargs
@test fetch(Threads.@spawn sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1]
@test fetch(Threads.@spawn sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1]

# Supports multiple levels of interpolation
@test fetch(Threads.@spawn "$($a)") == "$a"
let a = 1
# Interpolate the current value of `a` vs the value of `a` in the closure
t = Threads.@spawn :(+($$a, $a, a))
a = 2 # update `a` after spawning
@test fetch(t) == Expr(:call, :+, 1, 2, :a)
end

# Test the difference between different levels of interpolation
let
oneinterp = Vector{Any}(undef, 5)
twointerps = Vector{Any}(undef, 5)
@sync begin
local i = 1
while i <= 5
Threads.@spawn setindex!(oneinterp, :($i), $i)
Threads.@spawn setindex!(twointerps, :($($i)), $i)
i += 1
end
end
# The first definition _didn't_ escape i
@test oneinterp == fill(6, 5)
# The second definition _did_ escape i
@test twointerps == 1:5
end
end

@testset "@async interpolation" begin
# Args
@test fetch(@async 2+$2) == 4
@test fetch(@async Int($(2.0))) == 2
a = 2
@test fetch(@async *($a,$a)) == a^2
# kwargs
@test fetch(@async sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1]
@test fetch(@async sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1]

# Supports multiple levels of interpolation
@test fetch(@async :($a)) == a
@test fetch(@async :($($a))) == a
@test fetch(@async "$($a)") == "$a"
end

# errors inside @threads
function _atthreads_with_error(a, err)
Threads.@threads for i in eachindex(a)
if err
error("failed")
end
a[i] = Threads.threadid()
end
a
end
@test_throws TaskFailedException _atthreads_with_error(zeros(nthreads()), true)
let a = zeros(nthreads())
_atthreads_with_error(a, false)
@test a == [1:nthreads();]
end

0 comments on commit 0df3fe7

Please sign in to comment.