Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Debounce, ThrottleFirstLast, ThrottleLast async overload does not handle correctly when func return immediately #146

Merged
merged 5 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **Select**(this `Observable<T>` source, `Func<T, Int32, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, TState, TResult>` selector) | `Observable<TResult>` |
| **Select**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, TResult>` selector) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable<TResult>` |
| **SelectAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<TResult>>` selector, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TResult>>` selector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Observable<TCollection>>` collectionSelector, `Func<TSource, TCollection, TResult>` resultSelector) | `Observable<TResult>` |
| **SelectMany**(this `Observable<TSource>` source, `Func<TSource, Int32, Observable<TResult>>` selector) | `Observable<TResult>` |
Expand All @@ -1762,9 +1762,9 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **SkipUntil**(this `Observable<T>` source, `Task` task) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Boolean>` predicate) | `Observable<T>` |
| **SkipWhile**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Result>` onCompleted, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask>` onNextAsync, `Action<Exception>` onErrorResume, `Action<Result>` onCompleted, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` |
| **SubscribeOn**(this `Observable<T>` source, `SynchronizationContext` synchronizationContext) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `TimeProvider` timeProvider) | `Observable<T>` |
| **SubscribeOn**(this `Observable<T>` source, `FrameProvider` frameProvider) | `Observable<T>` |
Expand Down Expand Up @@ -1847,7 +1847,7 @@ Operator methods are defined as extension methods to `Observable<T>` in the stat
| **Where**(this `Observable<T>` source, `Func<T, Int32, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, TState, Boolean>` predicate) | `Observable<T>` |
| **Where**(this `Observable<T>` source, `TState` state, `Func<T, Int32, TState, Boolean>` predicate) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable<T>` |
| **WhereAwait**(this `Observable<T>` source, `Func<T, CancellationToken, ValueTask<Boolean>>` predicate, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable<T>` |
| **WithLatestFrom**(this `Observable<TFirst>` first, `Observable<TSecond>` second, `Func<TFirst, TSecond, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Func<T1, T2, TResult>` resultSelector) | `Observable<TResult>` |
| **Zip**(this `Observable<T1>` source1, `Observable<T2>` source2, `Observable<T3>` source3, `Func<T1, T2, T3, TResult>` resultSelector) | `Observable<TResult>` |
Expand Down
31 changes: 10 additions & 21 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Time.Testing;
using R3;
using System.ComponentModel.DataAnnotations;
using System.Reactive.Concurrency;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text.Json;
Expand All @@ -9,10 +10,6 @@
using System.Threading.Channels;


SynchronizationContext.SetSynchronizationContext(new MySyncContext());


var channel = ChannelUtility.CreateSingleReadeWriterUnbounded<int>();

//var t = Foo();

Expand All @@ -23,31 +20,23 @@


//t.Wait();
var timeProvider = new FakeTimeProvider();

var subject = new Subject<int>();

subject
.Do(x => Console.WriteLine($"Do:{Thread.CurrentThread.ManagedThreadId}"))
.SubscribeAwait(async (_, ct) =>
Observable.Interval(TimeSpan.FromSeconds(1))
.Index()
.Chunk(async (_, ct) =>
{
Console.WriteLine($"Before Await:{Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(TimeSpan.FromSeconds(1), timeProvider, ct);
Console.WriteLine($"After Yield:{Thread.CurrentThread.ManagedThreadId}");
}, AwaitOperation.Sequential/*, configureAwait: false*/);

await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct);
})
.Subscribe(x =>
{
Console.WriteLine(string.Join(", ", x));
});

subject.OnNext(10);
subject.OnNext(20);
subject.OnNext(30);

timeProvider.Advance(TimeSpan.FromSeconds(1));
Console.ReadLine();





internal static class ChannelUtility
{
static readonly UnboundedChannelOptions options = new UnboundedChannelOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using UnityEngine;
using System.Reflection;
using System.Text.RegularExpressions;

#if UNITY_EDITOR
using UnityEditor;
Expand Down Expand Up @@ -46,6 +48,9 @@ internal class SerializableReactivePropertyDrawer : PropertyDrawer
public override void OnGUI(Rect position, SerializedProperty property, GUIContent label)
{
var p = property.FindPropertyRelative("value");

EditorGUI.BeginChangeCheck();

if (p.propertyType == SerializedPropertyType.Quaternion)
{
label.text += "(EulerAngles)";
Expand All @@ -55,6 +60,24 @@ public override void OnGUI(Rect position, SerializedProperty property, GUIConten
{
EditorGUI.PropertyField(position, p, label, true);
}

if (EditorGUI.EndChangeCheck())
{
var paths = property.propertyPath.Split('.'); // X.Y.Z...
var attachedComponent = property.serializedObject.targetObject;

var targetProp = (paths.Length == 1)
? fieldInfo.GetValue(attachedComponent)
: GetValueRecursive(attachedComponent, 0, paths);
if (targetProp == null) return;

property.serializedObject.ApplyModifiedProperties(); // deserialize to field
var methodInfo = targetProp.GetType().GetMethod("ForceNotify", BindingFlags.IgnoreCase | BindingFlags.InvokeMethod | BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
if (methodInfo != null)
{
methodInfo.Invoke(targetProp, Array.Empty<object>());
}
}
}

public override float GetPropertyHeight(SerializedProperty property, GUIContent label)
Expand All @@ -70,6 +93,63 @@ public override float GetPropertyHeight(SerializedProperty property, GUIContent
return EditorGUI.GetPropertyHeight(p);
}
}

object GetValueRecursive(object obj, int index, string[] paths)
{
var path = paths[index];

FieldInfo fldInfo = null;
var type = obj.GetType();
while (fldInfo == null)
{
// attempt to get information about the field
fldInfo = type.GetField(path, BindingFlags.IgnoreCase | BindingFlags.GetField | BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);

if (fldInfo != null ||
type.BaseType == null ||
type.BaseType.IsSubclassOf(typeof(ReactiveProperty<>))) break;

// if the field information is missing, it may be in the base class
type = type.BaseType;
}

// If array, path = Array.data[index]
if (fldInfo == null && path == "Array")
{
try
{
path = paths[++index];
var m = Regex.Match(path, @"(.+)\[([0-9]+)*\]");
var arrayIndex = int.Parse(m.Groups[2].Value);
var arrayValue = (obj as System.Collections.IList)[arrayIndex];
if (index < paths.Length - 1)
{
return GetValueRecursive(arrayValue, ++index, paths);
}
else
{
return arrayValue;
}
}
catch
{
Debug.Log("SerializableReactivePropertyDrawer Exception, objType:" + obj.GetType().Name + " path:" + string.Join(", ", paths));
throw;
}
}
else if (fldInfo == null)
{
throw new Exception("Can't decode path:" + string.Join(", ", paths));
}

var v = fldInfo.GetValue(obj);
if (index < paths.Length - 1)
{
return GetValueRecursive(v, ++index, paths);
}

return v;
}
}

#endif
Expand Down
33 changes: 5 additions & 28 deletions src/R3.Unity/Assets/Scenes/NewBehaviourScript.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,12 @@ public class NewBehaviourScript : MonoBehaviour

public NoAwakeTest noAwake;

async void Start()
void Start()
{
Observer<Unit> dis = (Observer<Unit>)button1
.OnClickAsObservable()
.SubscribeAwait(async (_, ct) =>
{
await UniTask.Delay(1000, cancellationToken: ct);
Debug.Log("Clicked!");
}, AwaitOperation.Drop);

Observer<Unit> dis2 = (Observer<Unit>)button2
.OnClickAsObservable()
.Subscribe(_ => Debug.Log("Clicked!"));

await UniTask.Yield();

Destroy(button1.gameObject);

await UniTask.Yield();

Debug.Log(dis.IsDisposed); // True

await UniTask.Yield();

Destroy(button2.gameObject);

await UniTask.Yield();

Debug.Log(dis2.IsDisposed); // True
rpInt.Subscribe(x =>
{
Debug.Log(x);
});



Expand Down
85 changes: 85 additions & 0 deletions src/R3/Operators/Chunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public static Observable<TSource[]> Chunk<TSource, TWindowBoundary>(this Observa
{
return new ChunkWindow<TSource, TWindowBoundary>(source, windowBoundaries);
}

public static Observable<T[]> Chunk<T>(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncWindow, bool configureAwait = true)
{
return new ChunkAsync<T>(source, asyncWindow, configureAwait);
}
}

// Count
Expand Down Expand Up @@ -341,3 +346,83 @@ protected override void OnCompletedCore(Result result)
}
}
}

// Async
internal sealed class ChunkAsync<T>(Observable<T> source, Func<T, CancellationToken, ValueTask> asyncWindow, bool configureAwait) : Observable<T[]>
{
protected override IDisposable SubscribeCore(Observer<T[]> observer)
{
return source.Subscribe(new _Chunk(observer, asyncWindow, configureAwait));
}

sealed class _Chunk(Observer<T[]> observer, Func<T, CancellationToken, ValueTask> asyncWindow, bool configureAwait) : Observer<T>
{
readonly List<T> list = new List<T>();
CancellationTokenSource cancellationTokenSource = new();
bool isRunning;

protected override void OnNextCore(T value)
{
lock (list)
{
list.Add(value);
if (!isRunning)
{
isRunning = true;
StartWindow(value);
}
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
cancellationTokenSource.Cancel();

lock (list)
{
if (list.Count > 0)
{
observer.OnNext(list.ToArray());
list.Clear();
}
}

observer.OnCompleted(result);
}

protected override void DisposeCore()
{
cancellationTokenSource.Cancel();
}

async void StartWindow(T value)
{
try
{
await asyncWindow(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait);
}
catch (Exception ex)
{
if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token)
{
return;
}
OnErrorResume(ex);
}
finally
{
lock (list)
{
observer.OnNext(list.ToArray());
list.Clear();
isRunning = false;
}
}
}
}
}
12 changes: 7 additions & 5 deletions src/R3/Operators/Debounce.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ sealed class _Debounce(Observer<T> observer, Func<T, CancellationToken, ValueTas
readonly object gate = new object();
T? latestValue;
bool hasValue;
Task? runningTask;
bool isRunning;
int taskId;
CancellationTokenSource cancellationTokenSource = new();

Expand All @@ -123,15 +123,17 @@ protected override void OnNextCore(T value)
latestValue = value;
hasValue = true;

if (runningTask != null)
if (isRunning)
{
cancellationTokenSource.Cancel();
cancellationTokenSource = new CancellationTokenSource();
}

var newId = unchecked(taskId + 1);
Volatile.Write(ref taskId, newId);
runningTask = PublishOnNextAfterAsync(value, newId, cancellationTokenSource.Token);

isRunning = true;
PublishOnNextAfterAsync(value, newId, cancellationTokenSource.Token);
}
}

Expand Down Expand Up @@ -164,7 +166,7 @@ protected override void DisposeCore()
cancellationTokenSource.Cancel();
}

async Task PublishOnNextAfterAsync(T value, int id, CancellationToken cancellationToken)
async void PublishOnNextAfterAsync(T value, int id, CancellationToken cancellationToken)
{
try
{
Expand All @@ -188,7 +190,7 @@ async Task PublishOnNextAfterAsync(T value, int id, CancellationToken cancellati
observer.OnNext(latestValue!);
hasValue = false;
latestValue = default;
runningTask = null;
isRunning = false;

END:
{ }
Expand Down
Loading
Loading