Skip to content

Commit

Permalink
refactor multicaster and combinator (#15)
Browse files Browse the repository at this point in the history
* Begin incorporation of DistributorState into MulticasterState

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Complete incorporation of DistributorState in MulticasterState

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Clean up of Decombinator

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Remove Combinator protocol

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Jun 7, 2022
1 parent 43dd4d0 commit 8faef3d
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 188 deletions.
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Combinator/CombineLatestState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Created by Van Simmons on 6/4/22.
//
struct CombineLatestState<Left: Sendable, Right: Sendable>: CombinatorState {
struct CombineLatestState<Left: Sendable, Right: Sendable> {
typealias CombinatorAction = Self.Action
enum Action {
case setLeft(AsyncStream<Left>.Result, UnsafeContinuation<Demand, Swift.Error>)
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Combinator/MergeState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Created by Van Simmons on 5/19/22.
//
struct MergeState<Output: Sendable>: CombinatorState {
struct MergeState<Output: Sendable> {
typealias CombinatorAction = Self.Action
enum Action {
case setValue(AsyncStream<(Int, Output)>.Result, UnsafeContinuation<Demand, Swift.Error>)
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Combinator/ZipState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Created by Van Simmons on 3/16/22.
//
struct ZipState<Left: Sendable, Right: Sendable>: CombinatorState {
struct ZipState<Left: Sendable, Right: Sendable> {
typealias CombinatorAction = Self.Action
enum Action {
case setLeft(AsyncStream<Left>.Result, UnsafeContinuation<Demand, Swift.Error>)
Expand Down
243 changes: 92 additions & 151 deletions Sources/FreeCombine/Decombinator/MulticasterState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@ public struct MulticasterState<Output: Sendable> {
case pause(UnsafeContinuation<Void, Swift.Error>)
case resume(UnsafeContinuation<Void, Swift.Error>)
case disconnect(UnsafeContinuation<Void, Swift.Error>)

case receive(AsyncStream<Output>.Result, UnsafeContinuation<Demand, Swift.Error>)
case subscribe(
@Sendable (AsyncStream<Output>.Result) async throws -> Demand,
UnsafeContinuation<Cancellable<Demand>, Swift.Error>
)
case unsubscribe(Int)
case distribute(DistributorState<Output>.Action)
}

public enum Error: Swift.Error {
Expand All @@ -33,21 +27,22 @@ public struct MulticasterState<Output: Sendable> {
let downstream: @Sendable (AsyncStream<Output>.Result) async throws -> Demand

var cancellable: Cancellable<Demand>?
var nextKey: Int
var repeaters: [Int: StateTask<RepeaterState<Int, Output>, RepeaterState<Int, Output>.Action>]
var upstreamContinuation: UnsafeContinuation<Demand, Swift.Error>?
var isRunning: Bool = false
var distributor: DistributorState<Output>

public init(
upstream: Publisher<Output>,
channel: Channel<MulticasterState<Output>.Action>
) {
self.upstream = upstream
self.nextKey = 0
self.repeaters = [:]
self.downstream = { r in try await withUnsafeThrowingContinuation { continuation in
channel.yield(.receive(r, continuation))
} }
self.distributor = .init(currentValue: .none, nextKey: 0, downstreams: [:])
self.downstream = { r in
await withUnsafeContinuation { continuation in
channel.yield(.distribute(.receive(r, continuation)))
}
return .more
}
}

static func create(
Expand All @@ -59,18 +54,18 @@ public struct MulticasterState<Output: Sendable> {
static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
switch completion {
case .termination:
await state.process(currentRepeaters: state.repeaters, with: .completion(.finished))
await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.finished))
case .exit:
fatalError("Multicaster should never exit")
case let .failure(error):
await state.process(currentRepeaters: state.repeaters, with: .completion(.failure(error)))
await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.failure(error)))
case .cancel:
await state.process(currentRepeaters: state.repeaters, with: .completion(.cancelled))
await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.cancelled))
}
for (_, repeater) in state.repeaters {
for (_, repeater) in state.distributor.repeaters {
repeater.finish()
}
state.repeaters.removeAll()
state.distributor.repeaters.removeAll()
}

static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer<Self, Action>.Effect {
Expand All @@ -80,150 +75,96 @@ public struct MulticasterState<Output: Sendable> {
mutating func reduce(action: Action) async throws -> Reducer<Self, Action>.Effect {
switch action {
case let .connect(continuation):
guard case .none = cancellable else {
continuation.resume(throwing: Error.alreadyConnected)
return .completion(.failure(Error.alreadyConnected))
}
cancellable = await upstream.sink(downstream)
isRunning = true
continuation.resume()
return .none
return try await connect(continuation)
case let .pause(continuation):
guard let _ = cancellable else {
continuation.resume(throwing: Error.disconnected)
return .completion(.failure(Error.disconnected))
}
guard isRunning else {
continuation.resume(throwing: Error.alreadyPaused)
return .completion(.failure(Error.alreadyPaused))
}
isRunning = false
continuation.resume()
return .none
return try await pause(continuation)
case let .resume(continuation):
guard let _ = cancellable else {
continuation.resume(throwing: Error.disconnected)
return .completion(.failure(Error.disconnected))
}
guard !isRunning else {
continuation.resume(throwing: Error.alreadyResumed)
return .completion(.failure(Error.alreadyResumed))
}
isRunning = true
upstreamContinuation?.resume(returning: .more)
continuation.resume()
return .none
return try await resume(continuation)
case let .disconnect(continuation):
guard let _ = cancellable else {
continuation.resume(throwing: Error.alreadyDisconnected)
return .completion(.failure(Error.alreadyDisconnected))
}
isRunning = false
upstreamContinuation?.resume(returning: .done)
continuation.resume()
return .completion(.exit)
case let .receive(result, continuation):
guard case .none = upstreamContinuation else {
upstreamContinuation?.resume(throwing: Error.internalError)
continuation.resume(throwing: Error.internalError)
return .completion(.failure(Error.internalError))
}
await process(currentRepeaters: repeaters, with: result)
if isRunning {
continuation.resume(returning: .more)
upstreamContinuation = .none
} else {
upstreamContinuation = continuation
}
return .none
case let .subscribe(downstream, continuation):
var repeater: Cancellable<Demand>!
let _: Void = await withUnsafeContinuation { outerContinuation in
repeater = process(subscription: downstream, continuation: outerContinuation)
}
continuation.resume(returning: repeater)
return .none
case let .unsubscribe(channelId):
guard let downstream = repeaters.removeValue(forKey: channelId) else {
return .none
}
await process(currentRepeaters: [channelId: downstream], with: .completion(.finished))
return .none
return try await disconnect(continuation)
case let .distribute(action):
return try await distribute(action)
}
}

mutating func process(
currentRepeaters : [Int: StateTask<RepeaterState<Int, Output>, RepeaterState<Int, Output>.Action>],
with result: AsyncStream<Output>.Result
) async -> Void {
guard currentRepeaters.count > 0 else {
return
mutating func connect(
_ continuation: UnsafeContinuation<Void, Swift.Error>
) async throws -> Reducer<Self, Action>.Effect {
guard case .none = cancellable else {
continuation.resume(throwing: Error.alreadyConnected)
return .completion(.failure(Error.alreadyConnected))
}
await withUnsafeContinuation { (completedContinuation: UnsafeContinuation<[Int], Never>) in
// Note that the semaphore's reducer constructs a list of repeaters
// which have responded with .done and that the elements of that list
// are removed at completion of the sends
let semaphore = Semaphore<[Int], RepeatedAction<Int>>(
continuation: completedContinuation,
reducer: { completedIds, action in
guard case let .repeated(id, .done) = action else { return }
completedIds.append(id)
},
initialState: [Int](),
count: currentRepeaters.count
)
cancellable = await upstream.sink(downstream)
isRunning = true
continuation.resume()
return .none
}

for (key, downstreamTask) in currentRepeaters {
let queueStatus = downstreamTask.send(.repeat(result, semaphore))
switch queueStatus {
case .enqueued:
()
case .terminated:
Task { await semaphore.decrement(with: .repeated(key, .done)) }
case .dropped:
fatalError("Should never drop")
@unknown default:
fatalError("Handle new case")
}
}
mutating func pause(
_ continuation: UnsafeContinuation<Void, Swift.Error>
) async throws -> Reducer<Self, Action>.Effect {
guard let _ = cancellable else {
continuation.resume(throwing: Error.disconnected)
return .completion(.failure(Error.disconnected))
}
.forEach { key in
repeaters.removeValue(forKey: key)
guard isRunning else {
continuation.resume(throwing: Error.alreadyPaused)
return .completion(.failure(Error.alreadyPaused))
}
isRunning = false
continuation.resume()
return .none
}

mutating func process(
subscription downstream: @escaping @Sendable (AsyncStream<Output>.Result) async throws -> Demand,
continuation: UnsafeContinuation<Void, Never>?
) -> Cancellable<Demand> {
nextKey += 1
let repeaterState = RepeaterState(id: nextKey, downstream: downstream)
let repeater: StateTask<RepeaterState<Int, Output>, RepeaterState<Int, Output>.Action> = .init(
channel: .init(buffering: .bufferingOldest(1)),
initialState: { _ in repeaterState },
onStartup: continuation,
reducer: Reducer(
onCompletion: RepeaterState.complete,
reducer: RepeaterState.reduce
)
)
repeaters[nextKey] = repeater
return .init(
cancel: {
repeater.cancel()
},
isCancelled: { repeater.isCancelled },
value: {
do {
let value = try await repeater.value
return value.mostRecentDemand
} catch {
fatalError("Could not get demand. Error: \(error)")
mutating func resume(
_ continuation: UnsafeContinuation<Void, Swift.Error>
) async throws -> Reducer<Self, Action>.Effect {
guard let _ = cancellable else {
continuation.resume(throwing: Error.disconnected)
return .completion(.failure(Error.disconnected))
}
guard !isRunning else {
continuation.resume(throwing: Error.alreadyResumed)
return .completion(.failure(Error.alreadyResumed))
}
isRunning = true
upstreamContinuation?.resume(returning: .more)
continuation.resume()
return .none
}

mutating func disconnect(
_ continuation: UnsafeContinuation<Void, Swift.Error>
) async throws -> Reducer<Self, Action>.Effect {
guard let _ = cancellable else {
continuation.resume(throwing: Error.alreadyDisconnected)
return .completion(.failure(Error.alreadyDisconnected))
}
isRunning = false
upstreamContinuation?.resume(returning: .done)
continuation.resume()
return .completion(.exit)
}

mutating func distribute(
_ action: DistributorState<Output>.Action
) async throws -> Reducer<Self, Action>.Effect {
switch try await distributor.reduce(action: action) {
case .none:
return .none
case .published(_):
return .none // FIXME: Need to handle this
case let .completion(completion):
switch completion {
case .termination:
return .completion(.termination)
case .exit:
return .completion(.exit)
case let .failure(error):
return .completion(.failure(error))
case .cancel:
return .completion(.cancel)
}
},
result: {
await repeater.result.map(\.mostRecentDemand)
}
)
}
}
}
29 changes: 12 additions & 17 deletions Sources/FreeCombine/Publishers/Combinator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,27 @@
//
// Created by Van Simmons on 5/8/22.
//
public protocol CombinatorState {
associatedtype CombinatorAction
var mostRecentDemand: Demand { get }
}

public func Combinator<Output: Sendable, State: CombinatorState, Action>(
public func Combinator<Output: Sendable, State, Action>(
initialState: @escaping (@escaping (AsyncStream<Output>.Result) async throws -> Demand) -> (Channel<Action>) async -> State,
buffering: AsyncStream<Action>.Continuation.BufferingPolicy,
reducer: Reducer<State, Action>
) -> Publisher<Output> where State.CombinatorAction == Action {
reducer: Reducer<State, Action>,
extractor: @escaping (State) -> Demand
) -> Publisher<Output> {
.init(
initialState: initialState,
buffering: buffering,
reducer: reducer
reducer: reducer,
extractor: extractor
)
}

public extension Publisher {
init<State: CombinatorState, Action>(
init<State, Action>(
initialState: @escaping (@escaping (AsyncStream<Output>.Result) async throws -> Demand) -> (Channel<Action>) async -> State,
buffering: AsyncStream<Action>.Continuation.BufferingPolicy,
reducer: Reducer<State, Action>
) where State.CombinatorAction == Action {
reducer: Reducer<State, Action>,
extractor: @escaping (State) -> Demand
) {
self = .init { continuation, downstream in
.init {
let stateTask = await Channel(buffering: buffering).stateTask(
Expand All @@ -36,12 +34,9 @@ public extension Publisher {

return try await withTaskCancellationHandler(handler: stateTask.cancel) {
continuation?.resume()
guard !Task.isCancelled else {
throw PublisherError.cancelled
}
guard !Task.isCancelled else { throw PublisherError.cancelled }
let finalState = try await stateTask.value
let mostRecentDemand = finalState.mostRecentDemand
return mostRecentDemand
return extractor(finalState)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion Sources/FreeCombine/Publishers/Combined.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public func combineLatest<Left, Right>(
reducer: Reducer(
onCompletion: CombineLatestState<Left, Right>.complete,
reducer: CombineLatestState<Left, Right>.reduce
)
),
extractor: \.mostRecentDemand
)
}

Expand Down
Loading

0 comments on commit 8faef3d

Please sign in to comment.