Skip to content

Commit

Permalink
Add async helpers to Schedulers
Browse files Browse the repository at this point in the history
Credits to @pointfreeco for the work done in:
https://github.com/pointfreeco/combine-schedulers

## Changes

- Add new `async` APIs to `DateScheduler` to enable waiting for them in
asynchronous contexts:
  + `sleep(for:leeway) async`
  + `sleep(until:leeway) async`
  + `timer(interval:leeway) async`

- Add new `async` variants `TestScheduler`'s `advance` and `run` APIs,
to enable waiting for them in asynchronous contexts:
  + `advance()` async`
  + `advance(by:) async` (`DispatchTimeInterval` and `TimeInterval`)
  + `advance(to:) async`
  + `run() async`

- Refactor `TestScheduler.advance(to:)`'s loop condition to be based on
`_currentDate` instead of `scheduledActions.count` to be consistent
with the new `async` variant.

- Add `NSRecursiveLock.withLock` implementation for older OS versions.
  • Loading branch information
p4checo committed Nov 13, 2022
1 parent ade1518 commit d222a44
Show file tree
Hide file tree
Showing 6 changed files with 503 additions and 102 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# master
*Please add new entries at the top.*

1. Add `async` helpers to Schedulers (#857, kudos to @p4checo)
1. Add primary associated types to SignalProducerConvertible & SignalProducerProtocol (#855, kudos to @braker1nine)
1. Refactor Github Actions to cover more swift versions (#858, kudos to @braker1nine)
2. Refactor Github Actions to cover more swift versions (#858, kudos to @braker1nine)
1.Use `OSAllocatedUnfairLock` instead of `os_unfair_lock` on supported Apple platforms (#856, kudos to @mluisbrown)

# 7.0.0
Expand Down
113 changes: 37 additions & 76 deletions ReactiveSwift.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Sources/FoundationExtensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,10 @@ extension DispatchTimeInterval {
return result
}
}

extension TimeInterval {

internal var dispatchTimeInterval: DispatchTimeInterval {
.nanoseconds(Int(self * TimeInterval(NSEC_PER_SEC)))
}
}
251 changes: 226 additions & 25 deletions Sources/Scheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,120 @@ public protocol DateScheduler: Scheduler {
func schedule(after date: Date, interval: DispatchTimeInterval, leeway: DispatchTimeInterval, action: @escaping () -> Void) -> Disposable?
}

extension DateScheduler {

/// Schedules a recurring action after given delay repeated at the given,
/// interval, beginning at the given interval counted from `currentDate`.
///
/// - parameters:
/// - delay: A delay for action's dispatch.
/// - interval: A repetition interval.
/// - leeway: Some delta for repetition interval.
/// - action: A closure of the action to repeat.
///
/// - returns: Optional `Disposable` that can be used to cancel the work
/// before it begins.
@discardableResult
public func schedule(after delay: DispatchTimeInterval, interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0), action: @escaping () -> Void) -> Disposable? {
return schedule(after: currentDate.addingTimeInterval(delay), interval: interval, leeway: leeway, action: action)
}
}

#if canImport(_Concurrency) && compiler(>=5.5.2)

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension DateScheduler {

/// Suspends the current task for at least the given duration.
///
/// If the task is cancelled before the time ends, this function throws `CancellationError`.
///
/// This function doesn't block the scheduler.
///
/// ```
/// try await in scheduler.sleep(for: .seconds(1))
/// ```
///
/// - precondition: `interval` must be non-negative number.
/// - precondition: `leeway` must be non-negative number.
///
/// - Parameters:
/// - duration: The time interval on which to sleep between yielding.
/// - leeway: The allowed timing variance when emitting events. Defaults to `.seconds(0)`.
public func sleep(for interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0)) async throws {
precondition(interval.timeInterval >= 0)
precondition(leeway.timeInterval >= 0)

try Task.checkCancellation()
_ = await self
.timer(interval: interval, leeway: leeway)
.first { _ in true }
try Task.checkCancellation()
}

/// Suspend task execution until a given deadline within a tolerance.
///
/// If the task is cancelled before the time ends, this function throws `CancellationError`.
///
/// This function doesn't block the scheduler.
///
/// ```
/// try await in scheduler.sleep(until: scheduler.now + .seconds(1))
/// ```
///
/// - precondition: `deadline` must be greater than the current date (i.e. in its future).
/// - precondition: `leeway` must be non-negative number.
///
/// - Parameters:
/// - deadline: An instant of time to suspend until.
/// - leeway: The allowed timing variance when emitting events. Defaults to `.seconds(0)`.
public func sleep(until deadline: Date, leeway: DispatchTimeInterval = .seconds(0)) async throws {
precondition(leeway.timeInterval >= 0)
precondition(deadline > currentDate)

try await self.sleep(
for: deadline.timeIntervalSince(currentDate).dispatchTimeInterval,
leeway: leeway
)
}

/// Returns a stream that repeatedly yields the current time of the scheduler on a given interval.
///
/// If the task is cancelled, the sequence will terminate.
///
/// ```
/// for await instant in scheduler.timer(interval: .seconds(1)) {
/// print("now:", instant)
/// }
/// ```
///
/// - precondition: `interval` must be non-negative number.
/// - precondition: `leeway` must be non-negative number.
///
/// - Parameters:
/// - interval: The time interval on which to sleep between yielding the current instant in
/// time. For example, a value of `0.5` yields an instant approximately every half-second.
/// - leeway: The allowed timing variance when emitting events. Defaults to `.seconds(0)`.
/// - Returns: A stream that repeatedly yields the current time.
public func timer(interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0)) -> AsyncStream<Date> {
precondition(interval.timeInterval >= 0)
precondition(leeway.timeInterval >= 0)

return .init { continuation in
let disposable = self.schedule(after: interval, interval: interval) {
continuation.yield(self.currentDate)
}
continuation.onTermination = { _ in
disposable?.dispose()
}
// NB: This explicit cast is needed to work around a compiler bug in Swift 5.5.2
as @Sendable (AsyncStream<Date>.Continuation.Termination) -> Void
}
}
}

#endif

/// A scheduler that performs all work synchronously.
public final class ImmediateScheduler: Scheduler {
public init() {}
Expand Down Expand Up @@ -510,22 +624,6 @@ public final class TestScheduler: DateScheduler {
}
}

/// Schedules a recurring action after given delay repeated at the given,
/// interval, beginning at the given interval counted from `currentDate`.
///
/// - parameters:
/// - delay: A delay for action's dispatch.
/// - interval: A repetition interval.
/// - leeway: Some delta for repetition interval.
/// - action: A closure of the action to repeat.
///
/// - returns: Optional `Disposable` that can be used to cancel the work
/// before it begins.
@discardableResult
public func schedule(after delay: DispatchTimeInterval, interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0), action: @escaping () -> Void) -> Disposable? {
return schedule(after: currentDate.addingTimeInterval(delay), interval: interval, leeway: leeway, action: action)
}

/// Schedules a recurring action at the given interval with
/// provided leeway, beginning at the given start date.
///
Expand Down Expand Up @@ -584,19 +682,20 @@ public final class TestScheduler: DateScheduler {

assert(currentDate <= newDate)

while scheduledActions.count > 0 {
if newDate < scheduledActions[0].date {
break
while _currentDate <= newDate {
guard
let next = scheduledActions.first,
newDate >= next.date
else {
_currentDate = newDate
return
}

_currentDate = scheduledActions[0].date

let scheduledAction = scheduledActions.remove(at: 0)
scheduledAction.action()
_currentDate = next.date
scheduledActions.removeFirst()
next.action()
}

_currentDate = newDate

lock.unlock()
}

Expand All @@ -621,4 +720,106 @@ public final class TestScheduler: DateScheduler {
lock.unlock()

}

#if canImport(_Concurrency) && compiler(>=5.5.2)

/// Advances the virtualized clock by an extremely tiny interval, dequeuing
/// and executing any actions along the way.
///
/// This is intended to be used as a way to execute actions that have been
/// scheduled to run as soon as possible.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance() async {
await advance(by: .nanoseconds(1))
}

/// Advances the virtualized clock by the given interval, dequeuing and
/// executing any actions along the way.
///
/// - parameters:
/// - interval: Interval by which the current date will be advanced.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance(by interval: DispatchTimeInterval) async {
await advance(to: lock.withLock({ currentDate.addingTimeInterval(interval) }))
}

/// Advances the virtualized clock by the given interval, dequeuing and
/// executing any actions along the way.
///
/// - parameters:
/// - interval: Interval by which the current date will be advanced.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance(by interval: TimeInterval) async {
await advance(to: lock.withLock({ currentDate.addingTimeInterval(interval) }))
}

/// Advances the virtualized clock to the given future date, dequeuing and
/// executing any actions up until that point.
///
/// - parameters:
/// - newDate: Future date to which the virtual clock will be advanced.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance(to newDate: Date) async {
assert(lock.withLock { _currentDate <= newDate })

while lock.withLock({ _currentDate }) <= newDate {
await Task.megaYield()

let `return` = lock.withLock {
guard
let next = scheduledActions.first,
newDate >= next.date
else {
_currentDate = newDate
return true
}

_currentDate = next.date
scheduledActions.removeFirst()
next.action()
return false
}

if `return` {
return
}
}
}

@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func run() async {
await Task.megaYield()
await advance(to: Date.distantFuture)
}
#endif
}

@available(macOS, obsoleted: 13, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
@available(iOS, obsoleted: 13, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
@available(watchOS, obsoleted: 9, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
@available(macCatalyst, obsoleted: 16, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
extension NSRecursiveLock {
fileprivate func withLock<T>(_ body: () throws -> T) rethrows -> T {
self.lock()
defer { self.unlock() }
return try body()
}
}

// Credits to @pointfreeco
// https://github.com/pointfreeco/combine-schedulers
#if canImport(_Concurrency) && compiler(>=5.5.2)
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension Task where Success == Failure, Failure == Never {
static func megaYield(count: Int = 10) async {
for _ in 1...count {
await Task<Void, Never>.detached(priority: .background) { await Task.yield() }.value
}
}
}
#endif
Loading

0 comments on commit d222a44

Please sign in to comment.