Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug or misuse? Exceptions not propagating through multithreaded channels #34262

Closed
tro3 opened this issue Jan 5, 2020 · 8 comments
Closed

Bug or misuse? Exceptions not propagating through multithreaded channels #34262

tro3 opened this issue Jan 5, 2020 · 8 comments
Labels
multithreading Base.Threads and related functionality

Comments

@tro3
Copy link

tro3 commented Jan 5, 2020

In the following code, one task throws an error that does not propagate to terminate the whole routine. But the task itself ends and never produces a result for the Channel, so the receiver waits forever for the last result. Is this an error in the exception handling or a flagrant violation of channel / thread usage in some way?

julia> function work(x)
           t1 = time() + 1
           @info "Working on $x (Thread $(Threads.threadid()))"
           while time() < t1 #Blocking wait
               nothing
           end
           x == 9 && error("Error!")
           @info "Finished $x"
           return x
       end
work (generic function with 1 method)

julia> channel = Channel{Int}(8)
Channel{Int64}(sz_max:8,sz_curr:0)

julia> begin
         for i in 1:10
           Threads.@spawn put!(channel, work(i))
         end
         for i in 1:10
           println(take!(channel))
         end
       end
[ Info: Working on 4 (Thread 4)
[ Info: Working on 3 (Thread 3)
[ Info: Working on 1 (Thread 6)
[ Info: Working on 7 (Thread 8)
[ Info: Working on 2 (Thread 5)
[ Info: Working on 6 (Thread 7)
[ Info: Working on 10 (Thread 3)
[ Info: Working on 5 (Thread 2)
[ Info: Working on 9 (Thread 4)
[ Info: Working on 8 (Thread 1)
[ Info: Finished 1
[ Info: Finished 4
[ Info: Finished 6
[ Info: Finished 5
[ Info: Finished 2
[ Info: Finished 3
[ Info: Finished 8
[ Info: Finished 7
[ Info: Finished 10
8
5
6
4
2
1
7
3
10
[waits forever on take!(channel), looking for a 9]
@ViralBShah ViralBShah added the multithreading Base.Threads and related functionality label Jan 5, 2020
@tro3
Copy link
Author

tro3 commented Jan 5, 2020

Sorry, forgot the version info:

Julia Version 1.3.0
Commit 46ce4d7933 (2019-11-26 06:09 UTC)
Platform Info:
  OS: Windows (x86_64-w64-mingw32)
  CPU: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 8

@tkf
Copy link
Member

tkf commented Jan 5, 2020

I think this is a dup of #32677. A possible solution is

@sync begin
    @async try
        @sync for i in 1:10
            Threads.@spawn put!(channel, work(i))
        end
    finally
        close(channel)
    end
    @async for i in 1:10
        println(take!(channel))
    end
end

I think we need structured concurrency #33248 to get rid of this kind of problem.

@KristofferC
Copy link
Member

Also, see #34198

@tro3
Copy link
Author

tro3 commented Jan 5, 2020

I think this is a dup of #32677.

Could be. I didn't find it in a pre-submit search as I was focused on the threading instead of the tasking. The conversation in that thread is focused on the task, by the way, but I think the issue (common to both threads) is the Channel. I already had an original (not-heavily-tested) solution without Channels and I was trying to upgrade to Channels when I ran across this. (The main issue with my original solution was that it was more complex and required some "load balancing" in thread 1, which gets less effective when the scheduler tosses a heavy-load task onto the same thread.)

I'll try to see if an MWE of my original solution, mapped to the same test case above, sheds any light.

@tro3
Copy link
Author

tro3 commented Jan 5, 2020

Okay, here is an MWE of my original solution (kind of), using the same error-generating work function above. It sadly forces local event loops, but the reason it works is that each Task gets explicitly fetched, which gives the Exception a chance to propagate. But a Channel is simply receiving data and can't really do that, so I think my statement above about it being the Channel is incorrect. I don't know if this provides you guys with any insight, but hopefully it is useful. I have some threading experience from days gone by, and I am happy to help if there is something you guys want to offload.

const TaskSlot = Union{Task,Nothing}
const TaskPool = Vector{TaskSlot}
TaskPool() = TaskSlot[nothing for i=1:Threads.nthreads()]
isslotready(x) = isnothing(x) || istaskdone(x)
isslotdone(x) = !isnothing(x) && istaskdone(x)

function put!(pool::TaskPool, fn, args...)
    while !any(isslotready, pool)
        sleep(0.01)
    end
    ind = findfirst(isslotready, pool)
    pool[ind] = Threads.@spawn fn(args...)
end

function take!(pool::TaskPool)
    while !any(isslotdone, pool)
        sleep(0.01)
    end
    ind = findfirst(isslotdone, pool)
    result = fetch(pool[ind])
    pool[ind] = nothing
    return result
end


channel = TaskPool()
@async for i in 1:10
    put!(channel, work, i)
end
for i in 1:10
    println(take!(channel))
end

@tro3
Copy link
Author

tro3 commented Jan 5, 2020

@KristofferC , from #34198:

However, in many cases one of the tasks produces data for the others, so if that task errors, the remaining tasks will wait forever.

You're right - that does sound pretty related...

@tkf
Copy link
Member

tkf commented Jan 5, 2020

Could be. I didn't find it in a pre-submit search as I was focused on the threading instead of the tasking.

Your OP example has the same bug if you replace Threads.@spawn with @async.

@tro3
Copy link
Author

tro3 commented Jan 5, 2020

Sold. I’ll close this and watch the other.

@tro3 tro3 closed this as completed Jan 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality
Projects
None yet
Development

No branches or pull requests

4 participants