Skip to content

Commit

Permalink
Doc updates and initial promise tests (#49)
Browse files Browse the repository at this point in the history
* Fix failure to compile

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* rename function arguments appropriately

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Documentation udpates

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Initial Promise test working

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Aug 6, 2022
1 parent 7c36148 commit 0b79e1a
Show file tree
Hide file tree
Showing 24 changed files with 238 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,40 @@
//: [Previous](@previous)

/*:
## The "Crusty" Talk
![The Crusty Talk](POP-1.pdf)

## The 3 Beefs
![The Three Beefs](POP-2.pdf)

## Concurrency
![The Crusty Talk](POP-3.pdf)

## Single Inheritance
![The Crusty Talk](POP-4.pdf)

## Lost Type Relationships
![The Crusty Talk](POP-5.pdf)

## The Tell-tale Sign
![The Crusty Talk](POP-6.pdf)

## Protocol Oriented Programming
![The Crusty Talk](POP-7.pdf)

## The Challenge: Prove It!
![The Crusty Talk](POP-8.pdf)

## Demand only comes in More and All
![Demand](Demand.png)

## The Tell-tale Sign: Protocol Edition
![Return Types in Combine](CombineReturnTypes.png)

## Protocols have problems too
![Swift Evolution Proposal 335](SE-335.png)


*/

//: [Next](@next)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<playground version='6.0' target-platform='ios' display-mode='raw' buildActiveScheme='true' importAppTypes='true'>
<playground version='6.0' target-platform='ios' display-mode='rendered' buildActiveScheme='true' importAppTypes='true'>
<pages>
<page name='01 - Lost Type Relationships'/>
<page name='02 - Type Level vs Value Level'/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
//: [Previous](@previous)

import FreeCombine
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

var count = 0
import FreeCombine

let counter = Counter()
var publisher1 = (0 ... 100).asyncPublisher
var publisher2 = "abcdefghijklmnopqrstuvwxyz".asyncPublisher

let cancellable = Zipped(publisher1, publisher2)
.map { ($0.0 + 100, $0.1.uppercased()) }
.sink(onStartup: .none) { result in
switch result {
case let .value(value):
print(value)
count += 1
case let .completion(.failure(error)):
print("WTF? \(error)")
case .completion(.finished):
print("Done")
case .completion(.cancelled):
print("Cancelled")
}
return .more
@Sendable func zipSink(_ result: AsyncStream<(Int, String)>.Result) async -> Demand {
switch result {
case let .value(value):
print(value)
await counter.increment()
case let .completion(.failure(error)):
print("WTF? \(error)")
case .completion(.finished):
print("Done")
case .completion(.cancelled):
print("Cancelled")
}
return .more
}

let cancellable = await Zipped(publisher1, publisher2)
.map { ($0.0 + 100, $0.1.uppercased()) }
.sink(zipSink)

cancellable
let count = await counter.count
await cancellable.result

//: [Next](@next)
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,21 @@ FreeCombine received: z26
```
# The Long Version

## Like Combine. Only free. And concurrent.
## Like Combine. Only free. And functionally concurrent.

FreeCombine is a functional streaming library for the Swift language.

Functional streaming comes in two forms: push and pull. FreeCombine is pull. RxSwift and ReactiveSwift are push. Combine is both, but primarily pull, in that the vast majority of use cases utilize only the push mode. If you are curious about the differences between the two, a good introduction is this one on [the Akka Streams library](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf) which is both push and pull and can change between the two dynamically (Pages 29-33 are especially informative).

As an aside, if you have ever wondered what a Subscription is in Combine, it's the implementation of pull semantics. Any use of `sink` or `assign` puts the stream into push mode and ignores Demand. If you've never used `AnySubscriber` and have never written your own `Subscriber` implementation, then you've only been using Combine in push mode. My experience is that this is the vast majority of Combine users.
As an aside, if you have ever wondered what a Subscription is in Combine, it's the implementation of pull semantics. Any use of `sink` or `assign` puts the stream into push mode and ignores Demand. If you've never used `AnySubscriber` and have never written your own `Subscriber` implementation, then you've only been using Combine in push mode. My experience is that this is the majority of Combine users.

AsyncStream in Apple's Swift standard library is a _pull_ stream. Accordingly several things that seem natural in Combine turn out to have different meanings in AsyncStream (and are much more difficult to implement). In particular, having several downstream subscribers to the same stream is very complicated when compared to doing the same thing in a push environment. AsyncStream conforms to AsyncSequence and all of the other conforming types to AsyncSequence are also pull-mode streams and therefore share the same semantics.

The difference between push and pull is really fundamental, yet in my experience, most users of Combine are surprised to learn that it exists. It explains why, as of this writing in July '22, Swift Async Algorithms still lacks a `Subject`-like type. It's because `Subject`, `ConnectablePublisher` and operations like `throttle`, `delay` and `debounce` are really hard to get right in a pull system and they are much easier to implement in push systems. OTOH, operations like `zip` are really hard to get right in a push system because they require the introduction of unbounded queues upstream. Unbounded queues are more than a little problematic if the user has not explicitly accounted for their presence.

While there are exceptions (Combine for example), streams in synchronous systems tend to be push, in asynchronous systems they tend to be pull. Different applications are better suited to one form of streaming than the other. The main differences lie in how the two modes treat combinators like zip or decombinators like Combine's Subject. A good summary of the differences is found in this presentation: [A Brief History of Streams](https://shonan.nii.ac.jp/archives/seminar/136/wp-content/uploads/sites/172/2018/09/a-brief-history-of-streams.pdf) - especially the table on page 21. One interesting area of future development for FreeCombine is at the interface between synchronous and asynchronous context, for example, you would like your SwiftUI code to be only synchronous - a button tap should not (and really cannot) hang the UI, but you would like your application state to be maintained in async context. More on this below.
While there are exceptions (Combine for example), streams in synchronous systems tend to be push, while in asynchronous systems they tend to be pull. Different applications are better suited to one form of streaming than the other. The main differences lie in how the two modes treat combinators like zip or decombinators like Combine's Subject. A good summary of the differences is found in this presentation: [A Brief History of Streams](https://shonan.nii.ac.jp/archives/seminar/136/wp-content/uploads/sites/172/2018/09/a-brief-history-of-streams.pdf) - especially the table on page 21. One interesting area of future development for FreeCombine is at the interface between synchronous and asynchronous context, for example, you would like your SwiftUI code to be only synchronous - a button tap should not (and really cannot) hang the UI, but you would like your application state to be maintained in async context. More on this below.

All streaming libraries are written in the [Continuation Passing Style (CPS)](https://en.wikipedia.org/wiki/Continuation-passing_style). Because of this they share certain operations for the Continuation type: map, flatMap, join, filter, reduce, et al. (FWIW, everything you know Object-Oriented notation is also CPS just slightly disguised. This is shown Playground 2 in the Playgrounds directory).
All streaming libraries are written in the [Continuation Passing Style (CPS)](https://en.wikipedia.org/wiki/Continuation-passing_style). Because of this, they share certain operations for the Continuation type: map, flatMap, join, filter, reduce, et al. (FWIW, everything you know Object-Oriented notation is also CPS just slightly disguised. This is shown Playground 2 in the Playgrounds directory).

Promise/Future systems are also written in CPS and as a result share many of the same operations. FreeCombine incorporates NIO-style Promises and Futures almost by default as a result of FreeCombine's direct implemenation of CPS. In FreeCombine's implementations of Publisher and Future, it is easy to read the relationship between the two directly from the type signatures. Futures can be thought of as "one-shot" streams, i.e. a stream which will only ever send exactly one element downstream, no more, no less. In this paradigm, Promises can be seen to be the exact one-shot representation of Subject from the "normal" streaming world. If you find the concept of a "one-shot" stream odd, it is worth noting that the Swift Standard Library already has an exactly analogous notion in the type [CollectionOfOne](https://developer.apple.com/documentation/swift/collectionofone).

Expand All @@ -198,11 +198,11 @@ FreeCombine is "free" in the sense that it is:
* Protocol-free.
* No use of protocols, only concrete types
* Eager type erasure, no long nested types as seen in Combine.
* Explicit implemenation of the continuation-passing style via the Publisher and StateTask types.
* Explicit implemenation of the continuation-passing style via the Publisher and StateTask types which are _not_ protocols.
* Race-free.
* Yield-free.
* Sleep-free.
* subscribeOn-free. `subscribeOn`-like functionality is inherent in FreeCombine. The race conditions it creates are prevented because continuations are only created after upstream continuations are guaranteed to exist and have started. `subscribeOn` is guaranteed to be in the right async context.
* subscribeOn-free. `subscribeOn`-like functionality is inherent in FreeCombine. `subscribeOn` is one of the Combine operators that has no exact analog in FreeCombine. But `subscribeOn` is subject to race conditions. The race conditions it creates are prevented because all FreeCombine continuations are only created after their upstream continuations are guaranteed to exist and have started. This means that `subscribeOn` is guaranteed to be in the right async context.
* All tests must demonstrate race-free operation by executing successfully for 10,000 repetitions under Xcode's `Run [Tests] Repeatedly` option.
* Leak-free.
* ARC-like Task lifetimes
Expand Down
8 changes: 4 additions & 4 deletions Sources/FreeCombine/Cancellable/Cancellable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ public extension Cancellable {
func map<B>(
file: StaticString = #file,
line: UInt = #line,
_ f: @escaping (Output) async -> B
_ transform: @escaping (Output) async -> B
) -> Cancellable<B> {
let inner = self
return .init(file: file, line: line) {
try await withTaskCancellationHandler(handler: { Task { inner.cancel() } }) {
try await f(inner.value)
try await transform(inner.value)
}
}
}
Expand All @@ -134,10 +134,10 @@ public extension Cancellable {
}

func flatMap<B>(
_ f: @escaping (Output) async -> Cancellable<B>,
_ transform: @escaping (Output) async -> Cancellable<B>,
file: StaticString = #file,
line: UInt = #line
) -> Cancellable<B> {
self.map(file: file, line: line, f).join(file: file, line: line)
self.map(file: file, line: line, transform).join(file: file, line: line)
}
}
4 changes: 2 additions & 2 deletions Sources/FreeCombine/Cancellable/Resumption.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public func withResumption<Output>(
file: StaticString = #file,
line: UInt = #line,
deinitBehavior: DeinitBehavior = .assert,
_ f: (Resumption<Output>) -> Void
_ resume: (Resumption<Output>) -> Void
) async throws -> Output {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Output, Swift.Error>) -> Void in
f(.init(
resume(.init(
file: file,
line: line,
deinitBehavior: deinitBehavior,
Expand Down
10 changes: 3 additions & 7 deletions Sources/FreeCombine/Decombinator/PromiseState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ public struct PromiseState<Output: Sendable> {
resumption.resume(throwing: error)
throw error
}
return isComplete
? .completion(.exit)
: .none
return isComplete ? .completion(.exit) : .none
case let .subscribe(downstream, resumption):
var repeater: Cancellable<Void>!
if let currentValue = currentValue {
Expand Down Expand Up @@ -170,9 +168,7 @@ public struct PromiseState<Output: Sendable> {
}
}
}
.forEach { key in
repeaters.removeValue(forKey: key)
}
.forEach { repeaters.removeValue(forKey: $0) }
}

mutating func process(
Expand All @@ -191,7 +187,7 @@ public struct PromiseState<Output: Sendable> {
)
)
repeaters[nextKey] = repeater
return .init { try await repeater.value }
return .init { _ = try await repeater.value; return }
}
}

4 changes: 2 additions & 2 deletions Sources/FreeCombine/FlatMap/Catch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// Created by Van Simmons on 6/7/22.
//
public extension Publisher {
func `catch`(_ f: @escaping (Swift.Error) async -> Publisher<Output>) -> Publisher<Output> {
flatMapError(f)
func `catch`(_ transform: @escaping (Swift.Error) async -> Publisher<Output>) -> Publisher<Output> {
flatMapError(transform)
}
}
4 changes: 2 additions & 2 deletions Sources/FreeCombine/FlatMap/FlatMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
//
public extension Publisher {
func flatMap<B>(
_ f: @escaping (Output) async -> Publisher<B>
_ transform: @escaping (Output) async -> Publisher<B>
) -> Publisher<B> {
.init { continuation, downstream in self(onStartup: continuation) { r in switch r {
case .value(let a):
return try await f(a)(flattener(downstream)).value
return try await transform(a)(flattener(downstream)).value
case let .completion(value):
return try await downstream(.completion(value))
} } }
Expand Down
6 changes: 4 additions & 2 deletions Sources/FreeCombine/FlatMap/FlatMapError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
// Created by Van Simmons on 5/18/22.
//
public extension Publisher {
func flatMapError(_ f: @escaping (Swift.Error) async -> Publisher<Output>) -> Publisher<Output> {
func flatMapError(
_ transform: @escaping (Swift.Error) async -> Publisher<Output>
) -> Publisher<Output> {
.init { continuation, downstream in
self(onStartup: continuation) { r in switch r {
case .value(let a):
return try await downstream(.value(a))
case .completion(.failure(let e)):
return try await f(e)(flattener(downstream)).value
return try await transform(e)(flattener(downstream)).value
case .completion(.finished):
return try await downstream(.completion(.finished))
case .completion(.cancelled):
Expand Down
4 changes: 2 additions & 2 deletions Sources/FreeCombine/FlatMap/TryFlatMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

public extension Publisher {
func tryFlatMap<B>(
_ f: @escaping (Output) async throws -> Publisher<B>
_ transform: @escaping (Output) async throws -> Publisher<B>
) -> Publisher<B> {
.init { continuation, downstream in
self(onStartup: continuation) { r in
switch r {
case .value(let a):
var c: Publisher<B>!
do { c = try await f(a) }
do { c = try await transform(a) }
catch { return try await downstream(.completion(.failure(error))) }
return try await c(flattener(downstream)).value
case let .completion(value):
Expand Down
6 changes: 4 additions & 2 deletions Sources/FreeCombine/FlatMap/TryFlatMapError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
// Created by Van Simmons on 6/7/22.
//
public extension Publisher {
func tryFlatMapError(_ f: @escaping (Swift.Error) async throws -> Publisher<Output>) -> Publisher<Output> {
func tryFlatMapError(
_ transform: @escaping (Swift.Error) async throws -> Publisher<Output>
) -> Publisher<Output> {
.init { continuation, downstream in
self(onStartup: continuation) { r in switch r {
case .value(let a):
return try await downstream(.value(a))
case .completion(.failure(let e)):
return try await f(e)(flattener(downstream)).value
return try await transform(e)(flattener(downstream)).value
case .completion(.finished):
return try await downstream(.completion(.finished))
case .completion(.cancelled):
Expand Down
16 changes: 8 additions & 8 deletions Sources/FreeCombine/Future.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public extension Future {
@discardableResult
func sink(
onStartup: Resumption<Void>,
_ f: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
_ downstream: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
) -> Cancellable<Void> {
self(onStartup: onStartup, f)
self(onStartup: onStartup, downstream)
}

@discardableResult
Expand All @@ -52,21 +52,21 @@ public extension Future {
file: StaticString = #file,
line: UInt = #line,
deinitBehavior: DeinitBehavior = .assert,
_ f: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
_ downstream: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
) async -> Cancellable<Void> {
await self(file: file, line: line, deinitBehavior: deinitBehavior, f)
await self(file: file, line: line, deinitBehavior: deinitBehavior, downstream)
}

@discardableResult
func callAsFunction(
file: StaticString = #file,
line: UInt = #line,
deinitBehavior: DeinitBehavior = .assert,
_ f: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
_ downstream: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
) async -> Cancellable<Void> {
var cancellable: Cancellable<Void>!
let _: Void = try! await withResumption(file: file, line: line, deinitBehavior: deinitBehavior) { continuation in
cancellable = self(onStartup: continuation, f)
cancellable = self(onStartup: continuation, downstream)
}
return cancellable
}
Expand Down Expand Up @@ -116,7 +116,7 @@ extension Future {
}

func handleFutureCancellation<Output>(
of f: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
of downstream: @Sendable @escaping (Result<Output, Swift.Error>) async throws -> Void
) async throws -> Void {
_ = try await f(.failure(FutureError.cancelled))
_ = try await downstream(.failure(FutureError.cancelled))
}
Loading

0 comments on commit 0b79e1a

Please sign in to comment.