Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start in on Connectable support #18

Merged
merged 6 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Documentation/Bibliography.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion%20on%20Haskell%2

### Stream Libraries

[Akka Streams](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf)

### Combine

[Conversation on Combine](https://iosdevelopers.slack.com/archives/C0AET0JQ5/p1623102144192300)
Expand Down
24 changes: 10 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@

# FreeCombine

## Combine. Only free. And concurrent.

- Protocol-free.
- Race-free.

- Yield-free.
- Sleep-free.

- Leak-free.

- Unbounded queue-free.

- Lock-free.
- Dependency-free.
## Like Combine. Only free. And concurrent.

* Protocol-free.
* Race-free.
* Yield-free.
* Sleep-free.
* Leak-free.
* Unbounded queue-free.
* Lock-free.
* Dependency-free.

## Salient features

Expand Down
42 changes: 42 additions & 0 deletions Sources/FreeCombine/Cancellable/Cancellable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
// Created by Van Simmons on 5/18/22.
//

// Can't be a protocol bc we have to implement deinit

public final class Cancellable<Output: Sendable>: Sendable {
private let _cancel: @Sendable () -> Void
private let _isCancelled: @Sendable () -> Bool
Expand Down Expand Up @@ -63,3 +65,43 @@ public extension Cancellable {
self.init(task: Task.init(operation: task))
}
}
public extension Cancellable {
static func join<B>(_ outer: Cancellable<Cancellable<B>>) -> Cancellable<B> {
.init(
cancel: { outer.cancel() },
isCancelled: { outer.isCancelled },
value: {
let inner = try await outer.value
return try await inner.value
},
result: {
switch await outer.result {
case let .success(value): return await value.result
case let .failure(error): return .failure(error)
}
}
)
}

func map<B>(_ f: @escaping (Output) async -> B) -> Cancellable<B> {
.init(
cancel: self.cancel,
isCancelled: { self.isCancelled },
value: { try await f(self.value) },
result: {
switch await self.result {
case let .success(value): return await .success(f(value))
case let .failure(error): return .failure(error)
}
}
)
}

func join<B>() -> Cancellable<B> where Output == Cancellable<B> {
Self.join(self)
}

func flatMap<B>(_ f: @escaping (Output) async -> Cancellable<B>) -> Cancellable<B> {
self.map(f).join()
}
}
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Combinator/CombineLatestState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct CombineLatestState<Left: Sendable, Right: Sendable> {
switch completion {
case .cancel:
_ = try? await state.downstream(.completion(.cancelled))
case .exit, .termination:
case .exit, .finished:
_ = try? await state.downstream(.completion(.finished))
case let .failure(error):
_ = try? await state.downstream(.completion(.failure(error)))
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Combinator/MergeState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct MergeState<Output: Sendable> {
guard state.mostRecentDemand != .done else { return }
do {
switch completion {
case .termination:
case .finished:
state.mostRecentDemand = try await state.downstream(.completion(.finished))
case .cancel:
state.mostRecentDemand = try await state.downstream(.completion(.cancelled))
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Combinator/ZipState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct ZipState<Left: Sendable, Right: Sendable> {
switch completion {
case .cancel:
_ = try? await state.downstream(.completion(.cancelled))
case .exit, .termination:
case .exit, .finished:
_ = try? await state.downstream(.completion(.finished))
case let .failure(error):
_ = try? await state.downstream(.completion(.failure(error)))
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Decombinator/DistributorState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public struct DistributorState<Output: Sendable> {

static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
switch completion {
case .termination:
case .finished:
await state.process(currentRepeaters: state.repeaters, with: .completion(.finished))
case .exit:
fatalError("Distributor should never exit")
Expand Down
6 changes: 3 additions & 3 deletions Sources/FreeCombine/Decombinator/MulticasterState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public struct MulticasterState<Output: Sendable> {

static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
switch completion {
case .termination:
case .finished:
await state.distributor.process(currentRepeaters: state.distributor.repeaters, with: .completion(.finished))
case .exit:
fatalError("Multicaster should never exit")
Expand Down Expand Up @@ -156,8 +156,8 @@ public struct MulticasterState<Output: Sendable> {
return .none // FIXME: Need to handle this
case let .completion(completion):
switch completion {
case .termination:
return .completion(.termination)
case .finished:
return .completion(.finished)
case .exit:
return .completion(.exit)
case let .failure(error):
Expand Down
2 changes: 1 addition & 1 deletion Sources/FreeCombine/Decombinator/RepeaterState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public struct RepeaterState<ID: Hashable & Sendable, Output: Sendable>: Identifi
static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
do {
switch completion {
case .termination:
case .finished:
_ = try await state.downstream(.completion(.finished))
case .exit:
()
Expand Down
76 changes: 46 additions & 30 deletions Sources/FreeCombine/Multicast/Share.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,51 @@
//
// Created by Van Simmons on 6/7/22.
//

// Need join on Cancellable
public extension Publisher {
// func share() async -> Self {
// let multicaster: ValueRef<Multicaster<Output>?> = ValueRef(value: .none)
// return .init { continuation, downstream in
// return self(onStartup: continuation) { r in
// if await multicaster.value == nil {
// let m: Multicaster<Output> = await .init(
// stateTask: Channel.init().stateTask(
// initialState: MulticasterState<Output>.create(upstream: self),
// reducer: Reducer(
// onCompletion: MulticasterState<Output>.complete,
// reducer: MulticasterState<Output>.reduce
// )
// )
// )
// await multicaster.set(value: m)
// try await m.connect()
// }
// guard let m = await multicaster.value else {
// fatalError("Lost connection")
// }
// switch r {
// case .value(let a):
// return try await downstream(.value(a))
// case let .completion(value):
// try await m.disconnect()
// return .done
// }
// }
// }
// }

func share() async -> Self {
let multicaster: ValueRef<Multicaster<Output>?> = ValueRef(value: .none)
@Sendable func lift(
_ downstream: @Sendable @escaping (AsyncStream<Output>.Result) async throws -> Demand
) -> @Sendable (AsyncStream<Output>.Result) async throws -> Demand {
{ r in
switch r {
case .value:
return try await downstream(r)
case .completion:
let finalValue = try await downstream(r)
await multicaster.set(value: .none)
return finalValue
}
}
}
return .init { continuation, downstream in
Cancellable<Cancellable<Demand>>.join(.init {
if await multicaster.value == nil {
let m: Multicaster<Output> = await .init(
stateTask: Channel.init().stateTask(
initialState: MulticasterState<Output>.create(upstream: self),
reducer: Reducer(
onCompletion: MulticasterState<Output>.complete,
reducer: MulticasterState<Output>.reduce
)
)
)
await multicaster.set(value: m)
let cancellable = await m.publisher().sink(lift(downstream))
try await m.connect()
continuation?.resume()
return cancellable
}
guard let m = await multicaster.value else {
return await Empty(Output.self).sink(downstream)
}
continuation?.resume()
let cancellable = await m.publisher().sink(lift(downstream))
return cancellable
} )
}
}
}
2 changes: 1 addition & 1 deletion Sources/FreeCombine/State/Reducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public struct Reducer<State, Action> {
}

public enum Completion {
case termination
case finished
case exit
case failure(Swift.Error)
case cancel
Expand Down
4 changes: 2 additions & 2 deletions Sources/FreeCombine/State/StateTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ extension StateTask {
case .published(_): continue // FIXME: Handle future mutation
case .completion(.exit): throw PublisherError.completed
case let .completion(.failure(error)): throw error
case .completion(.termination): throw PublisherError.internalError
case .completion(.finished): throw PublisherError.internalError
case .completion(.cancel): throw PublisherError.cancelled
}
}
await reducer(&state, .termination)
await reducer(&state, .finished)
} catch {
channel.finish()
for await action in channel { reducer(action, .failure(error)); continue }
Expand Down
54 changes: 54 additions & 0 deletions Tests/FreeCombineTests/CancellableTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// CancellableTests.swift
//
//
// Created by Van Simmons on 6/8/22.
//

import XCTest
@testable import FreeCombine

final class CancellableTests: XCTestCase {

override func setUpWithError() throws { }

override func tearDownWithError() throws { }

func testCancellable() async throws {
let expectation1 = await CheckedExpectation<Void>()
let expectation2 = await CheckedExpectation<Void>()
let expectation3 = await CheckedExpectation<Void>()

let expectation1a = await CheckedExpectation<Bool>()
let expectation2a = await CheckedExpectation<Bool>()
let expectation3a = await CheckedExpectation<Bool>()

_ = await Cancellable(task: Task<(Cancellable<Void>, Cancellable<Void>, Cancellable<Void>), Swift.Error> {
let t1 = Cancellable(task: Task<Void, Swift.Error> {
try await expectation1.value
try await expectation1a.complete(Task.isCancelled)
})
let t2 = Cancellable(task: Task<Void, Swift.Error> {
try await expectation2.value
try await expectation2a.complete(Task.isCancelled)
})
let t3 = Cancellable(task: Task<Void, Swift.Error> {
try await expectation3.value
try await expectation3a.complete(Task.isCancelled)
})
return (t1, t2, t3)
}).result

try await expectation1.complete()
try await expectation2.complete()
try await expectation3.complete()

let r1 = try await expectation1a.value
let r2 = try await expectation2a.value
let r3 = try await expectation3a.value

XCTAssert(r1, "Inner task 1 not cancelled")
XCTAssert(r2, "Inner task 2 not cancelled")
XCTAssert(r3, "Inner task 3 not cancelled")
}
}
4 changes: 2 additions & 2 deletions Tests/FreeCombineTests/CancellationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class CancellationTests: XCTestCase {
let count2 = await counter2.count
XCTAssertTrue(count2 == 26, "Incorrect count: \(count2)")
do { try await expectation2.complete() }
catch { XCTFail("Multiple terminations sent: \(error)") }
catch { XCTFail("Multiple finishes sent: \(error)") }
return .done
case .completion(.cancelled):
XCTFail("Should not have cancelled")
Expand Down Expand Up @@ -128,7 +128,7 @@ class CancellationTests: XCTestCase {
case .completion(.finished):
XCTFail("Got to end of task that should have been cancelled")
do { try await expectation.complete() }
catch { XCTFail("Multiple terminations sent: \(error)") }
catch { XCTFail("Multiple finishes sent: \(error)") }
return .done
case .completion(.cancelled):
try await expectation.complete()
Expand Down
2 changes: 1 addition & 1 deletion Tests/FreeCombineTests/CombineLatestTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class CombineLatestTests: XCTestCase {
let count = await counter.count
XCTAssert(count == 254, "wrong number of values sent: \(count)")
do { try await expectation.complete() }
catch { XCTFail("Multiple terminations sent: \(error)") }
catch { XCTFail("Multiple finishes sent: \(error)") }
return .done
case .completion(.cancelled):
XCTFail("Should not have cancelled")
Expand Down
2 changes: 0 additions & 2 deletions Tests/FreeCombineTests/MulticastTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,12 @@ class MulticastTests: XCTestCase {
XCTFail("Got an error? \(error)")
return .done
case .completion(.finished):
print("received: \(result)")
let count = await counter1.count
if count != 100 {
XCTFail("Incorrect count: \(count) in subscription 1")
}
return .done
case .completion(.cancelled):
print("received: \(result)")
XCTFail("Should not have cancelled")
return .done
}
Expand Down
Loading