Skip to content

Commit

Permalink
Roll back semaphore change (#59)
Browse files Browse the repository at this point in the history
* Revert "Final removal of actors (#57)"

This reverts commit 2f8d44c.

* Update dock, add types from Atomics, fix Counter bug

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Aug 21, 2022
1 parent 408708c commit 26325a5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public struct DistributorRepeaterState<ID: Hashable & Sendable, Output: Sendable
if case .completion = output { mostRecentDemand = .done }
}
catch { mostRecentDemand = .done }
semaphore.decrement(with: .repeated(id, mostRecentDemand))
await semaphore.decrement(with: .repeated(id, mostRecentDemand))
return mostRecentDemand == .done ? .completion(.exit) : .none
}
}
Expand Down
6 changes: 2 additions & 4 deletions Sources/FreeCombine/Decombinator/DistributorState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,11 @@ public struct DistributorState<Output: Sendable> {
with result: AsyncStream<Output>.Result
) async throws -> Void {
guard currentRepeaters.count > 0 else { return }
var semaphore: Semaphore<[Int], RepeatedAction<Int>>!
try await withResumption { (completedResumption: Resumption<[Int]>) in
// Note that the semaphore's reducer constructs a list of repeaters
// which have responded with .done and that the elements of that list
// are removed at completion of the sends
semaphore = Semaphore<[Int], RepeatedAction<Int>>(
let semaphore = Semaphore<[Int], RepeatedAction<Int>>(
resumption: completedResumption,
reducer: { completedIds, action in
guard case let .repeated(id, .done) = action else { return }
Expand All @@ -169,7 +168,7 @@ public struct DistributorState<Output: Sendable> {
case .enqueued:
()
case .terminated:
semaphore.decrement(with: .repeated(key, .done))
Task { await semaphore.decrement(with: .repeated(key, .done)) }
case .dropped:
fatalError("Should never drop")
@unknown default:
Expand All @@ -180,7 +179,6 @@ public struct DistributorState<Output: Sendable> {
.forEach { key in
repeaters.removeValue(forKey: key)
}
assert(semaphore.count == 0, "Exiting with semaphore unresolved")
}

mutating func process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public struct PromiseRepeaterState<ID: Hashable & Sendable, Output: Sendable>: I
switch action {
case let .complete(output, semaphore):
try? await downstream(output)
semaphore.decrement(with: .repeated(id, .done))
await semaphore.decrement(with: .repeated(id, .done))
return .completion(.exit)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Decombinator/PromiseState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public struct PromiseState<Output: Sendable> {
case .enqueued:
()
case .terminated:
semaphore.decrement(with: .repeated(key, .done))
Task { await semaphore.decrement(with: .repeated(key, .done)) }
case .dropped:
fatalError("Should never drop")
@unknown default:
Expand Down
42 changes: 19 additions & 23 deletions Sources/FreeCombine/Synchronization/Semaphore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
public class Semaphore<State, Action> {
public actor Semaphore<State, Action> {
public enum Error: Swift.Error {
case complete
}
private let resumption: Resumption<State>
private let reducer: (inout State, Action) -> Void

private var state: State
private var counter: Counter
private var actions: SingleConsumerStack<Action> = .init()

var count: Int { counter.count }
private var count: Int

public init(
resumption: Resumption<State>,
Expand All @@ -40,27 +37,26 @@ public class Semaphore<State, Action> {
self.resumption = resumption
self.reducer = reducer
self.state = initialState
self.counter = .init(count: count)
self.count = count
if count == 0 { resumption.resume(returning: initialState) }
}

public func decrement(
function: String = #function,
file: String = #file,
line: Int = #line,
with action: Action
) -> Void {
actions.push(action)
switch counter.decrement() {
case Int.min ..< 0:
fatalError("Semaphore decremented after complete in \(function) @\(file):\(line)")
case 1 ... Int.max:
return
default:
while let prevAction = actions.pop() {
reducer(&state, prevAction)
}
public func decrement(with action: Action, function: String = #function, file: String = #file, line: Int = #line) -> Void {
guard count > 0 else {
fatalError("Semaphore decremented after complete in \(function) @\(file):\(line)")
}
count -= 1
reducer(&state, action)
if count == 0 {
resumption.resume(returning: state)
}
}

public func increment(with action: Action, function: String = #function, file: String = #file, line: Int = #line) -> Void {
guard count > 0 else {
fatalError("Semaphore incremented after complete in \(function) @\(file):\(line)")
}
resumption.resume(returning: state)
count += 1
reducer(&state, action)
}
}
4 changes: 2 additions & 2 deletions Tests/FreeCombineTests/PromiseTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ final class PromiseTests: XCTestCase {
)
for _ in 0 ..< maxAttempts {
Task {
do { try await promise.succeed(13); succeedCounter.increment(); semaphore.decrement(with: ()) }
catch { failureCounter.increment(); semaphore.decrement(with: ()) }
do { try await promise.succeed(13); succeedCounter.increment(); await semaphore.decrement(with: ()) }
catch { failureCounter.increment(); await semaphore.decrement(with: ()) }
}
}
}
Expand Down

0 comments on commit 26325a5

Please sign in to comment.