diff --git a/base/condition.jl b/base/condition.jl index c536eceec17a0..4965b43a7019b 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -61,12 +61,12 @@ Abstract implementation of a condition object for synchronizing tasks objects with a given lock. """ struct GenericCondition{L<:AbstractLock} - waitq::InvasiveLinkedList{Task} + waitq::IntrusiveLinkedList{Task} lock::L - GenericCondition{L}() where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), L()) - GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), l) - GenericCondition(l::AbstractLock) = new{typeof(l)}(InvasiveLinkedList{Task}(), l) + GenericCondition{L}() where {L<:AbstractLock} = new{L}(IntrusiveLinkedList{Task}(), L()) + GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(IntrusiveLinkedList{Task}(), l) + GenericCondition(l::AbstractLock) = new{typeof(l)}(IntrusiveLinkedList{Task}(), l) end assert_havelock(c::GenericCondition) = assert_havelock(c.lock) diff --git a/base/linked_list.jl b/base/linked_list.jl index 113607f63a2ff..c477dc56bdb2b 100644 --- a/base/linked_list.jl +++ b/base/linked_list.jl @@ -1,23 +1,23 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -mutable struct InvasiveLinkedList{T} +mutable struct IntrusiveLinkedList{T} # Invasive list requires that T have a field `.next >: U{T, Nothing}` and `.queue >: U{ILL{T}, Nothing}` head::Union{T, Nothing} tail::Union{T, Nothing} - InvasiveLinkedList{T}() where {T} = new{T}(nothing, nothing) + IntrusiveLinkedList{T}() where {T} = new{T}(nothing, nothing) end #const list_append!! = append! #const list_deletefirst! = delete! -eltype(::Type{<:InvasiveLinkedList{T}}) where {T} = @isdefined(T) ? T : Any +eltype(::Type{<:IntrusiveLinkedList{T}}) where {T} = @isdefined(T) ? T : Any -iterate(q::InvasiveLinkedList) = (h = q.head; h === nothing ? nothing : (h, h)) -iterate(q::InvasiveLinkedList{T}, v::T) where {T} = (h = v.next; h === nothing ? nothing : (h, h)) +iterate(q::IntrusiveLinkedList) = (h = q.head; h === nothing ? nothing : (h, h)) +iterate(q::IntrusiveLinkedList{T}, v::T) where {T} = (h = v.next; h === nothing ? nothing : (h, h)) -isempty(q::InvasiveLinkedList) = (q.head === nothing) +isempty(q::IntrusiveLinkedList) = (q.head === nothing) -function length(q::InvasiveLinkedList) +function length(q::IntrusiveLinkedList) i = 0 head = q.head while head !== nothing @@ -27,7 +27,7 @@ function length(q::InvasiveLinkedList) return i end -function list_append!!(q::InvasiveLinkedList{T}, q2::InvasiveLinkedList{T}) where T +function list_append!!(q::IntrusiveLinkedList{T}, q2::IntrusiveLinkedList{T}) where T q === q2 && error("can't append list to itself") head2 = q2.head if head2 !== nothing @@ -49,7 +49,7 @@ function list_append!!(q::InvasiveLinkedList{T}, q2::InvasiveLinkedList{T}) wher return q end -function push!(q::InvasiveLinkedList{T}, val::T) where T +function push!(q::IntrusiveLinkedList{T}, val::T) where T val.queue === nothing || error("val already in a list") val.queue = q tail = q.tail @@ -62,7 +62,7 @@ function push!(q::InvasiveLinkedList{T}, val::T) where T return q end -function pushfirst!(q::InvasiveLinkedList{T}, val::T) where T +function pushfirst!(q::IntrusiveLinkedList{T}, val::T) where T val.queue === nothing || error("val already in a list") val.queue = q head = q.head @@ -75,20 +75,20 @@ function pushfirst!(q::InvasiveLinkedList{T}, val::T) where T return q end -function pop!(q::InvasiveLinkedList{T}) where {T} +function pop!(q::IntrusiveLinkedList{T}) where {T} val = q.tail::T list_deletefirst!(q, val) # expensive! return val end -function popfirst!(q::InvasiveLinkedList{T}) where {T} +function popfirst!(q::IntrusiveLinkedList{T}) where {T} val = q.head::T list_deletefirst!(q, val) # cheap return val end # this function assumes `val` is found in `q` -function list_deletefirst!(q::InvasiveLinkedList{T}, val::T) where T +function list_deletefirst!(q::IntrusiveLinkedList{T}, val::T) where T val.queue === q || return head = q.head::T if head === val @@ -125,20 +125,20 @@ end mutable struct LinkedListItem{T} # Adapter class to use any `T` in a LinkedList next::Union{LinkedListItem{T}, Nothing} - queue::Union{InvasiveLinkedList{LinkedListItem{T}}, Nothing} + queue::Union{IntrusiveLinkedList{LinkedListItem{T}}, Nothing} value::T LinkedListItem{T}(value::T) where {T} = new{T}(nothing, nothing, value) end -const LinkedList{T} = InvasiveLinkedList{LinkedListItem{T}} +const LinkedList{T} = IntrusiveLinkedList{LinkedListItem{T}} # delegate methods, as needed eltype(::Type{<:LinkedList{T}}) where {T} = @isdefined(T) ? T : Any iterate(q::LinkedList) = (h = q.head; h === nothing ? nothing : (h.value, h)) -iterate(q::InvasiveLinkedList{LLT}, v::LLT) where {LLT<:LinkedListItem} = (h = v.next; h === nothing ? nothing : (h.value, h)) +iterate(q::IntrusiveLinkedList{LLT}, v::LLT) where {LLT<:LinkedListItem} = (h = v.next; h === nothing ? nothing : (h.value, h)) push!(q::LinkedList{T}, val::T) where {T} = push!(q, LinkedListItem{T}(val)) pushfirst!(q::LinkedList{T}, val::T) where {T} = pushfirst!(q, LinkedListItem{T}(val)) -pop!(q::LinkedList) = invoke(pop!, Tuple{InvasiveLinkedList,}, q).value -popfirst!(q::LinkedList) = invoke(popfirst!, Tuple{InvasiveLinkedList,}, q).value +pop!(q::LinkedList) = invoke(pop!, Tuple{IntrusiveLinkedList,}, q).value +popfirst!(q::LinkedList) = invoke(popfirst!, Tuple{IntrusiveLinkedList,}, q).value function list_deletefirst!(q::LinkedList{T}, val::T) where T h = q.head while h !== nothing diff --git a/base/partr.jl b/base/partr.jl index 159cba1e9021a..debf38fb72930 100644 --- a/base/partr.jl +++ b/base/partr.jl @@ -2,30 +2,23 @@ module Partr -using ..Threads: SpinLock +using ..Threads: SpinLock, nthreads -# a task heap +# a task minheap mutable struct taskheap const lock::SpinLock const tasks::Vector{Task} @atomic ntasks::Int32 @atomic priority::UInt16 - taskheap() = new(SpinLock(), Vector{Task}(undef, tasks_per_heap), zero(Int32), typemax(UInt16)) + taskheap() = new(SpinLock(), Vector{Task}(undef, 256), zero(Int32), typemax(UInt16)) end -# multiqueue parameters -const heap_d = UInt32(8) -const heap_c = UInt32(2) - -# size of each heap -const tasks_per_heap = Int32(65536) # TODO: this should be smaller by default, but growable! - -# the multiqueue's heaps -global heaps::Vector{taskheap} -global heap_p::UInt32 = 0 -# unbias state for the RNG -global cong_unbias::UInt32 = 0 +# multiqueue minheap state +const heap_d = UInt32(8) +global heaps::Vector{taskheap} = Vector{taskheap}(undef, 0) +const heaps_lock = SpinLock() +global cong_unbias::UInt32 = typemax(UInt32) cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1) @@ -35,18 +28,7 @@ function unbias_cong(max::UInt32) end -function multiq_init(nthreads) - global heap_p = heap_c * nthreads - global cong_unbias = unbias_cong(UInt32(heap_p)) - global heaps = Vector{taskheap}(undef, heap_p) - for i = UInt32(1):heap_p - heaps[i] = taskheap() - end - nothing -end - - -function sift_up(heap::taskheap, idx::Int32) +function multiq_sift_up(heap::taskheap, idx::Int32) while idx > Int32(1) parent = (idx - Int32(2)) รท heap_d + Int32(1) if heap.tasks[idx].priority < heap.tasks[parent].priority @@ -61,45 +43,76 @@ function sift_up(heap::taskheap, idx::Int32) end -function sift_down(heap::taskheap, idx::Int32) +function multiq_sift_down(heap::taskheap, idx::Int32) if idx <= heap.ntasks for child = (heap_d * idx - heap_d + Int32(2)):(heap_d * idx + Int32(1)) - child > tasks_per_heap && break - if isassigned(heap.tasks, child) && + child > length(heap.tasks) && break + if isassigned(heap.tasks, Int(child)) && heap.tasks[child].priority < heap.tasks[idx].priority t = heap.tasks[idx] heap.tasks[idx] = heap.tasks[child] heap.tasks[child] = t - sift_down(heap, child) + multiq_sift_down(heap, child) end end end end +function multiq_size() + heap_c = UInt32(2) + heap_p = UInt32(length(heaps)) + nt = UInt32(nthreads()) + + if heap_c * nt <= heap_p + return heap_p + end + + @lock heaps_lock begin + heap_p = UInt32(length(heaps)) + nt = UInt32(nthreads()) + if heap_c * nt <= heap_p + return heap_p + end + + heap_p += heap_c * nt + newheaps = Vector{taskheap}(undef, heap_p) + copyto!(newheaps, heaps) + for i = (1 + length(heaps)):heap_p + newheaps[i] = taskheap() + end + global heaps = newheaps + global cong_unbias = unbias_cong(heap_p) + end + + return heap_p +end + + function multiq_insert(task::Task, priority::UInt16) task.priority = priority + heap_p = multiq_size() rn = cong(heap_p, cong_unbias) while !trylock(heaps[rn].lock) rn = cong(heap_p, cong_unbias) end - if heaps[rn].ntasks >= tasks_per_heap - unlock(heaps[rn].lock) - # multiq insertion failed, increase #tasks per heap - return false + heap = heaps[rn] + if heap.ntasks >= length(heap.tasks) + resize!(heap.tasks, length(heap.tasks) * 2) end - ntasks = heaps[rn].ntasks + Int32(1) - @atomic :monotonic heaps[rn].ntasks = ntasks - heaps[rn].tasks[ntasks] = task - sift_up(heaps[rn], ntasks) - priority = heaps[rn].priority + ntasks = heap.ntasks + Int32(1) + @atomic :monotonic heap.ntasks = ntasks + heap.tasks[ntasks] = task + multiq_sift_up(heap, ntasks) + priority = heap.priority if task.priority < priority - @atomic :monotonic heaps[rn].priority = task.priority + @atomic :monotonic heap.priority = task.priority end - unlock(heaps[rn].lock) + unlock(heap.lock) + return true end @@ -110,7 +123,8 @@ function multiq_deletemin() @label retry GC.safepoint() - for i = UInt32(1):heap_p + heap_p = UInt32(length(heaps)) + for i = UInt32(0):heap_p if i == heap_p return nothing end @@ -132,30 +146,31 @@ function multiq_deletemin() end end - task = heaps[rn1].tasks[1] + heap = heaps[rn1] + task = heap.tasks[1] tid = Threads.threadid() if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0 - unlock(heaps[rn1].lock) + unlock(heap.lock) @goto retry end - ntasks = heaps[rn1].ntasks - @atomic :monotonic heaps[rn1].ntasks = ntasks - Int32(1) - heaps[rn1].tasks[1] = heaps[rn1].tasks[ntasks] - Base._unsetindex!(heaps[rn1].tasks, Int(ntasks)) + ntasks = heap.ntasks + @atomic :monotonic heap.ntasks = ntasks - Int32(1) + heap.tasks[1] = heap.tasks[ntasks] + Base._unsetindex!(heap.tasks, Int(ntasks)) prio1 = typemax(UInt16) if ntasks > 1 - sift_down(heaps[rn1], Int32(1)) - prio1 = heaps[rn1].tasks[1].priority + multiq_sift_down(heap, Int32(1)) + prio1 = heap.tasks[1].priority end - @atomic :monotonic heaps[rn1].priority = prio1 - unlock(heaps[rn1].lock) + @atomic :monotonic heap.priority = prio1 + unlock(heap.lock) return task end function multiq_check_empty() - for i = UInt32(1):heap_p + for i = UInt32(1):length(heaps) if heaps[i].ntasks != 0 return false end diff --git a/base/pcre.jl b/base/pcre.jl index 81e9b1d4d0ff8..d689e9be29113 100644 --- a/base/pcre.jl +++ b/base/pcre.jl @@ -32,7 +32,6 @@ _tid() = Int(ccall(:jl_threadid, Int16, ())) + 1 _nth() = Int(unsafe_load(cglobal(:jl_n_threads, Cint))) function get_local_match_context() - global THREAD_MATCH_CONTEXTS tid = _tid() ctxs = THREAD_MATCH_CONTEXTS if length(ctxs) < tid @@ -40,7 +39,10 @@ function get_local_match_context() l = PCRE_COMPILE_LOCK::Threads.SpinLock lock(l) try - THREAD_MATCH_CONTEXTS = ctxs = copyto!(fill(C_NULL, _nth()), THREAD_MATCH_CONTEXTS) + ctxs = THREAD_MATCH_CONTEXTS + if length(ctxs) < tid + global THREAD_MATCH_CONTEXTS = ctxs = copyto!(fill(C_NULL, _nth()), ctxs) + end finally unlock(l) end @@ -49,18 +51,7 @@ function get_local_match_context() if ctx == C_NULL # slow path to allocate it ctx = create_match_context() - l = PCRE_COMPILE_LOCK - if l === nothing - THREAD_MATCH_CONTEXTS[tid] = ctx - else - l = l::Threads.SpinLock - lock(l) - try - THREAD_MATCH_CONTEXTS[tid] = ctx - finally - unlock(l) - end - end + THREAD_MATCH_CONTEXTS[tid] = ctx end return ctx end diff --git a/base/task.jl b/base/task.jl index 40725edff6a29..6d5ce6c39eb20 100644 --- a/base/task.jl +++ b/base/task.jl @@ -656,14 +656,14 @@ end ## scheduler and work queue -struct InvasiveLinkedListSynchronized{T} - queue::InvasiveLinkedList{T} +struct IntrusiveLinkedListSynchronized{T} + queue::IntrusiveLinkedList{T} lock::Threads.SpinLock - InvasiveLinkedListSynchronized{T}() where {T} = new(InvasiveLinkedList{T}(), Threads.SpinLock()) + IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock()) end -isempty(W::InvasiveLinkedListSynchronized) = isempty(W.queue) -length(W::InvasiveLinkedListSynchronized) = length(W.queue) -function push!(W::InvasiveLinkedListSynchronized{T}, t::T) where T +isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue) +length(W::IntrusiveLinkedListSynchronized) = length(W.queue) +function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try push!(W.queue, t) @@ -672,7 +672,7 @@ function push!(W::InvasiveLinkedListSynchronized{T}, t::T) where T end return W end -function pushfirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T +function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try pushfirst!(W.queue, t) @@ -681,7 +681,7 @@ function pushfirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T end return W end -function pop!(W::InvasiveLinkedListSynchronized) +function pop!(W::IntrusiveLinkedListSynchronized) lock(W.lock) try return pop!(W.queue) @@ -689,7 +689,7 @@ function pop!(W::InvasiveLinkedListSynchronized) unlock(W.lock) end end -function popfirst!(W::InvasiveLinkedListSynchronized) +function popfirst!(W::IntrusiveLinkedListSynchronized) lock(W.lock) try return popfirst!(W.queue) @@ -697,7 +697,7 @@ function popfirst!(W::InvasiveLinkedListSynchronized) unlock(W.lock) end end -function list_deletefirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T +function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try list_deletefirst!(W.queue, t) @@ -707,30 +707,36 @@ function list_deletefirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T return W end -const StickyWorkqueue = InvasiveLinkedListSynchronized{Task} -global const Workqueues = [StickyWorkqueue()] -global const Workqueue = Workqueues[1] # default work queue is thread 1 -function __preinit_threads__() - nt = Threads.nthreads() - if length(Workqueues) < nt - resize!(Workqueues, nt) - for i = 2:nt - Workqueues[i] = StickyWorkqueue() +const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} +global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()] +const Workqueues_lock = Threads.SpinLock() +const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable + +function workqueue_for(tid::Int) + qs = Workqueues + if length(qs) >= tid && isassigned(qs, tid) + return @inbounds qs[tid] + end + # slow path to allocate it + l = Workqueues_lock + @lock l begin + qs = Workqueues + if length(qs) < tid + nt = Threads.nthreads() + @assert tid <= nt + global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) end + if !isassigned(qs, tid) + @inbounds qs[tid] = StickyWorkqueue() + end + return @inbounds qs[tid] end - Partr.multiq_init(nt) - nothing end function enq_work(t::Task) (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable") - tid = Threads.threadid(t) - # Note there are three reasons a Task might be put into a sticky queue - # even if t.sticky == false: - # 1. The Task's stack is currently being used by the scheduler for a certain thread. - # 2. There is only 1 thread. - # 3. The multiq is full (can be fixed by making it growable). if t.sticky || Threads.nthreads() == 1 + tid = Threads.threadid(t) if tid == 0 # Issue #41324 # t.sticky && tid == 0 is a task that needs to be co-scheduled with @@ -741,18 +747,10 @@ function enq_work(t::Task) tid = Threads.threadid() ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) end - push!(Workqueues[tid], t) + push!(workqueue_for(tid), t) else - if !Partr.multiq_insert(t, t.priority) - # if multiq is full, give to a random thread (TODO fix) - if tid == 0 - tid = mod(time_ns() % Int, Threads.nthreads()) + 1 - ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) - end - push!(Workqueues[tid], t) - else - tid = 0 - end + Partr.multiq_insert(t, t.priority) + tid = 0 end ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) return t @@ -893,12 +891,12 @@ end function ensure_rescheduled(othertask::Task) ct = current_task() - W = Workqueues[Threads.threadid()] + W = workqueue_for(Threads.threadid()) if ct !== othertask && othertask._state === task_state_runnable # we failed to yield to othertask # return it to the head of a queue to be retried later tid = Threads.threadid(othertask) - Wother = tid == 0 ? W : Workqueues[tid] + Wother = tid == 0 ? W : workqueue_for(tid) pushfirst!(Wother, othertask) end # if the current task was queued, @@ -925,9 +923,7 @@ function trypoptask(W::StickyWorkqueue) return Partr.multiq_deletemin() end -function checktaskempty() - return Partr.multiq_check_empty() -end +checktaskempty = Partr.multiq_check_empty @noinline function poptask(W::StickyWorkqueue) task = trypoptask(W) @@ -940,7 +936,7 @@ end function wait() GC.safepoint() - W = Workqueues[Threads.threadid()] + W = workqueue_for(Threads.threadid()) poptask(W) result = try_yieldto(ensure_rescheduled) process_events() diff --git a/doc/src/manual/multi-threading.md b/doc/src/manual/multi-threading.md index 0d5f30e5c7631..cc6c7f897353e 100644 --- a/doc/src/manual/multi-threading.md +++ b/doc/src/manual/multi-threading.md @@ -364,7 +364,7 @@ There are a few approaches to dealing with this problem: 3. A related third strategy is to use a yield-free queue. We don't currently have a lock-free queue implemented in Base, but - `Base.InvasiveLinkedListSynchronized{T}` is suitable. This can frequently be a + `Base.IntrusiveLinkedListSynchronized{T}` is suitable. This can frequently be a good strategy to use for code with event loops. For example, this strategy is employed by `Gtk.jl` to manage lifetime ref-counting. In this approach, we don't do any explicit work inside the `finalizer`, and instead add it to a queue diff --git a/src/init.c b/src/init.c index 64af677564209..c6ff7842e3c57 100644 --- a/src/init.c +++ b/src/init.c @@ -731,17 +731,7 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ post_boot_hooks(); } - if (jl_base_module != NULL) { - // Do initialization needed before starting child threads - jl_value_t *f = jl_get_global(jl_base_module, jl_symbol("__preinit_threads__")); - if (f) { - size_t last_age = ct->world_age; - ct->world_age = jl_get_world_counter(); - jl_apply(&f, 1); - ct->world_age = last_age; - } - } - else { + if (jl_base_module == NULL) { // nthreads > 1 requires code in Base jl_n_threads = 1; } diff --git a/src/partr.c b/src/partr.c index 74b0e2c8db6a9..52c4a468c77c2 100644 --- a/src/partr.c +++ b/src/partr.c @@ -72,6 +72,7 @@ extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, JL_DLLEXPORT uint32_t jl_rand_ptls(uint32_t max, uint32_t unbias) { jl_ptls_t ptls = jl_current_task->ptls; + // one-extend unbias back to 64-bits return cong(max, -(uint64_t)-unbias, &ptls->rngseed); } diff --git a/test/threads_exec.jl b/test/threads_exec.jl index ca8ec03b685e4..9cd5992d90a74 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -588,12 +588,12 @@ function test_thread_too_few_iters() end test_thread_too_few_iters() -@testset "InvasiveLinkedList" begin - @test eltype(Base.InvasiveLinkedList{Integer}) == Integer +@testset "IntrusiveLinkedList" begin + @test eltype(Base.IntrusiveLinkedList{Integer}) == Integer @test eltype(Base.LinkedList{Integer}) == Integer - @test eltype(Base.InvasiveLinkedList{<:Integer}) == Any + @test eltype(Base.IntrusiveLinkedList{<:Integer}) == Any @test eltype(Base.LinkedList{<:Integer}) == Any - @test eltype(Base.InvasiveLinkedList{<:Base.LinkedListItem{Integer}}) == Any + @test eltype(Base.IntrusiveLinkedList{<:Base.LinkedListItem{Integer}}) == Any t = Base.LinkedList{Integer}() @test eltype(t) == Integer