Skip to content

Commit

Permalink
Merge branch 'main' into hadashiA/separate-count
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc authored Jan 12, 2024
2 parents ef547af + 8093301 commit 6ab1f84
Show file tree
Hide file tree
Showing 10 changed files with 1,129 additions and 63 deletions.
40 changes: 2 additions & 38 deletions src/R3/Operators/AggregateOperators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,6 @@ public static Task<HashSet<T>> ToHashSetAsync<T>(this Observable<T> source, IEqu
}, (value) => value, cancellationToken); // ignore complete
}

public static Task<T> MinAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
return AggregateAsync(source, (default(T)!, hasValue: false),
static (min, message) =>
{
if (!min.hasValue) return (message, true); // first
return Comparer<T>.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true);
},
static (min) =>
{
if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements");
return min.Item1;
}, cancellationToken);
}


public static Task<T> MaxAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
return AggregateAsync(source, (default(T)!, hasValue: false),
static (max, message) =>
{
if (!max.hasValue) return (message, true); // first
return Comparer<T>.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true);
},
static (max) =>
{
if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements");
return max.Item1;
}, cancellationToken);
}


public static Task<(T Min, T Max)> MinMaxAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
return AggregateAsync(source,
Expand All @@ -91,12 +59,6 @@ public static Task<T> MaxAsync<T>(this Observable<T> source, CancellationToken c

#if NET8_0_OR_GREATER

public static Task<T> SumAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
where T : IAdditionOperators<T, T, T>
{
return AggregateAsync(source, default(T)!, static (sum, message) => checked(sum + message), Stubs<T>.ReturnSelf, cancellationToken); // ignore complete
}

public static Task<double> AverageAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
where T : INumberBase<T>
{
Expand All @@ -121,3 +83,5 @@ public static Task WaitAsync<T>(this Observable<T> source, CancellationToken can
return AggregateAsync(source, 0, static (_, _) => 0, Stubs<int>.ReturnSelf, cancellationToken);
}
}


121 changes: 121 additions & 0 deletions src/R3/Operators/MaxAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Task<T> MaxAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
var method = new MaxAsync<T>(Comparer<T>.Default, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<T> MaxAsync<T>(this Observable<T> source, IComparer<T> comparer, CancellationToken cancellationToken = default)
{
var method = new MaxAsync<T>(comparer, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<TResult> MaxAsync<TSource, TResult>(this Observable<TSource> source, Func<TSource, TResult> selector, CancellationToken cancellationToken = default)
{
var method = new MaxAsync<TSource, TResult>(selector, Comparer<TResult>.Default, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<TResult> MaxAsync<TSource, TResult>(this Observable<TSource> source, Func<TSource, TResult> selector, IComparer<TResult> comparer, CancellationToken cancellationToken = default)
{
var method = new MaxAsync<TSource, TResult>(selector, comparer, cancellationToken);
source.Subscribe(method);
return method.Task;
}
}

internal sealed class MaxAsync<T>(IComparer<T> comparer, CancellationToken cancellation) : TaskObserverBase<T, T>(cancellation)
{
T current = default!;
bool hasValue;

protected override void OnNextCore(T value)
{
if (!hasValue)
{
hasValue = true;
current = value;
return;
}

if (comparer.Compare(value, current) > 0)
{
current = value;
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (hasValue)
{
TrySetResult(current);
}
else
{
TrySetException(new InvalidOperationException("Sequence contains no elements"));
}
}
}

internal sealed class MaxAsync<TSource, TResult>(Func<TSource, TResult> selector, IComparer<TResult> comparer, CancellationToken cancellation) : TaskObserverBase<TSource, TResult>(cancellation)
{
TResult current = default!;
bool hasValue;

protected override void OnNextCore(TSource value)
{
var nextValue = selector(value);
if (!hasValue)
{
hasValue = true;
current = nextValue;
return;
}

if (comparer.Compare(nextValue, current) > 0)
{
current = nextValue;
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (hasValue)
{
TrySetResult(current);
}
else
{
TrySetException(new InvalidOperationException("Sequence contains no elements"));
}
}
}
121 changes: 121 additions & 0 deletions src/R3/Operators/MinAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Task<T> MinAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
var method = new MinAsync<T>(Comparer<T>.Default, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<T> MinAsync<T>(this Observable<T> source, IComparer<T> comparer, CancellationToken cancellationToken = default)
{
var method = new MinAsync<T>(comparer, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<TResult> MinAsync<TSource, TResult>(this Observable<TSource> source, Func<TSource, TResult> selector, CancellationToken cancellationToken = default)
{
var method = new MinAsync<TSource, TResult>(selector, Comparer<TResult>.Default, cancellationToken);
source.Subscribe(method);
return method.Task;
}

public static Task<TResult> MinAsync<TSource, TResult>(this Observable<TSource> source, Func<TSource, TResult> selector, IComparer<TResult> comparer, CancellationToken cancellationToken = default)
{
var method = new MinAsync<TSource, TResult>(selector, comparer, cancellationToken);
source.Subscribe(method);
return method.Task;
}
}

internal sealed class MinAsync<T>(IComparer<T> comparer, CancellationToken cancellation) : TaskObserverBase<T, T>(cancellation)
{
T current = default!;
bool hasValue;

protected override void OnNextCore(T value)
{
if (!hasValue)
{
hasValue = true;
current = value;
return;
}

if (comparer.Compare(value, current) < 0)
{
current = value;
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (hasValue)
{
TrySetResult(current);
}
else
{
TrySetException(new InvalidOperationException("Sequence contains no elements"));
}
}
}

internal sealed class MinAsync<TSource, TResult>(Func<TSource, TResult> selector, IComparer<TResult> comparer, CancellationToken cancellation) : TaskObserverBase<TSource, TResult>(cancellation)
{
TResult current = default!;
bool hasValue;

protected override void OnNextCore(TSource value)
{
var nextValue = selector(value);
if (!hasValue)
{
hasValue = true;
current = nextValue;
return;
}

if (comparer.Compare(nextValue, current) < 0)
{
current = nextValue;
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (hasValue)
{
TrySetResult(current);
}
else
{
TrySetException(new InvalidOperationException("Sequence contains no elements"));
}
}
}
Loading

0 comments on commit 6ab1f84

Please sign in to comment.