Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: a work stealing threaded for loop #21017

Open
skariel opened this issue Mar 14, 2017 · 15 comments
Open

Feature request: a work stealing threaded for loop #21017

skariel opened this issue Mar 14, 2017 · 15 comments
Labels
multithreading Base.Threads and related functionality

Comments

@skariel
Copy link
Contributor

skariel commented Mar 14, 2017

The current @threads for loop breaks the work into equal length chunks and runs them on available pooled threads. This could be improved by using a work stealing technique like the one used by Rust Rayon in which the work to be done is queued and threads that have some free time can pop work from this queue.

Work stealing can result in faster parallel execution due to these significant advantages:

  1. More efficient cache utilization: all threads use similar memory locations. Unlike the current implementation where memory is divided into large chunks with work stealing each thread works on a single item per time

  2. Load balancing: when an item takes more time to process other threads will not wait for it to finish, instead they will continue stealing work. In the current implementation one thread can finish all work significantly faster than others and just wait for all to finish instead of working

Haven't written an implementation yet, just wanted to start a discussion

@timholy
Copy link
Member

timholy commented Mar 14, 2017

The current implementation can be used to implement this, of course; see ImageFiltering, which queues up a task list and then uses @threads to iterate over the list, each thread grabbing the next task.

@andreasnoack andreasnoack added the multithreading Base.Threads and related functionality label Mar 14, 2017
@JeffBezanson
Copy link
Member

This is on our roadmap for multithreading.

@JeffBezanson
Copy link
Member

Admittedly it's not obvious, but this is implied by issues such as #14494 and #18335. We could use some refactoring of the issues on this topic.

@kpamnany
Copy link
Contributor

Standard work stealing does not improve cache efficiency, on the contrary. You need some very clever tricks for locality awareness to get that. See The Data Locality of Work Stealing for instance.

Load balancing, for sure.

But I'm hoping we'll get the best of all worlds with PDF scheduling. Coming soon!

@miguelraz
Copy link
Contributor

This sounds like cool work!
@JeffBezanson, would it be in order if I opened up a post on a general roadmap/to-do on Discourse with regards to future design of multithreading?
Some of us on Gitter today where wondering if there was a handy reference for the roadmap you mention.
This could help keeping up with the developing ideas.
Thanks!

@Sacha0
Copy link
Member

Sacha0 commented Mar 23, 2017

(@kpamnany, thanks for sharing the parallel depth first (PDF) scheduling paper! If you would recommend additional material along those lines, please share!)

yuyichao added a commit that referenced this issue Apr 20, 2017
* Print a warning if an error occurs in the threaded loop (Helps #17532)
* Make recursive threaded loops "work" (Fix #18335).

  The proper fix will be tracked by #21017
yuyichao added a commit that referenced this issue Apr 20, 2017
* Print a warning if an error occurs in the threaded loop (Helps #17532)
* Make recursive threaded loops "work" (Fix #18335).

  The proper fix will be tracked by #21017
yuyichao added a commit to yuyichao/julia that referenced this issue Apr 20, 2017
* Print a warning if an error occurs in the threaded loop (Helps JuliaLang#17532)
* Make recursive threaded loops "work" (Fix JuliaLang#18335).

  The proper fix will be tracked by JuliaLang#21017
yuyichao added a commit to yuyichao/julia that referenced this issue Apr 20, 2017
* Print a warning if an error occurs in the threaded loop (Helps JuliaLang#17532)
* Make recursive threaded loops "work" (Fix JuliaLang#18335).

  The proper fix will be tracked by JuliaLang#21017
yuyichao added a commit to yuyichao/julia that referenced this issue Apr 21, 2017
* Print a warning if an error occurs in the threaded loop (Helps JuliaLang#17532)
* Make recursive threaded loops "work" (Fix JuliaLang#18335).

  The proper fix will be tracked by JuliaLang#21017
yuyichao added a commit that referenced this issue Apr 21, 2017
* Print a warning if an error occurs in the threaded loop (Helps #17532)
* Make recursive threaded loops "work" (Fix #18335).

  The proper fix will be tracked by #21017
yuyichao added a commit that referenced this issue Apr 21, 2017
* Print a warning if an error occurs in the threaded loop (Helps #17532)
* Make recursive threaded loops "work" (Fix #18335).

  The proper fix will be tracked by #21017
yuyichao added a commit to yuyichao/julia that referenced this issue Apr 21, 2017
* Print a warning if an error occurs in the threaded loop (Helps JuliaLang#17532)
* Make recursive threaded loops "work" (Fix JuliaLang#18335).

  The proper fix will be tracked by JuliaLang#21017
@Balinus
Copy link

Balinus commented Jun 5, 2017

I did some test with nested threaded loops and performance is severely degraded. Here's the code and benchmarking. The code is taken from the tests of the save nested thread fix.

edit - I assumed that 0.6rc2 is including the "Band aid". Perhaps not?

versioninfo()
Julia Version 0.6.0-rc2.0
Commit 68e911be53* (2017-05-18 02:31 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i5-4570 CPU @ 3.20GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, haswell)
function testnestedloop()
  a = zeros(Int, 10000, 10000)
  Threads.@threads for i in 1:10000
    Threads.@threads for j in 1:10000
      a[j, i] = i + j
    end
  end
  return a
end

function testloop()
  a = zeros(Int, 10000, 10000)
  for i in 1:10000
    for j in 1:10000
      a[j, i] = i + j
    end
  end
  return a
end
@benchmark testnestedloop()
BenchmarkTools.Trial: 
  memory estimate:  5.05 GiB
  allocs estimate:  236655143
  --------------
  minimum time:     13.538 s (7.93% GC)
  median time:      13.538 s (7.93% GC)
  mean time:        13.538 s (7.93% GC)
  maximum time:     13.538 s (7.93% GC)
  --------------
  samples:          1
  evals/sample:     1

vs.

@benchmark testloop()
BenchmarkTools.Trial: 
  memory estimate:  762.94 MiB
  allocs estimate:  2
  --------------
  minimum time:     155.824 ms (1.54% GC)
  median time:      163.314 ms (1.47% GC)
  mean time:        170.175 ms (5.50% GC)
  maximum time:     233.299 ms (31.00% GC)
  --------------
  samples:          30
  evals/sample:     1

Getting rid of one of the Threads.@threads bring back the performance.

function testnestedloop()
         a = zeros(Int, 10000, 10000)
         Threads.@threads for i in 1:10000
        for j in 1:10000
             a[j, i] = i + j
           end
         end
         return a
       end
testnestedloop (generic function with 1 method)

julia> @benchmark testnestedloop()
BenchmarkTools.Trial: 
  memory estimate:  762.94 MiB
  allocs estimate:  3
  --------------
  minimum time:     149.190 ms (0.00% GC)
  median time:      157.355 ms (0.00% GC)
  mean time:        171.763 ms (8.29% GC)
  maximum time:     261.659 ms (33.20% GC)
  --------------
  samples:          30
  evals/sample:     1

@yuyichao
Copy link
Contributor

yuyichao commented Jun 5, 2017

The band aid is to make it work not necessarily making it fast. It won't be faster than a normal loop and it'll still have all the other problems (i.e. closure lowering). The slowdown has nothing to do with scheduling though. Given the large number of allocation this is due to #15276 and can be work arounded in a similar way.

Just checked locally and that's indeed the case. The lowered AST for the first loop body is

Variables:
  #self#::##2#threadsfor_fun#1{Array{Int64,2},UnitRange{Int64}}
  onethread::Bool
  i@_3::Core.Box
  i@_4::Int64
  range::Core.Box
  threadsfor_fun::##14#threadsfor_fun#2{Array{Int64,2}}
  #temp#@_7::Int64
  r@_8::UnitRange{Int64}
  lenr::Int64
  tid::Int64
  #temp#@_11::Int64
  len::Int64
  rem::Int64
  f::Int64
  l::Int64
  #temp#@_16::Bool
  z@_17::Int64
  b@_18::Bool
  #temp#@_19::Int64
  z@_20::Int64
  b@_21::Bool
  #temp#@_22::Int64
  r@_23::Int64
  ret::Int64

Body:
  begin
      NewvarNode(:(tid::Int64))
      NewvarNode(:(len::Int64))
      NewvarNode(:(rem::Int64))
      NewvarNode(:(f::Int64))
      NewvarNode(:(l::Int64))
      r@_8::UnitRange{Int64} = (Core.getfield)(#self#::##2#threadsfor_fun#1{Array{Int64,2},UnitRange{Int64}}, Symbol("#1#range"))::UnitRange{Int64}
      #= line 31 =#
      $(Expr(:inbounds, false))
      # meta: location range.jl length 393
      # meta: location checked.jl checked_sub 221
      SSAValue(9) = (Base.Checked.checked_ssub_int)((Core.getfield)(r@_8::UnitRange{Int64}, :stop)::Int64, (Core.getfield)(r@_8::UnitRange{Int64}, :start)::Int64)::Tuple{Int64,Bool}
      SSAValue(12) = (Base.getfield)(SSAValue(9), 1)::Int64
      SSAValue(14) = (Base.getfield)(SSAValue(9), 2)::Bool
      #= line 222 =#
      unless SSAValue(14) goto 17
      (Base.Checked.throw)($(QuoteNode(OverflowError())))::Union{}
      17: 
      # meta: pop location
      # meta: location checked.jl checked_add 164
      SSAValue(16) = (Base.Checked.checked_sadd_int)(SSAValue(12), 1)::Tuple{Int64,Bool}
      SSAValue(19) = (Base.getfield)(SSAValue(16), 1)::Int64
      SSAValue(21) = (Base.getfield)(SSAValue(16), 2)::Bool
      #= line 165 =#
      unless SSAValue(21) goto 26
      (Base.Checked.throw)($(QuoteNode(OverflowError())))::Union{}
      26: 
      # meta: pop location
      # meta: pop location
      $(Expr(:inbounds, :pop))
      #= line 33 =#
      unless onethread::Bool goto 38
      #= line 34 =#
      tid::Int64 = 1
      #= line 35 =#
      len::Int64 = SSAValue(19)
      rem::Int64 = 0
      goto 60
      38: 
      #= line 37 =#
      $(Expr(:inbounds, false))
      # meta: location threadingconstructs.jl threadid 10
      SSAValue(23) = $(Expr(:foreigncall, :(:jl_threadid), Int16, svec()))
      # meta: pop location
      $(Expr(:inbounds, :pop))
      tid::Int64 = (Base.add_int)((Base.sext_int)(Int64, SSAValue(23))::Int64, 1)::Int64
      #= line 38 =#
      $(Expr(:inbounds, false))
      # meta: location threadingconstructs.jl nthreads 19
      SSAValue(24) = (Base.Threads.cglobal)(:jl_n_threads, Base.Threads.Cint)::Ptr{Int32}
      SSAValue(25) = (Base.pointerref)(SSAValue(24), 1, 1)::Int32
      # meta: pop location
      $(Expr(:inbounds, :pop))
      SSAValue(26) = (Base.sext_int)(Int64, SSAValue(25))::Int64
      SSAValue(30) = (Base.checked_sdiv_int)(SSAValue(19), SSAValue(26))::Int64
      SSAValue(31) = (Base.checked_srem_int)(SSAValue(19), SSAValue(26))::Int64
      SSAValue(32) = (Base.add_int)(1, 1)::Int64
      len::Int64 = SSAValue(30)
      SSAValue(33) = (Base.add_int)(2, 1)::Int64
      rem::Int64 = SSAValue(31)
      60: 
      #= line 41 =#
      unless (len::Int64 === 0)::Bool goto 71
      #= line 42 =#
      unless (Base.slt_int)(rem::Int64, tid::Int64)::Bool goto 67
      #= line 43 =#
      return
      67: 
      #= line 45 =#
      len::Int64 = 1
      rem::Int64 = 0
      71: 
      #= line 48 =#
      f::Int64 = (Base.add_int)(1, (Base.mul_int)((Base.sub_int)(tid::Int64, 1)::Int64, len::Int64)::Int64)::Int64
      #= line 49 =#
      l::Int64 = (Base.sub_int)((Base.add_int)(f::Int64, len::Int64)::Int64, 1)::Int64
      #= line 51 =#
      unless (Base.slt_int)(0, rem::Int64)::Bool goto 90
      #= line 52 =#
      unless (Base.sle_int)(tid::Int64, rem::Int64)::Bool goto 85
      #= line 53 =#
      f::Int64 = (Base.add_int)(f::Int64, (Base.sub_int)(tid::Int64, 1)::Int64)::Int64
      #= line 54 =#
      l::Int64 = (Base.add_int)(l::Int64, tid::Int64)::Int64
      goto 90
      85: 
      #= line 56 =#
      f::Int64 = (Base.add_int)(f::Int64, rem::Int64)::Int64
      #= line 57 =#
      l::Int64 = (Base.add_int)(l::Int64, rem::Int64)::Int64
      90: 
      #= line 61 =#
      SSAValue(34) = f::Int64
      SSAValue(35) = (Base.select_value)((Base.sle_int)(f::Int64, l::Int64)::Bool, l::Int64, (Base.sub_int)(f::Int64, 1)::Int64)::Int64
      #temp#@_7::Int64 = SSAValue(34)
      95: 
      unless (Base.not_int)((#temp#@_7::Int64 === (Base.add_int)(SSAValue(35), 1)::Int64)::Bool)::Bool goto 166
      i@_3::Core.Box = $(Expr(:new, :(Core.Box)))
      range::Core.Box = $(Expr(:new, :(Core.Box)))
      SSAValue(36) = #temp#@_7::Int64
      SSAValue(37) = (Base.add_int)(#temp#@_7::Int64, 1)::Int64
      i@_4::Int64 = SSAValue(36)
      #temp#@_7::Int64 = SSAValue(37)
      #= line 62 =#
      $(Expr(:inbounds, false))
      # meta: location abstractarray.jl unsafe_getindex 884
      $(Expr(:inbounds, true))
      SSAValue(27) = i@_4::Int64
      $(Expr(:inbounds, false))
      # meta: location range.jl getindex 476
      ret::Int64 = (Base.sub_int)((Base.add_int)((Core.getfield)(r@_8::UnitRange{Int64}, :start)::Int64, SSAValue(27))::Int64, 1)::Int64
      #= line 477 =#
      112: 
      113: 
      # meta: pop location
      $(Expr(:inbounds, :pop))
      $(Expr(:inbounds, :pop))
      # meta: pop location
      $(Expr(:inbounds, :pop))
      (Core.setfield!)(i@_3::Core.Box, :contents, ret::Int64)::Int64
      #= line 63 =#
      # meta: location REPL[1]
      #= line 4 =#
      # meta: location threadingconstructs.jl
      #= line 28 =#
      SSAValue(6) = $(Expr(:new, UnitRange{Int64}, 1, :((Base.select_value)((Base.sle_int)(1, 10000)::Bool, 10000, (Base.sub_int)(1, 1)::Int64)::Int64)))
      (Core.setfield!)(range::Core.Box, :contents, SSAValue(6))::UnitRange{Int64}
      #= line 29 =#
      threadsfor_fun::##14#threadsfor_fun#2{Array{Int64,2}} = $(Expr(:new, ##14#threadsfor_fun#2{Array{Int64,2}}, :(i@_3), :(range), :((Core.getfield)(#self#, :a)::Array{Int64,2})))
      #= line 67 =#
      $(Expr(:inbounds, false))
      # meta: location threadingconstructs.jl threadid 10
      SSAValue(29) = $(Expr(:foreigncall, :(:jl_threadid), Int16, svec()))
      # meta: pop location
      $(Expr(:inbounds, :pop))
      SSAValue(7) = (Base.not_int)(((Base.add_int)((Base.sext_int)(Int64, SSAValue(29))::Int64, 1)::Int64 === 1)::Bool)::Bool
      unless SSAValue(7) goto 139
      #temp#@_16::Bool = SSAValue(7)
      goto 141
      139:
      #temp#@_16::Bool = (Core.getfield)(Base.Threads.in_threaded_loop, :x)::Bool
      141:
      unless #temp#@_16::Bool goto 146
      #= line 69 =#
      $(Expr(:invoke, MethodInstance for (::##14#threadsfor_fun#2{Array{Int64,2}})(::Bool), :(threadsfor_fun), true))::Void
      goto 161
      146:
      #= line 71 =#
      $(Expr(:inbounds, false))
      # meta: location refpointer.jl setindex! 121
      (Core.setfield!)(Base.Threads.in_threaded_loop, :x, true)::Bool
      # meta: pop location
      $(Expr(:inbounds, :pop))
      #= line 73 =#
      $(Expr(:foreigncall, :(:jl_threading_run), Ref{Void}, svec(Any), :(threadsfor_fun), 0))
      #= line 74 =#
      $(Expr(:inbounds, false))
      # meta: location refpointer.jl setindex! 121
      (Core.setfield!)(Base.Threads.in_threaded_loop, :x, false)::Bool
      # meta: pop location
      $(Expr(:inbounds, :pop))
      161:
      # meta: pop location
      # meta: pop location
      164:
      goto 95
      166:
      return
  end::Void

Note that the range and i are boxed.

@Balinus
Copy link

Balinus commented Jun 5, 2017

Thanks for the feedback and info!

I wasn't expecting faster loops with nested threading, but was surprised by the results and was wondering how to solve that. :)

@ViralBShah
Copy link
Member

Should we close this in favour of the more recent #32207

@tkf
Copy link
Member

tkf commented Sep 18, 2020

FYI, FLoops.jl has parallel for loop with load balancing, very flexible reduction syntax, and extensible executor infrastructure.

@IanButterworth
Copy link
Member

I find myself using @ChrisRackauckas's @par macro from #32207 quite a lot

macro par(expr)
    thunk = esc(:(()->($expr)))
    quote
        local task = Task($thunk)
        task.sticky = false
        schedule(task)
        task
    end
end

Is something like this that does a load balanced Threads.@threads possible to get into 1.7?

@tkf
Copy link
Member

tkf commented May 14, 2021

I don't think you need such a macro in any released Julia versions. The @par macro you quoted is just a subset of Threads.@spawn that does not support @sync. I think it was useful only around 1.3-DEV. You can just use Threads.@spawn.

As I mentioned just above, FLoops.jl provides scheduler-agnostic API for parallel for loop with configurable load-balancing API using the basesize option. It can be combined with many schedulers, including a work-stealing (continuation-stealing) scheduler from by FoldsThreads.jl.

Ref: [ANN] FoldsThreads.jl: A zoo of pluggable thread-based data-parallel execution mechanisms - Community / Package announcements - JuliaLang

@laborg
Copy link
Contributor

laborg commented Feb 24, 2022

@tkf does the new :dynamic default in Threads.@threads count as work stealing?

@tkf
Copy link
Member

tkf commented Feb 24, 2022

It's not work-stealing. But it's a step towards a better load-balancing. Caching aspect is much harder since it interacts with other generic tasks. So, it's not what we can solve in @threads alone. (But I'm working on it.)

Perhaps we can close this PR without implementing work-stealing per se? I don't think most of the users care about the exact scheduling algorithm used. I think a more important thing is that they have load-balancing. If so, I think we can close it once we have what @IanButterworth commented here #44168 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality
Projects
None yet
Development

No branches or pull requests