Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TakeUntil(async), SkipUntil(async) #155

Merged
merged 2 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 178 additions & 1 deletion src/R3/Operators/AppendPrepend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,41 @@ public static Observable<T> Append<T>(this Observable<T> source, T value)
return new AppendPrepend<T>(source, value, append: true);
}

public static Observable<T> Append<T>(this Observable<T> source, IEnumerable<T> values)
{
return new AppendPrependEnumerable<T>(source, values, append: true);
}

public static Observable<T> Append<T>(this Observable<T> source, Func<T> valueFactory)
{
return new AppendPrependFactory<T>(source, valueFactory, append: true);
}

public static Observable<T> Append<T, TState>(this Observable<T> source, TState state, Func<TState, T> valueFactory)
{
return new AppendPrependFactory<T, TState>(source, state, valueFactory, append: true);
}

public static Observable<T> Prepend<T>(this Observable<T> source, T value)
{
return new AppendPrepend<T>(source, value, append: false);
}
}

public static Observable<T> Prepend<T>(this Observable<T> source, IEnumerable<T> values)
{
return new AppendPrependEnumerable<T>(source, values, append: false);
}

public static Observable<T> Prepend<T>(this Observable<T> source, Func<T> valueFactory)
{
return new AppendPrependFactory<T>(source, valueFactory, append: false);
}

public static Observable<T> Prepend<T, TState>(this Observable<T> source, TState state, Func<TState, T> valueFactory)
{
return new AppendPrependFactory<T, TState>(source, state, valueFactory, append: false);
}
}

internal sealed class AppendPrepend<T>(Observable<T> source, T value, bool append) : Observable<T>
{
Expand Down Expand Up @@ -53,3 +82,151 @@ protected override void OnCompletedCore(Result result)
}
}
}

internal sealed class AppendPrependEnumerable<T>(Observable<T> source, IEnumerable<T> values, bool append) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
if (!append) // prepend
{
if (values is T[] array)
{
foreach (var value in array)
{
observer.OnNext(value);
}
}
else
{
foreach (var value in values)
{
observer.OnNext(value);
}
}

return source.Subscribe(observer.Wrap());
}

return source.Subscribe(new _Append(observer, values));
}

sealed class _Append(Observer<T> observer, IEnumerable<T> values) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
observer.OnCompleted(result);
}
else
{
if (values is T[] array)
{
foreach (var value in array)
{
observer.OnNext(value);
}
}
else
{
foreach (var value in values)
{
observer.OnNext(value);
}
}

observer.OnCompleted();
}
}
}
}

internal sealed class AppendPrependFactory<T>(Observable<T> source, Func<T> valueFactory, bool append) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
if (!append) // prepend
{
observer.OnNext(valueFactory());
return source.Subscribe(observer.Wrap());
}

return source.Subscribe(new _Append(observer, valueFactory));
}

sealed class _Append(Observer<T> observer, Func<T> valueFactory) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
observer.OnCompleted(result);
}
else
{
observer.OnNext(valueFactory());
observer.OnCompleted();
}
}
}
}

internal sealed class AppendPrependFactory<T, TState>(Observable<T> source, TState state, Func<TState, T> valueFactory, bool append) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
if (!append) // prepend
{
observer.OnNext(valueFactory(state));
return source.Subscribe(observer.Wrap());
}

return source.Subscribe(new _Append(observer, state, valueFactory));
}

sealed class _Append(Observer<T> observer, TState state, Func<TState, T> valueFactory) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
observer.OnCompleted(result);
}
else
{
observer.OnNext(valueFactory(state));
observer.OnCompleted();
}
}
}
}
4 changes: 2 additions & 2 deletions src/R3/Operators/Skip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static Observable<T> Skip<T>(this Observable<T> source, TimeSpan duration

public static Observable<T> Skip<T>(this Observable<T> source, TimeSpan duration, TimeProvider timeProvider)
{
return new SkipTime<T>(source, duration, timeProvider);
return new SkipTime<T>(source, duration.Normalize(), timeProvider);
}

// SkipFrame
Expand All @@ -30,7 +30,7 @@ public static Observable<T> SkipFrame<T>(this Observable<T> source, int frameCou

public static Observable<T> SkipFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new SkipFrame<T>(source, frameCount, frameProvider);
return new SkipFrame<T>(source, frameCount.NormalizeFrame(), frameProvider);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/R3/Operators/SkipLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static Observable<T> SkipLast<T>(this Observable<T> source, TimeSpan dura

public static Observable<T> SkipLast<T>(this Observable<T> source, TimeSpan duration, TimeProvider timeProvider)
{
return new SkipLastTime<T>(source, duration, timeProvider);
return new SkipLastTime<T>(source, duration.Normalize(), timeProvider);
}

// SkipLastFrame
Expand All @@ -29,7 +29,7 @@ public static Observable<T> SkipLastFrame<T>(this Observable<T> source, int fram

public static Observable<T> SkipLastFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new SkipLastFrame<T>(source, frameCount, frameProvider);
return new SkipLastFrame<T>(source, frameCount.NormalizeFrame(), frameProvider);
}
}

Expand Down
72 changes: 72 additions & 0 deletions src/R3/Operators/SkipUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public static Observable<T> SkipUntil<T>(this Observable<T> source, Task task)
{
return new SkipUntilT<T>(source, task);
}

public static Observable<T> SkipUntil<T>(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncFunc, bool configureAwait = true)
{
return new SkipUntilAsync<T>(source, asyncFunc, configureAwait);
}
}

internal sealed class SkipUntil<T, TOther>(Observable<T> source, Observable<TOther> other) : Observable<T>
Expand Down Expand Up @@ -195,3 +200,70 @@ async void TaskAwait(Task task)
}
}
}

internal sealed class SkipUntilAsync<T>(Observable<T> source, Func<T, CancellationToken, ValueTask> asyncFunc, bool configureAwait) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SkipUntil(observer, asyncFunc, configureAwait));
}

sealed class _SkipUntil(Observer<T> observer, Func<T, CancellationToken, ValueTask> asyncFunc, bool configureAwait) : Observer<T>, IDisposable
{
readonly CancellationTokenSource cancellationTokenSource = new();
int isTaskRunning;
bool open;

protected override void OnNextCore(T value)
{
var isFirstValue = (Interlocked.Exchange(ref isTaskRunning, 1) == 0);
if (isFirstValue)
{
TaskStart(value);
}

if (Volatile.Read(ref open))
{
observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
cancellationTokenSource.Cancel(); // cancel executing async process first
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
cancellationTokenSource.Cancel();
}

async void TaskStart(T value)
{
try
{
await asyncFunc(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait);
}
catch (Exception ex)
{
if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token)
{
return;
}

// error is Stop
observer.OnCompleted(Result.Failure(ex));
return;
}

Volatile.Write(ref open, true);
}
}
}

4 changes: 2 additions & 2 deletions src/R3/Operators/Take.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static Observable<T> Take<T>(this Observable<T> source, TimeSpan duration

public static Observable<T> Take<T>(this Observable<T> source, TimeSpan duration, TimeProvider timeProvider)
{
return new TakeTime<T>(source, duration, timeProvider);
return new TakeTime<T>(source, duration.Normalize(), timeProvider);
}

// TakeFrame
Expand All @@ -35,7 +35,7 @@ public static Observable<T> TakeFrame<T>(this Observable<T> source, int frameCou

public static Observable<T> TakeFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new TakeFrame<T>(source, frameCount, frameProvider);
return new TakeFrame<T>(source, frameCount.NormalizeFrame(), frameProvider);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/R3/Operators/TakeLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static Observable<T> TakeLast<T>(this Observable<T> source, TimeSpan dura

public static Observable<T> TakeLast<T>(this Observable<T> source, TimeSpan duration, TimeProvider timeProvider)
{
return new TakeLastTime<T>(source, duration, timeProvider);
return new TakeLastTime<T>(source, duration.Normalize(), timeProvider);
}

// TakeLastFrame
Expand All @@ -29,7 +29,7 @@ public static Observable<T> TakeLastFrame<T>(this Observable<T> source, int fram

public static Observable<T> TakeLastFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new TakeLastFrame<T>(source, frameCount, frameProvider);
return new TakeLastFrame<T>(source, frameCount.NormalizeFrame(), frameProvider);
}
}

Expand Down
Loading
Loading