diff --git a/Playgrounds/02 - Continuation Passing Style.playground/Pages/10 - Push - Pull Duality.xcplaygroundpage/Contents.swift b/Playgrounds/02 - Continuation Passing Style.playground/Pages/10 - Push - Pull Duality.xcplaygroundpage/Contents.swift new file mode 100644 index 0000000..2d90498 --- /dev/null +++ b/Playgrounds/02 - Continuation Passing Style.playground/Pages/10 - Push - Pull Duality.xcplaygroundpage/Contents.swift @@ -0,0 +1,371 @@ +//: [Previous](@previous) + +// Quick review +// Make and infix version of apply +precedencegroup ApplicationPrecedence { + associativity: left + higherThan: AssignmentPrecedence +} +precedencegroup CompositionPrecedence { + associativity: right + higherThan: ApplicationPrecedence + lowerThan: MultiplicationPrecedence, AdditionPrecedence +} + +// let's do composition as an operator in infix form +infix operator >>> : CompositionPrecedence // Application +infix operator |> : ApplicationPrecedence // Application + +// Three functions that compose together +func doubler(_ value: Int) -> Double { .init(value * 2) } +func toString(_ value: Double) -> String { "\(value)" } +func encloseInSpaces(_ value: String) -> String { " \(value) " } + +// what does it mean to say that functions "compose" +encloseInSpaces(toString(doubler(14))) + +// Note that functions have type and it is the type that allows them to compose +type(of: doubler) + +// I can compose (Int) -> Double with: +// (Double) -> String with: +// (String) -> String + +// I can take _any_ function of one variable and make it +// a computed var on the type of the variable +// This is possible because computed vars are really functions themselves +extension Double { + // underneath this is a function (Double) -> String + var toString: String { "\(self)" } +} +extension String { + // underneath this is a function (String) -> String + var encloseInSpaces: String { " \(self) " } +} + +extension Int { + // (Int) -> Double + var doubler: Double { .init(self * 2) } +} + +// Importantly, _EVERYTHING_ you think of as a `method` on an object +// is, in fact, a curried function +extension Int { + // (Int) -> () -> Double + func yetAnotherDoubler() -> Double { .init(self * 2) } + + // (Int) -> (Double) -> Double + func add(someDouble: Double) -> Double { Double(self) + someDouble } + + // (Int) -> (Int, Double) -> Double + func multiply(by anInt: Int, andAdd aDouble: Double) -> Double { Double(self * anInt) + aDouble } +} + +extension Int { + static func anotherDoubler(_ anInt: Int) -> Double { .init(anInt * 2) } +} + +type(of: Int.anotherDoubler) // (Int) -> Double +type(of: Int.yetAnotherDoubler) // (Int) -> () -> Double +type(of: Int.add) +type(of: Int.multiply) + +func yetAnotherDoubler(_ `self`: Int) -> () -> Double { + { .init(`self` * 2) } +} +type(of: yetAnotherDoubler) // (Int) -> () -> Double + +14.doubler +14.yetAnotherDoubler() + +// NB The compiler lies to us about computed vars, it won't tell us the type like +// it will functions +type(of: \Int.doubler) + +// These statements are exactly equivalent, however +doubler(14) +14.doubler + +// Reminder I can _always_ rearrange arguments to a function +func flip( + _ f: @escaping (A, B) -> C +) -> (B, A) -> C { + { b, a in f(a,b) } +} + +func flip( + _ f: @escaping (A) -> (B) -> C +) -> (B) -> (A) -> C { + { b in { a in f(a)(b) } } +} + +func flip( + _ f: @escaping (A) -> () -> C +) -> () -> (A) -> C { + { { a in f(a)() } } +} + +flip(yetAnotherDoubler)() + +// Infix form +// `+` is a function, just in "infix" form +14 + 13 + +// we can compose functions +public func compose( + _ f: @escaping (A) -> B, + _ g: @escaping (B) -> C +) -> (A) -> C { + { a in g(f(a)) } +} + + +// This EXACTLY the same as `compose` just named to be an operation in infix notation. +public func >>>( + _ f: @escaping (A) -> B, + _ g: @escaping (B) -> C +) -> (A) -> C { + { a in g(f(a)) } +} + +encloseInSpaces(toString(doubler(14))) +let composedDoubleString = compose(doubler, compose(toString, encloseInSpaces)) +composedDoubleString(14) +let doubleString = doubler >>> toString >>> encloseInSpaces +doubleString(14) + + +// inline the compositions "by hand" +let toStringExplicit = { innerInt in toString(doubler(innerInt) ) } +let encloseInSpacesExplicit = { outerInt in encloseInSpaces(toStringExplicit(outerInt) ) } +// lambda calculus form +let doubleStringExplicit = { outerInt in encloseInSpaces({ innerInt in toString(doubler(innerInt)) }(outerInt) ) } +doubleStringExplicit(14) + +let value = 14 +doubleString(value) +doubleStringExplicit(value) +(doubler >>> toString >>> encloseInSpaces)(value) + +// Introduce apply as reversed invocation +// Note: with @inlinable apply(a, f) == f(a) +func apply( + _ a: A, + _ f: (A) -> R +) -> R { + f(a) +} + + apply(14, doubler) + apply(apply(14, doubler), toString) +apply(apply(apply(14, doubler), toString), encloseInSpaces) +apply(apply(apply(14, \Int.doubler), \Double.toString), \String.encloseInSpaces) +apply(apply(apply(14, \.doubler), \.toString), \.encloseInSpaces) + +apply(14, doubleString) + +// Note that this is _exactly_ the apply function, just in `infix` form +public func |>( + _ a: A, + _ f: (A) -> R +) -> R { + f(a) +} + +// Given the above, all of these are EXACTLY the same thing +// we're just sprinkling on some "syntactic sugar" +apply(14, doubleString) +14 |> doubleString +14 |> \Int.doubler +14 |> \.doubler + +// if we used the infix form of apply, these would be the same as: +14.doubler +doubler(14) + +// Now lets compare and contrast apply and compose +// reminder, this is composition +(doubler >>> toString >>> encloseInSpaces)(14) // Direct Style + +// each of these is different, but all yield the same result +14 |> doubler >>> toString >>> encloseInSpaces // direct style +14 |> \.doubler >>> \.toString >>> \.encloseInSpaces // direct style +14 |> doubler |> toString >>> encloseInSpaces // mixed style +14 |> doubler >>> toString |> encloseInSpaces // mixed style +14 |> doubler |> toString |> encloseInSpaces // Continuation Passing Style +14 |> \.doubler |> \.toString |> \.encloseInSpaces // Continuation Passing Style +14 .doubler .toString .encloseInSpaces // OOP native style + +// writing the above out long hand.. + +// The last two (when inlined) +apply(apply(apply(14, doubler), toString), encloseInSpaces) +encloseInSpaces(apply(apply(14, doubler), toString)) +encloseInSpaces(toString(apply(14, doubler))) +encloseInSpaces(toString(doubler(14))) + +// The first one is in lambda calculus form +//14 |> doubler >>> toString >>> encloseInSpaces // direct style +compose(doubler, compose(toString, encloseInSpaces))(14) +({ outerInt in encloseInSpaces({ innerInt in toString(doubler(innerInt)) }(outerInt) ) })(14) + +// The compiler can also optimize _THAT_ to: +encloseInSpaces(toString(doubler(14))) + +// Interestingly the compiler _actually_ uses +// Single Static Assignment (SSA) form in the final output +// or you can think of it as what the compiler would put out: +let a = 14 |> doubler // or doubler(14) +let b = a |> toString // or toString(a) +let c = b |> encloseInSpaces // or encloseInSpaces(b) + +// In summary, all of these _do_ the same thing +// but _are not_ the same thing +(doubler >>> toString >>> encloseInSpaces)(14) // Direct Style +compose(doubler, compose(toString, encloseInSpaces))(14) +14 |> doubler >>> toString >>> encloseInSpaces // mixed style +apply(14, compose(doubler, compose(toString, encloseInSpaces))) + +14 |> doubler |> toString >>> encloseInSpaces // mixed style +apply(apply(14, doubler), compose(toString, encloseInSpaces)) + +14 |> doubler >>> toString |> encloseInSpaces // mixed style +apply(apply(14, compose(doubler, toString)), encloseInSpaces) + +14 |> doubler |> toString |> encloseInSpaces // Continuation Passing Style +apply(apply(apply(14, doubler), toString), encloseInSpaces) + +// Looking closely at that last one we discover that application +// and the continuation passing style are something that we +// already knew as the Object-Oriented style +14 |> doubler |> toString |> encloseInSpaces // Continuation Passing Style +14 .doubler .toString .encloseInSpaces // Object-Oriented Style +// [[[14 doubler] toString] encloseInSpaces] // ObjC syntax +// encloseInSpaces(toString(doubler(14))) + +// OO with immutable types is exactly equivalent to CPS + + +// Now, what does it look like if we _curry_ +// our apply function +// ((A) -> B) -> B This is a Continuation +func curriedApply( + _ a: A +) -> (@escaping (A) -> R) -> R { + { f in f(a) } +} + + apply( apply( apply(14, doubler), toString), encloseInSpaces) +curriedApply(curriedApply(curriedApply(14)(doubler))(toString))(encloseInSpaces) + +// BTW, it is instructive to curry compose as well, bc we will be coming +// back to this. +public func curriedCompose( + _ f: @escaping (A) -> B +) -> (@escaping (B) -> C) -> (A) -> C { + { g in { a in g(f(a)) } } +} + +compose (doubler, compose(toString, encloseInSpaces))(14) +curriedCompose(doubler)(curriedCompose(toString)(encloseInSpaces))(14) + +// Again, note that these all give EXACTLY IDENTICAL results +// And that translating from one form to another ia a completely +// mechanical process + apply( apply( apply(14, doubler), toString), encloseInSpaces) +curriedApply(curriedApply(curriedApply(14)(doubler))(toString))(encloseInSpaces) + +// Note that apply nests left, compose nests right + compose(doubler, compose(toString, encloseInSpaces))(14) +curriedCompose(doubler)(curriedCompose(toString)(encloseInSpaces))(14) + +// Also note that we are able to do this to ANY function that is in +// the standard one-argument form. And that it is only slightly +// more complicated in the multi-argument form +// And that we can do this only because we have generics. Generics +// are absolutely critical to this ability to compose. + + +// Interesting side-note: suppose we _flip_ the arguments to apply + +// Flipped form of apply (here called invoke) turns out to be the natively supported invocation form + +// reminder this is the apply from above: +//public func apply( +// _ a: A, +// _ f: (A) -> R +//) -> R { +// f(a) +//} + +// Flipping that yields: +public func invoke( + _ f: (A) -> R, + _ a: A +) -> R { + f(a) +} + +// And then we _curry_ invoke + +func curriedInvoke( + _ f: @escaping (A) -> R +) -> (A) -> R { + f +} +// curried invoke just turns out to be the identity +// function operating on the supplied funcion. Making it @inlinable allows the +// compiler to, in fact remove it and just use the +// native function invocation operation that is built in +// to the language + + +// The Big Leap +// Lifting curriedApply from a structural type, i.e. just a function +// to be a nominal type, i.e. a struct with a function as it's only member. + +// Up to this point we've just been playing with Swift's +// syntax for functions, rearranging things in various ways +// and showing that different rearrangements produce identical +// results. NOW we make real use of what generics give us + +// Notes: +// 1. trailing function from curriedApply becomes the let +// 2. there are TWO inits: the default init + the init that takes the _leading_ value from curriedApply +// 3. We add a callAsFunction to denote that this is in fact a lifted function +// 4. the let is exactly the shape of the Haskell Continuation monad right down to the A and the R + +public struct Continuation { + public let sink: (@escaping (A) -> R) -> R + public init(sink: @escaping (@escaping (A) -> R) -> R) { + self.sink = sink + } + public init(_ a: A) { + self = .init { downstream in + downstream(a) + } + } + public func callAsFunction(_ f: @escaping (A) -> R) -> R { + sink(f) + } +} + +//// If this is correct, we should be able to +//// implement curriedApply using the new type +//// and have it just work.. +func continuationApply( + _ a: A +) -> (@escaping (A) -> R) -> R { + Continuation(a).sink // .sink here is exactly of type: (@escaping (A) -> R) -> R +} + +// Proof that apply, curriedApply, continuationApply and Continuation are the exact same thing: +apply (apply (apply (14, doubler), toString), encloseInSpaces) +curriedApply (curriedApply (curriedApply (14)(doubler))(toString))(encloseInSpaces) +continuationApply(continuationApply(continuationApply(14)(doubler))(toString))(encloseInSpaces) +Continuation (Continuation (Continuation (14)(doubler))(toString))(encloseInSpaces) + +// And again, with suitable inlining, the compiler can reduce every one of these to: +encloseInSpaces(toString(doubler(14))) + +//: [Next](@next) diff --git a/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-1.pdf b/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-1.pdf new file mode 100644 index 0000000..a152694 Binary files /dev/null and b/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-1.pdf differ diff --git a/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-2.pdf b/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-2.pdf new file mode 100644 index 0000000..971f74a Binary files /dev/null and b/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-2.pdf differ diff --git a/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-3.pdf b/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-3.pdf new file mode 100644 index 0000000..5210161 Binary files /dev/null and b/Playgrounds/02 - Continuation Passing Style.playground/Resources/AkkaStreams-3.pdf differ diff --git a/Playgrounds/02 - Continuation Passing Style.playground/contents.xcplayground b/Playgrounds/02 - Continuation Passing Style.playground/contents.xcplayground index 7dad8eb..f936c5b 100644 --- a/Playgrounds/02 - Continuation Passing Style.playground/contents.xcplayground +++ b/Playgrounds/02 - Continuation Passing Style.playground/contents.xcplayground @@ -10,5 +10,6 @@ + \ No newline at end of file diff --git a/Playgrounds/04 - Even More Structured Concurrency.playground/Pages/01 - Cancellable.xcplaygroundpage/Contents.swift b/Playgrounds/04 - Even More Structured Concurrency.playground/Pages/01 - Cancellable.xcplaygroundpage/Contents.swift index 5275afb..e479413 100644 --- a/Playgrounds/04 - Even More Structured Concurrency.playground/Pages/01 - Cancellable.xcplaygroundpage/Contents.swift +++ b/Playgrounds/04 - Even More Structured Concurrency.playground/Pages/01 - Cancellable.xcplaygroundpage/Contents.swift @@ -1,7 +1,100 @@ //: [Previous](@previous) -import Foundation +import _Concurrency -var greeting = "Hello, playground" +/*: + Tasks can leak + */ + +Task { + Task { + try await Task.sleep(nanoseconds: 4_000_000_000) + print("ending inner task 1") + } + Task { + try await Task.sleep(nanoseconds: 5_000_000_000) + print("ending inner task 2") + } + Task { + try await Task.sleep(nanoseconds: 3_000_000_000) + print("ending inner task 3") + } + do { + try await Task.sleep(nanoseconds: 1_000_000_000) + } catch { + print("Cancelled the sleep") + } + print("ending outer task") +} + +/*: +A better approach + */ +public final class Cancellable: Sendable { + private let _cancel: @Sendable () -> Void + private let _isCancelled: @Sendable () -> Bool + private let _value: @Sendable () async throws -> Output + private let _result: @Sendable () async -> Result + + public var isCancelled: Bool { _isCancelled() } + public var value: Output { get async throws { try await _value() } } + public var result: Result { get async { await _result() } } + + @Sendable public func cancel() -> Void { _cancel() } + @Sendable public func cancelAndAwaitValue() async throws -> Output { + _cancel() + return try await _value() + } + @Sendable public func cancelAndAwaitResult() async throws -> Result { + _cancel() + return await _result() + } + + init( + cancel: @escaping @Sendable () -> Void, + isCancelled: @escaping @Sendable () -> Bool, + value: @escaping @Sendable () async throws -> Output, + result: @escaping @Sendable () async -> Result + ) { + _cancel = cancel + _isCancelled = isCancelled + _value = value + _result = result + } + + deinit { if !self.isCancelled { self.cancel() } } +} + +public extension Cancellable { + convenience init(task: Task) { + self.init( + cancel: { task.cancel() }, + isCancelled: { task.isCancelled }, + value: { try await task.value }, + result: { await task.result } + ) + } +} + +Cancellable(task: .init { + Cancellable(task: .init { + try await Task.sleep(nanoseconds: 4_000_000_000) + print("ending inner cancellable 1") + }) + Cancellable(task: .init { + try await Task.sleep(nanoseconds: 5_000_000_000) + print("ending inner cancellable 2") + }) + Cancellable(task: .init { + try await Task.sleep(nanoseconds: 3_000_000_000) + print("ending inner cancellable 3") + }) + do { + try await Task.sleep(nanoseconds: 1_000_000_000) + } catch { + print("Cancelled the cancellable sleep") + } + print("ending outer cancellable") +}) //: [Next](@next) diff --git a/Playgrounds/04 - Even More Structured Concurrency.playground/contents.xcplayground b/Playgrounds/04 - Even More Structured Concurrency.playground/contents.xcplayground index 7627317..bec6037 100644 --- a/Playgrounds/04 - Even More Structured Concurrency.playground/contents.xcplayground +++ b/Playgrounds/04 - Even More Structured Concurrency.playground/contents.xcplayground @@ -1,4 +1,2 @@ - - - \ No newline at end of file + \ No newline at end of file diff --git a/Sources/FreeCombine/Channel/Channel.swift b/Sources/FreeCombine/Channel/Channel.swift index 9d6c940..9cfd21b 100644 --- a/Sources/FreeCombine/Channel/Channel.swift +++ b/Sources/FreeCombine/Channel/Channel.swift @@ -49,6 +49,12 @@ public extension Channel where Element == Void { } public extension Channel { +// func consume( +// publisher: Publisher +// ) async -> Cancellable where Element == AsyncStream.Result { +// await consume(publisher: publisher, using: { ($0, $1) }) +// } + func consume( publisher: Publisher ) async -> Cancellable where Element == (AsyncStream.Result, UnsafeContinuation) { diff --git a/Sources/FreeCombine/Decombinator/DistributorState.swift b/Sources/FreeCombine/Decombinator/DistributorState.swift index 8e971d7..1a3a572 100644 --- a/Sources/FreeCombine/Decombinator/DistributorState.swift +++ b/Sources/FreeCombine/Decombinator/DistributorState.swift @@ -9,6 +9,10 @@ public struct DistributorState { var nextKey: Int var repeaters: [Int: StateTask, RepeaterState.Action>] + public enum Error: Swift.Error { + case alreadyCompleted + } + public enum Action: Sendable { case receive(AsyncStream.Result, UnsafeContinuation?) case subscribe( @@ -28,6 +32,23 @@ public struct DistributorState { self.repeaters = downstreams } + static func dispose(action: Self.Action, completion: Reducer.Completion) async -> Void { + switch action { + case let .receive(_, continuation): continuation?.resume() + case let .subscribe(downstream, continuation): + switch completion { + case let .failure(error): + _ = try? await downstream(.completion(.failure(error))) + continuation.resume(throwing: error) + default: + continuation.resume( + returning: .init(task: .init { return try await downstream(.completion(.finished))}) + ) + } + case .unsubscribe: () + } + } + static func complete(state: inout Self, completion: Reducer.Completion) async -> Void { switch completion { case .finished: @@ -44,6 +65,8 @@ public struct DistributorState { } state.repeaters.removeAll() } + + static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer.Effect { try await `self`.reduce(action: action) diff --git a/Sources/FreeCombine/Decombinator/MulticasterState.swift b/Sources/FreeCombine/Decombinator/MulticasterState.swift index 2803023..9c3681a 100644 --- a/Sources/FreeCombine/Decombinator/MulticasterState.swift +++ b/Sources/FreeCombine/Decombinator/MulticasterState.swift @@ -20,6 +20,7 @@ public struct MulticasterState { case disconnected case alreadyPaused case alreadyResumed + case alreadyCompleted case internalError } @@ -68,6 +69,32 @@ public struct MulticasterState { state.distributor.repeaters.removeAll() } + static func distributorCompletion( + _ completion: Reducer.Completion + ) -> Reducer, DistributorState.Action>.Completion { + switch completion { + case .finished: return .finished + case .exit: return .exit + case let .failure(error): return .failure(error) + case .cancel: return .cancel + } + } + + static func dispose(action: Self.Action, completion: Reducer.Completion) async -> Void { + switch action { + case let .connect(continuation): + continuation.resume(throwing: Error.alreadyCompleted) + case let .pause(continuation): + continuation.resume(throwing: Error.alreadyCompleted) + case let .resume(continuation): + continuation.resume(throwing: Error.alreadyCompleted) + case let .disconnect(continuation): + continuation.resume(throwing: Error.alreadyCompleted) + case let .distribute(distributorAction): + await DistributorState.dispose(action: distributorAction, completion: distributorCompletion(completion)) + } + } + static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer.Effect { try await `self`.reduce(action: action) } @@ -91,8 +118,8 @@ public struct MulticasterState { _ continuation: UnsafeContinuation ) async throws -> Reducer.Effect { guard case .none = cancellable else { - continuation.resume(throwing: Error.alreadyConnected) - return .completion(.failure(Error.alreadyConnected)) + continuation.resume() + return .none } cancellable = await upstream.sink(downstream) isRunning = true diff --git a/Sources/FreeCombine/Multicast/Share.swift b/Sources/FreeCombine/Multicast/Share.swift index b3520df..1835088 100644 --- a/Sources/FreeCombine/Multicast/Share.swift +++ b/Sources/FreeCombine/Multicast/Share.swift @@ -7,9 +7,19 @@ // Need join on Cancellable public extension Publisher { - func share() async -> Self { - let multicaster: ValueRef?> = ValueRef(value: .none) + let multicaster = await LazyValueRef( + creator: { await Multicaster.init( + stateTask: Channel.init().stateTask( + initialState: MulticasterState.create(upstream: self), + reducer: Reducer( + onCompletion: MulticasterState.complete, + disposer: MulticasterState.dispose, + reducer: MulticasterState.reduce + ) + ) + ) } + ) @Sendable func lift( _ downstream: @Sendable @escaping (AsyncStream.Result) async throws -> Demand ) -> @Sendable (AsyncStream.Result) async throws -> Demand { @@ -19,34 +29,18 @@ public extension Publisher { return try await downstream(r) case .completion: let finalValue = try await downstream(r) - await multicaster.set(value: .none) + try await multicaster.release() return finalValue } } } return .init { continuation, downstream in Cancellable>.join(.init { - if await multicaster.value == nil { - let m: Multicaster = await .init( - stateTask: Channel.init().stateTask( - initialState: MulticasterState.create(upstream: self), - reducer: Reducer( - onCompletion: MulticasterState.complete, - reducer: MulticasterState.reduce - ) - ) - ) - await multicaster.set(value: m) - let cancellable = await m.publisher().sink(lift(downstream)) - try await m.connect() - continuation?.resume() - return cancellable - } - guard let m = await multicaster.value else { - return await Empty(Output.self).sink(downstream) - } - continuation?.resume() + let m = try await multicaster.value() + try await multicaster.retain() let cancellable = await m.publisher().sink(lift(downstream)) + try await m.connect() + continuation?.resume() return cancellable } ) } diff --git a/Sources/FreeCombine/Publishers/Just.swift b/Sources/FreeCombine/Publishers/Just.swift index af8b67b..c8a5d72 100644 --- a/Sources/FreeCombine/Publishers/Just.swift +++ b/Sources/FreeCombine/Publishers/Just.swift @@ -19,6 +19,21 @@ public extension Publisher { } } +public func Just(_ f: @escaping () async -> Element) -> Publisher { + .init(f) +} + +public extension Publisher { + init(_ f: @escaping () async -> Output) { + self = .init { continuation, downstream in + .init { + continuation?.resume() + return try await downstream(.value(f())) == .more ? try await downstream(.completion(.finished)) : .done + } + } + } +} + public func Just(_ a: AsyncStream.Result) -> Publisher { .init(a) } @@ -33,3 +48,18 @@ public extension Publisher { } } } + +public func Just(_ f: @escaping () async -> AsyncStream.Result) -> Publisher { + .init(f) +} + +public extension Publisher { + init(_ f: @escaping () async -> AsyncStream.Result) { + self = .init { continuation, downstream in + .init { + continuation?.resume() + return try await downstream(f()) == .more ? try await downstream(.completion(.finished)) : .done + } + } + } +} diff --git a/Sources/FreeCombine/State/Reducer.swift b/Sources/FreeCombine/State/Reducer.swift index 043a153..aad114a 100644 --- a/Sources/FreeCombine/State/Reducer.swift +++ b/Sources/FreeCombine/State/Reducer.swift @@ -23,13 +23,13 @@ public struct Reducer { case cancel } - let disposer: (Action, Completion) -> Void + let disposer: (Action, Completion) async -> Void let reducer: (inout State, Action) async throws -> Effect let onCompletion: (inout State,Completion) async -> Void public init( onCompletion: @escaping (inout State, Completion) async -> Void = { _, _ in }, - disposer: @escaping (Action, Completion) -> Void = { _, _ in }, + disposer: @escaping (Action, Completion) async -> Void = { _, _ in }, reducer: @escaping (inout State, Action) async throws -> Effect ) { self.onCompletion = onCompletion @@ -41,8 +41,8 @@ public struct Reducer { try await reducer(&state, action) } - public func callAsFunction(_ action: Action, _ completion: Completion) -> Void { - disposer(action, completion) + public func callAsFunction(_ action: Action, _ completion: Completion) async -> Void { + await disposer(action, completion) } public func callAsFunction(_ state: inout State, _ completion: Completion) async -> Void { diff --git a/Sources/FreeCombine/State/StateTask.swift b/Sources/FreeCombine/State/StateTask.swift index 7582d36..f7a12b9 100644 --- a/Sources/FreeCombine/State/StateTask.swift +++ b/Sources/FreeCombine/State/StateTask.swift @@ -78,7 +78,9 @@ extension StateTask { guard !Task.isCancelled else { throw PublisherError.cancelled } switch effect { case .none: continue - case .published(_): continue // FIXME: Handle future mutation + case .published(_): + // FIXME: Need to handle the publisher channel.consume(publisher: publisher) + continue case .completion(.exit): throw PublisherError.completed case let .completion(.failure(error)): throw error case .completion(.finished): throw PublisherError.internalError @@ -88,7 +90,7 @@ extension StateTask { await reducer(&state, .finished) } catch { channel.finish() - for await action in channel { reducer(action, .failure(error)); continue } + for await action in channel { await reducer(action, .failure(error)); continue } guard let completion = error as? PublisherError else { await reducer(&state, .failure(error)) throw error diff --git a/Sources/FreeCombine/Subject/CurrentValueSubject.swift b/Sources/FreeCombine/Subject/CurrentValueSubject.swift index ea3f7a6..867bfa1 100644 --- a/Sources/FreeCombine/Subject/CurrentValueSubject.swift +++ b/Sources/FreeCombine/Subject/CurrentValueSubject.swift @@ -16,6 +16,7 @@ public func CurrentValueSubject( onStartup: onStartup, reducer: Reducer( onCompletion: DistributorState.complete, + disposer: DistributorState.dispose, reducer: DistributorState.reduce ) ) @@ -30,6 +31,7 @@ public func CurrentValueSubject( initialState: { channel in .init(currentValue: currentValue, nextKey: 0, downstreams: [:]) }, reducer: Reducer( onCompletion: DistributorState.complete, + disposer: DistributorState.dispose, reducer: DistributorState.reduce ) ) diff --git a/Sources/FreeCombine/Subject/PassthroughSubject.swift b/Sources/FreeCombine/Subject/PassthroughSubject.swift index 7cb5b0e..8b1c8a5 100644 --- a/Sources/FreeCombine/Subject/PassthroughSubject.swift +++ b/Sources/FreeCombine/Subject/PassthroughSubject.swift @@ -15,6 +15,7 @@ public func PassthroughSubject( onStartup: onStartup, reducer: Reducer( onCompletion: DistributorState.complete, + disposer: DistributorState.dispose, reducer: DistributorState.reduce ) ) @@ -29,6 +30,7 @@ public func PassthroughSubject( initialState: { channel in .init(currentValue: .none, nextKey: 0, downstreams: [:]) }, reducer: Reducer( onCompletion: DistributorState.complete, + disposer: DistributorState.dispose, reducer: DistributorState.reduce ) ) diff --git a/Sources/FreeCombine/Utils/LazyValueRef.swift b/Sources/FreeCombine/Utils/LazyValueRef.swift new file mode 100644 index 0000000..0877831 --- /dev/null +++ b/Sources/FreeCombine/Utils/LazyValueRef.swift @@ -0,0 +1,155 @@ +// +// LazyValueRef.swift +// +// +// Created by Van Simmons on 6/10/22. +// +public struct LazyValueRefState { + public enum Error: Swift.Error { + case deallocated + case finished + case dropped + } + public enum Action: Sendable { + case value(UnsafeContinuation) + case retain(UnsafeContinuation) + case release(UnsafeContinuation) + } + var value: Value? + var refCount: Int = 0 + var creator: (() async throws -> Value)? + + public init(creator: @escaping () async throws -> Value) async { + self.creator = creator + } + + static func complete(state: inout Self, completion: Reducer.Completion) async -> Void { + state.value = .none + state.creator = .none + state.refCount = 0 + } + + static func dispose(action: Self.Action, completion: Reducer.Completion) -> Void { + switch action { + case let .value(continuation): continuation.resume(throwing: Error.deallocated) + case let .retain(continuation): continuation.resume(throwing: Error.deallocated) + case let .release(continuation): continuation.resume(throwing: Error.deallocated) + } + } + + static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer.Effect { + guard !Task.isCancelled else { return .completion(.cancel) } + return try await `self`.reduce(action: action) + } + + private mutating func reduce( + action: Self.Action + ) async throws -> Reducer.Effect { + switch action { + case let .value(continuation): + guard let value = value else { + guard let creator = creator else { + continuation.resume(throwing: Error.deallocated) + return .completion(.failure(Error.deallocated)) + } + do { + value = try await creator() + continuation.resume(returning: value!) + return .none + } + } + continuation.resume(returning: value) + return .none + case let .retain(continuation): + guard let _ = value else { + continuation.resume(throwing: Error.deallocated) + return .completion(.failure(Error.deallocated)) + } + refCount += 1 + continuation.resume() + return .none + case let .release(continuation): + guard let _ = value else { + continuation.resume(throwing: Error.deallocated) + return .completion(.failure(Error.deallocated)) + } + refCount -= 1 + if refCount == 0 { + value = .none + } + continuation.resume() + return .none + } + } +} + +public extension StateTask { + static func stateTask( + creator: @escaping () async throws -> Value + ) async -> StateTask where State == LazyValueRefState, Action == LazyValueRefState.Action { + await Channel.Action>.init(buffering: .unbounded) + .stateTask( + initialState: { _ in await .init(creator: creator) }, + reducer: .init( + onCompletion: LazyValueRefState.complete, + disposer: LazyValueRefState.dispose, + reducer: LazyValueRefState.reduce + ) + ) + } +} + +public func LazyValueRef( + creator: @escaping () async throws -> Value +) async -> StateTask, LazyValueRefState.Action> { + await .stateTask(creator: creator) +} + +public extension StateTask { + func value() async throws -> Value where State == LazyValueRefState, Action == LazyValueRefState.Action { + let value: Value = try await withUnsafeThrowingContinuation({ continuation in + let queueStatus = self.channel.yield(.value(continuation)) + switch queueStatus { + case .enqueued: + () + case .terminated: + continuation.resume(throwing: LazyValueRefState.Error.finished) + case .dropped: + continuation.resume(throwing: LazyValueRefState.Error.dropped) + @unknown default: + fatalError("Handle new case") + } + }) + return value + } + func retain() async throws -> Void where State == LazyValueRefState, Action == LazyValueRefState.Action { + let _: Void = try await withUnsafeThrowingContinuation({ continuation in + let queueStatus = self.channel.yield(.retain(continuation)) + switch queueStatus { + case .enqueued: + () + case .terminated: + continuation.resume(throwing: LazyValueRefState.Error.finished) + case .dropped: + continuation.resume(throwing: LazyValueRefState.Error.dropped) + @unknown default: + fatalError("Handle new case") + } + }) + } + func release() async throws -> Void where State == LazyValueRefState, Action == LazyValueRefState.Action { + let _: Void = try await withUnsafeThrowingContinuation({ continuation in + let queueStatus = self.channel.yield(.release(continuation)) + switch queueStatus { + case .enqueued: + () + case .terminated: + continuation.resume(throwing: LazyValueRefState.Error.finished) + case .dropped: + continuation.resume(throwing: LazyValueRefState.Error.dropped) + @unknown default: + fatalError("Handle new case") + } + }) + } +} diff --git a/Sources/FreeCombine/Utils/ValueRef.swift b/Sources/FreeCombine/Utils/ValueRef.swift index 4baa95e..88db593 100644 --- a/Sources/FreeCombine/Utils/ValueRef.swift +++ b/Sources/FreeCombine/Utils/ValueRef.swift @@ -7,7 +7,7 @@ public actor ValueRef { var value: Value - + public init(value: Value) { self.value = value } @discardableResult @@ -27,4 +27,3 @@ extension ValueRef { value.next() } } - diff --git a/Tests/FreeCombineTests/ShareTests.swift b/Tests/FreeCombineTests/ShareTests.swift index cf7d7d2..d554c4e 100644 --- a/Tests/FreeCombineTests/ShareTests.swift +++ b/Tests/FreeCombineTests/ShareTests.swift @@ -13,26 +13,30 @@ class ShareTests: XCTestCase { override func tearDownWithError() throws { } - func testSimpleShare() async throws { + func xtestSimpleShare() async throws { let expectation1 = await CheckedExpectation() let expectation2 = await CheckedExpectation() - let unfolded = await Unfolded(0 ..< 100) - .map { $0 % 47 } + let n = 2 + + let unfolded = await Unfolded(0 ..< n) + .map { $0 } .share() let counter1 = Counter() + let value1 = ValueRef(value: -1) let u1 = await unfolded.sink { (result: AsyncStream.Result) in switch result { - case .value: + case let .value(value): await counter1.increment() + await value1.set(value: value) return .more case let .completion(.failure(error)): XCTFail("Got an error? \(error)") return .done case .completion(.finished): let count = await counter1.count - guard count == 100 else { + guard count == n else { XCTFail("Incorrect count: \(count) in subscription 1") return .done } @@ -47,16 +51,25 @@ class ShareTests: XCTestCase { } } + let counter2 = Counter() + let value2 = ValueRef(value: -1) let u2 = await unfolded.sink { (result: AsyncStream.Result) in switch result { - case .value: + case let .value(value): + await counter2.increment() + await value2.set(value: value) return .more case let .completion(.failure(error)): XCTFail("Got an error? \(error)") return .done case .completion(.finished): // Note number received here is unpredictable - do { try await expectation2.complete() } + let count = await counter2.count + let last = await value2.value + print("Finished 2. count = \(count), last = \(last)") + do { + try await expectation2.complete() + } catch { XCTFail("Failed to complete with error: \(error)") } return .done case .completion(.cancelled): @@ -67,8 +80,12 @@ class ShareTests: XCTestCase { do { try await FreeCombine.wait(for: expectation1, timeout: 100_000_000) + let count = await counter1.count + let last = await value1.value + print("Finished 1. count = \(count), last = \(last)") } catch { XCTFail("Timed out") + return } let d1 = try await u1.value @@ -78,73 +95,72 @@ class ShareTests: XCTestCase { try await FreeCombine.wait(for: expectation2, timeout: 100_000_000) } catch { XCTFail("Timed out") + return } let d2 = try await u2.value XCTAssert(d2 == .done, "Second chain has wrong value") } - func testSubjectMulticast() async throws { -// let subj = await PassthroughSubject(Int.self) -// -// let unfolded = await subj -// .publisher() -// .map { $0 % 47 } -// .multicast() -// -// let counter1 = Counter() -// let u1 = await unfolded.publisher().sink( { result in -// switch result { -// case .value: -// await counter1.increment() -// return .more -// case let .completion(.failure(error)): -// XCTFail("Got an error? \(error)") -// return .done -// case .completion(.finished): -// let count = await counter1.count -// if count != 100 { -// XCTFail("Incorrect count: \(count) in subscription 1") -// } -// return .done -// case .completion(.cancelled): -// XCTFail("Should not have cancelled") -// return .done -// } -// }) -// -// let counter2 = Counter() -// let u2 = await unfolded.publisher().sink { (result: AsyncStream.Result) in -// switch result { -// case .value: -// await counter2.increment() -// return .more -// case let .completion(.failure(error)): -// XCTFail("Got an error? \(error)") -// return .done -// case .completion(.finished): -// let count = await counter2.count -// if count != 100 { -// XCTFail("Incorrect count: \(count) in subscription 2") -// } -// return .done -// case .completion(.cancelled): -// XCTFail("Should not have cancelled") -// return .done -// } -// } -// -// try await unfolded.connect() -// -// for i in (0 ..< 100) { -// do { try await subj.send(i) } -// catch { XCTFail("Failed to send on \(i)") } -// } -// -// try subj.nonBlockingFinish() -// let d1 = try await u1.value -// XCTAssert(d1 == .done, "First chain has wrong value") -// let d2 = try await u2.value -// XCTAssert(d2 == .done, "Second chain has wrong value") + func testSubjectShare() async throws { + let subj = await PassthroughSubject(Int.self) + + let unfolded = await subj + .publisher() + .map { $0 % 47 } + .share() + + let counter1 = Counter() + let u1 = await unfolded.sink( { result in + switch result { + case .value: + await counter1.increment() + return .more + case let .completion(.failure(error)): + XCTFail("Got an error? \(error)") + return .done + case .completion(.finished): + let count = await counter1.count + if count != 100 { + XCTFail("Incorrect count: \(count) in subscription 1") + } + return .done + case .completion(.cancelled): + XCTFail("Should not have cancelled") + return .done + } + }) + + let counter2 = Counter() + let u2 = await unfolded.share().sink { (result: AsyncStream.Result) in + switch result { + case .value: + await counter2.increment() + return .more + case let .completion(.failure(error)): + XCTFail("Got an error? \(error)") + return .done + case .completion(.finished): + let count = await counter2.count + if count != 100 { + XCTFail("Incorrect count: \(count) in subscription 2") + } + return .done + case .completion(.cancelled): + XCTFail("Should not have cancelled") + return .done + } + } + + for i in (0 ..< 100) { + do { try await subj.send(i) } + catch { XCTFail("Failed to send on \(i)") } + } + + try subj.nonBlockingFinish() + let d1 = try await u1.value + XCTAssert(d1 == .done, "First chain has wrong value") + let d2 = try await u2.value + XCTAssert(d2 == .done, "Second chain has wrong value") } }