diff --git a/Sources/FreeCombine/Combinator/CombineLatestState.swift b/Sources/FreeCombine/Combinator/CombineLatestState.swift index b53458e..4af5686 100644 --- a/Sources/FreeCombine/Combinator/CombineLatestState.swift +++ b/Sources/FreeCombine/Combinator/CombineLatestState.swift @@ -4,7 +4,7 @@ // // Created by Van Simmons on 6/4/22. // -struct CombineLatestState: CombinatorState { +struct CombineLatestState { typealias CombinatorAction = Self.Action enum Action { case setLeft(AsyncStream.Result, UnsafeContinuation) diff --git a/Sources/FreeCombine/Combinator/MergeState.swift b/Sources/FreeCombine/Combinator/MergeState.swift index 65324c8..98f27fc 100644 --- a/Sources/FreeCombine/Combinator/MergeState.swift +++ b/Sources/FreeCombine/Combinator/MergeState.swift @@ -4,7 +4,7 @@ // // Created by Van Simmons on 5/19/22. // -struct MergeState: CombinatorState { +struct MergeState { typealias CombinatorAction = Self.Action enum Action { case setValue(AsyncStream<(Int, Output)>.Result, UnsafeContinuation) diff --git a/Sources/FreeCombine/Combinator/ZipState.swift b/Sources/FreeCombine/Combinator/ZipState.swift index 2963db3..5a2e408 100644 --- a/Sources/FreeCombine/Combinator/ZipState.swift +++ b/Sources/FreeCombine/Combinator/ZipState.swift @@ -4,7 +4,7 @@ // // Created by Van Simmons on 3/16/22. // -struct ZipState: CombinatorState { +struct ZipState { typealias CombinatorAction = Self.Action enum Action { case setLeft(AsyncStream.Result, UnsafeContinuation) diff --git a/Sources/FreeCombine/Decombinator/MulticasterState.swift b/Sources/FreeCombine/Decombinator/MulticasterState.swift index dc9bafd..9bb578d 100644 --- a/Sources/FreeCombine/Decombinator/MulticasterState.swift +++ b/Sources/FreeCombine/Decombinator/MulticasterState.swift @@ -11,13 +11,7 @@ public struct MulticasterState { case pause(UnsafeContinuation) case resume(UnsafeContinuation) case disconnect(UnsafeContinuation) - - case receive(AsyncStream.Result, UnsafeContinuation) - case subscribe( - @Sendable (AsyncStream.Result) async throws -> Demand, - UnsafeContinuation, Swift.Error> - ) - case unsubscribe(Int) + case distribute(DistributorState.Action) } public enum Error: Swift.Error { @@ -33,21 +27,22 @@ public struct MulticasterState { let downstream: @Sendable (AsyncStream.Result) async throws -> Demand var cancellable: Cancellable? - var nextKey: Int - var repeaters: [Int: StateTask, RepeaterState.Action>] var upstreamContinuation: UnsafeContinuation? var isRunning: Bool = false + var distributor: DistributorState public init( upstream: Publisher, channel: Channel.Action> ) { self.upstream = upstream - self.nextKey = 0 - self.repeaters = [:] - self.downstream = { r in try await withUnsafeThrowingContinuation { continuation in - channel.yield(.receive(r, continuation)) - } } + self.distributor = .init(currentValue: .none, nextKey: 0, downstreams: [:]) + self.downstream = { r in + await withUnsafeContinuation { continuation in + channel.yield(.distribute(.receive(r, continuation))) + } + return .more + } } static func create( @@ -59,18 +54,18 @@ public struct MulticasterState { static func complete(state: inout Self, completion: Reducer.Completion) async -> Void { switch completion { case .termination: - await state.process(currentRepeaters: state.repeaters, with: .completion(.finished)) + await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.finished)) case .exit: fatalError("Multicaster should never exit") case let .failure(error): - await state.process(currentRepeaters: state.repeaters, with: .completion(.failure(error))) + await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.failure(error))) case .cancel: - await state.process(currentRepeaters: state.repeaters, with: .completion(.cancelled)) + await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.cancelled)) } - for (_, repeater) in state.repeaters { + for (_, repeater) in state.distributor.repeaters { repeater.finish() } - state.repeaters.removeAll() + state.distributor.repeaters.removeAll() } static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer.Effect { @@ -80,150 +75,96 @@ public struct MulticasterState { mutating func reduce(action: Action) async throws -> Reducer.Effect { switch action { case let .connect(continuation): - guard case .none = cancellable else { - continuation.resume(throwing: Error.alreadyConnected) - return .completion(.failure(Error.alreadyConnected)) - } - cancellable = await upstream.sink(downstream) - isRunning = true - continuation.resume() - return .none + return try await connect(continuation) case let .pause(continuation): - guard let _ = cancellable else { - continuation.resume(throwing: Error.disconnected) - return .completion(.failure(Error.disconnected)) - } - guard isRunning else { - continuation.resume(throwing: Error.alreadyPaused) - return .completion(.failure(Error.alreadyPaused)) - } - isRunning = false - continuation.resume() - return .none + return try await pause(continuation) case let .resume(continuation): - guard let _ = cancellable else { - continuation.resume(throwing: Error.disconnected) - return .completion(.failure(Error.disconnected)) - } - guard !isRunning else { - continuation.resume(throwing: Error.alreadyResumed) - return .completion(.failure(Error.alreadyResumed)) - } - isRunning = true - upstreamContinuation?.resume(returning: .more) - continuation.resume() - return .none + return try await resume(continuation) case let .disconnect(continuation): - guard let _ = cancellable else { - continuation.resume(throwing: Error.alreadyDisconnected) - return .completion(.failure(Error.alreadyDisconnected)) - } - isRunning = false - upstreamContinuation?.resume(returning: .done) - continuation.resume() - return .completion(.exit) - case let .receive(result, continuation): - guard case .none = upstreamContinuation else { - upstreamContinuation?.resume(throwing: Error.internalError) - continuation.resume(throwing: Error.internalError) - return .completion(.failure(Error.internalError)) - } - await process(currentRepeaters: repeaters, with: result) - if isRunning { - continuation.resume(returning: .more) - upstreamContinuation = .none - } else { - upstreamContinuation = continuation - } - return .none - case let .subscribe(downstream, continuation): - var repeater: Cancellable! - let _: Void = await withUnsafeContinuation { outerContinuation in - repeater = process(subscription: downstream, continuation: outerContinuation) - } - continuation.resume(returning: repeater) - return .none - case let .unsubscribe(channelId): - guard let downstream = repeaters.removeValue(forKey: channelId) else { - return .none - } - await process(currentRepeaters: [channelId: downstream], with: .completion(.finished)) - return .none + return try await disconnect(continuation) + case let .distribute(action): + return try await distribute(action) } } - mutating func process( - currentRepeaters : [Int: StateTask, RepeaterState.Action>], - with result: AsyncStream.Result - ) async -> Void { - guard currentRepeaters.count > 0 else { - return + mutating func connect( + _ continuation: UnsafeContinuation + ) async throws -> Reducer.Effect { + guard case .none = cancellable else { + continuation.resume(throwing: Error.alreadyConnected) + return .completion(.failure(Error.alreadyConnected)) } - await withUnsafeContinuation { (completedContinuation: UnsafeContinuation<[Int], Never>) in - // Note that the semaphore's reducer constructs a list of repeaters - // which have responded with .done and that the elements of that list - // are removed at completion of the sends - let semaphore = Semaphore<[Int], RepeatedAction>( - continuation: completedContinuation, - reducer: { completedIds, action in - guard case let .repeated(id, .done) = action else { return } - completedIds.append(id) - }, - initialState: [Int](), - count: currentRepeaters.count - ) + cancellable = await upstream.sink(downstream) + isRunning = true + continuation.resume() + return .none + } - for (key, downstreamTask) in currentRepeaters { - let queueStatus = downstreamTask.send(.repeat(result, semaphore)) - switch queueStatus { - case .enqueued: - () - case .terminated: - Task { await semaphore.decrement(with: .repeated(key, .done)) } - case .dropped: - fatalError("Should never drop") - @unknown default: - fatalError("Handle new case") - } - } + mutating func pause( + _ continuation: UnsafeContinuation + ) async throws -> Reducer.Effect { + guard let _ = cancellable else { + continuation.resume(throwing: Error.disconnected) + return .completion(.failure(Error.disconnected)) } - .forEach { key in - repeaters.removeValue(forKey: key) + guard isRunning else { + continuation.resume(throwing: Error.alreadyPaused) + return .completion(.failure(Error.alreadyPaused)) } + isRunning = false + continuation.resume() + return .none } - mutating func process( - subscription downstream: @escaping @Sendable (AsyncStream.Result) async throws -> Demand, - continuation: UnsafeContinuation? - ) -> Cancellable { - nextKey += 1 - let repeaterState = RepeaterState(id: nextKey, downstream: downstream) - let repeater: StateTask, RepeaterState.Action> = .init( - channel: .init(buffering: .bufferingOldest(1)), - initialState: { _ in repeaterState }, - onStartup: continuation, - reducer: Reducer( - onCompletion: RepeaterState.complete, - reducer: RepeaterState.reduce - ) - ) - repeaters[nextKey] = repeater - return .init( - cancel: { - repeater.cancel() - }, - isCancelled: { repeater.isCancelled }, - value: { - do { - let value = try await repeater.value - return value.mostRecentDemand - } catch { - fatalError("Could not get demand. Error: \(error)") + mutating func resume( + _ continuation: UnsafeContinuation + ) async throws -> Reducer.Effect { + guard let _ = cancellable else { + continuation.resume(throwing: Error.disconnected) + return .completion(.failure(Error.disconnected)) + } + guard !isRunning else { + continuation.resume(throwing: Error.alreadyResumed) + return .completion(.failure(Error.alreadyResumed)) + } + isRunning = true + upstreamContinuation?.resume(returning: .more) + continuation.resume() + return .none + } + + mutating func disconnect( + _ continuation: UnsafeContinuation + ) async throws -> Reducer.Effect { + guard let _ = cancellable else { + continuation.resume(throwing: Error.alreadyDisconnected) + return .completion(.failure(Error.alreadyDisconnected)) + } + isRunning = false + upstreamContinuation?.resume(returning: .done) + continuation.resume() + return .completion(.exit) + } + + mutating func distribute( + _ action: DistributorState.Action + ) async throws -> Reducer.Effect { + switch try await distributor.reduce(action: action) { + case .none: + return .none + case .published(_): + return .none // FIXME: Need to handle this + case let .completion(completion): + switch completion { + case .termination: + return .completion(.termination) + case .exit: + return .completion(.exit) + case let .failure(error): + return .completion(.failure(error)) + case .cancel: + return .completion(.cancel) } - }, - result: { - await repeater.result.map(\.mostRecentDemand) - } - ) + } } } diff --git a/Sources/FreeCombine/Publishers/Combinator.swift b/Sources/FreeCombine/Publishers/Combinator.swift index e445716..e5dcb7f 100644 --- a/Sources/FreeCombine/Publishers/Combinator.swift +++ b/Sources/FreeCombine/Publishers/Combinator.swift @@ -4,29 +4,27 @@ // // Created by Van Simmons on 5/8/22. // -public protocol CombinatorState { - associatedtype CombinatorAction - var mostRecentDemand: Demand { get } -} - -public func Combinator( +public func Combinator( initialState: @escaping (@escaping (AsyncStream.Result) async throws -> Demand) -> (Channel) async -> State, buffering: AsyncStream.Continuation.BufferingPolicy, - reducer: Reducer -) -> Publisher where State.CombinatorAction == Action { + reducer: Reducer, + extractor: @escaping (State) -> Demand +) -> Publisher { .init( initialState: initialState, buffering: buffering, - reducer: reducer + reducer: reducer, + extractor: extractor ) } public extension Publisher { - init( + init( initialState: @escaping (@escaping (AsyncStream.Result) async throws -> Demand) -> (Channel) async -> State, buffering: AsyncStream.Continuation.BufferingPolicy, - reducer: Reducer - ) where State.CombinatorAction == Action { + reducer: Reducer, + extractor: @escaping (State) -> Demand + ) { self = .init { continuation, downstream in .init { let stateTask = await Channel(buffering: buffering).stateTask( @@ -36,12 +34,9 @@ public extension Publisher { return try await withTaskCancellationHandler(handler: stateTask.cancel) { continuation?.resume() - guard !Task.isCancelled else { - throw PublisherError.cancelled - } + guard !Task.isCancelled else { throw PublisherError.cancelled } let finalState = try await stateTask.value - let mostRecentDemand = finalState.mostRecentDemand - return mostRecentDemand + return extractor(finalState) } } } diff --git a/Sources/FreeCombine/Publishers/Combined.swift b/Sources/FreeCombine/Publishers/Combined.swift index 240e027..2957994 100644 --- a/Sources/FreeCombine/Publishers/Combined.swift +++ b/Sources/FreeCombine/Publishers/Combined.swift @@ -30,7 +30,8 @@ public func combineLatest( reducer: Reducer( onCompletion: CombineLatestState.complete, reducer: CombineLatestState.reduce - ) + ), + extractor: \.mostRecentDemand ) } diff --git a/Sources/FreeCombine/Publishers/Decombinator.swift b/Sources/FreeCombine/Publishers/Decombinator.swift index 466ad92..3fee3eb 100644 --- a/Sources/FreeCombine/Publishers/Decombinator.swift +++ b/Sources/FreeCombine/Publishers/Decombinator.swift @@ -74,7 +74,7 @@ public extension Publisher { self = .init { continuation, downstream in let t: Task, Swift.Error> = .init { let c: Cancellable = try await withUnsafeThrowingContinuation { demandContinuation in - let enqueueStatus = stateTask.send(.subscribe(downstream, demandContinuation)) + let enqueueStatus = stateTask.send(.distribute(.subscribe(downstream, demandContinuation))) guard case .enqueued = enqueueStatus else { return demandContinuation.resume(throwing: PublisherError.enqueueError) } @@ -83,20 +83,11 @@ public extension Publisher { return c } return .init( - cancel: { Task { - await t.result.map { - $0.cancel() - } - } }, + cancel: { Task { await t.result.map { $0.cancel() } } }, isCancelled: { t.isCancelled }, - value: { - let cancellable = try await t.value - let value = try await cancellable.value - return value - }, + value: { try await t.value.value }, result: { - let r = await t.result - switch r { + switch await t.result { case let .success(result): return await result.result case let .failure(error): return .failure(error) } diff --git a/Sources/FreeCombine/Publishers/Merged.swift b/Sources/FreeCombine/Publishers/Merged.swift index 84c514f..ae5d307 100644 --- a/Sources/FreeCombine/Publishers/Merged.swift +++ b/Sources/FreeCombine/Publishers/Merged.swift @@ -54,6 +54,7 @@ public func merge( } }, reducer: MergeState.reduce - ) + ), + extractor: \.mostRecentDemand ) } diff --git a/Sources/FreeCombine/Publishers/Zipped.swift b/Sources/FreeCombine/Publishers/Zipped.swift index c01d865..a9abe0d 100644 --- a/Sources/FreeCombine/Publishers/Zipped.swift +++ b/Sources/FreeCombine/Publishers/Zipped.swift @@ -30,7 +30,8 @@ public func zip( reducer: Reducer( onCompletion: ZipState.complete, reducer: ZipState.reduce - ) + ), + extractor: \.mostRecentDemand ) } diff --git a/Sources/FreeCombine/State/StateTask.swift b/Sources/FreeCombine/State/StateTask.swift index 5c25479..1f60a87 100644 --- a/Sources/FreeCombine/State/StateTask.swift +++ b/Sources/FreeCombine/State/StateTask.swift @@ -7,7 +7,7 @@ #actor problems 1. no oneway funcs (can't call from synchronous code) - 2. can't selectively block callers + 2. can't selectively block callers (to pass a continuation to an actor requires spawning a task which can introduce a race condition and is really heavy-weight) 3. can't block on internal state (can only block with async call to another task) 4. no concept of cancellation 5. execute on global actor queues (generally not needed or desirable)