From 27fe21131f245a25405097a23f372c8faf8916b1 Mon Sep 17 00:00:00 2001 From: "Ronald V. Simmons" <789213+rvsrvs@users.noreply.github.com> Date: Sat, 9 Jul 2022 18:34:56 -0400 Subject: [PATCH] Fix Heartbeat, Throttle and add Debounce (#34) * Working (sorta) throttle Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> * Add Heartbeat tests, attempt to make heartbeat skip Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> * Fix problems in Heartbeat and throttle Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> * Add Debounce and DelayEachDemand. Improve throttle to allow buffering Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> * Undo package version change mistake Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> * Fix inadvertant changes Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> * Undo inadvertant change Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com> --- Documentation/Bibliography.md | 33 ++++-- .../xcshareddata/swiftpm/Package.resolved | 14 +++ Package.swift | 3 +- .../Contents.swift | 2 +- .../contents.xcplayground | 2 +- README.md | 57 ++++++---- .../FreeCombine/Cancellable/Cancellable.swift | 2 +- .../Decombinator/RepeatReceiveState.swift | 23 ++-- .../FreeCombine/Publishers/Heartbeat.swift | 79 +++++++++----- Sources/FreeCombine/Subject/Subject.swift | 22 ++++ Sources/FreeCombine/Time/Debounce.swift | 57 ++++++++++ .../FreeCombine/Time/DelayEachDemand.swift | 21 ++++ Sources/FreeCombine/Time/Throttle.swift | 47 ++++---- Tests/TimeTests/HeartbeatTests.swift | 46 ++++++++ Tests/TimeTests/ThrottleDemandTests.swift | 9 +- Tests/TimeTests/ThrottleTests.swift | 100 +++++++++++++++++- 16 files changed, 420 insertions(+), 97 deletions(-) create mode 100644 FreeCombine.xcworkspace/xcshareddata/swiftpm/Package.resolved create mode 100644 Sources/FreeCombine/Time/Debounce.swift create mode 100644 Sources/FreeCombine/Time/DelayEachDemand.swift create mode 100644 Tests/TimeTests/HeartbeatTests.swift diff --git a/Documentation/Bibliography.md b/Documentation/Bibliography.md index e83bf16..6641c78 100644 --- a/Documentation/Bibliography.md +++ b/Documentation/Bibliography.md @@ -4,18 +4,39 @@ https://youtu.be/T5oB8PZQNvY https://leanpub.com/sofp +### Swift's Structure Concurrency + +[SE-304 Structured Concurrency](https://github.com/apple/swift-evolution/blob/main/proposals/0304-structured-concurrency.md#structured-concurrency-1) The original structured concurrency proposal from Apple. + +[Sources for a lot of the design](https://forums.swift.org/t/concurrency-designs-from-other-communities/32389/16) + +[Notes on structured concurrency](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) the original post that sparked Apple's approach of structured concurrency, in particular the idea that Task lifetimes should be organized hierarchically. + +[Timeouts and Cancellation for Humans](https://vorpus.org/blog/timeouts-and-cancellation-for-humans/) - influential on Apple's thinking + +[Guidelines on NIO](https://github.com/swift-server/guides/blob/main/docs/concurrency-adoption-guidelines.md). The official guidelines that implementors of server-side code are supposed to consider. + +[NIO Roadmap](https://forums.swift.org/t/future-of-swift-nio-in-light-of-concurrency-roadmap/41633/4). Basically that NIO needs custom executors. ### Theory of Streams -* [Stream Fusion: From Lists to Streams to Nothing At All](https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion:%20From%20Lists%20to%20Streams%20to%20Nothing%20At%20All.pdf) +* [A Brief History of Streams](https://shonan.nii.ac.jp/archives/seminar/136/wp-content/uploads/sites/172/2018/09/a-brief-history-of-streams.pdf) - see especially page 21 comparing push and pull strategies + +[History of Haskell](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/07/history.pdf) - see in particular Section 7.1 on Stream and Continuation-based I/O * [Oleg Kiselyov's Stream Page](https://okmij.org/ftp/Streams.html) +* [Stream Fusion: From Lists to Streams to Nothing At All](https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion:%20From%20Lists%20to%20Streams%20to%20Nothing%20At%20All.pdf) + * [All Things Flow: A History of Streams](https://okmij.org/ftp/Computation/streams-hapoc2021.pdf) * [Exploiting Vector Instructions with Generalized Stream Fusion](https://cacm.acm.org/magazines/2017/5/216312-exploiting-vector-instructions-with-generalized-stream-fusion/fulltext) * [Functional Stream Libraries and Fusion: What's Next?](https://okmij.org/ftp/meta-programming/shonan-streams.pdf) +### Theory of State +[Lazy Functional State Threads](https://github.com/bitemyapp/papers/blob/master/Lazy%20Functional%20State%20Threads.pdf) - the original paper on the ST monad + +### Generalized Functional Concurrency https://github.com/bitemyapp/papers/blob/master/A%20Poor%20Man's%20Concurrency%20Monad.pdf https://github.com/bitemyapp/papers/blob/master/Cheap%20(But%20Functional)%20Threads.pdf https://github.com/bitemyapp/papers/blob/master/Combining%20Events%20and%20Threads%20for%20Scalable%20Network%20Services:%20Implementation%20and%20Evaluation%20of%20Monadic%2C%20Application-Level%20Concurrency%20Primitives.pdf @@ -23,22 +44,16 @@ https://github.com/bitemyapp/papers/blob/master/Compiling%20with%20Continuations https://github.com/bitemyapp/papers/blob/master/Functional%20Reactive%20Programming%20from%20First%20Principles.pdf https://github.com/bitemyapp/papers/blob/master/Functional%20Reactive%20Programming%2C%20Continued.pdf https://github.com/bitemyapp/papers/blob/master/Higher-Order%20Functional%20Reactive%20Programming%20without%20Spacetime%20Leaks.pdf -https://github.com/bitemyapp/papers/blob/master/Lazy%20Functional%20State%20Threads.pdf https://github.com/bitemyapp/papers/blob/master/Push-pull%20functional%20reactive%20programming.pdf https://github.com/bitemyapp/papers/blob/master/Stream%20Fusion%20on%20Haskell%20Unicode%20Strings.pdf -### Theory of State - - - ### Stream Libraries -[Akka Streams](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf) - -### Combine +[Akka Streams](https://qconnewyork.com/ny2015/system/files/presentation-slides/AkkaStreamsQconNY.pdf) - important for the idea of dynamic push/pull mode. See especially starting on page 29. [Conversation on Combine](https://iosdevelopers.slack.com/archives/C0AET0JQ5/p1623102144192300) +[And](https://iosdevelopers.slack.com/archives/C0AET0JQ5/p1623177619245300?thread_ts=1623102144.192300&cid=C0AET0JQ5) ### Concepts diff --git a/FreeCombine.xcworkspace/xcshareddata/swiftpm/Package.resolved b/FreeCombine.xcworkspace/xcshareddata/swiftpm/Package.resolved new file mode 100644 index 0000000..b3b6338 --- /dev/null +++ b/FreeCombine.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -0,0 +1,14 @@ +{ + "pins" : [ + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "919eb1d83e02121cdb434c7bfc1f0c66ef17febe", + "version" : "1.0.2" + } + } + ], + "version" : 2 +} diff --git a/Package.swift b/Package.swift index eb4b8cf..c6e5a12 100644 --- a/Package.swift +++ b/Package.swift @@ -1,5 +1,4 @@ // swift-tools-version: 5.6 - import PackageDescription let package = Package( @@ -25,7 +24,7 @@ let package = Package( ), ], dependencies: [ - .package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.0.0")), + .package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.0.2")), ], targets: [ .target( diff --git a/Playgrounds/05 - FreeCombine.playground/Pages/00a-Introduction (Examples).xcplaygroundpage/Contents.swift b/Playgrounds/05 - FreeCombine.playground/Pages/00a-Introduction (Examples).xcplaygroundpage/Contents.swift index 573fd2e..7f5cf7c 100644 --- a/Playgrounds/05 - FreeCombine.playground/Pages/00a-Introduction (Examples).xcplaygroundpage/Contents.swift +++ b/Playgrounds/05 - FreeCombine.playground/Pages/00a-Introduction (Examples).xcplaygroundpage/Contents.swift @@ -27,7 +27,7 @@ observe how the zip blocks of value `Int(14)` and `String("hello, combined world!")` are all emitted at the very end. */ import Combine -import Atomics + func combineVersion() { let subject1 = Combine.PassthroughSubject() diff --git a/Playgrounds/05 - FreeCombine.playground/contents.xcplayground b/Playgrounds/05 - FreeCombine.playground/contents.xcplayground index cc9659f..ae8489f 100644 --- a/Playgrounds/05 - FreeCombine.playground/contents.xcplayground +++ b/Playgrounds/05 - FreeCombine.playground/contents.xcplayground @@ -1,5 +1,5 @@ - + diff --git a/README.md b/README.md index bb71cf6..ce1d315 100644 --- a/README.md +++ b/README.md @@ -4,25 +4,33 @@ ## Like Combine. Only free. And concurrent. +FreeCombine is a functional streaming library for the Swift language. + +Functional streaming comes in two forms: push and pull. FreeCombine is pull. RxSwift and ReactiveSwift are push. Combine is both, but primarily pull. (If you have ever wondered what a Subscription is in Combine, it's the implementation of pull semantics). AsyncSequence in Apple's Swift standard library is pull-only. While there are exceptions, streams in synchronous systems tend to be push, in asynchronous systems they tend to be pull. Different applications are better suited to one form of streaming than the other. The main differences lie in how the two treat combinators like zip or decombinators like Combine's Subject. + +All streaming libraries are written in the Continuation Passing Style (CPS). Because of this they share certain operations for the Continuation type: map, flatMap, filter, reduce, et al. + +Promise/Future systems are also written in CPS and as a result share many of the same operations. FreeCombine incorporates NIO-style Promises and Futures as a result. + +FreeCombine differs from AsyncSequence (and its support in Apple's swift-async-algorithms package) in the following key ways. FreeCombine is: + * Protocol-free. * No protocols, only concrete types * Eager type erasure - * Continuation-passing style + * Explicit implemenation of the continuation-passing style via Continuation type. * Race-free. * Yield-free. * Sleep-free. - * Continuations are only created after upstream continuations are guaranteed to exist + * subscribeOn-race-free. (Continuations are only created after upstream continuations are guaranteed to exist) * Leak-free. - * Memory-bound Task resolution - * Memory-bound Continuation resolution - * Unbounded queue-free. + * ARC-like Task lifetimes + * ARC-like Continuation lifetimes * Lock-free. * Queueing channel instead of locking channel + * Blocking, not locking * No use of `os_unfair_lock` or equivalent constructs in other operating system -* Foundation-free. - * depends only on Swift std lib (and swift-atomics) -Implies the following Do's and Dont's +These "freedoms" imply the following specific restrictions: Don'ts: * No use of `protocol` @@ -35,29 +43,21 @@ Sort of Don'ts: * Use of `Task.init` only in Cancellable * Use of `[Checked|Unsafe]Continuation` only in Resumption * Use of `AsyncStream.init` only in Channel +* Use of .unbounded as BufferingPolicy only in Channel's which accept subscribe operations ## Salient features 1. "Small things that compose" 1. Implement all operations supported by Combine, but some require modification 1. Uses "imprecise" errors throughout in the manner of Swift NIO. -1. Tasks and Continuations can _always_ fail due to cancellation +1. Tasks and Continuations can _always_ fail due to cancellation so no Failure type on Continuations 1. Principled handling of cancellation throughout 1. Futures _AND_ Streams, chocolate _AND_ peanut butter +1. No dependency on Foundation +1. Finally and very importantly, ownership of Tasks/Cancellables can be transferred or even shared. -## Todo - -1. ~~Implement leak prevention on UnsafeContinuation, Task, and AsyncStream.Continuation~~ -1. maybe add an additional repo (FreeCombineDispatch) that depends on libdispatch to get delay, debounce, throttle -1. revamp StateThread to be exactly a concurrency aware version of TCA's store -1. Add support for Promise/Future -1. Add a repo which implements asyncPublishers for everything in Foundation that currently has a `publisher` -1. fully implement all Combine operators -1. Add a Buffer publisher/operator to reintroduce a push model via an independent source of demand upstream -1. Get to 100% test coverage -1. Document this at the level of writing a book in the form of playgrounds - ## Introduction +## Introduction For a long time I've been exploring the idea of what Apple's Swift Combine framework would look like if written without using protocols. The advent of Concurrency support in Swift 5.5 provided an impetus to complete that exploration. This repository represents the current state of that effort and consists of material that I intend to incorporate into classes I teach on iOS development at [Harvard](https://courses.dce.harvard.edu/?details&srcdb=202203&crn=33540) and at [Tufts](https://www.cs.tufts.edu/t/courses/description/fall2021/CS/151-02). @@ -65,8 +65,9 @@ Sort of Don'ts: Secondarily, this repo is my own feeble attempt to answer the following questions: - 1. Why does Swift's Structured Concurrency, not have the same set of primitives as Concurrent Haskell? - 1. Why is that whenever I ask someone: do you use TaskGroup or `async let`, they respond, I don't but I'm sure that there are other people who do? + 1. Why does Swift's Structured Concurrency, not have the same set of primitives as (say) Concurrent Haskell? + 1. Why is that whenever I ask someone: do you use TaskGroup or `async let`, they respond, "I don't but I'm sure that there are other people who do." ? + 1. Why is it that Task lifetimes much align with their parent's lifetime, but that other objects which are in a parent-child relationship have no such restriction? 1. Are there differences between what we mean when we refer to Structured Concurrency and what we mean when we refer to Functional Concurrency and precisely what would those differences be? ### Functional Requirements @@ -131,4 +132,14 @@ In my opinion, what SE-335 is saying applies to Combine (and frankly to AsyncSeq * Sending .completion(.finished|.cancelled|.failure(Error)) means that the value returned is ignored (proactive) * External cancellation causes .completion(.cancelled) to be sent as the next demanded value (external) +## Todo +1. ~~Implement leak prevention on UnsafeContinuation, Task, and AsyncStream.Continuation~~ +1. ~~maybe add an additional repo (FreeCombineDispatch) that depends on libdispatch to get delay, debounce, throttle~~ +1. revamp StateThread to be exactly a concurrency aware version of TCA's store +1. Add support for Promise/Future +1. Add a repo which implements asyncPublishers for everything in Foundation that currently has a `publisher` +1. ~~fully implement all Combine operators~~ +1. Add a Buffer publisher/operator to reintroduce a push model via an independent source of demand upstream +1. Get to 100% test coverage +1. Document this at the level of writing a book in the form of playgrounds diff --git a/Sources/FreeCombine/Cancellable/Cancellable.swift b/Sources/FreeCombine/Cancellable/Cancellable.swift index 74157c9..3d43e1e 100644 --- a/Sources/FreeCombine/Cancellable/Cancellable.swift +++ b/Sources/FreeCombine/Cancellable/Cancellable.swift @@ -23,7 +23,7 @@ public final class Cancellable: Sendable { public var isCancelled: Bool { task.isCancelled } public var isCompleting: Bool { deallocGuard.load(ordering: .sequentiallyConsistent) } - public var value: Output { get async throws { try await task.value } } + public var value: Output { get async throws { try await task.value } } public var result: Result { get async { await task.result } } @Sendable public func cancel() -> Void { diff --git a/Sources/FreeCombine/Decombinator/RepeatReceiveState.swift b/Sources/FreeCombine/Decombinator/RepeatReceiveState.swift index 131d85f..133753e 100644 --- a/Sources/FreeCombine/Decombinator/RepeatReceiveState.swift +++ b/Sources/FreeCombine/Decombinator/RepeatReceiveState.swift @@ -4,6 +4,7 @@ // // Created by Van Simmons on 7/2/22. // + public struct RepeatReceiveState { var distributorChannel: Channel.Action> @@ -13,6 +14,7 @@ public struct RepeatReceiveState { public enum Action: Sendable { case receive(AsyncStream.Result, Resumption) + case nonBlockingReceive(AsyncStream.Result) } public init( @@ -38,23 +40,32 @@ public struct RepeatReceiveState { default: continuation.resume(throwing: PublisherError.completed) } + case .nonBlockingReceive(_): + () } } - static func reduce(`self`: inout Self, action: Self.Action) async throws -> Reducer.Effect { + static func reduce(state: inout Self, action: Self.Action) async throws -> Reducer.Effect { if Task.isCancelled { switch action { case .receive(_, let resumption): resumption.resume(throwing: PublisherError.cancelled) + case .nonBlockingReceive(_): + () } return .completion(.cancel) } - return try await `self`.reduce(action: action) + return try await state.reduce(action: action) } mutating func reduce(action: Action) async throws -> Reducer.Effect { - guard case let .receive(result, resumption) = action else { - fatalError("Unknown action") + var result: AsyncStream.Result! = .none + var resumption: Resumption! = .none + if case let .receive(r1, r3) = action { + result = r1 + resumption = r3 + } else if case let .nonBlockingReceive(r2) = action { + result = r2 } do { let subscribers: Int = try await withResumption { innerResumption in @@ -69,10 +80,10 @@ public struct RepeatReceiveState { fatalError("Unhandled continuation value") } } - resumption.resume(returning: subscribers) + resumption?.resume(returning: subscribers) return .none } catch { - resumption.resume(throwing: error) + resumption?.resume(throwing: error) return .completion(.failure(error)) } } diff --git a/Sources/FreeCombine/Publishers/Heartbeat.swift b/Sources/FreeCombine/Publishers/Heartbeat.swift index 23c7c64..4e4a0e1 100644 --- a/Sources/FreeCombine/Publishers/Heartbeat.swift +++ b/Sources/FreeCombine/Publishers/Heartbeat.swift @@ -10,26 +10,39 @@ public func Heartbeat(interval: Duration) -> Publisher { } public extension Publisher where Output == UInt64 { - init(interval: Duration, maxTicks: Int = Int.max) { + init(interval: Duration, maxTicks: Int = Int.max, tickAtStart: Bool = false) { self = Publisher { continuation, downstream in .init { - let startTime = DispatchTime.now().uptimeNanoseconds var ticks: UInt64 = 0 continuation.resume() - while ticks < maxTicks { - guard !Task.isCancelled else { - return try await handleCancellation(of: downstream) + var maxTicks = maxTicks + do { + let start = DispatchTime.now().uptimeNanoseconds + var current = start + if tickAtStart { + maxTicks -= 1 + guard try await downstream(.value(current)) != .done else { + return .done + } } - ticks += 1 - let nextTime = startTime + (ticks * interval.inNanoseconds) - let currentTime = DispatchTime.now().uptimeNanoseconds - if currentTime > nextTime { continue } - switch try await downstream(.value(currentTime)) { - case .done: return .done - case .more: try? await Task.sleep(nanoseconds: nextTime - currentTime) + while ticks < maxTicks { + guard !Task.isCancelled else { + return try await handleCancellation(of: downstream) + } + ticks += 1 + let next = start + (ticks * interval.inNanoseconds) + current = DispatchTime.now().uptimeNanoseconds + if current > next { continue } + try? await Task.sleep(nanoseconds: next - current) + current = DispatchTime.now().uptimeNanoseconds + guard try await downstream(.value(current)) != .done else { + return .done + } } + _ = try await downstream(.completion(.finished)) + } catch { + throw error } - _ = try await downstream(.completion(.finished)) return .done } } @@ -52,27 +65,41 @@ public extension Publisher { clock: C, interval: C.Instant.Duration, tolerance: C.Instant.Duration? = .none, - maxTicks: Int = Int.max + maxTicks: Int = Int.max, + tickAtStart: Bool = false ) where Output == C.Instant { self = Publisher { continuation, downstream in .init { - let startTime = clock.now + let start = clock.now var ticks: Int = .zero continuation.resume() - while ticks < maxTicks { - guard !Task.isCancelled else { - return try await handleCancellation(of: downstream) + var maxTicks = maxTicks + do { + var current = start + if tickAtStart { + maxTicks -= 1 + guard try await downstream(.value(current)) != .done else { + return .done + } } - ticks += 1 - let nextTime = startTime.advanced(by: interval * ticks) - let currentTime = clock.now - if currentTime > nextTime { continue } - switch try await downstream(.value(currentTime)) { - case .done: return .done - case .more: try await clock.sleep(until: nextTime, tolerance: tolerance) + while ticks < maxTicks { + guard !Task.isCancelled else { + return try await handleCancellation(of: downstream) + } + ticks += 1 + let next = start.advanced(by: interval * ticks) + current = clock.now + if current > next { continue } + try await clock.sleep(until: next, tolerance: tolerance) + current = clock.now + guard try await downstream(.value(current)) != .done else { + return .done + } } + _ = try await downstream(.completion(.finished)) + } catch { + throw error } - _ = try await downstream(.completion(.finished)) return .done } } diff --git a/Sources/FreeCombine/Subject/Subject.swift b/Sources/FreeCombine/Subject/Subject.swift index b02f854..e0189c1 100644 --- a/Sources/FreeCombine/Subject/Subject.swift +++ b/Sources/FreeCombine/Subject/Subject.swift @@ -149,6 +149,22 @@ public extension Subject { return count } + func receive( + _ result: AsyncStream.Result + ) throws -> Void { + let queueStatus = receiveStateTask.send(.nonBlockingReceive(result)) + switch queueStatus { + case .enqueued: + () + case .terminated: + throw PublisherError.completed + case .dropped: + throw PublisherError.enqueueError + @unknown default: + throw PublisherError.enqueueError + } + } + @discardableResult @Sendable func send(_ result: AsyncStream.Result) async throws -> Int { try await receive(result) @@ -157,4 +173,10 @@ public extension Subject { @Sendable func send(_ value: Output) async throws -> Int { try await receive(.value(value)) } + @Sendable func send(_ result: AsyncStream.Result) throws -> Void { + try receive(result) + } + @Sendable func send(_ value: Output) throws -> Void { + try receive(.value(value)) + } } diff --git a/Sources/FreeCombine/Time/Debounce.swift b/Sources/FreeCombine/Time/Debounce.swift new file mode 100644 index 0000000..5c21021 --- /dev/null +++ b/Sources/FreeCombine/Time/Debounce.swift @@ -0,0 +1,57 @@ +// +// Debounce.swift +// +// +// Created by Van Simmons on 7/8/22. +// +extension Publisher { + private func cleanup( + _ subject: Subject, + _ subjectRef: ValueRef?>, + _ cancellable: Cancellable, + _ cancellableRef: ValueRef?> + ) async throws -> Void { + try await subject.finish() + _ = await subject.result + await subjectRef.set(value: .none) + _ = await cancellable.result + await cancellableRef.set(value: .none) + } + + func debounce( + interval: Duration + ) -> Self { + .init { continuation, downstream in + let subjectRef = ValueRef?>(value: .none) + let cancellableRef = ValueRef?>(value: .none) + return self(onStartup: continuation) { r in + var subject: Subject! = await subjectRef.value + var cancellable: Cancellable! = await cancellableRef.value + if subject == nil { + subject = try await PassthroughSubject() + await subjectRef.set(value: subject) + cancellable = await subject.publisher() + .delayEachDemand(interval: interval) + .sink(downstream) + await cancellableRef.set(value: cancellable) + } + guard !Task.isCancelled && !cancellable.isCancelled else { + try await cleanup(subject, subjectRef, cancellable, cancellableRef) + return try await handleCancellation(of: downstream) + } + do { let _: Void = try subject.send(r) } + catch { /* ignore failure to enqueue, that's the entire point */ } + switch r { + case .value: + return .more + case .completion(.finished), .completion(.cancelled): + try await cleanup(subject, subjectRef, cancellable, cancellableRef) + return .done + case .completion(.failure(let error)): + try await cleanup(subject, subjectRef, cancellable, cancellableRef) + throw error + } + } + } + } +} diff --git a/Sources/FreeCombine/Time/DelayEachDemand.swift b/Sources/FreeCombine/Time/DelayEachDemand.swift new file mode 100644 index 0000000..c23ddec --- /dev/null +++ b/Sources/FreeCombine/Time/DelayEachDemand.swift @@ -0,0 +1,21 @@ +// +// DelayEachDemand.swift +// +// +// Created by Van Simmons on 7/9/22. +// + +extension Publisher { + func delayEachDemand( + interval: Duration + ) -> Self { + .init { resumption, downstream in + self(onStartup: resumption) { r in + let demand = try await downstream(r) + guard demand != .done else { return demand } + try await Task.sleep(nanoseconds: interval.inNanoseconds) + return demand + } + } + } +} diff --git a/Sources/FreeCombine/Time/Throttle.swift b/Sources/FreeCombine/Time/Throttle.swift index 17b42ee..a889954 100644 --- a/Sources/FreeCombine/Time/Throttle.swift +++ b/Sources/FreeCombine/Time/Throttle.swift @@ -4,13 +4,25 @@ // // Created by Van Simmons on 7/4/22. // -//func throttle(for interval: S.SchedulerTimeType.Stride, scheduler: S, -// latest: Bool) -> Publishers.Throttle -import Atomics + extension Publisher { + private func cleanup( + _ subject: Subject, + _ subjectRef: ValueRef?>, + _ cancellable: Cancellable, + _ cancellableRef: ValueRef?> + ) async throws -> Void { + try await subject.finish() + _ = await subject.result + await subjectRef.set(value: .none) + _ = await cancellable.result + await cancellableRef.set(value: .none) + } + func throttle( interval: Duration, - latest: Bool + latest: Bool = false, + bufferSize: Int = 1 ) -> Self { .init { continuation, downstream in let subjectRef = ValueRef?>(value: .none) @@ -20,37 +32,28 @@ extension Publisher { var cancellable: Cancellable! = await cancellableRef.value if subject == nil { subject = try await PassthroughSubject( - buffering: latest ? .bufferingNewest(1) : .bufferingOldest(1) + buffering: latest ? .bufferingNewest(bufferSize) : .bufferingOldest(bufferSize) ) await subjectRef.set(value: subject) - cancellable = await subject.publisher().throttleDemand(interval: interval).sink(downstream) + cancellable = await subject.publisher() + .throttleDemand(interval: interval) + .sink(downstream) await cancellableRef.set(value: cancellable) } guard !Task.isCancelled && !cancellable.isCancelled else { - try await subject.finish() - await subjectRef.set(value: .none) - _ = await subject.result - await cancellableRef.set(value: cancellable) - _ = await cancellable.result + try await cleanup(subject, subjectRef, cancellable, cancellableRef) return try await handleCancellation(of: downstream) } - _ = try await subject.send(r) + do { let _: Void = try subject.send(r) } + catch { /* ignore failure to enqueue */ } switch r { case .value: return .more case .completion(.finished), .completion(.cancelled): - try await subject.finish() - await subjectRef.set(value: .none) - _ = await subject.result - await cancellableRef.set(value: cancellable) - _ = await cancellable.result + try await cleanup(subject, subjectRef, cancellable, cancellableRef) return .done case .completion(.failure(let error)): - try await subject.finish() - await subjectRef.set(value: .none) - _ = await subject.result - await cancellableRef.set(value: cancellable) - _ = await cancellable.result + try await cleanup(subject, subjectRef, cancellable, cancellableRef) throw error } } diff --git a/Tests/TimeTests/HeartbeatTests.swift b/Tests/TimeTests/HeartbeatTests.swift new file mode 100644 index 0000000..136e3d4 --- /dev/null +++ b/Tests/TimeTests/HeartbeatTests.swift @@ -0,0 +1,46 @@ +// +// HeartbeatTests.swift +// +// +// Created by Van Simmons on 7/6/22. +// + +import XCTest +@testable import FreeCombine +@testable import Time + +class HeartbeatTests: XCTestCase { + + override func setUpWithError() throws { } + + override func tearDownWithError() throws { } + + func testSimpleHeartbeat() async throws { + let inputCounter = Counter() + let counter = Counter() + var t: Cancellable! + t = await Heartbeat(interval: .milliseconds(500)) + .handleEvents(receiveOutput: { _ in await inputCounter.increment() }) + .sink({ value in + switch value { + case .value: + let count = await counter.increment() + return count >= 10 ? .done : .more + case let .completion(.failure(error)): + XCTFail("Got unexpected failure: \(error)") + return .done + case .completion(.finished): + return .done + case .completion(.cancelled): + return .done + } + }) + + let r = await t.result + print(r) + let count = await counter.count + XCTAssert(count == 10, "Got wrong count = \(count)") + let inputCount = await inputCounter.count + XCTAssert(inputCount == 10, "Got wrong input count = \(inputCount)") + } +} diff --git a/Tests/TimeTests/ThrottleDemandTests.swift b/Tests/TimeTests/ThrottleDemandTests.swift index 6dedad0..aa98a2a 100644 --- a/Tests/TimeTests/ThrottleDemandTests.swift +++ b/Tests/TimeTests/ThrottleDemandTests.swift @@ -19,9 +19,10 @@ class ThrottleDemandTests: XCTestCase { let expectation = await Expectation() let counter = Counter() - let t = await (1 ... 15).asyncPublisher - .throttleDemand(interval: .milliseconds(100)) + let t = await (0 ... 15).asyncPublisher + .throttleDemand(interval: .seconds(1)) .sink({ value in + print(value) switch value { case .value(_): await counter.increment() @@ -38,11 +39,11 @@ class ThrottleDemandTests: XCTestCase { }) do { - try await FreeCombine.wait(for: expectation, timeout: 1_000_000_000) + try await FreeCombine.wait(for: expectation, timeout: 10_000_000_000) } catch { t.cancel() let count = await counter.count - XCTAssert(count == 11, "Got wrong count = \(count)") + XCTAssert(count == 10, "Got wrong count = \(count)") } _ = await t.result diff --git a/Tests/TimeTests/ThrottleTests.swift b/Tests/TimeTests/ThrottleTests.swift index 4596232..916aa39 100644 --- a/Tests/TimeTests/ThrottleTests.swift +++ b/Tests/TimeTests/ThrottleTests.swift @@ -18,8 +18,10 @@ class ThrottleTests: XCTestCase { func testSimpleThrottle() async throws { let expectation = await Expectation() + let inputCounter = Counter() let counter = Counter() let t = await (1 ... 15).asyncPublisher + .handleEvents(receiveOutput: { _ in await inputCounter.increment() }) .throttle(interval: .milliseconds(1000), latest: false) .sink({ value in switch value { @@ -34,16 +36,110 @@ class ThrottleTests: XCTestCase { case .completion(.cancelled): return .done } - }) + }) do { try await FreeCombine.wait(for: expectation, timeout: 1_000_000_000) } catch { t.cancel() let count = await counter.count - XCTAssert(count == 2, "Got wrong count = \(count)") + let inputCount = await inputCounter.count + XCTAssert(count == 1, "Got wrong count = \(count)") + XCTAssert(inputCount == 15, "Got wrong count = \(inputCount)") } _ = await t.result } + + func testSimpleSubjectThrottle() async throws { + let values = ValueRef<[Int]>.init(value: []) + let inputCounter = Counter() + let counter = Counter() + let subject = try await PassthroughSubject(Int.self) + let t = await subject.publisher() + .handleEvents(receiveOutput: { _ in await inputCounter.increment() }) + .throttle(interval: .milliseconds(1000), latest: false) + .sink({ value in + switch value { + case .value(let value): + let vals = await values.value + await values.set(value: vals + [value]) + await counter.increment() + return .more + case let .completion(.failure(error)): + XCTFail("Got unexpected failure: \(error)") + return .done + case .completion(.finished): + return .done + case .completion(.cancelled): + XCTFail("Should not have cancelled") + return .done + } + }) + + for i in (0 ..< 15) { + try await subject.send(i) + try await Task.sleep(nanoseconds: 100_000_000) + } + + try await subject.finish() + _ = await subject.result + + let count = await counter.count + XCTAssert(count == 2, "Got wrong count = \(count)") + + let inputCount = await inputCounter.count + XCTAssert(inputCount == 15, "Got wrong count = \(inputCount)") + + let vals = await values.value + XCTAssert(vals == [0, 1], "Incorrect values") + + _ = await t.result + } + + func testSimpleSubjectThrottleLatest() async throws { + let values = ValueRef<[Int]>.init(value: []) + let inputCounter = Counter() + let counter = Counter() + let subject = try await PassthroughSubject(Int.self) + let t = await subject.publisher() + .handleEvents(receiveOutput: { _ in await inputCounter.increment() }) + .throttle(interval: .milliseconds(1000), latest: true) + .sink({ value in + switch value { + case .value(let value): + let vals = await values.value + await values.set(value: vals + [value]) + await counter.increment() + return .more + case let .completion(.failure(error)): + XCTFail("Got unexpected failure: \(error)") + return .done + case .completion(.finished): + return .done + case .completion(.cancelled): + XCTFail("Should not have cancelled") + return .done + } + }) + + for i in (0 ..< 15) { + try await subject.send(i) + try await Task.sleep(nanoseconds: 100_000_000) + } + + try await subject.finish() + _ = await subject.result + + let count = await counter.count + XCTAssert(count == 2, "Got wrong count = \(count)") + + let inputCount = await inputCounter.count + XCTAssert(inputCount == 15, "Got wrong count = \(inputCount)") + + let vals = await values.value + XCTAssert(vals == [0, 9], "Incorrect values") + + _ = await t.result + } }