Skip to content

Commit

Permalink
Final removal of actors (#57)
Browse files Browse the repository at this point in the history
* Cleanup TimeNever

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Documentation update

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Final removal actors.

Semaphore may still need some thought, but tests pass

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Aug 20, 2022
1 parent 79dd3df commit 408708c
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public struct DistributorRepeaterState<ID: Hashable & Sendable, Output: Sendable
if case .completion = output { mostRecentDemand = .done }
}
catch { mostRecentDemand = .done }
await semaphore.decrement(with: .repeated(id, mostRecentDemand))
semaphore.decrement(with: .repeated(id, mostRecentDemand))
return mostRecentDemand == .done ? .completion(.exit) : .none
}
}
Expand Down
6 changes: 4 additions & 2 deletions Sources/FreeCombine/Decombinator/DistributorState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ public struct DistributorState<Output: Sendable> {
with result: AsyncStream<Output>.Result
) async throws -> Void {
guard currentRepeaters.count > 0 else { return }
var semaphore: Semaphore<[Int], RepeatedAction<Int>>!
try await withResumption { (completedResumption: Resumption<[Int]>) in
// Note that the semaphore's reducer constructs a list of repeaters
// which have responded with .done and that the elements of that list
// are removed at completion of the sends
let semaphore = Semaphore<[Int], RepeatedAction<Int>>(
semaphore = Semaphore<[Int], RepeatedAction<Int>>(
resumption: completedResumption,
reducer: { completedIds, action in
guard case let .repeated(id, .done) = action else { return }
Expand All @@ -168,7 +169,7 @@ public struct DistributorState<Output: Sendable> {
case .enqueued:
()
case .terminated:
Task { await semaphore.decrement(with: .repeated(key, .done)) }
semaphore.decrement(with: .repeated(key, .done))
case .dropped:
fatalError("Should never drop")
@unknown default:
Expand All @@ -179,6 +180,7 @@ public struct DistributorState<Output: Sendable> {
.forEach { key in
repeaters.removeValue(forKey: key)
}
assert(semaphore.count == 0, "Exiting with semaphore unresolved")
}

mutating func process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public struct PromiseRepeaterState<ID: Hashable & Sendable, Output: Sendable>: I
switch action {
case let .complete(output, semaphore):
try? await downstream(output)
await semaphore.decrement(with: .repeated(id, .done))
semaphore.decrement(with: .repeated(id, .done))
return .completion(.exit)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Decombinator/PromiseState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public struct PromiseState<Output: Sendable> {
case .enqueued:
()
case .terminated:
Task { await semaphore.decrement(with: .repeated(key, .done)) }
semaphore.decrement(with: .repeated(key, .done))
case .dropped:
fatalError("Should never drop")
@unknown default:
Expand Down
12 changes: 6 additions & 6 deletions Sources/FreeCombine/State/StateTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
/*:
#actor problems

1. no oneway funcs (can't call from synchronous code)
2. can't selectively block callers (to pass a continuation to an actor requires spawning a task which gives up ordering guarantees)
3. can't block calling tasks on internal state (can only block with async call to another task)
4. no concept of cancellation (cannot perform orderly shutdown with outstanding requests in flight)
5. execute on global actor queues (generally not needed or desirable)
6. No way of possible failure to enqueue on an overburdened actor, all requests enter an unbounded queue
1. no oneway funcs (i.e. they can’t be called from synchronous code)
2. cant selectively block callers in order (i.e. passing a continuation to an actor requires spawning a task which gives up ordering guarantees)
3. cant block calling tasks on internal state (can only block with async call to another task)
4. have no concept of cancellation (cannot perform orderly shutdown with outstanding requests in flight)
5. they execute on global actor queues (generally not needed or desirable to go off-Task for these things)
6. No way to allow possible failure to enqueue on an overburdened actor, all requests enter an unbounded queue

#actor solutions: StateTask - a swift implementation of the Haskell ST monad

Expand Down
135 changes: 135 additions & 0 deletions Sources/FreeCombine/Synchronization/LockFreeQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//
// LockFreeQueue.swift
//
//
// Created by Van Simmons on 8/20/22.
// Derived from: https://github.com/apple/swift-atomics/blob/main/Tests/AtomicsTests/LockFreeQueue.swift
// As per, the Apache license this is a derivative work.
//
// Copyright 2022, ComputeCycles, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// A lock-free concurrent queue implementation adapted from
// M. Michael and M. Scott's 1996 paper [Michael 1996].
//
// [Michael 1996]: https://doi.org/10.1145/248052.248106
//
// While this is a nice illustration of the use of atomic strong references,
// this is a somewhat sloppy implementation of an old algorithm. If you need a
// lock-free queue for actual production use, it would probably be a good idea
// to look at some more recent algorithms before deciding on this one.
//
// Note: because this implementation uses reference counting, we don't need
// to implement a free list to resolve the original algorithm's use-after-free
// problem.

import Atomics

private let nodeCount = ManagedAtomic<Int>(0)

class LockFreeQueue<Element> {
final class Node: AtomicReference {
let next: ManagedAtomic<Node?>
var value: Element?

init(value: Element?, next: Node?) {
self.value = value
self.next = ManagedAtomic(next)
nodeCount.wrappingIncrement(ordering: .relaxed)
}

deinit {
var values = 0
// Prevent stack overflow when reclaiming a long queue
var node = self.next.exchange(nil, ordering: .relaxed)
while node != nil && isKnownUniquelyReferenced(&node) {
let next = node!.next.exchange(nil, ordering: .relaxed)
withExtendedLifetime(node) {
values += 1
}
node = next
}
if values > 0 {
fatalError("Deinit of lock free queue failed to dereference values")
}
nodeCount.wrappingDecrement(ordering: .relaxed)
}
}

let head: ManagedAtomic<Node>
let tail: ManagedAtomic<Node>

// Used to distinguish removed nodes from active nodes with a nil `next`.
let marker = Node(value: nil, next: nil)

init() {
let dummy = Node(value: nil, next: nil)
self.head = ManagedAtomic(dummy)
self.tail = ManagedAtomic(dummy)
}

func enqueue(_ newValue: Element) {
let new = Node(value: newValue, next: nil)
var tail = self.tail.load(ordering: .acquiring)
while true {
let next = tail.next.load(ordering: .acquiring)
if tail === marker || next === marker {
tail = self.tail.load(ordering: .acquiring)
continue
}
if let next = next {
let (exchanged, original) = self.tail.compareExchange(
expected: tail,
desired: next,
ordering: .acquiringAndReleasing
)
tail = (exchanged ? next : original)
continue
}
let (exchanged, current) = tail.next.compareExchange(
expected: nil,
desired: new,
ordering: .acquiringAndReleasing
)
if exchanged {
_ = self.tail.compareExchange(expected: tail, desired: new, ordering: .releasing)
return
}
tail = current!
}
}

func dequeue() -> Element? {
while true {
let head = self.head.load(ordering: .acquiring)
let next = head.next.load(ordering: .acquiring)
if next === marker { continue }
guard let n = next else { return nil }
let tail = self.tail.load(ordering: .acquiring)
if head === tail {
_ = self.tail.compareExchange(expected: tail, desired: n, ordering: .acquiringAndReleasing)
}
if self.head.compareExchange(expected: head, desired: n, ordering: .releasing).exchanged {
let result = n.value!
n.value = nil
// To prevent threads that are suspended in `enqueue`/`dequeue` from
// holding onto arbitrarily long chains of removed nodes, we unlink
// removed nodes by replacing their `next` value with the special
// `marker`.
head.next.store(marker, ordering: .releasing)
return result
}
}
}
}
42 changes: 23 additions & 19 deletions Sources/FreeCombine/Synchronization/Semaphore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
public actor Semaphore<State, Action> {
public class Semaphore<State, Action> {
public enum Error: Swift.Error {
case complete
}
private let resumption: Resumption<State>
private let reducer: (inout State, Action) -> Void

private var state: State
private var count: Int
private var counter: Counter
private var actions: SingleConsumerStack<Action> = .init()

var count: Int { counter.count }

public init(
resumption: Resumption<State>,
Expand All @@ -37,26 +40,27 @@ public actor Semaphore<State, Action> {
self.resumption = resumption
self.reducer = reducer
self.state = initialState
self.count = count
self.counter = .init(count: count)
if count == 0 { resumption.resume(returning: initialState) }
}

public func decrement(with action: Action, function: String = #function, file: String = #file, line: Int = #line) -> Void {
guard count > 0 else {
fatalError("Semaphore decremented after complete in \(function) @\(file):\(line)")
}
count -= 1
reducer(&state, action)
if count == 0 {
resumption.resume(returning: state)
}
}

public func increment(with action: Action, function: String = #function, file: String = #file, line: Int = #line) -> Void {
guard count > 0 else {
fatalError("Semaphore incremented after complete in \(function) @\(file):\(line)")
public func decrement(
function: String = #function,
file: String = #file,
line: Int = #line,
with action: Action
) -> Void {
actions.push(action)
switch counter.decrement() {
case Int.min ..< 0:
fatalError("Semaphore decremented after complete in \(function) @\(file):\(line)")
case 1 ... Int.max:
return
default:
while let prevAction = actions.pop() {
reducer(&state, prevAction)
}
}
count += 1
reducer(&state, action)
resumption.resume(returning: state)
}
}
39 changes: 25 additions & 14 deletions Sources/FreeCombine/Synchronization/SingleConsumerStack.swift
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Atomics open source project
// Semaphore.swift
//
// Copyright (c) 2020-2021 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
// Created by Van Simmons on 4/24/22.
// Derived from: https://github.com/apple/swift-atomics/blob/main/Tests/AtomicsTests/LockFreeSingleConsumerStack.swift
// As per, the Apache license this is a derivative work.
//
// Copyright 2022, ComputeCycles, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//===----------------------------------------------------------------------===//
import Atomics

class LockFreeSingleConsumerStack<Element> {
class SingleConsumerStack<Element> {
struct Node {
let value: Element
var next: UnsafeMutablePointer<Node>?
Expand All @@ -24,7 +35,7 @@ class LockFreeSingleConsumerStack<Element> {

deinit {
// Discard remaining nodes
while let _ = pop() {}
while let _ = pop() { }
_last.destroy()
_consumerCount.destroy()
}
Expand All @@ -42,17 +53,17 @@ class LockFreeSingleConsumerStack<Element> {
(done, current) = _last.compareExchange(
expected: current,
desired: new,
ordering: .releasing)
ordering: .releasing
)
}
}

// Pop and return the topmost element from the stack.
// This method does not support multiple overlapping concurrent calls.
func pop() -> Element? {
precondition(
_consumerCount.loadThenWrappingIncrement(ordering: .acquiring) == 0,
"Multiple consumers detected"
)
guard _consumerCount.loadThenWrappingIncrement(ordering: .acquiring) == 0 else {
return .none
}
defer { _consumerCount.wrappingDecrement(ordering: .releasing) }
var done = false
var current = _last.load(ordering: .acquiring)
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Utils/Counter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public struct Counter {
}

@discardableResult
public func decrement(by: Int = -1) -> Int {
public func decrement(by: Int = 1) -> Int {
var c = atomicValue.load(ordering: .sequentiallyConsistent)
while !atomicValue.compareExchange(
expected: c,
Expand Down
17 changes: 15 additions & 2 deletions Sources/Time/TimeNever.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@
//
// Created by Van Simmons on 7/4/22.
//

// Copyright 2022, ComputeCycles, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
enum TimeNever {

// Provide a type for this package which is only used for testing
}
4 changes: 2 additions & 2 deletions Tests/FreeCombineTests/PromiseTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ final class PromiseTests: XCTestCase {
)
for _ in 0 ..< maxAttempts {
Task {
do { try await promise.succeed(13); succeedCounter.increment(); await semaphore.decrement(with: ()) }
catch { failureCounter.increment(); await semaphore.decrement(with: ()) }
do { try await promise.succeed(13); succeedCounter.increment(); semaphore.decrement(with: ()) }
catch { failureCounter.increment(); semaphore.decrement(with: ()) }
}
}
}
Expand Down

0 comments on commit 408708c

Please sign in to comment.