From 540f9a77ce221863a9a158c8ac11b6bd84791523 Mon Sep 17 00:00:00 2001 From: hadashiA Date: Fri, 12 Jan 2024 10:43:54 +0900 Subject: [PATCH 1/2] Add dedicated impl Min/Max independent of Aggregate --- src/R3/Operators/AggregateOperators.cs | 32 ---------- src/R3/Operators/MaxAsync.cs | 62 +++++++++++++++++++ src/R3/Operators/MinAsync.cs | 62 +++++++++++++++++++ tests/R3.Tests/OperatorTests/AggregateTest.cs | 44 ------------- tests/R3.Tests/OperatorTests/MaxTest.cs | 49 +++++++++++++++ tests/R3.Tests/OperatorTests/MinTest.cs | 45 ++++++++++++++ 6 files changed, 218 insertions(+), 76 deletions(-) create mode 100644 src/R3/Operators/MaxAsync.cs create mode 100644 src/R3/Operators/MinAsync.cs create mode 100644 tests/R3.Tests/OperatorTests/MaxTest.cs create mode 100644 tests/R3.Tests/OperatorTests/MinTest.cs diff --git a/src/R3/Operators/AggregateOperators.cs b/src/R3/Operators/AggregateOperators.cs index dc6ef6c0..733a2a71 100644 --- a/src/R3/Operators/AggregateOperators.cs +++ b/src/R3/Operators/AggregateOperators.cs @@ -51,38 +51,6 @@ public static Task LongCountAsync(this Observable source, Cancellati return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), Stubs.ReturnSelf, cancellationToken); // ignore complete } - public static Task MinAsync(this Observable source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, (default(T)!, hasValue: false), - static (min, message) => - { - if (!min.hasValue) return (message, true); // first - return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); - }, - static (min) => - { - if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return min.Item1; - }, cancellationToken); - } - - - public static Task MaxAsync(this Observable source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, (default(T)!, hasValue: false), - static (max, message) => - { - if (!max.hasValue) return (message, true); // first - return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); - }, - static (max) => - { - if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return max.Item1; - }, cancellationToken); - } - - public static Task<(T Min, T Max)> MinMaxAsync(this Observable source, CancellationToken cancellationToken = default) { return AggregateAsync(source, diff --git a/src/R3/Operators/MaxAsync.cs b/src/R3/Operators/MaxAsync.cs new file mode 100644 index 00000000..28c81ae9 --- /dev/null +++ b/src/R3/Operators/MaxAsync.cs @@ -0,0 +1,62 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Task MaxAsync(this Observable source, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MaxAsync(this Observable source, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } +} + +internal sealed class MaxAsync(IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + T current = default!; + bool hasValue; + + protected override void OnNextCore(T value) + { + if (!hasValue) + { + hasValue = true; + current = value; + return; + } + + if (comparer.Compare(value, current) > 0) + { + current = value; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} diff --git a/src/R3/Operators/MinAsync.cs b/src/R3/Operators/MinAsync.cs new file mode 100644 index 00000000..dd3b29aa --- /dev/null +++ b/src/R3/Operators/MinAsync.cs @@ -0,0 +1,62 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Task MinAsync(this Observable source, CancellationToken cancellationToken = default) + { + var method = new MinAsync(Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MinAsync(this Observable source, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MinAsync(comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } +} + +internal sealed class MinAsync(IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + T current = default!; + bool hasValue; + + protected override void OnNextCore(T value) + { + if (!hasValue) + { + hasValue = true; + current = value; + return; + } + + if (comparer.Compare(value, current) < 0) + { + current = value; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} diff --git a/tests/R3.Tests/OperatorTests/AggregateTest.cs b/tests/R3.Tests/OperatorTests/AggregateTest.cs index 72a41b4d..cfdd0d82 100644 --- a/tests/R3.Tests/OperatorTests/AggregateTest.cs +++ b/tests/R3.Tests/OperatorTests/AggregateTest.cs @@ -90,50 +90,6 @@ public async Task LongCount() await Assert.ThrowsAsync(async () => await error.LongCountAsync()); } - [Fact] - public async Task Min() - { - var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); - var min = await source.MinAsync(); - - min.Should().Be(1); - - (await Observable.Return(999).MinAsync()).Should().Be(999); - - var task = Observable.Empty().MinAsync(); - - await Assert.ThrowsAsync(async () => await task); - - var error = Observable.Range(1, 10).Select(x => - { - if (x == 3) throw new Exception("foo"); - return x; - }).OnErrorResumeAsFailure(); - await Assert.ThrowsAsync(async () => await error.MinAsync()); - } - - [Fact] - public async Task Max() - { - var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); - var min = await source.MaxAsync(); - - min.Should().Be(10); - - (await Observable.Return(999).MaxAsync()).Should().Be(999); - - var task = Observable.Empty().MaxAsync(); - - await Assert.ThrowsAsync(async () => await task); - - var error = Observable.Range(1, 10).Select(x => - { - if (x == 3) throw new Exception("foo"); - return x; - }).OnErrorResumeAsFailure(); - await Assert.ThrowsAsync(async () => await error.MaxAsync()); - } - [Fact] public async Task MinMax() { diff --git a/tests/R3.Tests/OperatorTests/MaxTest.cs b/tests/R3.Tests/OperatorTests/MaxTest.cs new file mode 100644 index 00000000..b0317576 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/MaxTest.cs @@ -0,0 +1,49 @@ +namespace R3.Tests.OperatorTests; + +public class MaxTest +{ + [Fact] + public async Task One() + { + (await Observable.Return(999).MaxAsync()).Should().Be(999); + } + + [Fact] + public async Task MultipleValue() + { + var min = await new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable().MaxAsync(); + min.Should().Be(10); + } + + [Fact] + public async Task Empty() + { + var task = Observable.Empty().MaxAsync(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task WithError() + { + var error = Observable.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorResumeAsFailure(); + await Assert.ThrowsAsync(async () => await error.MaxAsync()); + } + + [Fact] + public async Task WithComparer() + { + var result = await new[] { new TestData(100), new TestData(200) }.ToObservable().MaxAsync(new TestComparer()); + result.Value.Should().Be(200); + } + + record struct TestData(int Value); + + class TestComparer : IComparer + { + public int Compare(TestData x, TestData y) => x.Value.CompareTo(y.Value); + } +} diff --git a/tests/R3.Tests/OperatorTests/MinTest.cs b/tests/R3.Tests/OperatorTests/MinTest.cs new file mode 100644 index 00000000..ffdfa5eb --- /dev/null +++ b/tests/R3.Tests/OperatorTests/MinTest.cs @@ -0,0 +1,45 @@ +namespace R3.Tests.OperatorTests; + +public class MinTest +{ + [Fact] + public async Task ValidValue() + { + var min = await new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable().MinAsync(); + min.Should().Be(1); + + (await Observable.Return(999).MinAsync()).Should().Be(999); + } + + [Fact] + public async Task Empty() + { + var task = Observable.Empty().MinAsync(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task WithError() + { + var error = Observable.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorResumeAsFailure(); + await Assert.ThrowsAsync(async () => await error.MinAsync()); + } + + [Fact] + public async Task WithComparer() + { + var result = await new[] { new TestData(100), new TestData(200) }.ToObservable().MinAsync(new TestComparer()); + result.Value.Should().Be(100); + } + + record struct TestData(int Value); + + class TestComparer : IComparer + { + public int Compare(TestData x, TestData y) => x.Value.CompareTo(y.Value); + } +} From 6c496c3f96bcdc1513e0aeb9a1accab98ed7801c Mon Sep 17 00:00:00 2001 From: hadashiA Date: Fri, 12 Jan 2024 12:36:25 +0900 Subject: [PATCH 2/2] Add overload for Min/Max --- src/R3/Operators/MaxAsync.cs | 59 +++++++++++++++ src/R3/Operators/MinAsync.cs | 59 +++++++++++++++ tests/R3.Tests/OperatorTests/AggregateTest.cs | 2 +- tests/R3.Tests/OperatorTests/MaxTest.cs | 55 +++++++++++++- tests/R3.Tests/OperatorTests/MinTest.cs | 71 ++++++++++++++++--- 5 files changed, 233 insertions(+), 13 deletions(-) diff --git a/src/R3/Operators/MaxAsync.cs b/src/R3/Operators/MaxAsync.cs index 28c81ae9..3dfabd8b 100644 --- a/src/R3/Operators/MaxAsync.cs +++ b/src/R3/Operators/MaxAsync.cs @@ -15,6 +15,20 @@ public static Task MaxAsync(this Observable source, IComparer compar source.Subscribe(method); return method.Task; } + + public static Task MaxAsync(this Observable source, Func selector, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(selector, Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MaxAsync(this Observable source, Func selector, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(selector, comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } } internal sealed class MaxAsync(IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) @@ -60,3 +74,48 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class MaxAsync(Func selector, IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + TResult current = default!; + bool hasValue; + + protected override void OnNextCore(TSource value) + { + var nextValue = selector(value); + if (!hasValue) + { + hasValue = true; + current = nextValue; + return; + } + + if (comparer.Compare(nextValue, current) > 0) + { + current = nextValue; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} diff --git a/src/R3/Operators/MinAsync.cs b/src/R3/Operators/MinAsync.cs index dd3b29aa..0299f8cf 100644 --- a/src/R3/Operators/MinAsync.cs +++ b/src/R3/Operators/MinAsync.cs @@ -15,6 +15,20 @@ public static Task MinAsync(this Observable source, IComparer compar source.Subscribe(method); return method.Task; } + + public static Task MinAsync(this Observable source, Func selector, CancellationToken cancellationToken = default) + { + var method = new MinAsync(selector, Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MinAsync(this Observable source, Func selector, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MinAsync(selector, comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } } internal sealed class MinAsync(IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) @@ -60,3 +74,48 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class MinAsync(Func selector, IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + TResult current = default!; + bool hasValue; + + protected override void OnNextCore(TSource value) + { + var nextValue = selector(value); + if (!hasValue) + { + hasValue = true; + current = nextValue; + return; + } + + if (comparer.Compare(nextValue, current) < 0) + { + current = nextValue; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} diff --git a/tests/R3.Tests/OperatorTests/AggregateTest.cs b/tests/R3.Tests/OperatorTests/AggregateTest.cs index cfdd0d82..251f77aa 100644 --- a/tests/R3.Tests/OperatorTests/AggregateTest.cs +++ b/tests/R3.Tests/OperatorTests/AggregateTest.cs @@ -5,7 +5,7 @@ namespace R3.Tests.OperatorTests; public class AggregateTest { [Fact] - public async Task Aggreagte() + public async Task Aggregate() { var publisher = new Subject(); diff --git a/tests/R3.Tests/OperatorTests/MaxTest.cs b/tests/R3.Tests/OperatorTests/MaxTest.cs index b0317576..e44c8d64 100644 --- a/tests/R3.Tests/OperatorTests/MaxTest.cs +++ b/tests/R3.Tests/OperatorTests/MaxTest.cs @@ -23,14 +23,23 @@ public async Task Empty() } [Fact] - public async Task WithError() + public async Task Error() { var error = Observable.Range(1, 10).Select(x => { if (x == 3) throw new Exception("foo"); return x; - }).OnErrorResumeAsFailure(); - await Assert.ThrowsAsync(async () => await error.MaxAsync()); + }); + + await Assert.ThrowsAsync(async () => + { + await error.MaxAsync(); + }); + + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MaxAsync(); + }); } [Fact] @@ -40,6 +49,46 @@ public async Task WithComparer() result.Value.Should().Be(200); } + [Fact] + public async Task WithSelector() + { + var source = new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); + + (await source.MaxAsync(x => x == 7 ? 777 : x)).Should().Be(777); + (await source.MaxAsync(x => new TestData(x), new TestComparer())).Value.Should().Be(1); + } + + [Fact] + public async Task WithSelectorError() + { + await Assert.ThrowsAsync(async () => + { + await Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }) + .MaxAsync(x => x); + }); + + var error = Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }); + + await Assert.ThrowsAsync(async () => + { + await error.MaxAsync(x => x); + }); + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MaxAsync(x => x); + }); + } + record struct TestData(int Value); class TestComparer : IComparer diff --git a/tests/R3.Tests/OperatorTests/MinTest.cs b/tests/R3.Tests/OperatorTests/MinTest.cs index ffdfa5eb..716d970f 100644 --- a/tests/R3.Tests/OperatorTests/MinTest.cs +++ b/tests/R3.Tests/OperatorTests/MinTest.cs @@ -3,30 +3,43 @@ namespace R3.Tests.OperatorTests; public class MinTest { [Fact] - public async Task ValidValue() + public async Task Empty() { - var min = await new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable().MinAsync(); - min.Should().Be(1); + var task = Observable.Empty().MinAsync(); + await Assert.ThrowsAsync(async () => await task); + } + [Fact] + public async Task One() + { (await Observable.Return(999).MinAsync()).Should().Be(999); } [Fact] - public async Task Empty() + public async Task MultipleValue() { - var task = Observable.Empty().MinAsync(); - await Assert.ThrowsAsync(async () => await task); + var min = await new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable().MinAsync(); + min.Should().Be(1); } [Fact] - public async Task WithError() + public async Task Error() { var error = Observable.Range(1, 10).Select(x => { if (x == 3) throw new Exception("foo"); return x; - }).OnErrorResumeAsFailure(); - await Assert.ThrowsAsync(async () => await error.MinAsync()); + }); + + await Assert.ThrowsAsync(async () => + { + await error.MinAsync(); + }); + + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MinAsync(); + }); } [Fact] @@ -36,6 +49,46 @@ public async Task WithComparer() result.Value.Should().Be(100); } + [Fact] + public async Task WithSelector() + { + var source = new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); + + (await source.MinAsync(x => x == 7 ? -1 : x)).Should().Be(-); + (await source.MinAsync(x => new TestData(x), new TestComparer())).Value.Should().Be(10); + } + + [Fact] + public async Task WithSelectorError() + { + await Assert.ThrowsAsync(async () => + { + await Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }) + .MinAsync(x => x); + }); + + var error = Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }); + + await Assert.ThrowsAsync(async () => + { + await error.MinAsync(x => x); + }); + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MinAsync(x => x); + }); + } + record struct TestData(int Value); class TestComparer : IComparer