From 72e4ab2be7e025a5e7267cb051d511f2f44da0ae Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 28 Feb 2024 15:14:56 +0900 Subject: [PATCH 1/5] Call ForceNotify on SerializedReactiveProperty edtiro inspector change on Unity #133 --- .../Runtime/SerializableReactiveProperty.cs | 80 +++++++++++++++++++ .../Assets/Scenes/NewBehaviourScript.cs | 33 ++------ 2 files changed, 85 insertions(+), 28 deletions(-) diff --git a/src/R3.Unity/Assets/R3.Unity/Runtime/SerializableReactiveProperty.cs b/src/R3.Unity/Assets/R3.Unity/Runtime/SerializableReactiveProperty.cs index e79b8cb3..86b1097c 100644 --- a/src/R3.Unity/Assets/R3.Unity/Runtime/SerializableReactiveProperty.cs +++ b/src/R3.Unity/Assets/R3.Unity/Runtime/SerializableReactiveProperty.cs @@ -1,5 +1,7 @@ using System; using UnityEngine; +using System.Reflection; +using System.Text.RegularExpressions; #if UNITY_EDITOR using UnityEditor; @@ -46,6 +48,9 @@ internal class SerializableReactivePropertyDrawer : PropertyDrawer public override void OnGUI(Rect position, SerializedProperty property, GUIContent label) { var p = property.FindPropertyRelative("value"); + + EditorGUI.BeginChangeCheck(); + if (p.propertyType == SerializedPropertyType.Quaternion) { label.text += "(EulerAngles)"; @@ -55,6 +60,24 @@ public override void OnGUI(Rect position, SerializedProperty property, GUIConten { EditorGUI.PropertyField(position, p, label, true); } + + if (EditorGUI.EndChangeCheck()) + { + var paths = property.propertyPath.Split('.'); // X.Y.Z... + var attachedComponent = property.serializedObject.targetObject; + + var targetProp = (paths.Length == 1) + ? fieldInfo.GetValue(attachedComponent) + : GetValueRecursive(attachedComponent, 0, paths); + if (targetProp == null) return; + + property.serializedObject.ApplyModifiedProperties(); // deserialize to field + var methodInfo = targetProp.GetType().GetMethod("ForceNotify", BindingFlags.IgnoreCase | BindingFlags.InvokeMethod | BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + if (methodInfo != null) + { + methodInfo.Invoke(targetProp, Array.Empty()); + } + } } public override float GetPropertyHeight(SerializedProperty property, GUIContent label) @@ -70,6 +93,63 @@ public override float GetPropertyHeight(SerializedProperty property, GUIContent return EditorGUI.GetPropertyHeight(p); } } + + object GetValueRecursive(object obj, int index, string[] paths) + { + var path = paths[index]; + + FieldInfo fldInfo = null; + var type = obj.GetType(); + while (fldInfo == null) + { + // attempt to get information about the field + fldInfo = type.GetField(path, BindingFlags.IgnoreCase | BindingFlags.GetField | BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + + if (fldInfo != null || + type.BaseType == null || + type.BaseType.IsSubclassOf(typeof(ReactiveProperty<>))) break; + + // if the field information is missing, it may be in the base class + type = type.BaseType; + } + + // If array, path = Array.data[index] + if (fldInfo == null && path == "Array") + { + try + { + path = paths[++index]; + var m = Regex.Match(path, @"(.+)\[([0-9]+)*\]"); + var arrayIndex = int.Parse(m.Groups[2].Value); + var arrayValue = (obj as System.Collections.IList)[arrayIndex]; + if (index < paths.Length - 1) + { + return GetValueRecursive(arrayValue, ++index, paths); + } + else + { + return arrayValue; + } + } + catch + { + Debug.Log("SerializableReactivePropertyDrawer Exception, objType:" + obj.GetType().Name + " path:" + string.Join(", ", paths)); + throw; + } + } + else if (fldInfo == null) + { + throw new Exception("Can't decode path:" + string.Join(", ", paths)); + } + + var v = fldInfo.GetValue(obj); + if (index < paths.Length - 1) + { + return GetValueRecursive(v, ++index, paths); + } + + return v; + } } #endif diff --git a/src/R3.Unity/Assets/Scenes/NewBehaviourScript.cs b/src/R3.Unity/Assets/Scenes/NewBehaviourScript.cs index 947e30ab..f3a9c95b 100644 --- a/src/R3.Unity/Assets/Scenes/NewBehaviourScript.cs +++ b/src/R3.Unity/Assets/Scenes/NewBehaviourScript.cs @@ -40,35 +40,12 @@ public class NewBehaviourScript : MonoBehaviour public NoAwakeTest noAwake; - async void Start() + void Start() { - Observer dis = (Observer)button1 - .OnClickAsObservable() - .SubscribeAwait(async (_, ct) => - { - await UniTask.Delay(1000, cancellationToken: ct); - Debug.Log("Clicked!"); - }, AwaitOperation.Drop); - - Observer dis2 = (Observer)button2 - .OnClickAsObservable() - .Subscribe(_ => Debug.Log("Clicked!")); - - await UniTask.Yield(); - - Destroy(button1.gameObject); - - await UniTask.Yield(); - - Debug.Log(dis.IsDisposed); // True - - await UniTask.Yield(); - - Destroy(button2.gameObject); - - await UniTask.Yield(); - - Debug.Log(dis2.IsDisposed); // True + rpInt.Subscribe(x => + { + Debug.Log(x); + }); From 9bed913a6e42a248c8dc12de82f3898be412f5bc Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 28 Feb 2024 15:22:00 +0900 Subject: [PATCH 2/5] awaitOperations -> awaitOperation --- README.md | 10 +++++----- src/R3/Operators/SelectAwait.cs | 8 ++++---- src/R3/Operators/SubscribeAwait.cs | 12 ++++++------ src/R3/Operators/WhereAwait.cs | 8 ++++---- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index d5f56bce..48f7b246 100644 --- a/README.md +++ b/README.md @@ -1735,7 +1735,7 @@ Operator methods are defined as extension methods to `Observable` in the stat | **Select**(this `Observable` source, `Func` selector) | `Observable` | | **Select**(this `Observable` source, `TState` state, `Func` selector) | `Observable` | | **Select**(this `Observable` source, `TState` state, `Func` selector) | `Observable` | -| **SelectAwait**(this `Observable` source, `Func>` selector, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable` | +| **SelectAwait**(this `Observable` source, `Func>` selector, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable` | | **SelectMany**(this `Observable` source, `Func>` selector) | `Observable` | | **SelectMany**(this `Observable` source, `Func>` collectionSelector, `Func` resultSelector) | `Observable` | | **SelectMany**(this `Observable` source, `Func>` selector) | `Observable` | @@ -1762,9 +1762,9 @@ Operator methods are defined as extension methods to `Observable` in the stat | **SkipUntil**(this `Observable` source, `Task` task) | `Observable` | | **SkipWhile**(this `Observable` source, `Func` predicate) | `Observable` | | **SkipWhile**(this `Observable` source, `Func` predicate) | `Observable` | -| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` | -| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` | -| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onErrorResume, `Action` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` | +| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` | +| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onCompleted, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` | +| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onErrorResume, `Action` onCompleted, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `IDisposable` | | **SubscribeOn**(this `Observable` source, `SynchronizationContext` synchronizationContext) | `Observable` | | **SubscribeOn**(this `Observable` source, `TimeProvider` timeProvider) | `Observable` | | **SubscribeOn**(this `Observable` source, `FrameProvider` frameProvider) | `Observable` | @@ -1847,7 +1847,7 @@ Operator methods are defined as extension methods to `Observable` in the stat | **Where**(this `Observable` source, `Func` predicate) | `Observable` | | **Where**(this `Observable` source, `TState` state, `Func` predicate) | `Observable` | | **Where**(this `Observable` source, `TState` state, `Func` predicate) | `Observable` | -| **WhereAwait**(this `Observable` source, `Func>` predicate, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable` | +| **WhereAwait**(this `Observable` source, `Func>` predicate, `AwaitOperation` awaitOperation = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Boolean` cancelOnCompleted = true, `Int32` maxConcurrent = -1) | `Observable` | | **WithLatestFrom**(this `Observable` first, `Observable` second, `Func` resultSelector) | `Observable` | | **Zip**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | | **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | diff --git a/src/R3/Operators/SelectAwait.cs b/src/R3/Operators/SelectAwait.cs index 39a56126..122e3b8d 100644 --- a/src/R3/Operators/SelectAwait.cs +++ b/src/R3/Operators/SelectAwait.cs @@ -5,17 +5,17 @@ namespace R3; public static partial class ObservableExtensions { /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. - public static Observable SelectAwait(this Observable source, Func> selector, AwaitOperation awaitOperations = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) + public static Observable SelectAwait(this Observable source, Func> selector, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) { - return new SelectAwait(source, selector, awaitOperations, configureAwait, cancelOnCompleted, maxConcurrent); + return new SelectAwait(source, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); } } -internal sealed class SelectAwait(Observable source, Func> selector, AwaitOperation awaitOperations, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable +internal sealed class SelectAwait(Observable source, Func> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable { protected override IDisposable SubscribeCore(Observer observer) { - switch (awaitOperations) + switch (awaitOperation) { case AwaitOperation.Sequential: return source.Subscribe(new SelectAwaitSequential(observer, selector, configureAwait, cancelOnCompleted)); diff --git a/src/R3/Operators/SubscribeAwait.cs b/src/R3/Operators/SubscribeAwait.cs index 06d003ee..71439ccc 100644 --- a/src/R3/Operators/SubscribeAwait.cs +++ b/src/R3/Operators/SubscribeAwait.cs @@ -6,21 +6,21 @@ namespace R3; public static partial class ObservableExtensions { /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. - public static IDisposable SubscribeAwait(this Observable source, Func onNextAsync, AwaitOperation awaitOperations = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) + public static IDisposable SubscribeAwait(this Observable source, Func onNextAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) { - return SubscribeAwait(source, onNextAsync, ObservableSystem.GetUnhandledExceptionHandler(), Stubs.HandleResult, awaitOperations, configureAwait, cancelOnCompleted, maxConcurrent); + return SubscribeAwait(source, onNextAsync, ObservableSystem.GetUnhandledExceptionHandler(), Stubs.HandleResult, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); } /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. - public static IDisposable SubscribeAwait(this Observable source, Func onNextAsync, Action onCompleted, AwaitOperation awaitOperations = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) + public static IDisposable SubscribeAwait(this Observable source, Func onNextAsync, Action onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) { - return SubscribeAwait(source, onNextAsync, ObservableSystem.GetUnhandledExceptionHandler(), onCompleted, awaitOperations, configureAwait, cancelOnCompleted, maxConcurrent); + return SubscribeAwait(source, onNextAsync, ObservableSystem.GetUnhandledExceptionHandler(), onCompleted, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); } /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. - public static IDisposable SubscribeAwait(this Observable source, Func onNextAsync, Action onErrorResume, Action onCompleted, AwaitOperation awaitOperations = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) + public static IDisposable SubscribeAwait(this Observable source, Func onNextAsync, Action onErrorResume, Action onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) { - switch (awaitOperations) + switch (awaitOperation) { case AwaitOperation.Sequential: return source.Subscribe(new SubscribeAwaitSequential(onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); diff --git a/src/R3/Operators/WhereAwait.cs b/src/R3/Operators/WhereAwait.cs index 2d26b76b..7eb883a5 100644 --- a/src/R3/Operators/WhereAwait.cs +++ b/src/R3/Operators/WhereAwait.cs @@ -5,18 +5,18 @@ namespace R3; public static partial class ObservableExtensions { /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. - public static Observable WhereAwait(this Observable source, Func> predicate, AwaitOperation awaitOperations = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) + public static Observable WhereAwait(this Observable source, Func> predicate, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = true, int maxConcurrent = -1) { - return new WhereAwait(source, predicate, awaitOperations, configureAwait, cancelOnCompleted, maxConcurrent); + return new WhereAwait(source, predicate, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); } } -internal sealed class WhereAwait(Observable source, Func> predicate, AwaitOperation awaitOperations, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) +internal sealed class WhereAwait(Observable source, Func> predicate, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable { protected override IDisposable SubscribeCore(Observer observer) { - switch (awaitOperations) + switch (awaitOperation) { case AwaitOperation.Sequential: return source.Subscribe(new WhereAwaitSequential(observer, predicate, configureAwait, cancelOnCompleted)); From 6850c4ca2e8dad83802491538e2c9d112df2f32a Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 29 Feb 2024 02:42:43 +0900 Subject: [PATCH 3/5] WIP async chunk --- sandbox/ConsoleApp1/Program.cs | 31 +++------ src/R3/Operators/Chunk.cs | 84 +++++++++++++++++++++++ tests/R3.Tests/OperatorTests/ChunkTest.cs | 50 ++++++++++++++ 3 files changed, 144 insertions(+), 21 deletions(-) diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 07023b95..2b1d35ac 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Time.Testing; using R3; using System.ComponentModel.DataAnnotations; +using System.Reactive.Concurrency; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text.Json; @@ -9,10 +10,6 @@ using System.Threading.Channels; -SynchronizationContext.SetSynchronizationContext(new MySyncContext()); - - -var channel = ChannelUtility.CreateSingleReadeWriterUnbounded(); //var t = Foo(); @@ -23,31 +20,23 @@ //t.Wait(); -var timeProvider = new FakeTimeProvider(); -var subject = new Subject(); -subject - .Do(x => Console.WriteLine($"Do:{Thread.CurrentThread.ManagedThreadId}")) - .SubscribeAwait(async (_, ct) => +Observable.Interval(TimeSpan.FromSeconds(1)) + .Index() + .Chunk(async (_, ct) => { - Console.WriteLine($"Before Await:{Thread.CurrentThread.ManagedThreadId}"); - await Task.Delay(TimeSpan.FromSeconds(1), timeProvider, ct); - Console.WriteLine($"After Yield:{Thread.CurrentThread.ManagedThreadId}"); - }, AwaitOperation.Sequential/*, configureAwait: false*/); - + await Task.Delay(TimeSpan.FromSeconds(0)); + }) + .Subscribe(x => + { + Console.WriteLine(string.Join(", ", x)); + }); -subject.OnNext(10); -subject.OnNext(20); -subject.OnNext(30); -timeProvider.Advance(TimeSpan.FromSeconds(1)); Console.ReadLine(); - - - internal static class ChannelUtility { static readonly UnboundedChannelOptions options = new UnboundedChannelOptions diff --git a/src/R3/Operators/Chunk.cs b/src/R3/Operators/Chunk.cs index 5f972e9f..f53142eb 100644 --- a/src/R3/Operators/Chunk.cs +++ b/src/R3/Operators/Chunk.cs @@ -32,6 +32,11 @@ public static Observable Chunk(this Observa { return new ChunkWindow(source, windowBoundaries); } + + public static Observable Chunk(this Observable source, Func asyncWindow, bool configureAwait = true) + { + return new ChunkAsync(source, asyncWindow, configureAwait); + } } // Count @@ -341,3 +346,82 @@ protected override void OnCompletedCore(Result result) } } } + +// Async +internal sealed class ChunkAsync(Observable source, Func asyncWindow, bool configureAwait) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _Chunk(observer, asyncWindow, configureAwait)); + } + + sealed class _Chunk(Observer observer, Func asyncWindow, bool configureAwait) : Observer + { + readonly List list = new List(); + CancellationTokenSource cancellationTokenSource = new(); + Task? runningTask; + + protected override void OnNextCore(T value) + { + lock (list) + { + list.Add(value); + if (runningTask == null) + { + runningTask = StartWindow(value); + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + cancellationTokenSource.Cancel(); + + lock (list) + { + if (list.Count > 0) + { + observer.OnNext(list.ToArray()); + list.Clear(); + } + } + + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + cancellationTokenSource.Cancel(); + } + + async Task StartWindow(T value) + { + try + { + await asyncWindow(value, cancellationTokenSource.Token).ConfigureAwait(configureAwait); + } + catch (Exception ex) + { + if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationTokenSource.Token) + { + return; + } + OnErrorResume(ex); + } + finally + { + lock (list) + { + observer.OnNext(list.ToArray()); + list.Clear(); + runningTask = null; + } + } + } + } +} diff --git a/tests/R3.Tests/OperatorTests/ChunkTest.cs b/tests/R3.Tests/OperatorTests/ChunkTest.cs index e0926cb1..7862c6c8 100644 --- a/tests/R3.Tests/OperatorTests/ChunkTest.cs +++ b/tests/R3.Tests/OperatorTests/ChunkTest.cs @@ -252,4 +252,54 @@ public void ChunkFrameAndCount() list.AssertIsCompleted(); } + + // Async + [Fact] + public void ChunkAsync() + { + var publisher = new Subject(); + var tp = new FakeTimeProvider(); + var list = publisher.Chunk(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(3), tp); + + }).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + tp.Advance(3); + + list.AssertEqual([[1, 10, 100]]); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([[1, 10, 100]]); + + tp.Advance(3); + list.AssertEqual([[1, 10, 100], [1000, 10000]]); + + publisher.OnNext(2); + publisher.OnNext(20); + publisher.OnNext(200); + + list.AssertEqual([[1, 10, 100], [1000, 10000]]); + + tp.Advance(1); + list.AssertEqual([[1, 10, 100], [1000, 10000]]); + + tp.Advance(2); + list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200]]); + + publisher.OnNext(500); + + publisher.OnCompleted(); + + list.AssertEqual([[1, 10, 100], [1000, 10000], [2, 20, 200], [500]]); + list.AssertIsCompleted(); + } } + + From 98407204d83cea98dcb51b545c319e523d8a4844 Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 29 Feb 2024 14:15:00 +0900 Subject: [PATCH 4/5] done --- sandbox/ConsoleApp1/Program.cs | 2 +- src/R3/Operators/Chunk.cs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 2b1d35ac..5b1ceaca 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -26,7 +26,7 @@ .Index() .Chunk(async (_, ct) => { - await Task.Delay(TimeSpan.FromSeconds(0)); + await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct); }) .Subscribe(x => { diff --git a/src/R3/Operators/Chunk.cs b/src/R3/Operators/Chunk.cs index f53142eb..6c572035 100644 --- a/src/R3/Operators/Chunk.cs +++ b/src/R3/Operators/Chunk.cs @@ -359,16 +359,17 @@ sealed class _Chunk(Observer observer, Func list = new List(); CancellationTokenSource cancellationTokenSource = new(); - Task? runningTask; + bool isRunning; protected override void OnNextCore(T value) { lock (list) { list.Add(value); - if (runningTask == null) + if (!isRunning) { - runningTask = StartWindow(value); + isRunning = true; + StartWindow(value); } } } @@ -399,7 +400,7 @@ protected override void DisposeCore() cancellationTokenSource.Cancel(); } - async Task StartWindow(T value) + async void StartWindow(T value) { try { @@ -419,7 +420,7 @@ async Task StartWindow(T value) { observer.OnNext(list.ToArray()); list.Clear(); - runningTask = null; + isRunning = false; } } } From c6cdd835355a9d7a763fca61d0adfbdffab879e9 Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 29 Feb 2024 14:45:18 +0900 Subject: [PATCH 5/5] Fix debounce, throttlefirstlast, throttlelast async overload does not handle correctly when func return immediately --- src/R3/Operators/Debounce.cs | 12 +++++++----- src/R3/Operators/ThrottleFirstLast.cs | 11 ++++++----- src/R3/Operators/ThrottleLast.cs | 11 ++++++----- tests/R3.Tests/OperatorTests/ChunkTest.cs | 2 ++ 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/R3/Operators/Debounce.cs b/src/R3/Operators/Debounce.cs index e0e8d341..70007ab5 100644 --- a/src/R3/Operators/Debounce.cs +++ b/src/R3/Operators/Debounce.cs @@ -112,7 +112,7 @@ sealed class _Debounce(Observer observer, Func observer, Func observer, Func(); var tp = new FakeTimeProvider(); var list = publisher.Chunk(async (x, ct) =>