Skip to content

Commit

Permalink
Continue Connectable work, Add leak-proofing (#20)
Browse files Browse the repository at this point in the history
* Working at 1000 Test repetitions

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Move to swift-atomics version of expectation

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Checkpointing

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Rename Checked Expecation/Start Resumption

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Revamp Expectation

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* More cleanup

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Checkpointing

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Checkpointing

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

* Pass all tests @1000 on phone and mac.

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Jun 22, 2022
1 parent efea6dc commit 7342b1a
Show file tree
Hide file tree
Showing 68 changed files with 1,209 additions and 750 deletions.
4 changes: 4 additions & 0 deletions Documentation/Bibliography.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion%20on%20Haskell%2

[Conversation on Combine](https://iosdevelopers.slack.com/archives/C0AET0JQ5/p1623102144192300)

### Concepts

[Resource Acquisition is Initialization](https://en.wikipedia.org/wiki/Resource_acquisition_is_initialization)

14 changes: 14 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"pins" : [
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "919eb1d83e02121cdb434c7bfc1f0c66ef17febe",
"version" : "1.0.2"
}
}
],
"version" : 2
}
5 changes: 4 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ let package = Package(
),
],
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.0.0")),
],
targets: [
.target(
name: "FreeCombine",
dependencies: []
dependencies: [
.product(name: "Atomics", package: "swift-atomics")
]
),
.testTarget(
name: "FreeCombineTests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,52 @@ Task {
try await Task.sleep(nanoseconds: 3_000_000_000)
print("ending inner task 3")
}
do {
try await Task.sleep(nanoseconds: 1_000_000_000)
} catch {
print("Cancelled the sleep")
}
print("ending outer task")
await withTaskCancellationHandler(handler: {
// s1.cancel()
// s2.cancel()
// s3.cancel()
}, operation: {
do {
try await Task.sleep(nanoseconds: 1_000_000_000)
} catch {
print("Cancelled the sleep")
}
print("ending outer task")
} )
}
//t.cancel()

/*:
A better approach
Intuitively, this is what `async let` is for but this does not compile bc async let
variables cannot escape their parent task.
*/

//Task<(Void, Void, Void), Swift.Error> {
// async let t1: Void = {
// try await Task.sleep(nanoseconds: 4_000_000_000)
// print("ending inner task 1")
// }()
//
// async let t2: Void = {
// try await Task.sleep(nanoseconds: 5_000_000_000)
// print("ending inner task 2")
// }()
//
// async let t3: Void = {
// try await Task.sleep(nanoseconds: 3_000_000_000)
// print("ending inner task 3")
// }()
//
// do { try await Task.sleep(nanoseconds: 1_000_000_000) }
// catch { print("Cancelled the sleep") }
//
// print("ending outer task")
// return try await (t1, t2, t3)
//}

/*:
An additional approach, scope Tasks to the lifetime of an object rather than to a parent.
Then if you ever leak a reference to the task, it cancels.
*/
public final class Cancellable<Output: Sendable>: Sendable {
private let _cancel: @Sendable () -> Void
Expand Down Expand Up @@ -76,6 +112,14 @@ public extension Cancellable {
}
}

Task {
let _: Void = await withUnsafeContinuation { continuation in
return
}
print("Exiting continuation task")
}


Cancellable(task: .init {
Cancellable(task: .init {
try await Task.sleep(nanoseconds: 4_000_000_000)
Expand Down
39 changes: 27 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,52 @@
## Like Combine. Only free. And concurrent.

* Protocol-free.
* No protocols, only concrete types
* Eager type erasure
* Continuation-passing style
* Race-free.
* Yield-free.
* Sleep-free.
* Leak-free.
* Memory-bound Task cancellation
* Memory-bound UnsafeContinuation resolution
* Unbounded queue-free.
* Lock-free.
* Queueing channel instead of locking channel
* Dependency-free.
* depends only on Swift std lib

## Salient features

1. "Small things that compose"
2. Implement all operations supported by Combine, but some require modification
3. Uses "imprecise" errors throughout in the manner of Swift concurrency
4. Futures _AND_ Streams
1. Implement all operations supported by Combine, but some require modification
1. Uses "imprecise" errors throughout in the manner of Swift concurrency.
1. Tasks and Continuations can _always_ fail due to cancellation
1. Principled handling of cancellation throughout
1. Futures _AND_ Streams

## Todo

1. ~~Implement leak prevention on UnsafeContinuation, Task, and AsyncStream.Continuation~~
1. maybe add an additional repo (FreeCombineDispatch) that depends on libdispatch to get delay, debounce, throttle
2. revamp StateThread to be exactly a concurrency aware version of TCA's store
3. Add support for Promise/Future
4. Add a repo which implements asyncPublishers for everything in Foundation that currently has a `publisher`
5. fully implement all Combine operators
6. Add a Buffer publisher/operator to reintroduce a push model via an independent source of demand upstream
7. Get to 100% test coverage
8. Document this at the level of writing a book in the form of playgrounds
1. revamp StateThread to be exactly a concurrency aware version of TCA's store
1. Add support for Promise/Future
1. Add a repo which implements asyncPublishers for everything in Foundation that currently has a `publisher`
1. fully implement all Combine operators
1. Add a Buffer publisher/operator to reintroduce a push model via an independent source of demand upstream
1. Get to 100% test coverage
1. Document this at the level of writing a book in the form of playgrounds

## Introduction

For a long time I've been exploring the idea of what Apple's Swift Combine framework would look like without using protocols. The advent of Concurrency support in Swift 5.5 provided an impetus to complete that exploration. This repository represents the current state of that effort and consists of material that I intend to incorporate into classes I teach on iOS development at [Harvard](https://courses.dce.harvard.edu/?details&srcdb=202203&crn=33540) and at [Tufts](https://www.cs.tufts.edu/t/courses/description/fall2021/CS/151-02).
For a long time I've been exploring the idea of what Apple's Swift Combine framework would look like if written without using protocols. The advent of Concurrency support in Swift 5.5 provided an impetus to complete that exploration. This repository represents the current state of that effort and consists of material that I intend to incorporate into classes I teach on iOS development at [Harvard](https://courses.dce.harvard.edu/?details&srcdb=202203&crn=33540) and at [Tufts](https://www.cs.tufts.edu/t/courses/description/fall2021/CS/151-02).

Ideally, this material would become the core of an expanded course on Functional Concurrent Programming using Swift, but that course is still fairly far off.
Ideally, this material would become the core of an expanded course on Functional Concurrent Programming using Swift, but that course is still fairly far off.

Secondarily, this repo is my own feeble attempt to answer the following:

1. Why does Swift's Structured Concurrency, not have the same set of primitives as Concurrent Haskell?
2. Are there differences between what we mean when we refer to Structured Concurrency and what we mean when we refer to Functional Concurrency and precisely what would those differences be?

### Functional Requirements

Expand Down
178 changes: 104 additions & 74 deletions Sources/FreeCombine/Cancellable/Cancellable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,104 +4,134 @@
//
// Created by Van Simmons on 5/18/22.
//
@preconcurrency import Atomics

// Can't be a protocol bc we have to implement deinit
public enum DeinitBehavior: Sendable {
case assert
case log
case silent
}

// Can't be a protocol bc we have to implement deinit
public final class Cancellable<Output: Sendable>: Sendable {
private let _cancel: @Sendable () -> Void
private let _isCancelled: @Sendable () -> Bool
private let _value: @Sendable () async throws -> Output
private let _result: @Sendable () async -> Result<Output, Swift.Error>
private let task: Task<Output, Swift.Error>
private let deallocGuard: ManagedAtomic<Bool>

public let file: StaticString
public let line: UInt
public let deinitBehavior: DeinitBehavior

public var isCancelled: Bool { _isCancelled() }
public var value: Output { get async throws { try await _value() } }
public var result: Result<Output, Swift.Error> { get async { await _result() } }
public var isCancelled: Bool { task.isCancelled }
public var isCompleting: Bool { deallocGuard.load(ordering: .sequentiallyConsistent) }
public var value: Output { get async throws { try await task.value } }
public var result: Result<Output, Swift.Error> { get async { await task.result } }

@Sendable public func cancel() -> Void { _cancel() }
@Sendable public func cancel() -> Void {
guard !isCompleting else { return }
task.cancel()
}
@Sendable public func cancelAndAwaitValue() async throws -> Output {
_cancel()
return try await _value()
cancel()
return try await task.value
}
@Sendable public func cancelAndAwaitResult() async throws -> Result<Output, Swift.Error> {
_cancel()
return await _result()
@Sendable public func cancelAndAwaitResult() async -> Result<Output, Swift.Error> {
cancel()
return await task.result
}

init(
cancel: @escaping @Sendable () -> Void,
isCancelled: @escaping @Sendable () -> Bool,
value: @escaping @Sendable () async throws -> Output,
result: @escaping @Sendable () async -> Result<Output, Swift.Error>
file: StaticString = #file,
line: UInt = #line,
deinitBehavior: DeinitBehavior = .assert,
operation: @Sendable @escaping () async throws -> Output
) {
_cancel = cancel
_isCancelled = isCancelled
_value = value
_result = result
let atomic = ManagedAtomic<Bool>(false)
self.deallocGuard = atomic
self.file = file
self.line = line
self.deinitBehavior = deinitBehavior
self.task = .init {
do {
let retVal = try await operation()
atomic.store(true, ordering: .sequentiallyConsistent)
return retVal
}
catch {
atomic.store(true, ordering: .sequentiallyConsistent)
throw error
}
}
}

deinit { if !self.isCancelled { self.cancel() } }
deinit {
let shouldCancel = !(isCompleting || task.isCancelled)
switch deinitBehavior {
case .assert:
assert(!shouldCancel, "ABORTING DUE TO LEAKED \(type(of: Self.self)) CREATED @ \(file): \(line)")
case .log:
if shouldCancel { print("CANCELLING LEAKED \(type(of: Self.self)) CREATED @ \(file): \(line)") }
case .silent:
()
}
if shouldCancel { task.cancel() }
}
}

public extension Cancellable {
convenience init(task: Task<Output, Swift.Error>) {
self.init(
cancel: { task.cancel() },
isCancelled: { task.isCancelled },
value: { try await task.value },
result: { await task.result }
)
}

convenience init<Action: Sendable>(stateTask: StateTask<Output, Action>) {
self.init(
cancel: { stateTask.cancel() },
isCancelled: { stateTask.isCancelled },
value: { try await stateTask.value },
result: { await stateTask.result }
)
static func join<B>(
file: StaticString = #file,
line: UInt = #line,
_ outer: Cancellable<Cancellable<B>>
) -> Cancellable<B> {
.init(file: file, line: line, operation: {
try await withTaskCancellationHandler(handler: {
Task<Void, Swift.Error> { try! await outer.value.cancel() }
}, operation: {
return try await outer.value.value
})
})
}

convenience init(task: @Sendable @escaping () async throws -> Output) {
self.init(task: Task.init(operation: task))
}
}
public extension Cancellable {
static func join<B>(_ outer: Cancellable<Cancellable<B>>) -> Cancellable<B> {
.init(
cancel: { outer.cancel() },
isCancelled: { outer.isCancelled },
value: {
let inner = try await outer.value
return try await inner.value
},
result: {
switch await outer.result {
case let .success(value): return await value.result
case let .failure(error): return .failure(error)
}
}
)
static func join(
file: StaticString = #file,
line: UInt = #line,
_ generator: @escaping () async throws -> Cancellable<Output>
) -> Cancellable<Output> {
.init(file: file, line: line, operation: {
let outer = try await generator()
return try await withTaskCancellationHandler(handler: {
Task<Void, Swift.Error> { outer.cancel() }
}, operation: {
return try await outer.value
})
})
}

func map<B>(_ f: @escaping (Output) async -> B) -> Cancellable<B> {
.init(
cancel: self.cancel,
isCancelled: { self.isCancelled },
value: { try await f(self.value) },
result: {
switch await self.result {
case let .success(value): return await .success(f(value))
case let .failure(error): return .failure(error)
}
func map<B>(
file: StaticString = #file,
line: UInt = #line,
_ f: @escaping (Output) async -> B
) -> Cancellable<B> {
let inner = self
return .init(file: file, line: line) {
try await withTaskCancellationHandler(handler: { Task { inner.cancel() } }) {
try await f(inner.value)
}
)
}
}

func join<B>() -> Cancellable<B> where Output == Cancellable<B> {
Self.join(self)
func join<B>(
file: StaticString = #file,
line: UInt = #line
) -> Cancellable<B> where Output == Cancellable<B> {
Self.join(file: file, line: line, self)
}

func flatMap<B>(_ f: @escaping (Output) async -> Cancellable<B>) -> Cancellable<B> {
self.map(f).join()
func flatMap<B>(
_ f: @escaping (Output) async -> Cancellable<B>,
file: StaticString = #file,
line: UInt = #line
) -> Cancellable<B> {
self.map(file: file, line: line, f).join(file: file, line: line)
}
}
Loading

0 comments on commit 7342b1a

Please sign in to comment.