diff --git a/src/R3/Operators/AppendPrepend.cs b/src/R3/Operators/AppendPrepend.cs index 24e742e7..b6e08278 100644 --- a/src/R3/Operators/AppendPrepend.cs +++ b/src/R3/Operators/AppendPrepend.cs @@ -7,12 +7,41 @@ public static Observable Append(this Observable source, T value) return new AppendPrepend(source, value, append: true); } + public static Observable Append(this Observable source, IEnumerable values) + { + return new AppendPrependEnumerable(source, values, append: true); + } + + public static Observable Append(this Observable source, Func valueFactory) + { + return new AppendPrependFactory(source, valueFactory, append: true); + } + + public static Observable Append(this Observable source, TState state, Func valueFactory) + { + return new AppendPrependFactory(source, state, valueFactory, append: true); + } + public static Observable Prepend(this Observable source, T value) { return new AppendPrepend(source, value, append: false); } -} + public static Observable Prepend(this Observable source, IEnumerable values) + { + return new AppendPrependEnumerable(source, values, append: false); + } + + public static Observable Prepend(this Observable source, Func valueFactory) + { + return new AppendPrependFactory(source, valueFactory, append: false); + } + + public static Observable Prepend(this Observable source, TState state, Func valueFactory) + { + return new AppendPrependFactory(source, state, valueFactory, append: false); + } +} internal sealed class AppendPrepend(Observable source, T value, bool append) : Observable { @@ -53,3 +82,151 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class AppendPrependEnumerable(Observable source, IEnumerable values, bool append) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + if (!append) // prepend + { + if (values is T[] array) + { + foreach (var value in array) + { + observer.OnNext(value); + } + } + else + { + foreach (var value in values) + { + observer.OnNext(value); + } + } + + return source.Subscribe(observer.Wrap()); + } + + return source.Subscribe(new _Append(observer, values)); + } + + sealed class _Append(Observer observer, IEnumerable values) : Observer + { + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + observer.OnCompleted(result); + } + else + { + if (values is T[] array) + { + foreach (var value in array) + { + observer.OnNext(value); + } + } + else + { + foreach (var value in values) + { + observer.OnNext(value); + } + } + + observer.OnCompleted(); + } + } + } +} + +internal sealed class AppendPrependFactory(Observable source, Func valueFactory, bool append) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + if (!append) // prepend + { + observer.OnNext(valueFactory()); + return source.Subscribe(observer.Wrap()); + } + + return source.Subscribe(new _Append(observer, valueFactory)); + } + + sealed class _Append(Observer observer, Func valueFactory) : Observer + { + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + observer.OnCompleted(result); + } + else + { + observer.OnNext(valueFactory()); + observer.OnCompleted(); + } + } + } +} + +internal sealed class AppendPrependFactory(Observable source, TState state, Func valueFactory, bool append) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + if (!append) // prepend + { + observer.OnNext(valueFactory(state)); + return source.Subscribe(observer.Wrap()); + } + + return source.Subscribe(new _Append(observer, state, valueFactory)); + } + + sealed class _Append(Observer observer, TState state, Func valueFactory) : Observer + { + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + observer.OnCompleted(result); + } + else + { + observer.OnNext(valueFactory(state)); + observer.OnCompleted(); + } + } + } +} diff --git a/src/R3/Operators/Skip.cs b/src/R3/Operators/Skip.cs index 3ca6f89c..2dae31ac 100644 --- a/src/R3/Operators/Skip.cs +++ b/src/R3/Operators/Skip.cs @@ -18,7 +18,7 @@ public static Observable Skip(this Observable source, TimeSpan duration public static Observable Skip(this Observable source, TimeSpan duration, TimeProvider timeProvider) { - return new SkipTime(source, duration, timeProvider); + return new SkipTime(source, duration.Normalize(), timeProvider); } // SkipFrame @@ -30,7 +30,7 @@ public static Observable SkipFrame(this Observable source, int frameCou public static Observable SkipFrame(this Observable source, int frameCount, FrameProvider frameProvider) { - return new SkipFrame(source, frameCount, frameProvider); + return new SkipFrame(source, frameCount.NormalizeFrame(), frameProvider); } } diff --git a/src/R3/Operators/SkipLast.cs b/src/R3/Operators/SkipLast.cs index 8d5e01a6..aebfd06c 100644 --- a/src/R3/Operators/SkipLast.cs +++ b/src/R3/Operators/SkipLast.cs @@ -17,7 +17,7 @@ public static Observable SkipLast(this Observable source, TimeSpan dura public static Observable SkipLast(this Observable source, TimeSpan duration, TimeProvider timeProvider) { - return new SkipLastTime(source, duration, timeProvider); + return new SkipLastTime(source, duration.Normalize(), timeProvider); } // SkipLastFrame @@ -29,7 +29,7 @@ public static Observable SkipLastFrame(this Observable source, int fram public static Observable SkipLastFrame(this Observable source, int frameCount, FrameProvider frameProvider) { - return new SkipLastFrame(source, frameCount, frameProvider); + return new SkipLastFrame(source, frameCount.NormalizeFrame(), frameProvider); } } diff --git a/src/R3/Operators/SkipUntil.cs b/src/R3/Operators/SkipUntil.cs index 91487ac2..4fcee41d 100644 --- a/src/R3/Operators/SkipUntil.cs +++ b/src/R3/Operators/SkipUntil.cs @@ -17,6 +17,11 @@ public static Observable SkipUntil(this Observable source, Task task) { return new SkipUntilT(source, task); } + + public static Observable SkipUntil(this Observable source, Func asyncFunc, bool configureAwait = true) + { + return new SkipUntilAsync(source, asyncFunc, configureAwait); + } } internal sealed class SkipUntil(Observable source, Observable other) : Observable @@ -195,3 +200,70 @@ async void TaskAwait(Task task) } } } + +internal sealed class SkipUntilAsync(Observable source, Func asyncFunc, bool configureAwait) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _SkipUntil(observer, asyncFunc, configureAwait)); + } + + sealed class _SkipUntil(Observer observer, Func asyncFunc, bool configureAwait) : Observer, IDisposable + { + readonly CancellationTokenSource cancellationTokenSource = new(); + int isTaskRunning; + bool open; + + protected override void OnNextCore(T value) + { + var isFirstValue = (Interlocked.Exchange(ref isTaskRunning, 1) == 0); + if (isFirstValue) + { + TaskStart(value); + } + + if (Volatile.Read(ref open)) + { + observer.OnNext(value); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + cancellationTokenSource.Cancel(); // cancel executing async process first + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + cancellationTokenSource.Cancel(); + } + + async void TaskStart(T value) + { + try + { + await asyncFunc(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait); + } + catch (Exception ex) + { + if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token) + { + return; + } + + // error is Stop + observer.OnCompleted(Result.Failure(ex)); + return; + } + + Volatile.Write(ref open, true); + } + } +} + diff --git a/src/R3/Operators/Take.cs b/src/R3/Operators/Take.cs index fc5a2dac..468ece45 100644 --- a/src/R3/Operators/Take.cs +++ b/src/R3/Operators/Take.cs @@ -23,7 +23,7 @@ public static Observable Take(this Observable source, TimeSpan duration public static Observable Take(this Observable source, TimeSpan duration, TimeProvider timeProvider) { - return new TakeTime(source, duration, timeProvider); + return new TakeTime(source, duration.Normalize(), timeProvider); } // TakeFrame @@ -35,7 +35,7 @@ public static Observable TakeFrame(this Observable source, int frameCou public static Observable TakeFrame(this Observable source, int frameCount, FrameProvider frameProvider) { - return new TakeFrame(source, frameCount, frameProvider); + return new TakeFrame(source, frameCount.NormalizeFrame(), frameProvider); } } diff --git a/src/R3/Operators/TakeLast.cs b/src/R3/Operators/TakeLast.cs index 53e86551..2206b255 100644 --- a/src/R3/Operators/TakeLast.cs +++ b/src/R3/Operators/TakeLast.cs @@ -17,7 +17,7 @@ public static Observable TakeLast(this Observable source, TimeSpan dura public static Observable TakeLast(this Observable source, TimeSpan duration, TimeProvider timeProvider) { - return new TakeLastTime(source, duration, timeProvider); + return new TakeLastTime(source, duration.Normalize(), timeProvider); } // TakeLastFrame @@ -29,7 +29,7 @@ public static Observable TakeLastFrame(this Observable source, int fram public static Observable TakeLastFrame(this Observable source, int frameCount, FrameProvider frameProvider) { - return new TakeLastFrame(source, frameCount, frameProvider); + return new TakeLastFrame(source, frameCount.NormalizeFrame(), frameProvider); } } diff --git a/src/R3/Operators/TakeUntil.cs b/src/R3/Operators/TakeUntil.cs index a4a77ec7..3002762e 100644 --- a/src/R3/Operators/TakeUntil.cs +++ b/src/R3/Operators/TakeUntil.cs @@ -25,6 +25,11 @@ public static Observable TakeUntil(this Observable source, Task task) { return new TakeUntilT(source, task); } + + public static Observable TakeUntil(this Observable source, Func asyncFunc, bool configureAwait = true) + { + return new TakeUntilAsync(source, asyncFunc, configureAwait); + } } internal sealed class TakeUntil(Observable source, Observable other) : Observable @@ -190,3 +195,66 @@ async void TaskAwait(Task task) } } } + +internal sealed class TakeUntilAsync(Observable source, Func asyncFunc, bool configureAwait) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _TakeUntil(observer, asyncFunc, configureAwait)); + } + + sealed class _TakeUntil(Observer observer, Func asyncFunc, bool configureAwait) : Observer, IDisposable + { + readonly CancellationTokenSource cancellationTokenSource = new(); + int isTaskRunning; + + protected override void OnNextCore(T value) + { + var isFirstValue = (Interlocked.Exchange(ref isTaskRunning, 1) == 0); + if (isFirstValue) + { + TaskStart(value); + } + + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + cancellationTokenSource.Cancel(); // cancel executing async process first + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + cancellationTokenSource.Cancel(); + } + + async void TaskStart(T value) + { + + try + { + await asyncFunc(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait); + } + catch (Exception ex) + { + if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token) + { + return; + } + + // error is Stop + observer.OnCompleted(Result.Failure(ex)); + return; + } + + observer.OnCompleted(); + } + } +} diff --git a/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs b/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs index 78f46fa4..1def5a86 100644 --- a/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs +++ b/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs @@ -180,4 +180,72 @@ public void ConcatNestedSources_Empty() list.AssertIsCompleted(); list.AssertEqual([]); } + + // Prepend factory + [Fact] + public void PrependFactory() + { + { + using var list = Observable.Range(1, 3).Prepend(() => 10).ToLiveList(); + list.AssertEqual([10, 1, 2, 3]); + } + // with state + { + var o = new { V = 20 }; + using var list = Observable.Range(1, 3).Prepend(o, static x => x.V).ToLiveList(); + list.AssertEqual([20, 1, 2, 3]); + } + } + + [Fact] + public void PrependEnumerable() + { + // Array + { + using var list = Observable.Range(1, 3).Prepend([10, 11, 12]).ToLiveList(); + list.AssertEqual([10, 11, 12, 1, 2, 3]); + } + // Pure Enumerable + { + using var list = Observable.Range(1, 3).Prepend(Iterate()).ToLiveList(); + list.AssertEqual([100, 200, 300, 1, 2, 3]); + } + } + + [Fact] + public void AppendFactory() + { + { + using var list = Observable.Range(1, 3).Append(() => 10).ToLiveList(); + list.AssertEqual([1, 2, 3, 10]); + } + // with state + { + var o = new { V = 20 }; + using var list = Observable.Range(1, 3).Append(o, static x => x.V).ToLiveList(); + list.AssertEqual([1, 2, 3, 20]); + } + } + + [Fact] + public void AppendEnumerable() + { + // Array + { + using var list = Observable.Range(1, 3).Append([10, 11, 12]).ToLiveList(); + list.AssertEqual([1, 2, 3, 10, 11, 12]); + } + // Pure Enumerable + { + using var list = Observable.Range(1, 3).Append(Iterate()).ToLiveList(); + list.AssertEqual([1, 2, 3, 100, 200, 300]); + } + } + + static IEnumerable Iterate() + { + yield return 100; + yield return 200; + yield return 300; + } } diff --git a/tests/R3.Tests/OperatorTests/SkipUntilTest.cs b/tests/R3.Tests/OperatorTests/SkipUntilTest.cs index bdce6495..c0432f87 100644 --- a/tests/R3.Tests/OperatorTests/SkipUntilTest.cs +++ b/tests/R3.Tests/OperatorTests/SkipUntilTest.cs @@ -66,6 +66,30 @@ public async Task TaskT() await Task.Delay(100); // wait for completion + publisher1.OnNext(999999); + publisher1.OnNext(9999990); + + list.AssertEqual([999999, 9999990]); + publisher1.OnCompleted(); + list.AssertIsCompleted(); + } + + [Fact] + public void Async() + { + SynchronizationContext.SetSynchronizationContext(null); + + var publisher1 = new Subject(); + var tcs = new TaskCompletionSource(); + var list = publisher1.SkipUntil(async (x,ct) => await tcs.Task).ToLiveList(); + + publisher1.OnNext(1); + publisher1.OnNext(2); + publisher1.OnNext(3); + list.AssertEqual([]); + + tcs.TrySetResult(); + publisher1.OnNext(999999); publisher1.OnNext(9999990); diff --git a/tests/R3.Tests/OperatorTests/TakeUntilTest.cs b/tests/R3.Tests/OperatorTests/TakeUntilTest.cs index e9d16087..e8d08b93 100644 --- a/tests/R3.Tests/OperatorTests/TakeUntilTest.cs +++ b/tests/R3.Tests/OperatorTests/TakeUntilTest.cs @@ -61,4 +61,25 @@ public async Task TaskT() list.AssertEqual([1, 2, 3]); list.AssertIsCompleted(); } + + [Fact] + public void Async() + { + SynchronizationContext.SetSynchronizationContext(null); + + var publisher1 = new Subject(); + var tcs = new TaskCompletionSource(); + var list = publisher1.TakeUntil(async (x,ct) => await tcs.Task).ToLiveList(); + + publisher1.OnNext(1); + publisher1.OnNext(2); + publisher1.OnNext(3); + list.AssertEqual([1, 2, 3]); + + tcs.TrySetResult(); + + list.AssertEqual([1, 2, 3]); + list.AssertIsCompleted(); + + } }