Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CombineLatest, remove unnecessary side effects #9

Merged
merged 4 commits into from
Jun 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<playground version='5.0' target-platform='ios' buildActiveScheme='true' executeOnSourceChanges='false' importAppTypes='true'>
<timeline fileName='timeline.xctimeline'/>
</playground>
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ import FreeCombine
import _Concurrency

Task {
let fsubject1 = await FreeCombine.PassthroughSubject(Int.self)
let fsubject2 = await FreeCombine.PassthroughSubject(String.self)
let fsubject1 = await PassthroughSubject(Int.self)
let fsubject2 = await PassthroughSubject(String.self)

let fseq1 = "abcdefghijklmnopqrstuvwxyz".asyncPublisher
let fseq2 = (1 ... 100).asyncPublisher
Expand Down
3 changes: 3 additions & 0 deletions Playgrounds/04 - Publishers.playground/Contents.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import UIKit

var greeting = "Hello, playground"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Playgrounds/04 - Publishers.playground/timeline.xctimeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Timeline
version = "3.0">
<TimelineItems>
</TimelineItems>
</Timeline>
3 changes: 3 additions & 0 deletions Playgrounds/05 - Operators.playground/Contents.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import UIKit

var greeting = "Hello, playground"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//: [Previous](@previous)

import Foundation

var greeting = "Hello, playground"

//: [Next](@next)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//: [Previous](@previous)

import Foundation

var greeting = "Hello, playground"

//: [Next](@next)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//: [Previous](@previous)

import Foundation

var greeting = "Hello, playground"

//: [Next](@next)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//: [Previous](@previous)

import Foundation

var greeting = "Hello, playground"

//: [Next](@next)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//: [Previous](@previous)

import Foundation

var greeting = "Hello, playground"

//: [Next](@next)
4 changes: 4 additions & 0 deletions Playgrounds/05 - Operators.playground/contents.xcplayground
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<playground version='5.0' target-platform='ios' buildActiveScheme='true' executeOnSourceChanges='false' importAppTypes='true'>
<timeline fileName='timeline.xctimeline'/>
</playground>

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Playgrounds/05 - Operators.playground/timeline.xctimeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Timeline
version = "3.0">
<TimelineItems>
</TimelineItems>
</Timeline>
8 changes: 1 addition & 7 deletions Sources/FreeCombine/Channel/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ public struct Channel<Element>: AsyncSequence {

public init(
_: Element.Type = Element.self,
buffering: AsyncStream<Element>.Continuation.BufferingPolicy = .bufferingOldest(1),
onTermination: (@Sendable (AsyncStream<Element>.Continuation.Termination) -> Void)? = .none
buffering: AsyncStream<Element>.Continuation.BufferingPolicy = .bufferingOldest(1)
) {
var localContinuation: AsyncStream<Element>.Continuation! = .none
stream = .init(bufferingPolicy: buffering) { continuation in
localContinuation = continuation
localContinuation.onTermination = onTermination
}
continuation = localContinuation
}
Expand Down Expand Up @@ -79,7 +77,6 @@ public extension Channel {

func stateTask<State>(
initialState: @escaping (Self) async -> State,
onCancel: @Sendable @escaping () -> Void = { },
reducer: Reducer<State, Self.Element>
) async -> StateTask<State, Self.Element> {
var stateTask: StateTask<State, Self.Element>!
Expand All @@ -88,7 +85,6 @@ public extension Channel {
channel: self,
initialState: initialState,
onStartup: stateTaskContinuation,
onCancel: onCancel,
reducer: reducer
)
}
Expand All @@ -98,14 +94,12 @@ public extension Channel {
func stateTask<State>(
initialState: @escaping (Self) async -> State,
onStartup: UnsafeContinuation<Void, Never>? = .none,
onCancel: @Sendable @escaping () -> Void = { },
reducer: Reducer<State, Self.Element>
) -> StateTask<State, Self.Element> {
.init(
channel: self,
initialState: initialState,
onStartup: onStartup,
onCancel: onCancel,
reducer: reducer
)
}
Expand Down
120 changes: 120 additions & 0 deletions Sources/FreeCombine/Combinator/CombineLatestState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//
// CombineLatestState.swift
//
//
// Created by Van Simmons on 6/4/22.
//
struct CombineLatestState<Left: Sendable, Right: Sendable>: CombinatorState {
typealias CombinatorAction = Self.Action
enum Action {
case setLeft(AsyncStream<Left>.Result, UnsafeContinuation<Demand, Swift.Error>)
case setRight(AsyncStream<Right>.Result, UnsafeContinuation<Demand, Swift.Error>)
}

let downstream: (AsyncStream<(Left?, Right?)>.Result) async throws -> Demand
let leftCancellable: Cancellable<Demand>
let rightCancellable: Cancellable<Demand>

var mostRecentDemand: Demand
var left: (value: Left?, continuation: UnsafeContinuation<Demand, Swift.Error>)? = .none
var right: (value: Right?, continuation: UnsafeContinuation<Demand, Swift.Error>)? = .none
var leftComplete = false
var rightComplete = false

init(
channel: Channel<CombineLatestState<Left, Right>.Action>,
downstream: @escaping (AsyncStream<(Left?, Right?)>.Result) async throws -> Demand,
mostRecentDemand: Demand = .more,
left: Publisher<Left>,
right: Publisher<Right>
) async {
self.downstream = downstream
self.mostRecentDemand = mostRecentDemand
self.leftCancellable = await channel.consume(publisher: left, using: CombineLatestState<Left, Right>.Action.setLeft)
self.rightCancellable = await channel.consume(publisher: right, using: CombineLatestState<Left, Right>.Action.setRight)
}

static func create(
mostRecentDemand: Demand = .more,
left: Publisher<Left>,
right: Publisher<Right>
) -> (@escaping (AsyncStream<(Left?, Right?)>.Result) async throws -> Demand) -> (Channel<CombineLatestState<Left, Right>.Action>) async -> Self {
{ downstream in { channel in
await .init(channel: channel, downstream: downstream, left: left, right: right)
} }
}

static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
state.leftCancellable.cancel()
state.left?.continuation.resume(returning: .done)
state.rightCancellable.cancel()
state.right?.continuation.resume(returning: .done)
switch completion {
case .cancel:
_ = try? await state.downstream(.completion(.cancelled))
case .exit, .termination:
_ = try? await state.downstream(.completion(.finished))
case let .failure(error):
_ = try? await state.downstream(.completion(.failure(error)))
}
}

static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer<Self, Action>.Effect {
guard !Task.isCancelled else { return .completion(.cancel) }
return try await `self`.reduce(action: action)
}

private mutating func reduce(
action: Self.Action
) async throws -> Reducer<Self, Action>.Effect {
switch action {
case let .setLeft(leftResult, leftContinuation):
if mostRecentDemand == .done { leftContinuation.resume(returning: .done) }
else { return try await handleLeft(leftResult, leftContinuation) }
case let .setRight(rightResult, rightContinuation):
if mostRecentDemand == .done { rightContinuation.resume(returning: .done) }
else { return try await handleRight(rightResult, rightContinuation) }
}
return .none
}

private mutating func handleLeft(
_ leftResult: AsyncStream<Left>.Result,
_ leftContinuation: UnsafeContinuation<Demand, Error>
) async throws -> Reducer<Self, Action>.Effect {
switch leftResult {
case let .value((value)):
left = (value, leftContinuation)
guard !Task.isCancelled else {
leftContinuation.resume(returning: .done)
return .completion(.cancel)
}
mostRecentDemand = try await downstream(.value((value, right?.value)))
leftContinuation.resume(returning: mostRecentDemand)
return .none
case .completion(_):
leftComplete = true
return leftComplete && rightComplete ? .completion(.exit) : .none
}
}

private mutating func handleRight(
_ rightResult: AsyncStream<Right>.Result,
_ rightContinuation: UnsafeContinuation<Demand, Error>
) async throws -> Reducer<Self, Self.Action>.Effect {
switch rightResult {
case let .value((value)):
right = (value, rightContinuation)
guard !Task.isCancelled else {
rightContinuation.resume(returning: .done)
return .completion(.cancel)
}
mostRecentDemand = try await downstream(.value((left?.value, value)))
rightContinuation.resume(returning: mostRecentDemand)
return .none
case .completion(_) :
rightComplete = true
return leftComplete && rightComplete ? .completion(.exit) : .none
}
}
}
25 changes: 25 additions & 0 deletions Sources/FreeCombine/Filter/DropFirst.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// File.swift
//
//
// Created by Van Simmons on 6/4/22.
//
public extension Publisher {
func dropFirst(
_ count: Int = 1
) -> Self {
.init { continuation, downstream in
let currentValue: ValueRef<Int> = ValueRef(value: count + 1)
return self(onStartup: continuation) { r in
let current = await currentValue.value - 1
await currentValue.set(value: max(0, current))
switch r {
case .value:
guard current <= 0 else { return .more }
return try await downstream(r)
case let .completion(value):
return try await downstream(.completion(value))
} }
}
}
}
8 changes: 1 addition & 7 deletions Sources/FreeCombine/Publishers/Combinator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ public protocol CombinatorState {
public func Combinator<Output: Sendable, State: CombinatorState, Action>(
initialState: @escaping (@escaping (AsyncStream<Output>.Result) async throws -> Demand) -> (Channel<Action>) async -> State,
buffering: AsyncStream<Action>.Continuation.BufferingPolicy,
onCancel: @escaping () -> Void,
reducer: Reducer<State, Action>
) -> Publisher<Output> where State.CombinatorAction == Action {
.init(
initialState: initialState,
buffering: buffering,
onCancel: onCancel,
reducer: reducer
)
}
Expand All @@ -27,7 +25,6 @@ public extension Publisher {
init<State: CombinatorState, Action>(
initialState: @escaping (@escaping (AsyncStream<Output>.Result) async throws -> Demand) -> (Channel<Action>) async -> State,
buffering: AsyncStream<Action>.Continuation.BufferingPolicy,
onCancel: @escaping () -> Void,
reducer: Reducer<State, Action>
) where State.CombinatorAction == Action {
self = .init { continuation, downstream in
Expand All @@ -37,10 +34,7 @@ public extension Publisher {
reducer: reducer
)

return try await withTaskCancellationHandler(handler: {
stateTask.cancel()
onCancel()
}) {
return try await withTaskCancellationHandler(handler: stateTask.cancel) {
continuation?.resume()
guard !Task.isCancelled else {
throw PublisherError.cancelled
Expand Down
Loading