From 9433b4d3b42bb5e431c0c4810e93b492779fbf70 Mon Sep 17 00:00:00 2001 From: neuecc Date: Tue, 20 Feb 2024 17:10:24 +0900 Subject: [PATCH] Add ThrottleLatest --- src/R3/Operators/ThrottleLastFrame.cs | 28 +- src/R3/Operators/ThrottleLatest.cs | 269 ++++++++++++++++++ src/R3/Operators/ThrottleLatestFrame.cs | 91 ++++++ .../OperatorTests/ThrottleLatestTest.cs | 171 +++++++++++ 4 files changed, 548 insertions(+), 11 deletions(-) create mode 100644 src/R3/Operators/ThrottleLatest.cs create mode 100644 src/R3/Operators/ThrottleLatestFrame.cs create mode 100644 tests/R3.Tests/OperatorTests/ThrottleLatestTest.cs diff --git a/src/R3/Operators/ThrottleLastFrame.cs b/src/R3/Operators/ThrottleLastFrame.cs index 9b1e3cfe..7ec9b658 100644 --- a/src/R3/Operators/ThrottleLastFrame.cs +++ b/src/R3/Operators/ThrottleLastFrame.cs @@ -1,4 +1,6 @@ -namespace R3; +using System.Runtime.InteropServices; + +namespace R3; public static partial class ObservableExtensions { @@ -23,24 +25,31 @@ protected override IDisposable SubscribeCore(Observer observer) sealed class _ThrottleLastFrame : Observer, IFrameRunnerWorkItem { readonly Observer observer; + readonly FrameProvider frameProvider; readonly int frameCount; readonly object gate = new object(); T? lastValue; - bool hasValue; int currentFrame; + bool running; public _ThrottleLastFrame(Observer observer, int frameCount, FrameProvider frameProvider) { this.observer = observer; this.frameCount = frameCount; - frameProvider.Register(this); + this.frameProvider = frameProvider; } protected override void OnNextCore(T value) { lock (gate) { - hasValue = true; + if (!running) + { + running = true; + currentFrame = 0; + frameProvider.Register(this); + } + lastValue = value; } } @@ -63,13 +72,10 @@ bool IFrameRunnerWorkItem.MoveNext(long _) { if (++currentFrame == frameCount) { - if (hasValue) - { - observer.OnNext(lastValue!); - hasValue = false; - lastValue = default; - currentFrame = 0; - } + observer.OnNext(lastValue!); + lastValue = default; + running = false; + return false; } } diff --git a/src/R3/Operators/ThrottleLatest.cs b/src/R3/Operators/ThrottleLatest.cs new file mode 100644 index 00000000..936b3b15 --- /dev/null +++ b/src/R3/Operators/ThrottleLatest.cs @@ -0,0 +1,269 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable ThrottleLatest(this Observable source, TimeSpan timeSpan) + { + return new ThrottleLatest(source, timeSpan, ObservableSystem.DefaultTimeProvider); + } + + public static Observable ThrottleLatest(this Observable source, TimeSpan timeSpan, TimeProvider timeProvider) + { + return new ThrottleLatest(source, timeSpan, timeProvider); + } + + public static Observable ThrottleLatest(this Observable source, Observable sampler) + { + return new ThrottleLatestObservableSampler(source, sampler); + } + + public static Observable ThrottleLatest(this Observable source, Func sampler, bool configureAwait = true) + { + return new ThrottleLatestAsyncSampler(source, sampler, configureAwait); + } +} + +internal sealed class ThrottleLatest(Observable source, TimeSpan interval, TimeProvider timeProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ThrottleLatest(observer, interval.Normalize(), timeProvider)); + } + + sealed class _ThrottleLatest : Observer + { + static readonly TimerCallback timerCallback = RaiseOnNext; + + readonly Observer observer; + readonly TimeSpan interval; + readonly ITimer timer; + readonly object gate = new object(); + T? lastValue; + bool hasValue; + bool timerIsRunning; + + public _ThrottleLatest(Observer observer, TimeSpan interval, TimeProvider timeProvider) + { + this.observer = observer; + this.interval = interval; + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + if (!timerIsRunning) // timer is stopping + { + timerIsRunning = true; + timer.InvokeOnce(interval); // timer start before OnNext + observer.OnNext(value); // call OnNext in lock + return; + } + else + { + hasValue = true; + lastValue = value; + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + timer.Dispose(); + } + + static void RaiseOnNext(object? state) + { + var self = (_ThrottleLatest)state!; + lock (self.gate) + { + self.timerIsRunning = false; + if (self.hasValue) + { + self.observer.OnNext(self.lastValue!); + self.hasValue = false; + self.lastValue = default; + } + } + } + } +} + +internal sealed class ThrottleLatestAsyncSampler(Observable source, Func sampler, bool configureAwait) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ThrottleLatest(observer, sampler, configureAwait)); + } + + sealed class _ThrottleLatest(Observer observer, Func sampler, bool configureAwait) : Observer + { + readonly object gate = new object(); + readonly CancellationTokenSource cancellationTokenSource = new(); + T? lastValue; + bool hasValue; + Task? taskRunner; + + protected override void OnNextCore(T value) + { + lock (gate) + { + if (taskRunner == null) + { + taskRunner = RaiseOnNextAsync(value); + observer.OnNext(value); + } + else + { + hasValue = true; + lastValue = value; + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + cancellationTokenSource.Cancel(); + cancellationTokenSource.Dispose(); + } + + async Task RaiseOnNextAsync(T value) + { + try + { + await sampler(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait); + } + catch (Exception ex) + { + if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token) + { + return; + } + OnErrorResume(ex); + } + finally + { + lock (gate) + { + if (hasValue) + { + observer.OnNext(lastValue!); + lastValue = default; + hasValue = false; + taskRunner = null; + } + } + } + } + } +} + +internal sealed class ThrottleLatestObservableSampler(Observable source, Observable sampler) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ThrottleLatest(observer, sampler)); + } + + sealed class _ThrottleLatest : Observer + { + readonly Observer observer; + readonly object gate = new object(); + readonly IDisposable samplerSubscription; + T? lastValue; + bool hasValue; + bool closing; + + public _ThrottleLatest(Observer observer, Observable sampler) + { + this.observer = observer; + var sampleObserver = new SamplerObserver(this); + this.samplerSubscription = sampler.Subscribe(sampleObserver); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + if (!closing) + { + closing = true; + observer.OnNext(value); + } + else + { + lastValue = value; + hasValue = true; + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + samplerSubscription.Dispose(); + } + + void PublishOnNext() + { + lock (gate) + { + closing = false; + if (hasValue) + { + observer.OnNext(lastValue!); + hasValue = false; + lastValue = default; + } + } + } + + sealed class SamplerObserver(_ThrottleLatest parent) : Observer + { + protected override void OnNextCore(TSample value) + { + parent.PublishOnNext(); + } + + protected override void OnErrorResumeCore(Exception error) + { + parent.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + parent.OnCompleted(result); + } + } + } +} diff --git a/src/R3/Operators/ThrottleLatestFrame.cs b/src/R3/Operators/ThrottleLatestFrame.cs new file mode 100644 index 00000000..f7b0b56b --- /dev/null +++ b/src/R3/Operators/ThrottleLatestFrame.cs @@ -0,0 +1,91 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable ThrottleLatestFrame(this Observable source, int frameCount) + { + return new ThrottleLatestFrame(source, frameCount, ObservableSystem.DefaultFrameProvider); + } + + public static Observable ThrottleLatestFrame(this Observable source, int frameCount, FrameProvider frameProvider) + { + return new ThrottleLatestFrame(source, frameCount, frameProvider); + } +} + +internal sealed class ThrottleLatestFrame(Observable source, int frameCount, FrameProvider frameProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ThrottleLatestFrame(observer, frameCount.NormalizeFrame(), frameProvider)); + } + + sealed class _ThrottleLatestFrame : Observer, IFrameRunnerWorkItem + { + readonly Observer observer; + readonly FrameProvider frameProvider; + readonly int frameCount; + readonly object gate = new object(); + T? lastValue; + bool hasValue; + int currentFrame; + bool running; + + public _ThrottleLatestFrame(Observer observer, int frameCount, FrameProvider frameProvider) + { + this.observer = observer; + this.frameCount = frameCount; + this.frameProvider = frameProvider; + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + if (!running) + { + running = true; + currentFrame = 0; + frameProvider.Register(this); + observer.OnNext(value); + } + else + { + hasValue = true; + lastValue = value; + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + bool IFrameRunnerWorkItem.MoveNext(long _) + { + if (this.IsDisposed) return false; + + lock (gate) + { + if (++currentFrame == frameCount) + { + if (hasValue) + { + observer.OnNext(lastValue!); + lastValue = default; + } + running = false; + return false; + } + } + + return true; + } + } +} diff --git a/tests/R3.Tests/OperatorTests/ThrottleLatestTest.cs b/tests/R3.Tests/OperatorTests/ThrottleLatestTest.cs new file mode 100644 index 00000000..81647569 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/ThrottleLatestTest.cs @@ -0,0 +1,171 @@ +namespace R3.Tests.OperatorTests; + +public class ThrottleLatestTest +{ + // ThrottleLatest(TimeSpan) + [Fact] + public void ThrottleLatest() + { + var timeProvider = new FakeTimeProvider(); + + var publisher = new Subject(); + var list = publisher.ThrottleLatest(TimeSpan.FromSeconds(3), timeProvider).ToLiveList(); + + publisher.OnNext(1); + list.AssertEqual([1]); + + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([1]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([1]); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + list.AssertEqual([1, 10000]); + publisher.OnNext(2); + list.AssertEqual([1, 10000, 2]); + publisher.OnNext(20); + publisher.OnNext(200); + + timeProvider.Advance(TimeSpan.FromSeconds(3)); + list.AssertEqual([1, 10000, 2, 200]); + publisher.OnNext(3); + list.AssertEqual([1, 10000, 2, 200, 3]); + + publisher.OnCompleted(); + + list.AssertEqual([1, 10000, 2, 200, 3]); + list.AssertIsCompleted(); + } + + // ThrottleLatest(async) + [Fact] + public void ThrottleLatestAsyncSampler() + { + SynchronizationContext.SetSynchronizationContext(null); + + var publisher = new Subject(); + var fakeTime = new FakeTimeProvider(); + var list = publisher.ThrottleLatest(async (x, ct) => + { + await fakeTime.Delay(TimeSpan.FromSeconds(x), ct); + }).ToLiveList(); + + publisher.OnNext(1); // gate close + list.AssertEqual([1]); + publisher.OnNext(2); + publisher.OnNext(3); + + list.AssertEqual([1]); + + fakeTime.Advance(1); // gate open + list.AssertEqual([1, 3]); + + publisher.OnNext(5); + list.AssertEqual([1, 3, 5]); + + publisher.OnNext(6); + publisher.OnNext(7); + + fakeTime.Advance(4); + list.AssertEqual([1, 3, 5]); + + publisher.OnNext(8); + + list.AssertEqual([1, 3, 5]); + + fakeTime.Advance(1); + list.AssertEqual([1, 3, 5, 8]); + + publisher.OnCompleted(); + + list.AssertIsCompleted(); + } + + // ThrottleLatest(Observable) + [Fact] + public void ThrottleLatestObservableSampler() + { + SynchronizationContext.SetSynchronizationContext(null); + + var publisher = new Subject(); + var sampler = new Subject(); + var list = publisher.ThrottleLatest(sampler).ToLiveList(); + + publisher.OnNext(1); // gate close + list.AssertEqual([1]); + publisher.OnNext(2); + publisher.OnNext(3); + + list.AssertEqual([1]); + + sampler.OnNext(Unit.Default); + list.AssertEqual([1, 3]); + + publisher.OnNext(5); + list.AssertEqual([1, 3, 5]); + publisher.OnNext(6); + publisher.OnNext(7); + + sampler.OnNext(Unit.Default); + list.AssertEqual([1, 3, 5, 7]); + + + publisher.OnNext(8); + list.AssertEqual([1, 3, 5, 7, 8]); + + sampler.OnNext(Unit.Default); + list.AssertEqual([1, 3, 5, 7, 8]); + sampler.OnNext(Unit.Default); + sampler.OnNext(Unit.Default); + list.AssertEqual([1, 3, 5, 7, 8]); + + publisher.OnCompleted(); + + list.AssertIsCompleted(); + } + + + // ThrottleLatestFrame + + [Fact] + public void ThrottleLatestFrame() + { + var frameProvider = new FakeFrameProvider(); + + var publisher = new Subject(); + var list = publisher.ThrottleLatestFrame(3, frameProvider).ToLiveList(); + + publisher.OnNext(1); + list.AssertEqual([1]); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([1]); + + frameProvider.Advance(2); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([1]); + + frameProvider.Advance(1); + list.AssertEqual([1, 10000]); + publisher.OnNext(2); + list.AssertEqual([1, 10000, 2]); + publisher.OnNext(20); + publisher.OnNext(200); + + frameProvider.Advance(3); + list.AssertEqual([1, 10000, 2, 200]); + publisher.OnNext(3); + + publisher.OnCompleted(); + + list.AssertEqual([1, 10000, 2, 200, 3]); + list.AssertIsCompleted(); + } +}