Skip to content

Commit

Permalink
First real commit with Promises and Futures (#47)
Browse files Browse the repository at this point in the history
* Another README change

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Updates to bibliography and readme, improvement of Promise’s state types

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Initial Promise/Future functionality… Needs cleanup

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Jul 27, 2022
1 parent 7011d41 commit f7fd397
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 108 deletions.
139 changes: 89 additions & 50 deletions Documentation/Bibliography.md

Large diffs are not rendered by default.

33 changes: 24 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,17 @@ FreeCombine received: z26

FreeCombine is a functional streaming library for the Swift language.

Functional streaming comes in two forms: push and pull. FreeCombine is pull. RxSwift and ReactiveSwift are push. Combine is both, but primarily pull, in that the vast majority of use cases utilize only the push mode. (If you are curious about the differences between the two, a good introduction is this one on [the Akka Streams library](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf) which is both push and pull and can change between the two dynamically).
Functional streaming comes in two forms: push and pull. FreeCombine is pull. RxSwift and ReactiveSwift are push. Combine is both, but primarily pull, in that the vast majority of use cases utilize only the push mode. If you are curious about the differences between the two, a good introduction is this one on [the Akka Streams library](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf) which is both push and pull and can change between the two dynamically (Pages 29-33 are especially informative).

As an aside, if you have ever wondered what a Subscription is in Combine, it's the implementation of pull semantics. Any use of `sink` or `assign` puts the stream into push mode and ignores Demand. If you've never used `AnySubscriber` and have never written your own `Subscriber` implementation, then you've only been using Combine in push mode. My experience is that this is the vast majority of Combine users.

AsyncStream in Apple's Swift standard library is a _pull_ stream. Accordingly several things that seem natural in Combine turn out to have different meaningins in AsyncStream (and are much more difficult to implement). In particular, having several downstream subscribers to the same stream is very complicated when compared to doing the same thing in a push environment. AsyncStream conforms to AsyncSequence and all of the other conforming types to AsyncSequence are also pull-mode streams and share the same semantics.
AsyncStream in Apple's Swift standard library is a _pull_ stream. Accordingly several things that seem natural in Combine turn out to have different meanings in AsyncStream (and are much more difficult to implement). In particular, having several downstream subscribers to the same stream is very complicated when compared to doing the same thing in a push environment. AsyncStream conforms to AsyncSequence and all of the other conforming types to AsyncSequence are also pull-mode streams and therefore share the same semantics.

The difference between push and pull is really fundamental, yet in my experience, most users of Combine are surprised to learn that it exists. It explains why, as of this writing in July '22, Swift Async Algorithms still lacks a `Subject`-like type. It's because `Subject`, `ConnectablePublisher` and operations like `throttle`, `delay` and `debounce` are really hard to get right in a pull system and they are much easier to implement in push systems. OTOH, operations like `zip` are really hard to get right in a push system because they require the introduction of unbounded queues upstream and this is more than a little problematic if the user has not explicitly accounted for this.
The difference between push and pull is really fundamental, yet in my experience, most users of Combine are surprised to learn that it exists. It explains why, as of this writing in July '22, Swift Async Algorithms still lacks a `Subject`-like type. It's because `Subject`, `ConnectablePublisher` and operations like `throttle`, `delay` and `debounce` are really hard to get right in a pull system and they are much easier to implement in push systems. OTOH, operations like `zip` are really hard to get right in a push system because they require the introduction of unbounded queues upstream. Unbounded queues are more than a little problematic if the user has not explicitly accounted for their presence.

While there are exceptions, streams in synchronous systems tend to be push, in asynchronous systems they tend to be pull. Different applications are better suited to one form of streaming than the other. The main differences lie in how the two modes treat combinators like zip or decombinators like Combine's Subject. A good summary of the differences is found in this presentation: [A Brief History of Streams](https://shonan.nii.ac.jp/archives/seminar/136/wp-content/uploads/sites/172/2018/09/a-brief-history-of-streams.pdf) - especially the table on page 21. One interesting area of future development for FreeCombine is at the interface between synchronous and asynchronous context, for example, you would like your SwiftUI code to be only synchronous - a button tap should not (and really cannot) hang the UI, but you would like your application state to be maintained in async context. More on this below.
While there are exceptions (Combine for example), streams in synchronous systems tend to be push, in asynchronous systems they tend to be pull. Different applications are better suited to one form of streaming than the other. The main differences lie in how the two modes treat combinators like zip or decombinators like Combine's Subject. A good summary of the differences is found in this presentation: [A Brief History of Streams](https://shonan.nii.ac.jp/archives/seminar/136/wp-content/uploads/sites/172/2018/09/a-brief-history-of-streams.pdf) - especially the table on page 21. One interesting area of future development for FreeCombine is at the interface between synchronous and asynchronous context, for example, you would like your SwiftUI code to be only synchronous - a button tap should not (and really cannot) hang the UI, but you would like your application state to be maintained in async context. More on this below.

All streaming libraries are written in the [Continuation Passing Style (CPS)](https://en.wikipedia.org/wiki/Continuation-passing_style). Because of this they share certain operations for the Continuation type: map, flatMap, join, filter, reduce, et al. (FWIW, everything you know Object-Oriented notation is also CPS just slightly disguised and this is shown Playground 2 in the Playgrounds directory).
All streaming libraries are written in the [Continuation Passing Style (CPS)](https://en.wikipedia.org/wiki/Continuation-passing_style). Because of this they share certain operations for the Continuation type: map, flatMap, join, filter, reduce, et al. (FWIW, everything you know Object-Oriented notation is also CPS just slightly disguised. This is shown Playground 2 in the Playgrounds directory).

Promise/Future systems are also written in CPS and as a result share many of the same operations. FreeCombine incorporates NIO-style Promises and Futures almost by default as a result of FreeCombine's direct implemenation of CPS. In FreeCombine's implementations of Publisher and Future, it is easy to read the relationship between the two directly from the type signatures. Futures can be thought of as "one-shot" streams, i.e. a stream which will only ever send exactly one element downstream, no more, no less. In this paradigm, Promises can be seen to be the exact one-shot representation of Subject from the "normal" streaming world. If you find the concept of a "one-shot" stream odd, it is worth noting that the Swift Standard Library already has an exactly analogous notion in the type [CollectionOfOne](https://developer.apple.com/documentation/swift/collectionofone).

Expand Down Expand Up @@ -243,20 +243,35 @@ Ideally, this material would become the core of an expanded course on Functional
Secondarily, this repo is my own feeble attempt to answer the following questions:

1. Why does the use of protocols in things like Combine and AsyncSequence seem to produce much more complicated APIs than if the same APIs had been implemented with concrete types instead?
1. Why does Swift Concurrency seem to avoid use of functional constructs like `map`, `flatMap`, and `zip` when dealing with generic types like Tasks, but to embrance them fully when dealing with generic types like `AsyncStream`? (not to mention more run-of-the-mill types like `Optional`, `Result`, and `Sequence`)
1. Why does Swift Concurrency seem to avoid the use of functional constructs like `map`, `flatMap`, and `zip` when dealing with generic types like Tasks, but to embrance them fully when dealing with generic types like `AsyncStream`? (not to mention more run-of-the-mill types like `Optional`, `Result`, and `Sequence`)
1. Which elements of Swift Concurrency should be regarded as `primitive` and which are `compositional`.
1. If, in Swift, we decorate "effectful" functions with keywords like `throws` and `async`, does that mean we can expect other kinds of effects to introduce additional keywords on function declaration?
1. Is there a general way of dealing with effects in Swift and what might such a mechanism look like?
1. Why does Swift's Structured Concurrency not have a set of primitives similar to (say) Haskell or Java? In particular, why does it seem so difficult to use Structured Concurrency to write constructs like Haskell's [ST monad](https://hackage.haskell.org/package/base-4.3.1.0/docs/Control-Monad-ST.html), [MVar](https://hackage.haskell.org/package/base-4.16.2.0/docs/Control-Concurrent-MVar.html), or [TVar](https://hackage.haskell.org/package/stm-2.5.0.2/docs/Control-Concurrent-STM-TVar.html) or to implement the common [Producer/Consumer pattern](https://www.baeldung.com/java-producer-consumer-problem) seen ubiquitously in Java concurrency?
1. Why does Swift's Structured Concurrency not have a set of primitives similar to (say) Haskell or Java? In particular, why does it seem so difficult to use Structured Concurrency to write constructs like Haskell's [ST monad](https://hackage.haskell.org/package/base-4.3.1.0/docs/Control-Monad-ST.html), [MVar](https://hackage.haskell.org/package/base-4.16.2.0/docs/Control-Concurrent-MVar.html), or [TVar](https://hackage.haskell.org/package/stm-2.5.0.2/docs/Control-Concurrent-STM-TVar.html) or to implement the [Producer/Consumer pattern](https://www.baeldung.com/java-producer-consumer-problem) seen ubiquitously in Java concurrency?
1. Why, when I start out using TaskGroup and `async let` in my designs do I eventually find myself ending up discarding them and using their unstructured counterparts?
1. Why is that whenever I ask someone: "Do you use TaskGroup or `async let` and if so, how?", they (to date) have invariably responded, "I don't but I'm sure that there are many other people who do because they are really great."?
1. Why is it that in Structured Concurrency, Task lifetimes must align with the lifetime of the object that created them, but that all of my other objects which are in a parent-child relationship have no such lifetime restriction and can instead be shared or have ownership transferred and, in the end, just be managed by ARC?
1. Why is it that in Structured Concurrency, Task lifetimes must align with the lifetime of the Task that created them, but that all of my other objects which are in a parent-child relationship have no such lifetime restriction and can instead be shared or have ownership transferred and, in the end, just be managed by ARC?
1. Why is that alone of all the objects I write, ARC does not seem to apply to Task?
1. What are the differences between `actor` and `AtomicReference` (from swift-atomics) and when would I use one over the other?
1. In what cases would I use an AsyncStream to manage mutable state and in what cases would I use an `actor`?
1. Why, when I start out using actors in my design do I always end up using an AsyncStream or an AtomicReference instead?
1. Are there differences between what we mean when we refer to Structured Concurrency and what we mean when we refer to Functional Concurrency and precisely what would those differences be?

These questions have been nagging at me since early 2021 as Swift Concurrency was introduced. Developing this repository has helped me answer them to my own satisfaction, though of course, YMMV. My answers to these questions are in the Playgrounds section of this repository.
These questions have been nagging at me since early 2021 as Swift Concurrency was introduced. Developing this repository has helped me answer them to my own satisfaction, though of course, YMMV. My answers to all of these questions are explored in the Playgrounds section of this repository. The major design choices made in FreeCombine should all be plain there.


### Protocol-free

### Race-free

### Leak-free

In essence, my answers to the questions above lead to organizing concurrent Swift applications along different lines than those promulgated by [Swift Evolution Proposal 304](https://github.com/apple/swift-evolution/blob/main/proposals/0304-structured-concurrency.md#proposed-solution). SE-304 advocates that every task must not outlive the task that created it. "Not outlive" in this context means that every task should either complete or be in the cancelled state before its creator completes.

FreeCombine changes that restriction to be: every task must "owned" and may not outlive its owner(s). If that restriction reminds you of ARC, that's because that's exactly what it is. Instead of Structure Concurrency

### Lock-free

## Additional notes to be organized in the future

### Functional Requirements
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//
// RepeatReceiveState.swift
// DistributorReceiveState.swift
//
//
// Created by Van Simmons on 7/2/22.
//

public struct RepeatReceiveState<Output: Sendable> {
public struct DistributorReceiveState<Output: Sendable> {
var distributorChannel: Channel<DistributorState<Output>.Action>

public enum Error: Swift.Error {
Expand All @@ -25,7 +25,7 @@ public struct RepeatReceiveState<Output: Sendable> {

static func create(
distributorChannel: Channel<DistributorState<Output>.Action>
) -> (Channel<RepeatReceiveState<Output>.Action>) -> Self {
) -> (Channel<DistributorReceiveState<Output>.Action>) -> Self {
{ channel in .init(channel: distributorChannel) }
}

Expand Down
89 changes: 89 additions & 0 deletions Sources/FreeCombine/Decombinator/PromiseReceiveState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//
// PromiseReceiveState.swift
//
//
// Created by Van Simmons on 7/27/22.
//
public struct PromiseReceiveState<Output: Sendable> {
var promiseChannel: Channel<PromiseState<Output>.Action>

public enum Error: Swift.Error {
case alreadyCompleted
}

public enum Action: Sendable {
case receive(Result<Output, Swift.Error>, Resumption<Int>)
case nonBlockingReceive(Result<Output, Swift.Error>)
}

public init(
channel: Channel<PromiseState<Output>.Action>
) {
self.promiseChannel = channel
}

static func create(
promiseChannel: Channel<PromiseState<Output>.Action>
) -> (Channel<PromiseReceiveState<Output>.Action>) -> Self {
{ channel in .init(channel: promiseChannel) }
}

static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void { }

static func dispose(action: Self.Action, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
switch action {
case let .receive(_, continuation):
switch completion {
case .failure(let error):
continuation.resume(throwing: error)
default:
continuation.resume(throwing: PublisherError.completed)
}
case .nonBlockingReceive(_):
()
}
}

static func reduce(state: inout Self, action: Self.Action) async throws -> Reducer<Self, Action>.Effect {
if Task.isCancelled {
switch action {
case .receive(_, let resumption):
resumption.resume(throwing: PublisherError.cancelled)
case .nonBlockingReceive(_):
()
}
return .completion(.cancel)
}
return try await state.reduce(action: action)
}

mutating func reduce(action: Action) async throws -> Reducer<Self, Action>.Effect {
var result: Result<Output, Swift.Error>! = .none
var resumption: Resumption<Int>! = .none
if case let .receive(r1, r3) = action {
result = r1
resumption = r3
} else if case let .nonBlockingReceive(r2) = action {
result = r2
}
do {
let subscribers: Int = try await withResumption { innerResumption in
switch promiseChannel.yield(.receive(result, innerResumption)) {
case .enqueued:
()
case .dropped:
innerResumption.resume(throwing: PublisherError.enqueueError)
case .terminated:
innerResumption.resume(throwing: PublisherError.cancelled)
@unknown default:
fatalError("Unhandled continuation value")
}
}
resumption?.resume(returning: subscribers)
return .none
} catch {
resumption?.resume(throwing: error)
return .completion(.failure(error))
}
}
}
4 changes: 2 additions & 2 deletions Sources/FreeCombine/Decombinator/PromiseRepeaterState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public struct PromiseRepeaterState<ID: Hashable & Sendable, Output: Sendable>: Identifiable, Sendable {
public enum Action {
case `repeat`(Result<Output, Swift.Error>, Semaphore<[ID], RepeatedAction<ID>>)
case complete(Result<Output, Swift.Error>, Semaphore<[ID], RepeatedAction<ID>>)
}

public let id: ID
Expand Down Expand Up @@ -42,7 +42,7 @@ public struct PromiseRepeaterState<ID: Hashable & Sendable, Output: Sendable>: I

mutating func reduce(action: Self.Action) async throws -> Reducer<Self, Action>.Effect {
switch action {
case let .repeat(output, semaphore):
case let .complete(output, semaphore):
try? await downstream(output)
await semaphore.decrement(with: .repeated(id, .done))
return .completion(.exit)
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Decombinator/PromiseState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public struct PromiseState<Output: Sendable> {
)

currentRepeaters.forEach { key, downstreamTask in
let queueStatus = downstreamTask.send(.repeat(result, semaphore))
let queueStatus = downstreamTask.send(.complete(result, semaphore))
switch queueStatus {
case .enqueued:
()
Expand Down
Loading

0 comments on commit f7fd397

Please sign in to comment.