Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async helpers to Schedulers #857

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# master
*Please add new entries at the top.*

1. Add `async` helpers to Schedulers (#857, kudos to @p4checo)
1. Add primary associated types to SignalProducerConvertible & SignalProducerProtocol (#855, kudos to @braker1nine)
1. Refactor Github Actions to cover more swift versions (#858, kudos to @braker1nine)
2. Refactor Github Actions to cover more swift versions (#858, kudos to @braker1nine)
1.Use `OSAllocatedUnfairLock` instead of `os_unfair_lock` on supported Apple platforms (#856, kudos to @mluisbrown)

# 7.0.0
Expand Down
113 changes: 37 additions & 76 deletions ReactiveSwift.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Sources/FoundationExtensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,10 @@ extension DispatchTimeInterval {
return result
}
}

extension TimeInterval {

internal var dispatchTimeInterval: DispatchTimeInterval {
.nanoseconds(Int(self * TimeInterval(NSEC_PER_SEC)))
}
}
251 changes: 226 additions & 25 deletions Sources/Scheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,120 @@ public protocol DateScheduler: Scheduler {
func schedule(after date: Date, interval: DispatchTimeInterval, leeway: DispatchTimeInterval, action: @escaping () -> Void) -> Disposable?
}

extension DateScheduler {

/// Schedules a recurring action after given delay repeated at the given,
/// interval, beginning at the given interval counted from `currentDate`.
///
/// - parameters:
/// - delay: A delay for action's dispatch.
/// - interval: A repetition interval.
/// - leeway: Some delta for repetition interval.
/// - action: A closure of the action to repeat.
///
/// - returns: Optional `Disposable` that can be used to cancel the work
/// before it begins.
@discardableResult
public func schedule(after delay: DispatchTimeInterval, interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0), action: @escaping () -> Void) -> Disposable? {
return schedule(after: currentDate.addingTimeInterval(delay), interval: interval, leeway: leeway, action: action)
}
}

#if canImport(_Concurrency) && compiler(>=5.5.2)

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension DateScheduler {

/// Suspends the current task for at least the given duration.
///
/// If the task is cancelled before the time ends, this function throws `CancellationError`.
///
/// This function doesn't block the scheduler.
///
/// ```
/// try await in scheduler.sleep(for: .seconds(1))
/// ```
///
/// - precondition: `interval` must be non-negative number.
/// - precondition: `leeway` must be non-negative number.
///
/// - Parameters:
/// - duration: The time interval on which to sleep between yielding.
/// - leeway: The allowed timing variance when emitting events. Defaults to `.seconds(0)`.
public func sleep(for interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0)) async throws {
precondition(interval.timeInterval >= 0)
precondition(leeway.timeInterval >= 0)

try Task.checkCancellation()
_ = await self
.timer(interval: interval, leeway: leeway)
.first { _ in true }
try Task.checkCancellation()
}

/// Suspend task execution until a given deadline within a tolerance.
///
/// If the task is cancelled before the time ends, this function throws `CancellationError`.
///
/// This function doesn't block the scheduler.
///
/// ```
/// try await in scheduler.sleep(until: scheduler.now + .seconds(1))
/// ```
///
/// - precondition: `deadline` must be greater than the current date (i.e. in its future).
/// - precondition: `leeway` must be non-negative number.
///
/// - Parameters:
/// - deadline: An instant of time to suspend until.
/// - leeway: The allowed timing variance when emitting events. Defaults to `.seconds(0)`.
public func sleep(until deadline: Date, leeway: DispatchTimeInterval = .seconds(0)) async throws {
precondition(leeway.timeInterval >= 0)
precondition(deadline > currentDate)

try await self.sleep(
for: deadline.timeIntervalSince(currentDate).dispatchTimeInterval,
leeway: leeway
)
}

/// Returns a stream that repeatedly yields the current time of the scheduler on a given interval.
///
/// If the task is cancelled, the sequence will terminate.
///
/// ```
/// for await instant in scheduler.timer(interval: .seconds(1)) {
/// print("now:", instant)
/// }
/// ```
///
/// - precondition: `interval` must be non-negative number.
/// - precondition: `leeway` must be non-negative number.
///
/// - Parameters:
/// - interval: The time interval on which to sleep between yielding the current instant in
/// time. For example, a value of `0.5` yields an instant approximately every half-second.
/// - leeway: The allowed timing variance when emitting events. Defaults to `.seconds(0)`.
/// - Returns: A stream that repeatedly yields the current time.
public func timer(interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0)) -> AsyncStream<Date> {
precondition(interval.timeInterval >= 0)
precondition(leeway.timeInterval >= 0)

return .init { continuation in
let disposable = self.schedule(after: interval, interval: interval) {
continuation.yield(self.currentDate)
}
continuation.onTermination = { _ in
disposable?.dispose()
}
// NB: This explicit cast is needed to work around a compiler bug in Swift 5.5.2
as @Sendable (AsyncStream<Date>.Continuation.Termination) -> Void
}
}
}

#endif

/// A scheduler that performs all work synchronously.
public final class ImmediateScheduler: Scheduler {
public init() {}
Expand Down Expand Up @@ -510,22 +624,6 @@ public final class TestScheduler: DateScheduler {
}
}

/// Schedules a recurring action after given delay repeated at the given,
/// interval, beginning at the given interval counted from `currentDate`.
///
/// - parameters:
/// - delay: A delay for action's dispatch.
/// - interval: A repetition interval.
/// - leeway: Some delta for repetition interval.
/// - action: A closure of the action to repeat.
///
/// - returns: Optional `Disposable` that can be used to cancel the work
/// before it begins.
@discardableResult
public func schedule(after delay: DispatchTimeInterval, interval: DispatchTimeInterval, leeway: DispatchTimeInterval = .seconds(0), action: @escaping () -> Void) -> Disposable? {
return schedule(after: currentDate.addingTimeInterval(delay), interval: interval, leeway: leeway, action: action)
}

/// Schedules a recurring action at the given interval with
/// provided leeway, beginning at the given start date.
///
Expand Down Expand Up @@ -584,19 +682,20 @@ public final class TestScheduler: DateScheduler {

assert(currentDate <= newDate)

while scheduledActions.count > 0 {
if newDate < scheduledActions[0].date {
break
while _currentDate <= newDate {
guard
let next = scheduledActions.first,
newDate >= next.date
else {
_currentDate = newDate
return
}

_currentDate = scheduledActions[0].date

let scheduledAction = scheduledActions.remove(at: 0)
scheduledAction.action()
_currentDate = next.date
scheduledActions.removeFirst()
next.action()
}

_currentDate = newDate

lock.unlock()
}

Expand All @@ -621,4 +720,106 @@ public final class TestScheduler: DateScheduler {
lock.unlock()

}

#if canImport(_Concurrency) && compiler(>=5.5.2)

/// Advances the virtualized clock by an extremely tiny interval, dequeuing
/// and executing any actions along the way.
///
/// This is intended to be used as a way to execute actions that have been
/// scheduled to run as soon as possible.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance() async {
await advance(by: .nanoseconds(1))
}

/// Advances the virtualized clock by the given interval, dequeuing and
/// executing any actions along the way.
///
/// - parameters:
/// - interval: Interval by which the current date will be advanced.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance(by interval: DispatchTimeInterval) async {
await advance(to: lock.withLock({ currentDate.addingTimeInterval(interval) }))
}

/// Advances the virtualized clock by the given interval, dequeuing and
/// executing any actions along the way.
///
/// - parameters:
/// - interval: Interval by which the current date will be advanced.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance(by interval: TimeInterval) async {
await advance(to: lock.withLock({ currentDate.addingTimeInterval(interval) }))
}

/// Advances the virtualized clock to the given future date, dequeuing and
/// executing any actions up until that point.
///
/// - parameters:
/// - newDate: Future date to which the virtual clock will be advanced.
@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func advance(to newDate: Date) async {
assert(lock.withLock { _currentDate <= newDate })

while lock.withLock({ _currentDate }) <= newDate {
await Task.megaYield()

let `return` = lock.withLock {
guard
let next = scheduledActions.first,
newDate >= next.date
else {
_currentDate = newDate
return true
}

_currentDate = next.date
scheduledActions.removeFirst()
next.action()
return false
}

if `return` {
return
}
}
}

@MainActor
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
public func run() async {
await Task.megaYield()
await advance(to: Date.distantFuture)
}
#endif
}

@available(macOS, obsoleted: 13, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
@available(iOS, obsoleted: 13, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
@available(watchOS, obsoleted: 9, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
@available(macCatalyst, obsoleted: 16, message: "`NSLocking` now provides `withLocking`, so this is no longer needed")
extension NSRecursiveLock {
fileprivate func withLock<T>(_ body: () throws -> T) rethrows -> T {
self.lock()
defer { self.unlock() }
return try body()
}
}

// Credits to @pointfreeco
// https://github.com/pointfreeco/combine-schedulers
#if canImport(_Concurrency) && compiler(>=5.5.2)
@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension Task where Success == Failure, Failure == Never {
static func megaYield(count: Int = 10) async {
for _ in 1...count {
await Task<Void, Never>.detached(priority: .background) { await Task.yield() }.value
}
}
}
#endif
Loading