diff --git a/src/R3/Operators/AggregateOperators.cs b/src/R3/Operators/AggregateOperators.cs index dc6ef6c0..733a2a71 100644 --- a/src/R3/Operators/AggregateOperators.cs +++ b/src/R3/Operators/AggregateOperators.cs @@ -51,38 +51,6 @@ public static Task LongCountAsync(this Observable source, Cancellati return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), Stubs.ReturnSelf, cancellationToken); // ignore complete } - public static Task MinAsync(this Observable source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, (default(T)!, hasValue: false), - static (min, message) => - { - if (!min.hasValue) return (message, true); // first - return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); - }, - static (min) => - { - if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return min.Item1; - }, cancellationToken); - } - - - public static Task MaxAsync(this Observable source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, (default(T)!, hasValue: false), - static (max, message) => - { - if (!max.hasValue) return (message, true); // first - return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); - }, - static (max) => - { - if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return max.Item1; - }, cancellationToken); - } - - public static Task<(T Min, T Max)> MinMaxAsync(this Observable source, CancellationToken cancellationToken = default) { return AggregateAsync(source, diff --git a/src/R3/Operators/MaxAsync.cs b/src/R3/Operators/MaxAsync.cs new file mode 100644 index 00000000..3dfabd8b --- /dev/null +++ b/src/R3/Operators/MaxAsync.cs @@ -0,0 +1,121 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Task MaxAsync(this Observable source, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MaxAsync(this Observable source, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MaxAsync(this Observable source, Func selector, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(selector, Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MaxAsync(this Observable source, Func selector, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MaxAsync(selector, comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } +} + +internal sealed class MaxAsync(IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + T current = default!; + bool hasValue; + + protected override void OnNextCore(T value) + { + if (!hasValue) + { + hasValue = true; + current = value; + return; + } + + if (comparer.Compare(value, current) > 0) + { + current = value; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} + +internal sealed class MaxAsync(Func selector, IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + TResult current = default!; + bool hasValue; + + protected override void OnNextCore(TSource value) + { + var nextValue = selector(value); + if (!hasValue) + { + hasValue = true; + current = nextValue; + return; + } + + if (comparer.Compare(nextValue, current) > 0) + { + current = nextValue; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} diff --git a/src/R3/Operators/MinAsync.cs b/src/R3/Operators/MinAsync.cs new file mode 100644 index 00000000..0299f8cf --- /dev/null +++ b/src/R3/Operators/MinAsync.cs @@ -0,0 +1,121 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Task MinAsync(this Observable source, CancellationToken cancellationToken = default) + { + var method = new MinAsync(Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MinAsync(this Observable source, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MinAsync(comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MinAsync(this Observable source, Func selector, CancellationToken cancellationToken = default) + { + var method = new MinAsync(selector, Comparer.Default, cancellationToken); + source.Subscribe(method); + return method.Task; + } + + public static Task MinAsync(this Observable source, Func selector, IComparer comparer, CancellationToken cancellationToken = default) + { + var method = new MinAsync(selector, comparer, cancellationToken); + source.Subscribe(method); + return method.Task; + } +} + +internal sealed class MinAsync(IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + T current = default!; + bool hasValue; + + protected override void OnNextCore(T value) + { + if (!hasValue) + { + hasValue = true; + current = value; + return; + } + + if (comparer.Compare(value, current) < 0) + { + current = value; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} + +internal sealed class MinAsync(Func selector, IComparer comparer, CancellationToken cancellation) : TaskObserverBase(cancellation) +{ + TResult current = default!; + bool hasValue; + + protected override void OnNextCore(TSource value) + { + var nextValue = selector(value); + if (!hasValue) + { + hasValue = true; + current = nextValue; + return; + } + + if (comparer.Compare(nextValue, current) < 0) + { + current = nextValue; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (hasValue) + { + TrySetResult(current); + } + else + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + } +} diff --git a/tests/R3.Tests/OperatorTests/AggregateTest.cs b/tests/R3.Tests/OperatorTests/AggregateTest.cs index 72a41b4d..251f77aa 100644 --- a/tests/R3.Tests/OperatorTests/AggregateTest.cs +++ b/tests/R3.Tests/OperatorTests/AggregateTest.cs @@ -5,7 +5,7 @@ namespace R3.Tests.OperatorTests; public class AggregateTest { [Fact] - public async Task Aggreagte() + public async Task Aggregate() { var publisher = new Subject(); @@ -90,50 +90,6 @@ public async Task LongCount() await Assert.ThrowsAsync(async () => await error.LongCountAsync()); } - [Fact] - public async Task Min() - { - var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); - var min = await source.MinAsync(); - - min.Should().Be(1); - - (await Observable.Return(999).MinAsync()).Should().Be(999); - - var task = Observable.Empty().MinAsync(); - - await Assert.ThrowsAsync(async () => await task); - - var error = Observable.Range(1, 10).Select(x => - { - if (x == 3) throw new Exception("foo"); - return x; - }).OnErrorResumeAsFailure(); - await Assert.ThrowsAsync(async () => await error.MinAsync()); - } - - [Fact] - public async Task Max() - { - var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); - var min = await source.MaxAsync(); - - min.Should().Be(10); - - (await Observable.Return(999).MaxAsync()).Should().Be(999); - - var task = Observable.Empty().MaxAsync(); - - await Assert.ThrowsAsync(async () => await task); - - var error = Observable.Range(1, 10).Select(x => - { - if (x == 3) throw new Exception("foo"); - return x; - }).OnErrorResumeAsFailure(); - await Assert.ThrowsAsync(async () => await error.MaxAsync()); - } - [Fact] public async Task MinMax() { diff --git a/tests/R3.Tests/OperatorTests/MaxTest.cs b/tests/R3.Tests/OperatorTests/MaxTest.cs new file mode 100644 index 00000000..e44c8d64 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/MaxTest.cs @@ -0,0 +1,98 @@ +namespace R3.Tests.OperatorTests; + +public class MaxTest +{ + [Fact] + public async Task One() + { + (await Observable.Return(999).MaxAsync()).Should().Be(999); + } + + [Fact] + public async Task MultipleValue() + { + var min = await new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable().MaxAsync(); + min.Should().Be(10); + } + + [Fact] + public async Task Empty() + { + var task = Observable.Empty().MaxAsync(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task Error() + { + var error = Observable.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }); + + await Assert.ThrowsAsync(async () => + { + await error.MaxAsync(); + }); + + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MaxAsync(); + }); + } + + [Fact] + public async Task WithComparer() + { + var result = await new[] { new TestData(100), new TestData(200) }.ToObservable().MaxAsync(new TestComparer()); + result.Value.Should().Be(200); + } + + [Fact] + public async Task WithSelector() + { + var source = new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); + + (await source.MaxAsync(x => x == 7 ? 777 : x)).Should().Be(777); + (await source.MaxAsync(x => new TestData(x), new TestComparer())).Value.Should().Be(1); + } + + [Fact] + public async Task WithSelectorError() + { + await Assert.ThrowsAsync(async () => + { + await Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }) + .MaxAsync(x => x); + }); + + var error = Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }); + + await Assert.ThrowsAsync(async () => + { + await error.MaxAsync(x => x); + }); + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MaxAsync(x => x); + }); + } + + record struct TestData(int Value); + + class TestComparer : IComparer + { + public int Compare(TestData x, TestData y) => x.Value.CompareTo(y.Value); + } +} diff --git a/tests/R3.Tests/OperatorTests/MinTest.cs b/tests/R3.Tests/OperatorTests/MinTest.cs new file mode 100644 index 00000000..716d970f --- /dev/null +++ b/tests/R3.Tests/OperatorTests/MinTest.cs @@ -0,0 +1,98 @@ +namespace R3.Tests.OperatorTests; + +public class MinTest +{ + [Fact] + public async Task Empty() + { + var task = Observable.Empty().MinAsync(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task One() + { + (await Observable.Return(999).MinAsync()).Should().Be(999); + } + + [Fact] + public async Task MultipleValue() + { + var min = await new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable().MinAsync(); + min.Should().Be(1); + } + + [Fact] + public async Task Error() + { + var error = Observable.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }); + + await Assert.ThrowsAsync(async () => + { + await error.MinAsync(); + }); + + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MinAsync(); + }); + } + + [Fact] + public async Task WithComparer() + { + var result = await new[] { new TestData(100), new TestData(200) }.ToObservable().MinAsync(new TestComparer()); + result.Value.Should().Be(100); + } + + [Fact] + public async Task WithSelector() + { + var source = new[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToObservable(); + + (await source.MinAsync(x => x == 7 ? -1 : x)).Should().Be(-); + (await source.MinAsync(x => new TestData(x), new TestComparer())).Value.Should().Be(10); + } + + [Fact] + public async Task WithSelectorError() + { + await Assert.ThrowsAsync(async () => + { + await Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }) + .MinAsync(x => x); + }); + + var error = Observable.Range(1, 10) + .Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }); + + await Assert.ThrowsAsync(async () => + { + await error.MinAsync(x => x); + }); + await Assert.ThrowsAsync(async () => + { + await error.OnErrorResumeAsFailure().MinAsync(x => x); + }); + } + + record struct TestData(int Value); + + class TestComparer : IComparer + { + public int Compare(TestData x, TestData y) => x.Value.CompareTo(y.Value); + } +}