Skip to content

Commit

Permalink
Merge pull request #44721 from JuliaLang/jn/partr-scale
Browse files Browse the repository at this point in the history
partr: add dynamic scaling and optimize code
  • Loading branch information
vtjnash authored Mar 28, 2022
2 parents 9112135 + 0c32aa7 commit 897c3cf
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 150 deletions.
8 changes: 4 additions & 4 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ Abstract implementation of a condition object
for synchronizing tasks objects with a given lock.
"""
struct GenericCondition{L<:AbstractLock}
waitq::InvasiveLinkedList{Task}
waitq::IntrusiveLinkedList{Task}
lock::L

GenericCondition{L}() where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), l)
GenericCondition(l::AbstractLock) = new{typeof(l)}(InvasiveLinkedList{Task}(), l)
GenericCondition{L}() where {L<:AbstractLock} = new{L}(IntrusiveLinkedList{Task}(), L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(IntrusiveLinkedList{Task}(), l)
GenericCondition(l::AbstractLock) = new{typeof(l)}(IntrusiveLinkedList{Task}(), l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
Expand Down
36 changes: 18 additions & 18 deletions base/linked_list.jl
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

mutable struct InvasiveLinkedList{T}
mutable struct IntrusiveLinkedList{T}
# Invasive list requires that T have a field `.next >: U{T, Nothing}` and `.queue >: U{ILL{T}, Nothing}`
head::Union{T, Nothing}
tail::Union{T, Nothing}
InvasiveLinkedList{T}() where {T} = new{T}(nothing, nothing)
IntrusiveLinkedList{T}() where {T} = new{T}(nothing, nothing)
end

#const list_append!! = append!
#const list_deletefirst! = delete!

eltype(::Type{<:InvasiveLinkedList{T}}) where {T} = @isdefined(T) ? T : Any
eltype(::Type{<:IntrusiveLinkedList{T}}) where {T} = @isdefined(T) ? T : Any

iterate(q::InvasiveLinkedList) = (h = q.head; h === nothing ? nothing : (h, h))
iterate(q::InvasiveLinkedList{T}, v::T) where {T} = (h = v.next; h === nothing ? nothing : (h, h))
iterate(q::IntrusiveLinkedList) = (h = q.head; h === nothing ? nothing : (h, h))
iterate(q::IntrusiveLinkedList{T}, v::T) where {T} = (h = v.next; h === nothing ? nothing : (h, h))

isempty(q::InvasiveLinkedList) = (q.head === nothing)
isempty(q::IntrusiveLinkedList) = (q.head === nothing)

function length(q::InvasiveLinkedList)
function length(q::IntrusiveLinkedList)
i = 0
head = q.head
while head !== nothing
Expand All @@ -27,7 +27,7 @@ function length(q::InvasiveLinkedList)
return i
end

function list_append!!(q::InvasiveLinkedList{T}, q2::InvasiveLinkedList{T}) where T
function list_append!!(q::IntrusiveLinkedList{T}, q2::IntrusiveLinkedList{T}) where T
q === q2 && error("can't append list to itself")
head2 = q2.head
if head2 !== nothing
Expand All @@ -49,7 +49,7 @@ function list_append!!(q::InvasiveLinkedList{T}, q2::InvasiveLinkedList{T}) wher
return q
end

function push!(q::InvasiveLinkedList{T}, val::T) where T
function push!(q::IntrusiveLinkedList{T}, val::T) where T
val.queue === nothing || error("val already in a list")
val.queue = q
tail = q.tail
Expand All @@ -62,7 +62,7 @@ function push!(q::InvasiveLinkedList{T}, val::T) where T
return q
end

function pushfirst!(q::InvasiveLinkedList{T}, val::T) where T
function pushfirst!(q::IntrusiveLinkedList{T}, val::T) where T
val.queue === nothing || error("val already in a list")
val.queue = q
head = q.head
Expand All @@ -75,20 +75,20 @@ function pushfirst!(q::InvasiveLinkedList{T}, val::T) where T
return q
end

function pop!(q::InvasiveLinkedList{T}) where {T}
function pop!(q::IntrusiveLinkedList{T}) where {T}
val = q.tail::T
list_deletefirst!(q, val) # expensive!
return val
end

function popfirst!(q::InvasiveLinkedList{T}) where {T}
function popfirst!(q::IntrusiveLinkedList{T}) where {T}
val = q.head::T
list_deletefirst!(q, val) # cheap
return val
end

# this function assumes `val` is found in `q`
function list_deletefirst!(q::InvasiveLinkedList{T}, val::T) where T
function list_deletefirst!(q::IntrusiveLinkedList{T}, val::T) where T
val.queue === q || return
head = q.head::T
if head === val
Expand Down Expand Up @@ -125,20 +125,20 @@ end
mutable struct LinkedListItem{T}
# Adapter class to use any `T` in a LinkedList
next::Union{LinkedListItem{T}, Nothing}
queue::Union{InvasiveLinkedList{LinkedListItem{T}}, Nothing}
queue::Union{IntrusiveLinkedList{LinkedListItem{T}}, Nothing}
value::T
LinkedListItem{T}(value::T) where {T} = new{T}(nothing, nothing, value)
end
const LinkedList{T} = InvasiveLinkedList{LinkedListItem{T}}
const LinkedList{T} = IntrusiveLinkedList{LinkedListItem{T}}

# delegate methods, as needed
eltype(::Type{<:LinkedList{T}}) where {T} = @isdefined(T) ? T : Any
iterate(q::LinkedList) = (h = q.head; h === nothing ? nothing : (h.value, h))
iterate(q::InvasiveLinkedList{LLT}, v::LLT) where {LLT<:LinkedListItem} = (h = v.next; h === nothing ? nothing : (h.value, h))
iterate(q::IntrusiveLinkedList{LLT}, v::LLT) where {LLT<:LinkedListItem} = (h = v.next; h === nothing ? nothing : (h.value, h))
push!(q::LinkedList{T}, val::T) where {T} = push!(q, LinkedListItem{T}(val))
pushfirst!(q::LinkedList{T}, val::T) where {T} = pushfirst!(q, LinkedListItem{T}(val))
pop!(q::LinkedList) = invoke(pop!, Tuple{InvasiveLinkedList,}, q).value
popfirst!(q::LinkedList) = invoke(popfirst!, Tuple{InvasiveLinkedList,}, q).value
pop!(q::LinkedList) = invoke(pop!, Tuple{IntrusiveLinkedList,}, q).value
popfirst!(q::LinkedList) = invoke(popfirst!, Tuple{IntrusiveLinkedList,}, q).value
function list_deletefirst!(q::LinkedList{T}, val::T) where T
h = q.head
while h !== nothing
Expand Down
123 changes: 69 additions & 54 deletions base/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,23 @@

module Partr

using ..Threads: SpinLock
using ..Threads: SpinLock, nthreads

# a task heap
# a task minheap
mutable struct taskheap
const lock::SpinLock
const tasks::Vector{Task}
@atomic ntasks::Int32
@atomic priority::UInt16
taskheap() = new(SpinLock(), Vector{Task}(undef, tasks_per_heap), zero(Int32), typemax(UInt16))
taskheap() = new(SpinLock(), Vector{Task}(undef, 256), zero(Int32), typemax(UInt16))
end

# multiqueue parameters
const heap_d = UInt32(8)
const heap_c = UInt32(2)

# size of each heap
const tasks_per_heap = Int32(65536) # TODO: this should be smaller by default, but growable!

# the multiqueue's heaps
global heaps::Vector{taskheap}
global heap_p::UInt32 = 0

# unbias state for the RNG
global cong_unbias::UInt32 = 0
# multiqueue minheap state
const heap_d = UInt32(8)
global heaps::Vector{taskheap} = Vector{taskheap}(undef, 0)
const heaps_lock = SpinLock()
global cong_unbias::UInt32 = typemax(UInt32)


cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)
Expand All @@ -35,18 +28,7 @@ function unbias_cong(max::UInt32)
end


function multiq_init(nthreads)
global heap_p = heap_c * nthreads
global cong_unbias = unbias_cong(UInt32(heap_p))
global heaps = Vector{taskheap}(undef, heap_p)
for i = UInt32(1):heap_p
heaps[i] = taskheap()
end
nothing
end


function sift_up(heap::taskheap, idx::Int32)
function multiq_sift_up(heap::taskheap, idx::Int32)
while idx > Int32(1)
parent = (idx - Int32(2)) ÷ heap_d + Int32(1)
if heap.tasks[idx].priority < heap.tasks[parent].priority
Expand All @@ -61,45 +43,76 @@ function sift_up(heap::taskheap, idx::Int32)
end


function sift_down(heap::taskheap, idx::Int32)
function multiq_sift_down(heap::taskheap, idx::Int32)
if idx <= heap.ntasks
for child = (heap_d * idx - heap_d + Int32(2)):(heap_d * idx + Int32(1))
child > tasks_per_heap && break
if isassigned(heap.tasks, child) &&
child > length(heap.tasks) && break
if isassigned(heap.tasks, Int(child)) &&
heap.tasks[child].priority < heap.tasks[idx].priority
t = heap.tasks[idx]
heap.tasks[idx] = heap.tasks[child]
heap.tasks[child] = t
sift_down(heap, child)
multiq_sift_down(heap, child)
end
end
end
end


function multiq_size()
heap_c = UInt32(2)
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())

if heap_c * nt <= heap_p
return heap_p
end

@lock heaps_lock begin
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
if heap_c * nt <= heap_p
return heap_p
end

heap_p += heap_c * nt
newheaps = Vector{taskheap}(undef, heap_p)
copyto!(newheaps, heaps)
for i = (1 + length(heaps)):heap_p
newheaps[i] = taskheap()
end
global heaps = newheaps
global cong_unbias = unbias_cong(heap_p)
end

return heap_p
end


function multiq_insert(task::Task, priority::UInt16)
task.priority = priority

heap_p = multiq_size()
rn = cong(heap_p, cong_unbias)
while !trylock(heaps[rn].lock)
rn = cong(heap_p, cong_unbias)
end

if heaps[rn].ntasks >= tasks_per_heap
unlock(heaps[rn].lock)
# multiq insertion failed, increase #tasks per heap
return false
heap = heaps[rn]
if heap.ntasks >= length(heap.tasks)
resize!(heap.tasks, length(heap.tasks) * 2)
end

ntasks = heaps[rn].ntasks + Int32(1)
@atomic :monotonic heaps[rn].ntasks = ntasks
heaps[rn].tasks[ntasks] = task
sift_up(heaps[rn], ntasks)
priority = heaps[rn].priority
ntasks = heap.ntasks + Int32(1)
@atomic :monotonic heap.ntasks = ntasks
heap.tasks[ntasks] = task
multiq_sift_up(heap, ntasks)
priority = heap.priority
if task.priority < priority
@atomic :monotonic heaps[rn].priority = task.priority
@atomic :monotonic heap.priority = task.priority
end
unlock(heaps[rn].lock)
unlock(heap.lock)

return true
end

Expand All @@ -110,7 +123,8 @@ function multiq_deletemin()

@label retry
GC.safepoint()
for i = UInt32(1):heap_p
heap_p = UInt32(length(heaps))
for i = UInt32(0):heap_p
if i == heap_p
return nothing
end
Expand All @@ -132,30 +146,31 @@ function multiq_deletemin()
end
end

task = heaps[rn1].tasks[1]
heap = heaps[rn1]
task = heap.tasks[1]
tid = Threads.threadid()
if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0
unlock(heaps[rn1].lock)
unlock(heap.lock)
@goto retry
end
ntasks = heaps[rn1].ntasks
@atomic :monotonic heaps[rn1].ntasks = ntasks - Int32(1)
heaps[rn1].tasks[1] = heaps[rn1].tasks[ntasks]
Base._unsetindex!(heaps[rn1].tasks, Int(ntasks))
ntasks = heap.ntasks
@atomic :monotonic heap.ntasks = ntasks - Int32(1)
heap.tasks[1] = heap.tasks[ntasks]
Base._unsetindex!(heap.tasks, Int(ntasks))
prio1 = typemax(UInt16)
if ntasks > 1
sift_down(heaps[rn1], Int32(1))
prio1 = heaps[rn1].tasks[1].priority
multiq_sift_down(heap, Int32(1))
prio1 = heap.tasks[1].priority
end
@atomic :monotonic heaps[rn1].priority = prio1
unlock(heaps[rn1].lock)
@atomic :monotonic heap.priority = prio1
unlock(heap.lock)

return task
end


function multiq_check_empty()
for i = UInt32(1):heap_p
for i = UInt32(1):length(heaps)
if heaps[i].ntasks != 0
return false
end
Expand Down
19 changes: 5 additions & 14 deletions base/pcre.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ _tid() = Int(ccall(:jl_threadid, Int16, ())) + 1
_nth() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))

function get_local_match_context()
global THREAD_MATCH_CONTEXTS
tid = _tid()
ctxs = THREAD_MATCH_CONTEXTS
if length(ctxs) < tid
# slow path to allocate it
l = PCRE_COMPILE_LOCK::Threads.SpinLock
lock(l)
try
THREAD_MATCH_CONTEXTS = ctxs = copyto!(fill(C_NULL, _nth()), THREAD_MATCH_CONTEXTS)
ctxs = THREAD_MATCH_CONTEXTS
if length(ctxs) < tid
global THREAD_MATCH_CONTEXTS = ctxs = copyto!(fill(C_NULL, _nth()), ctxs)
end
finally
unlock(l)
end
Expand All @@ -49,18 +51,7 @@ function get_local_match_context()
if ctx == C_NULL
# slow path to allocate it
ctx = create_match_context()
l = PCRE_COMPILE_LOCK
if l === nothing
THREAD_MATCH_CONTEXTS[tid] = ctx
else
l = l::Threads.SpinLock
lock(l)
try
THREAD_MATCH_CONTEXTS[tid] = ctx
finally
unlock(l)
end
end
THREAD_MATCH_CONTEXTS[tid] = ctx
end
return ctx
end
Expand Down
Loading

0 comments on commit 897c3cf

Please sign in to comment.