Skip to content

Commit

Permalink
Fix Heartbeat, Throttle and add Debounce (#34)
Browse files Browse the repository at this point in the history
* Working (sorta) throttle

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Add Heartbeat tests, attempt to make heartbeat skip

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Fix problems in Heartbeat and throttle

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Add Debounce and DelayEachDemand.  Improve throttle to allow buffering

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Undo package version change mistake

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Fix inadvertant changes

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Undo inadvertant change

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Jul 9, 2022
1 parent fda4d6c commit 27fe211
Show file tree
Hide file tree
Showing 16 changed files with 420 additions and 97 deletions.
33 changes: 24 additions & 9 deletions Documentation/Bibliography.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,56 @@ https://youtu.be/T5oB8PZQNvY

https://leanpub.com/sofp

### Swift's Structure Concurrency

[SE-304 Structured Concurrency](https://github.com/apple/swift-evolution/blob/main/proposals/0304-structured-concurrency.md#structured-concurrency-1) The original structured concurrency proposal from Apple.

[Sources for a lot of the design](https://forums.swift.org/t/concurrency-designs-from-other-communities/32389/16)

[Notes on structured concurrency](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) the original post that sparked Apple's approach of structured concurrency, in particular the idea that Task lifetimes should be organized hierarchically.

[Timeouts and Cancellation for Humans](https://vorpus.org/blog/timeouts-and-cancellation-for-humans/) - influential on Apple's thinking

[Guidelines on NIO](https://github.com/swift-server/guides/blob/main/docs/concurrency-adoption-guidelines.md). The official guidelines that implementors of server-side code are supposed to consider.

[NIO Roadmap](https://forums.swift.org/t/future-of-swift-nio-in-light-of-concurrency-roadmap/41633/4). Basically that NIO needs custom executors.

### Theory of Streams
* [Stream Fusion: From Lists to Streams to Nothing At All](https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion:%20From%20Lists%20to%20Streams%20to%20Nothing%20At%20All.pdf)
* [A Brief History of Streams](https://shonan.nii.ac.jp/archives/seminar/136/wp-content/uploads/sites/172/2018/09/a-brief-history-of-streams.pdf) - see especially page 21 comparing push and pull strategies

[History of Haskell](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/07/history.pdf) - see in particular Section 7.1 on Stream and Continuation-based I/O

* [Oleg Kiselyov's Stream Page](https://okmij.org/ftp/Streams.html)

* [Stream Fusion: From Lists to Streams to Nothing At All](https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion:%20From%20Lists%20to%20Streams%20to%20Nothing%20At%20All.pdf)

* [All Things Flow: A History of Streams](https://okmij.org/ftp/Computation/streams-hapoc2021.pdf)

* [Exploiting Vector Instructions with Generalized Stream Fusion](https://cacm.acm.org/magazines/2017/5/216312-exploiting-vector-instructions-with-generalized-stream-fusion/fulltext)

* [Functional Stream Libraries and Fusion: What's Next?](https://okmij.org/ftp/meta-programming/shonan-streams.pdf)

### Theory of State
[Lazy Functional State Threads](https://github.com/bitemyapp/papers/blob/master/Lazy%20Functional%20State%20Threads.pdf) - the original paper on the ST monad

### Generalized Functional Concurrency
https://github.com/bitemyapp/papers/blob/master/A%20Poor%20Man's%20Concurrency%20Monad.pdf
https://github.com/bitemyapp/papers/blob/master/Cheap%20(But%20Functional)%20Threads.pdf
https://github.com/bitemyapp/papers/blob/master/Combining%20Events%20and%20Threads%20for%20Scalable%20Network%20Services:%20Implementation%20and%20Evaluation%20of%20Monadic%2C%20Application-Level%20Concurrency%20Primitives.pdf
https://github.com/bitemyapp/papers/blob/master/Compiling%20with%20Continuations%2C%20Continued.pdf
https://github.com/bitemyapp/papers/blob/master/Functional%20Reactive%20Programming%20from%20First%20Principles.pdf
https://github.com/bitemyapp/papers/blob/master/Functional%20Reactive%20Programming%2C%20Continued.pdf
https://github.com/bitemyapp/papers/blob/master/Higher-Order%20Functional%20Reactive%20Programming%20without%20Spacetime%20Leaks.pdf
https://github.com/bitemyapp/papers/blob/master/Lazy%20Functional%20State%20Threads.pdf
https://github.com/bitemyapp/papers/blob/master/Push-pull%20functional%20reactive%20programming.pdf
https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion%20on%20Haskell%20Unicode%20Strings.pdf


### Theory of State



### Stream Libraries

[Akka Streams](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf)

### Combine
[Akka Streams](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf) - important for the idea of dynamic push/pull mode. See especially starting on page 29.

[Conversation on Combine](https://iosdevelopers.slack.com/archives/C0AET0JQ5/p1623102144192300)
[And](https://iosdevelopers.slack.com/archives/C0AET0JQ5/p1623177619245300?thread_ts=1623102144.192300&cid=C0AET0JQ5)

### Concepts

Expand Down
14 changes: 14 additions & 0 deletions FreeCombine.xcworkspace/xcshareddata/swiftpm/Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"pins" : [
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "919eb1d83e02121cdb434c7bfc1f0c66ef17febe",
"version" : "1.0.2"
}
}
],
"version" : 2
}
3 changes: 1 addition & 2 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// swift-tools-version: 5.6

import PackageDescription

let package = Package(
Expand All @@ -25,7 +24,7 @@ let package = Package(
),
],
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.0.2")),
],
targets: [
.target(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
observe how the zip blocks of value `Int(14)` and `String("hello, combined world!")` are all emitted at the very end.
*/
import Combine
import Atomics


func combineVersion() {
let subject1 = Combine.PassthroughSubject<Int, Error>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<playground version='6.0' target-platform='macos' display-mode='raw' buildActiveScheme='true' importAppTypes='true'>
<playground version='6.0' target-platform='ios' display-mode='rendered' buildActiveScheme='true' importAppTypes='true'>
<pages>
<page name='00a-Introduction (Examples)'/>
<page name='00b-Introduction (Requirements)'/>
Expand Down
57 changes: 34 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,33 @@

## Like Combine. Only free. And concurrent.

FreeCombine is a functional streaming library for the Swift language.

Functional streaming comes in two forms: push and pull. FreeCombine is pull. RxSwift and ReactiveSwift are push. Combine is both, but primarily pull. (If you have ever wondered what a Subscription is in Combine, it's the implementation of pull semantics). AsyncSequence in Apple's Swift standard library is pull-only. While there are exceptions, streams in synchronous systems tend to be push, in asynchronous systems they tend to be pull. Different applications are better suited to one form of streaming than the other. The main differences lie in how the two treat combinators like zip or decombinators like Combine's Subject.

All streaming libraries are written in the Continuation Passing Style (CPS). Because of this they share certain operations for the Continuation type: map, flatMap, filter, reduce, et al.

Promise/Future systems are also written in CPS and as a result share many of the same operations. FreeCombine incorporates NIO-style Promises and Futures as a result.

FreeCombine differs from AsyncSequence (and its support in Apple's swift-async-algorithms package) in the following key ways. FreeCombine is:

* Protocol-free.
* No protocols, only concrete types
* Eager type erasure
* Continuation-passing style
* Explicit implemenation of the continuation-passing style via Continuation type.
* Race-free.
* Yield-free.
* Sleep-free.
* Continuations are only created after upstream continuations are guaranteed to exist
* subscribeOn-race-free. (Continuations are only created after upstream continuations are guaranteed to exist)
* Leak-free.
* Memory-bound Task resolution
* Memory-bound Continuation resolution
* Unbounded queue-free.
* ARC-like Task lifetimes
* ARC-like Continuation lifetimes
* Lock-free.
* Queueing channel instead of locking channel
* Blocking, not locking
* No use of `os_unfair_lock` or equivalent constructs in other operating system
* Foundation-free.
* depends only on Swift std lib (and swift-atomics)

Implies the following Do's and Dont's
These "freedoms" imply the following specific restrictions:

Don'ts:
* No use of `protocol`
Expand All @@ -35,38 +43,31 @@ Sort of Don'ts:
* Use of `Task.init` only in Cancellable
* Use of `[Checked|Unsafe]Continuation` only in Resumption
* Use of `AsyncStream.init` only in Channel
* Use of .unbounded as BufferingPolicy only in Channel's which accept subscribe operations

## Salient features

1. "Small things that compose"
1. Implement all operations supported by Combine, but some require modification
1. Uses "imprecise" errors throughout in the manner of Swift NIO.
1. Tasks and Continuations can _always_ fail due to cancellation
1. Tasks and Continuations can _always_ fail due to cancellation so no Failure type on Continuations
1. Principled handling of cancellation throughout
1. Futures _AND_ Streams, chocolate _AND_ peanut butter
1. No dependency on Foundation
1. Finally and very importantly, ownership of Tasks/Cancellables can be transferred or even shared.

## Todo

1. ~~Implement leak prevention on UnsafeContinuation, Task, and AsyncStream.Continuation~~
1. maybe add an additional repo (FreeCombineDispatch) that depends on libdispatch to get delay, debounce, throttle
1. revamp StateThread to be exactly a concurrency aware version of TCA's store
1. Add support for Promise/Future
1. Add a repo which implements asyncPublishers for everything in Foundation that currently has a `publisher`
1. fully implement all Combine operators
1. Add a Buffer publisher/operator to reintroduce a push model via an independent source of demand upstream
1. Get to 100% test coverage
1. Document this at the level of writing a book in the form of playgrounds

## Introduction
## Introduction

For a long time I've been exploring the idea of what Apple's Swift Combine framework would look like if written without using protocols. The advent of Concurrency support in Swift 5.5 provided an impetus to complete that exploration. This repository represents the current state of that effort and consists of material that I intend to incorporate into classes I teach on iOS development at [Harvard](https://courses.dce.harvard.edu/?details&srcdb=202203&crn=33540) and at [Tufts](https://www.cs.tufts.edu/t/courses/description/fall2021/CS/151-02).

Ideally, this material would become the core of an expanded course on Functional Concurrent Programming using Swift, but that course is still fairly far off.

Secondarily, this repo is my own feeble attempt to answer the following questions:

1. Why does Swift's Structured Concurrency, not have the same set of primitives as Concurrent Haskell?
1. Why is that whenever I ask someone: do you use TaskGroup or `async let`, they respond, I don't but I'm sure that there are other people who do?
1. Why does Swift's Structured Concurrency, not have the same set of primitives as (say) Concurrent Haskell?
1. Why is that whenever I ask someone: do you use TaskGroup or `async let`, they respond, "I don't but I'm sure that there are other people who do." ?
1. Why is it that Task lifetimes much align with their parent's lifetime, but that other objects which are in a parent-child relationship have no such restriction?
1. Are there differences between what we mean when we refer to Structured Concurrency and what we mean when we refer to Functional Concurrency and precisely what would those differences be?

### Functional Requirements
Expand Down Expand Up @@ -131,4 +132,14 @@ In my opinion, what SE-335 is saying applies to Combine (and frankly to AsyncSeq
* Sending .completion(.finished|.cancelled|.failure(Error)) means that the value returned is ignored (proactive)
* External cancellation causes .completion(.cancelled) to be sent as the next demanded value (external)

## Todo

1. ~~Implement leak prevention on UnsafeContinuation, Task, and AsyncStream.Continuation~~
1. ~~maybe add an additional repo (FreeCombineDispatch) that depends on libdispatch to get delay, debounce, throttle~~
1. revamp StateThread to be exactly a concurrency aware version of TCA's store
1. Add support for Promise/Future
1. Add a repo which implements asyncPublishers for everything in Foundation that currently has a `publisher`
1. ~~fully implement all Combine operators~~
1. Add a Buffer publisher/operator to reintroduce a push model via an independent source of demand upstream
1. Get to 100% test coverage
1. Document this at the level of writing a book in the form of playgrounds
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Cancellable/Cancellable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public final class Cancellable<Output: Sendable>: Sendable {

public var isCancelled: Bool { task.isCancelled }
public var isCompleting: Bool { deallocGuard.load(ordering: .sequentiallyConsistent) }
public var value: Output { get async throws { try await task.value } }
public var value: Output { get async throws { try await task.value } }
public var result: Result<Output, Swift.Error> { get async { await task.result } }

@Sendable public func cancel() -> Void {
Expand Down
23 changes: 17 additions & 6 deletions Sources/FreeCombine/Decombinator/RepeatReceiveState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//
// Created by Van Simmons on 7/2/22.
//

public struct RepeatReceiveState<Output: Sendable> {
var distributorChannel: Channel<DistributorState<Output>.Action>

Expand All @@ -13,6 +14,7 @@ public struct RepeatReceiveState<Output: Sendable> {

public enum Action: Sendable {
case receive(AsyncStream<Output>.Result, Resumption<Int>)
case nonBlockingReceive(AsyncStream<Output>.Result)
}

public init(
Expand All @@ -38,23 +40,32 @@ public struct RepeatReceiveState<Output: Sendable> {
default:
continuation.resume(throwing: PublisherError.completed)
}
case .nonBlockingReceive(_):
()
}
}

static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer<Self, Action>.Effect {
static func reduce(state: inout Self, action: Self.Action) async throws -> Reducer<Self, Action>.Effect {
if Task.isCancelled {
switch action {
case .receive(_, let resumption):
resumption.resume(throwing: PublisherError.cancelled)
case .nonBlockingReceive(_):
()
}
return .completion(.cancel)
}
return try await `self`.reduce(action: action)
return try await state.reduce(action: action)
}

mutating func reduce(action: Action) async throws -> Reducer<Self, Action>.Effect {
guard case let .receive(result, resumption) = action else {
fatalError("Unknown action")
var result: AsyncStream<Output>.Result! = .none
var resumption: Resumption<Int>! = .none
if case let .receive(r1, r3) = action {
result = r1
resumption = r3
} else if case let .nonBlockingReceive(r2) = action {
result = r2
}
do {
let subscribers: Int = try await withResumption { innerResumption in
Expand All @@ -69,10 +80,10 @@ public struct RepeatReceiveState<Output: Sendable> {
fatalError("Unhandled continuation value")
}
}
resumption.resume(returning: subscribers)
resumption?.resume(returning: subscribers)
return .none
} catch {
resumption.resume(throwing: error)
resumption?.resume(throwing: error)
return .completion(.failure(error))
}
}
Expand Down
Loading

0 comments on commit 27fe211

Please sign in to comment.