Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(delay-operator): support long delays by segmenting the delay (#7440) #7441

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions packages/rxjs/src/internal/operators/delay.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import { asyncScheduler } from '../scheduler/async.js';
import type { MonoTypeOperatorFunction, SchedulerLike } from '../types.js';
import { delayWhen } from './delayWhen.js';
import { timer } from '../observable/timer.js';
import { asyncScheduler } from '../scheduler/async';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { delayWhen } from './delayWhen';
import { timer } from '../observable/timer';
import { Observable } from '../Observable';

/**
* Delays the emission of items from the source Observable by a given timeout or
* until a given Date.
*
* <span class="informal">Time shifts each item by some specified amount of
* milliseconds.</span>
* milliseconds, even for very long periods.</span>
*
* ![](delay.svg)
*
* If the delay argument is a Number, this operator time shifts the source
* Observable by that amount of time expressed in milliseconds. The relative
* time intervals between the values are preserved.
* time intervals between the values are preserved. For delays longer than 2147483647 ms (~24.9 days),
* the delay is segmented to avoid JavaScript timer limitations.
*
* If the delay argument is a Date, this operator time shifts the start of the
* Observable execution until the given date occurs.
* Observable execution until the given date occurs. It correctly handles dates far in the future.
*
* ## Examples
*
Expand Down Expand Up @@ -55,11 +57,27 @@ import { timer } from '../observable/timer.js';
* @param due The delay duration in milliseconds (a `number`) or a `Date` until
* which the emission of the source items is delayed.
* @param scheduler The {@link SchedulerLike} to use for managing the timers
* that handle the time-shift for each item.
* that handle the time-shift for each item, with support for long delays.
* @return A function that returns an Observable that delays the emissions of
* the source Observable by the specified timeout or Date.
* the source Observable by the specified timeout or Date, correctly handling long delays.
*/
export function delay<T>(due: number | Date, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
const duration = timer(due, scheduler);
return delayWhen(() => duration);
// Helper function to handle long delays
function delaySegmented(dueTime: number, action: () => void, scheduler: SchedulerLike) {
if (dueTime <= 2_147_483_647) {
scheduler.schedule(action, dueTime);
} else {
// Schedule the first segment up to the maximum limit
scheduler.schedule(() => {
// Calculate the remaining time and apply recursively
const remainingTime = dueTime - 2_147_483_647;
delaySegmented(remainingTime, action, scheduler);
}, 2_147_483_647);
}
}

const dueTime = due instanceof Date ? due.getTime() - Date.now() : due;
return delayWhen(() => new Observable<T>((subscriber) => {
delaySegmented(dueTime, () => subscriber.complete(), scheduler);
}));
}