diff --git a/CHANGELOG.md b/CHANGELOG.md index 82c160aec2..20b721ef8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,29 @@ Release Notes ==== +# 08-13-2024 +DotNext 5.12.0 +* Added `DotNext.Runtime.ValueReference` data type that allows to obtain async-friendly managed pointer to the field +* Deprecation of `ConcurrentCache` in favor of `RandomAccessCache` + +DotNext.Metaprogramming 5.12.0 +* Updated dependencies + +DotNext.Unsafe 5.12.0 +* Updated dependencies + +DotNext.Threading 5.12.0 +* Introduced async-friendly `RandomAccessCache` as a replacement for deprecated `ConcurrentCache`. It uses [SIEVE](https://cachemon.github.io/SIEVE-website/) eviction algorithm. + +DotNext.IO 5.12.0 +* Updated dependencies + +DotNext.Net.Cluster 5.12.0 +* Fixed cancellation of `PersistentState` async methods + +DotNext.AspNetCore.Cluster 5.12.0 +* Fixed cancellation of `PersistentState` async methods + # 08-01-2024 DotNext 5.11.0 * Added `DotNext.Threading.Epoch` for epoch-based reclamation diff --git a/README.md b/README.md index 5c22663ba5..e3c9342a4e 100644 --- a/README.md +++ b/README.md @@ -44,35 +44,29 @@ All these things are implemented in 100% managed code on top of existing .NET AP * [NuGet Packages](https://www.nuget.org/profiles/rvsakno) # What's new -Release Date: 08-01-2024 +Release Date: 08-14-2024 -DotNext 5.11.0 -* Added `DotNext.Threading.Epoch` for epoch-based reclamation -* Fixed one-shot FNV1a hashing method -* Fixed [248](https://github.com/dotnet/dotNext/issues/248) -* Minor performance improvements +DotNext 5.12.0 +* Added `DotNext.Runtime.ValueReference` data type that allows to obtain async-friendly managed pointer to the field +* Deprecation of `ConcurrentCache` in favor of `RandomAccessCache` -DotNext.Metaprogramming 5.11.0 -* Minor performance improvements +DotNext.Metaprogramming 5.12.0 * Updated dependencies -DotNext.Unsafe 5.11.0 -* Minor performance improvements +DotNext.Unsafe 5.12.0 * Updated dependencies -DotNext.Threading 5.11.0 -* Fixed `AsyncSharedLock.Downgrade` behavior, so it can be used to release a weak lock -* Updated dependencies +DotNext.Threading 5.12.0 +* Introduced async-friendly `RandomAccessCache` as a replacement for deprecated `ConcurrentCache`. It uses [SIEVE](https://cachemon.github.io/SIEVE-website/) eviction algorithm. -DotNext.IO 5.11.0 -* Minor performance improvements +DotNext.IO 5.12.0 * Updated dependencies -DotNext.Net.Cluster 5.11.0 -* Updated dependencies +DotNext.Net.Cluster 5.12.0 +* Fixed cancellation of `PersistentState` async methods -DotNext.AspNetCore.Cluster 5.11.0 -* Updated dependencies +DotNext.AspNetCore.Cluster 5.12.0 +* Fixed cancellation of `PersistentState` async methods Changelog for previous versions located [here](./CHANGELOG.md). diff --git a/src/DotNext.Benchmarks/Runtime/Caching/ConcurrentCacheBenchmark.cs b/src/DotNext.Benchmarks/Runtime/Caching/ConcurrentCacheBenchmark.cs deleted file mode 100644 index 6cc122cab7..0000000000 --- a/src/DotNext.Benchmarks/Runtime/Caching/ConcurrentCacheBenchmark.cs +++ /dev/null @@ -1,119 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Threading; -using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Engines; -using BenchmarkDotNet.Order; - -namespace DotNext.Runtime.Caching; - -[SimpleJob(runStrategy: RunStrategy.Throughput, launchCount: 1)] -[Orderer(SummaryOrderPolicy.Declared)] -[MemoryDiagnoser] -public class ConcurrentCacheBenchmark -{ - private const int Capacity = 10; - - private Thread[] threads; - - [Params(2, 3, 4, 5)] - public int threadCount; - - [Params(CacheEvictionPolicy.LRU, CacheEvictionPolicy.LFU)] - public CacheEvictionPolicy evictionPolicy; - - private ConcurrentCache cache; - private ConcurrentDictionary dictionary; - - [IterationSetup(Target = nameof(ConcurrentCacheRead))] - public void InitializeConcurrentCacheAccess() - { - cache = new(Capacity, Environment.ProcessorCount, evictionPolicy); - - // fill cache - for (var i = 0; i < Capacity; i++) - cache[i] = i.ToString(); - - // read from cache - threads = new Thread[threadCount]; - - foreach (ref var thread in threads.AsSpan()) - thread = new Thread(Run); - - void Run() - { - var rnd = new Random(); - - for (var i = 0; i < 100; i++) - TouchCache(rnd); - } - - void TouchCache(Random random) - { - var index = random.Next(Capacity); - cache.TryGetValue(index, out _); - } - } - - [IterationCleanup(Target = nameof(ConcurrentCacheRead))] - public void CleanupCache() - { - cache.Clear(); - } - - [IterationSetup(Target = nameof(ConcurrentDictionaryRead))] - public void InitializeConcurrentDictionaryAccess() - { - dictionary = new(Environment.ProcessorCount, Capacity); - - // fill cache - for (var i = 0; i < Capacity; i++) - dictionary[i] = i.ToString(); - - // read from cache - threads = new Thread[threadCount]; - - foreach (ref var thread in threads.AsSpan()) - thread = new Thread(Run); - - void Run() - { - var rnd = new Random(); - - for (var i = 0; i < 100; i++) - TouchDictionary(rnd); - } - - void TouchDictionary(Random random) - { - var index = random.Next(Capacity); - dictionary.TryGetValue(index, out _); - } - } - - [IterationCleanup(Target = nameof(ConcurrentDictionaryRead))] - public void CleanupDictionary() - { - dictionary.Clear(); - } - - [Benchmark(Baseline = true)] - public void ConcurrentDictionaryRead() - { - foreach (var thread in threads) - thread.Start(); - - foreach (var thread in threads) - thread.Join(); - } - - [Benchmark] - public void ConcurrentCacheRead() - { - foreach (var thread in threads) - thread.Start(); - - foreach (var thread in threads) - thread.Join(); - } -} \ No newline at end of file diff --git a/src/DotNext.IO/DotNext.IO.csproj b/src/DotNext.IO/DotNext.IO.csproj index bad3c8e545..26f283b247 100644 --- a/src/DotNext.IO/DotNext.IO.csproj +++ b/src/DotNext.IO/DotNext.IO.csproj @@ -11,7 +11,7 @@ .NET Foundation and Contributors .NEXT Family of Libraries - 5.11.0 + 5.12.0 DotNext.IO MIT diff --git a/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj b/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj index 45c8f7b035..30c63b4c8d 100644 --- a/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj +++ b/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj @@ -8,7 +8,7 @@ true false nullablePublicOnly - 5.11.0 + 5.12.0 .NET Foundation .NEXT Family of Libraries diff --git a/src/DotNext.Tests/DotNext.Tests.csproj b/src/DotNext.Tests/DotNext.Tests.csproj index efdd6cffae..c1d8c4265c 100644 --- a/src/DotNext.Tests/DotNext.Tests.csproj +++ b/src/DotNext.Tests/DotNext.Tests.csproj @@ -56,4 +56,8 @@ + + + + diff --git a/src/DotNext.Tests/Runtime/Caching/ConcurrentCacheTests.cs b/src/DotNext.Tests/Runtime/Caching/ConcurrentCacheTests.cs index 261a8b3fba..ea0d31f37f 100644 --- a/src/DotNext.Tests/Runtime/Caching/ConcurrentCacheTests.cs +++ b/src/DotNext.Tests/Runtime/Caching/ConcurrentCacheTests.cs @@ -1,5 +1,6 @@ namespace DotNext.Runtime.Caching; +[Obsolete] public sealed class ConcurrentCacheTests : Test { [Fact] diff --git a/src/DotNext.Tests/Runtime/Caching/RandomAccessCacheTests.cs b/src/DotNext.Tests/Runtime/Caching/RandomAccessCacheTests.cs new file mode 100644 index 0000000000..8e496332e7 --- /dev/null +++ b/src/DotNext.Tests/Runtime/Caching/RandomAccessCacheTests.cs @@ -0,0 +1,195 @@ +using System.Runtime.CompilerServices; + +namespace DotNext.Runtime.Caching; + +using CompilerServices; + +public sealed class RandomAccessCacheTests : Test +{ + [Fact] + public static async Task CacheOverflow() + { + var evictedItem = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var cache = new RandomAccessCache(15) + { + Eviction = (_, value) => evictedItem.TrySetResult(value), + }; + + for (long i = 0; i < 150; i++) + { + using var handle = await cache.ChangeAsync(i); + False(handle.TryGetValue(out _)); + + handle.SetValue(i.ToString()); + } + + Equal("0", await evictedItem.Task.WaitAsync(DefaultTimeout)); + } + + [Fact] + public static async Task CacheOverflow2() + { + var evictedItem = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var cache = new RandomAccessCache(15) + { + Eviction = (_, value) => evictedItem.TrySetResult(value), + }; + + const long accessCount = 150; + for (long i = 0; i < accessCount; i++) + { + var key = Random.Shared.NextInt64(accessCount); + if (cache.TryRead(key, out var readSession)) + { + using (readSession) + { + Equal(key.ToString(), readSession.Value); + } + } + else + { + using var writeSession = await cache.ChangeAsync(key); + if (writeSession.TryGetValue(out var value)) + { + Equal(key.ToString(), value); + } + else + { + writeSession.SetValue(key.ToString()); + } + } + } + + await evictedItem.Task; + } + + [Fact] + public static async Task StressTest() + { + await using var cache = new RandomAccessCache(15); + + const long accessCount = 1500; + + var task1 = RequestLoop(cache); + var task2 = RequestLoop(cache); + + await Task.WhenAll(task1, task2); + + [AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder))] + static async Task RequestLoop(RandomAccessCache cache) + { + for (long i = 0; i < accessCount; i++) + { + var key = Random.Shared.NextInt64(accessCount); + if (cache.TryRead(key, out var readSession)) + { + using (readSession) + { + Equal(key.ToString(), readSession.Value); + } + } + else + { + using var writeSession = await cache.ChangeAsync(key); + if (writeSession.TryGetValue(out var value)) + { + Equal(key.ToString(), value); + } + else + { + writeSession.SetValue(key.ToString()); + } + } + } + } + } + + [Fact] + public static async Task AddRemove() + { + await using var cache = new RandomAccessCache(15); + + using (var session = await cache.ChangeAsync(10L)) + { + False(session.TryGetValue(out _)); + session.SetValue("10"); + } + + Null(await cache.TryRemoveAsync(11L)); + + using (var session = (await cache.TryRemoveAsync(10L)).Value) + { + Equal("10", session.Value); + } + } + + [Fact] + public static async Task AddInvalidate() + { + await using var cache = new RandomAccessCache(15); + + using (var session = await cache.ChangeAsync(10L)) + { + False(session.TryGetValue(out _)); + session.SetValue("10"); + } + + False(await cache.InvalidateAsync(11L)); + True(await cache.InvalidateAsync(10L)); + } + + [Fact] + public static async Task AddTwice() + { + await using var cache = new RandomAccessCache(15); + + using (var session = await cache.ChangeAsync(10L)) + { + False(session.TryGetValue(out _)); + session.SetValue("10"); + + Throws(() => session.SetValue("20")); + } + } + + [Fact] + public static async Task DisposedCacheAccess() + { + var cache = new RandomAccessCache(18); + await cache.DisposeAsync(); + + await ThrowsAsync(cache.ChangeAsync(0L).AsTask); + await ThrowsAsync(cache.TryRemoveAsync(0L).AsTask); + await ThrowsAsync(cache.InvalidateAsync().AsTask); + await ThrowsAsync(cache.InvalidateAsync(10L).AsTask); + } + + [Fact] + public static async Task DisposedCacheAccess2() + { + using var cts = new CancellationTokenSource(); + var cache = new RandomAccessCache(18); + await cache.DisposeAsync(); + + await ThrowsAsync(cache.ChangeAsync(0L, cts.Token).AsTask); + await ThrowsAsync(cache.TryRemoveAsync(0L, cts.Token).AsTask); + await ThrowsAsync(cache.InvalidateAsync(cts.Token).AsTask); + await ThrowsAsync(cache.InvalidateAsync(10L, cts.Token).AsTask); + } + + [Fact] + public static async Task Invalidation() + { + await using var cache = new RandomAccessCache(15); + + for (long i = 0; i < 20; i++) + { + using var handle = await cache.ChangeAsync(i); + False(handle.TryGetValue(out _)); + + handle.SetValue(i.ToString()); + } + + await cache.InvalidateAsync(); + } +} \ No newline at end of file diff --git a/src/DotNext.Tests/Runtime/ValueReferenceTests.cs b/src/DotNext.Tests/Runtime/ValueReferenceTests.cs new file mode 100644 index 0000000000..0016f733f2 --- /dev/null +++ b/src/DotNext.Tests/Runtime/ValueReferenceTests.cs @@ -0,0 +1,107 @@ +using System.Runtime.CompilerServices; + +namespace DotNext.Runtime; + +public sealed class ValueReferenceTests : Test +{ + [Fact] + public static void MutableFieldRef() + { + var obj = new MyClass() { AnotherField = string.Empty }; + var reference = new ValueReference(obj, ref obj.Field); + + obj.Field = 20; + Equal(obj.Field, reference.Value); + + reference.Value = 42; + Equal(obj.Field, reference.Value); + Empty(obj.AnotherField); + } + + [Fact] + public static void ImmutableFieldRef() + { + var obj = new MyClass() { AnotherField = string.Empty }; + var reference = new ReadOnlyValueReference(obj, in obj.Field); + + obj.Field = 20; + Equal(obj.Field, reference.Value); + + Equal(obj.Field, reference.Value); + Empty(obj.AnotherField); + } + + [Fact] + public static void MutableToImmutableRef() + { + var obj = new MyClass() { AnotherField = string.Empty }; + var reference = new ValueReference(obj, ref obj.Field); + ReadOnlyValueReference roReference = reference; + + obj.Field = 20; + Equal(roReference.Value, reference.Value); + + reference.Value = 42; + Equal(roReference.Value, reference.Value); + } + + [Fact] + public static void MutableRefEquality() + { + var obj = new MyClass() { AnotherField = string.Empty }; + var reference1 = new ValueReference(obj, ref obj.Field); + var reference2 = new ValueReference(obj, ref obj.Field); + + Equal(reference1, reference2); + } + + [Fact] + public static void ImmutableRefEquality() + { + var obj = new MyClass() { AnotherField = string.Empty }; + var reference1 = new ReadOnlyValueReference(obj, in obj.Field); + var reference2 = new ReadOnlyValueReference(obj, in obj.Field); + + Equal(reference1, reference2); + } + + [Fact] + public static void ReferenceToArray() + { + var array = new string[1]; + var reference = new ValueReference(array, 0) + { + Value = "Hello, world!" + }; + + Same(array[0], reference.Value); + Same(array[0], reference.ToString()); + } + + [Fact] + public static void MutableEmptyRef() + { + var reference = default(ValueReference); + True(reference.IsEmpty); + Null(reference.ToString()); + } + + [Fact] + public static void ImmutableEmptyRef() + { + var reference = default(ReadOnlyValueReference); + True(reference.IsEmpty); + Null(reference.ToString()); + } + + private record class MyClass : IResettable + { + internal int Field; + internal string AnotherField; + + public virtual void Reset() + { + + } + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/DotNext.Threading.csproj b/src/DotNext.Threading/DotNext.Threading.csproj index 83214d491c..d312917eea 100644 --- a/src/DotNext.Threading/DotNext.Threading.csproj +++ b/src/DotNext.Threading/DotNext.Threading.csproj @@ -7,7 +7,7 @@ true true nullablePublicOnly - 5.11.0 + 5.12.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/DotNext.Threading/Numerics/PrimeNumber.cs b/src/DotNext.Threading/Numerics/PrimeNumber.cs new file mode 100644 index 0000000000..f407e56a98 --- /dev/null +++ b/src/DotNext.Threading/Numerics/PrimeNumber.cs @@ -0,0 +1,31 @@ +using System.Diagnostics; + +namespace DotNext.Numerics; + +internal static class PrimeNumber +{ + private static ReadOnlySpan Primes => + [ + 3, 7, 11, 17, 23, 29, 37, 47, 59, 71, 89, 107, 131, 163, 197, 239, 293, 353, 431, 521, 631, 761, 919, + 1103, 1327, 1597, 1931, 2333, 2801, 3371, 4049, 4861, 5839, 7013, 8419, 10103, 12143, 14591, + 17519, 21023, 25229, 30293, 36353, 43627, 52361, 62851, 75431, 90523, 108631, 130363, 156437, + 187751, 225307, 270371, 324449, 389357, 467237, 560689, 672827, 807403, 968897, 1162687, 1395263, + 1674319, 2009191, 2411033, 2893249, 3471899, 4166287, 4999559, 5999471, 7199369 + ]; + + internal static int GetPrime(int min) => Number.GetPrime(min, Primes); + + internal static ulong GetFastModMultiplier(ulong divisor) + => ulong.MaxValue / divisor + 1UL; + + // Daniel Lemire's fastmod algorithm: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + internal static uint FastMod(uint value, uint divisor, ulong multiplier) + { + Debug.Assert(divisor <= int.MaxValue); + + var result = (uint)(((((multiplier * value) >> 32) + 1UL) * divisor) >> 32); + Debug.Assert(result == value % divisor); + + return result; + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Debug.cs b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Debug.cs new file mode 100644 index 0000000000..e312c644c5 --- /dev/null +++ b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Debug.cs @@ -0,0 +1,21 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; + +namespace DotNext.Runtime.Caching; + +[DebuggerDisplay($"EvictionListSize = {{{nameof(EvictionListSize)}}}, QueueSize = {{{nameof(QueueSize)}}}")] +public partial class RandomAccessCache +{ + [ExcludeFromCodeCoverage] + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + private (int Dead, int Alive) EvictionListSize => evictionHead?.EvictionNodesCount ?? default; + + [ExcludeFromCodeCoverage] + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + private int QueueSize => queueHead?.QueueLength ?? 0; + + internal partial class KeyValuePair + { + protected string ToString(TValue value) => $"Key = {Key} Value = {value}, Promoted = {IsNotified}, IsAlive = {!IsDead}"; + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Dictionary.cs b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Dictionary.cs new file mode 100644 index 0000000000..cbe314f772 --- /dev/null +++ b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Dictionary.cs @@ -0,0 +1,390 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace DotNext.Runtime.Caching; + +using Numerics; +using Threading; + +public partial class RandomAccessCache +{ + // devirtualize Value getter manually (JIT will replace this method with one of the actual branches) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static TValue GetValue(KeyValuePair pair) + { + Debug.Assert(pair is not null); + Debug.Assert(pair is not FakeKeyValuePair); + Debug.Assert(Atomic.IsAtomic() ? pair is KeyValuePairAtomicAccess : pair is KeyValuePairNonAtomicAccess); + + return Atomic.IsAtomic() + ? Unsafe.As(pair).Value + : Unsafe.As(pair).Value; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void SetValue(KeyValuePair pair, TValue value) + { + Debug.Assert(pair is not FakeKeyValuePair); + + if (Atomic.IsAtomic()) + { + Unsafe.As(pair).Value = value; + } + else + { + Unsafe.As(pair).Value = value; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void ClearValue(KeyValuePair pair) + { + Debug.Assert(pair is not FakeKeyValuePair); + + if (!RuntimeHelpers.IsReferenceOrContainsReferences()) + { + // do nothing + } + else if (Atomic.IsAtomic()) + { + Unsafe.As(pair).Value = default!; + } + else + { + Unsafe.As(pair).ClearValue(); + } + } + + private static KeyValuePair CreatePair(TKey key, TValue value, int hashCode) + { + return Atomic.IsAtomic() + ? new KeyValuePairAtomicAccess(key, hashCode) { Value = value } + : new KeyValuePairNonAtomicAccess(key, hashCode) { Value = value }; + } + + private readonly Bucket[] buckets; + private readonly ulong fastModMultiplier; + + private Bucket GetBucket(int hashCode) + { + var index = (int)(IntPtr.Size is sizeof(ulong) + ? PrimeNumber.FastMod((uint)hashCode, (uint)buckets.Length, fastModMultiplier) + : (uint)hashCode % (uint)buckets.Length); + + Debug.Assert((uint)index < (uint)buckets.Length); + + return Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(buckets), index); + } + + internal partial class KeyValuePair(TKey key, int hashCode) + { + internal readonly int KeyHashCode = hashCode; + internal readonly TKey Key = key; + internal volatile KeyValuePair? NextInBucket; // volatile, used by the dictionary subsystem only + + // Reference counting is used to establish lifetime of the stored value (not KeyValuePair instance). + // Initial value 1 means that the pair is referenced by the eviction queue. There + // are two competing threads that may decrement the counter to zero: removal thread (see TryRemove) + // and eviction thread. To synchronize the decision, 'cacheState' is used. The thread that evicts the pair + // successfully (transition from 0 => -1) is able to decrement the counter to zero. + private volatile int lifetimeCounter = 1; + + internal bool TryAcquireCounter() + { + int currentValue, tmp = lifetimeCounter; + do + { + currentValue = tmp; + if (currentValue is 0) + break; + } while ((tmp = Interlocked.CompareExchange(ref lifetimeCounter, currentValue + 1, currentValue)) != currentValue); + + return currentValue > 0U; + } + + internal bool ReleaseCounter() => Interlocked.Decrement(ref lifetimeCounter) > 0; + + [ExcludeFromCodeCoverage] + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + internal (int Alive, int Dead) BucketNodesCount + { + get + { + var alive = 0; + var dead = 0; + for (var current = this; current is not null; current = current.NextInBucket) + { + ref var counterRef = ref current.IsDead ? ref dead : ref alive; + counterRef++; + } + + return (alive, dead); + } + } + } + + private sealed class KeyValuePairAtomicAccess(TKey key, int hashCode) : KeyValuePair(key, hashCode) + { + internal required TValue Value; + + public override string ToString() => ToString(Value); + } + + // non-atomic access utilizes copy-on-write semantics + private sealed class KeyValuePairNonAtomicAccess(TKey key, int hashCode) : KeyValuePair(key, hashCode) + { + private sealed class ValueHolder(TValue value) + { + internal readonly TValue Value = value; + } + + private static readonly ValueHolder DefaultHolder = new(default!); + + private ValueHolder holder; + + internal required TValue Value + { + get => holder.Value; + + [MemberNotNull(nameof(holder))] set => holder = new(value); + } + + internal void ClearValue() => holder = DefaultHolder; + + public override string ToString() => ToString(Value); + } + + [DebuggerDisplay($"NumberOfItems = {{{nameof(Count)}}}")] + internal sealed class Bucket : AsyncExclusiveLock + { + private volatile KeyValuePair? first; // volatile + + [ExcludeFromCodeCoverage] + private (int Alive, int Dead) Count => first?.BucketNodesCount ?? default; + + internal KeyValuePair? TryAdd(IEqualityComparer? keyComparer, TKey key, int hashCode, TValue value) + { + var firstCopy = first; + if (firstCopy is not null && firstCopy.KeyHashCode == hashCode + && (keyComparer?.Equals(key, firstCopy.Key) + ?? EqualityComparer.Default.Equals(key, firstCopy.Key))) + { + return null; + } + + var newPair = CreatePair(key, value, hashCode); + newPair.NextInBucket = firstCopy; + first = newPair; + return newPair; + } + + private void Remove(KeyValuePair? previous, KeyValuePair current) + { + ref var location = ref previous is null ? ref first : ref previous.NextInBucket; + Volatile.Write(ref location, current.NextInBucket); + } + + internal KeyValuePair? TryRemove(IEqualityComparer? keyComparer, TKey key, int hashCode) + { + var result = default(KeyValuePair?); + + // remove all dead nodes from the bucket + if (keyComparer is null) + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + if (result is null && hashCode == current.KeyHashCode + && EqualityComparer.Default.Equals(key, current.Key) + && current.MarkAsEvicted()) + { + result = current; + } + + if (current.IsDead) + { + Remove(previous, current); + } + } + } + else + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + if (result is null && hashCode == current.KeyHashCode + && keyComparer.Equals(key, current.Key) + && current.MarkAsEvicted()) + { + result = current; + } + + if (current.IsDead) + { + Remove(previous, current); + } + } + } + + return result; + } + + internal KeyValuePair? TryGet(IEqualityComparer? keyComparer, TKey key, int hashCode) + { + var result = default(KeyValuePair?); + + // remove all dead nodes from the bucket + if (keyComparer is null) + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + if (result is null && hashCode == current.KeyHashCode + && EqualityComparer.Default.Equals(key, current.Key) + && current.Visit() + && current.TryAcquireCounter()) + { + result = current; + } + + if (current.IsDead) + { + Remove(previous, current); + } + } + } + else + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + if (result is null && hashCode == current.KeyHashCode + && keyComparer.Equals(key, current.Key) + && current.Visit() + && current.TryAcquireCounter()) + { + result = current; + } + + if (current.IsDead) + { + Remove(previous, current); + } + } + } + + return result; + } + + internal KeyValuePair? Modify(IEqualityComparer? keyComparer, TKey key, int hashCode) + { + KeyValuePair? valueHolder = null; + if (keyComparer is null) + { + for (KeyValuePair? current = first, previous = null; current is not null; previous = current, current = current.NextInBucket) + { + if (valueHolder is null && hashCode == current.KeyHashCode + && EqualityComparer.Default.Equals(key, current.Key) + && current.Visit() + && current.TryAcquireCounter()) + { + valueHolder = current; + } + + if (current.IsDead) + { + Remove(previous, current); + } + } + } + else + { + for (KeyValuePair? current = first, previous = null; current is not null; previous = current, current = current.NextInBucket) + { + if (valueHolder is null && hashCode == current.KeyHashCode + && keyComparer.Equals(key, current.Key) + && current.Visit() + && current.TryAcquireCounter()) + { + valueHolder = current; + } + + if (current.IsDead) + { + Remove(previous, current); + } + } + } + + return valueHolder; + } + + internal void CleanUp(IEqualityComparer? keyComparer) + { + // remove all dead nodes from the bucket + if (keyComparer is null) + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + if (current.IsDead) + { + Remove(previous, current); + } + } + } + else + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + if (current.IsDead) + { + Remove(previous, current); + } + } + } + } + + internal void Invalidate(IEqualityComparer? keyComparer, Action cleanup) + { + // remove all dead nodes from the bucket + if (keyComparer is null) + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + Remove(previous, current); + + if (current.MarkAsEvicted() && current.ReleaseCounter() is false) + { + cleanup.Invoke(current); + } + } + } + else + { + for (KeyValuePair? current = first, previous = null; + current is not null; + previous = current, current = current.NextInBucket) + { + Remove(previous, current); + + if (current.MarkAsEvicted() && current.ReleaseCounter() is false) + { + cleanup.Invoke(current); + } + } + } + } + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Eviction.cs b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Eviction.cs new file mode 100644 index 0000000000..053d427316 --- /dev/null +++ b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Eviction.cs @@ -0,0 +1,278 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Threading.Tasks.Sources; + +namespace DotNext.Runtime.Caching; + +using CompilerServices; + +public partial class RandomAccessCache +{ + private readonly CancelableValueTaskCompletionSource completionSource; + + // SIEVE core + private readonly int maxCacheSize; + private int currentSize; + private KeyValuePair? evictionHead, evictionTail, sieveHand; + + [AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder))] + private async Task DoEvictionAsync() + { + while (!IsDisposingOrDisposed) + { + if (queueHead.NextInQueue is KeyValuePair newHead) + { + queueHead.NextInQueue = Sentinel.Instance; + queueHead = newHead; + } + else if (await completionSource.WaitAsync(queueHead).ConfigureAwait(false)) + { + continue; + } + else + { + break; + } + + Debug.Assert(queueHead is not FakeKeyValuePair); + EvictOrInsert(queueHead); + } + } + + private void EvictOrInsert(KeyValuePair dequeued) + { + if (currentSize == maxCacheSize) + Evict(); + + Debug.Assert(currentSize < maxCacheSize); + dequeued.Prepend(ref evictionHead, ref evictionTail); + sieveHand ??= evictionTail; + currentSize++; + } + + private void Evict() + { + Debug.Assert(sieveHand is not null); + Debug.Assert(evictionHead is not null); + Debug.Assert(evictionTail is not null); + + while (sieveHand is not null) + { + if (!sieveHand.Evict(out var removed)) + { + sieveHand = sieveHand.MoveBackward() ?? evictionTail; + } + else + { + var removedPair = sieveHand; + sieveHand = sieveHand.DetachAndMoveBackward(ref evictionHead, ref evictionTail) ?? evictionTail; + currentSize--; + if (!removed && removedPair.ReleaseCounter() is false) + { + Eviction?.Invoke(removedPair.Key, GetValue(removedPair)); + ClearValue(removedPair); + TryCleanUpBucket(GetBucket(removedPair.KeyHashCode)); + break; + } + } + } + } + + private void TryCleanUpBucket(Bucket bucket) + { + if (bucket.TryAcquire()) + { + try + { + bucket.CleanUp(keyComparer); + } + finally + { + bucket.Release(); + } + } + } + + internal partial class KeyValuePair + { + private const int EvictedState = -1; + private const int NotVisitedState = 0; + private const int VisitedState = 1; + + private (KeyValuePair? Previous, KeyValuePair? Next) sieveLinks; + private volatile int cacheState; + + internal KeyValuePair? MoveBackward() + => sieveLinks.Previous; + + internal void Prepend([NotNull] ref KeyValuePair? head, [NotNull] ref KeyValuePair? tail) + { + if (head is null || tail is null) + { + head = tail = this; + } + else + { + head = (sieveLinks.Next = head).sieveLinks.Previous = this; + } + } + + internal KeyValuePair? DetachAndMoveBackward(ref KeyValuePair? head, ref KeyValuePair? tail) + { + var (previous, next) = sieveLinks; + + if (previous is null) + { + head = next; + } + + if (next is null) + { + tail = previous; + } + + MakeLink(previous, next); + sieveLinks = default; + return previous; + + static void MakeLink(KeyValuePair? previous, KeyValuePair? next) + { + if (previous is not null) + { + previous.sieveLinks.Next = next; + } + + if (next is not null) + { + next.sieveLinks.Previous = previous; + } + } + } + + internal bool Evict(out bool removed) + { + var counter = Interlocked.Decrement(ref cacheState); + removed = counter < EvictedState; + return counter < NotVisitedState; + } + + internal bool Visit() + => Interlocked.CompareExchange(ref cacheState, VisitedState, NotVisitedState) >= NotVisitedState; + + internal bool MarkAsEvicted() + => Interlocked.Exchange(ref cacheState, EvictedState) >= NotVisitedState; + + internal bool IsDead => cacheState < NotVisitedState; + + [ExcludeFromCodeCoverage] + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + internal (int Alive, int Dead) EvictionNodesCount + { + get + { + var alive = 0; + var dead = 0; + for (var current = this; current is not null; current = current.sieveLinks.Next) + { + ref var counterRef = ref current.IsDead ? ref dead : ref alive; + counterRef++; + } + + return (alive, dead); + } + } + } + + private sealed class CancelableValueTaskCompletionSource : Disposable, IValueTaskSource, IThreadPoolWorkItem + { + private object? continuationState; + private volatile Action? continuation; + private short version = short.MinValue; + + private void MoveTo(Action stub) + { + Debug.Assert(ValueTaskSourceHelpers.IsStub(stub)); + + // null, non-stub => stub + Action? current, tmp = continuation; + + do + { + current = tmp; + if (current is not null && ValueTaskSourceHelpers.IsStub(current)) + return; + } while (!ReferenceEquals(tmp = Interlocked.CompareExchange(ref continuation, stub, current), current)); + + current?.Invoke(continuationState); + } + + void IThreadPoolWorkItem.Execute() => MoveTo(ValueTaskSourceHelpers.CompletedStub); + + bool IValueTaskSource.GetResult(short token) + { + Debug.Assert(token == version); + + continuationState = null; + if (IsDisposingOrDisposed) + return false; + + Reset(); + return true; + } + + private void Reset() + { + version++; + continuationState = null; + Interlocked.Exchange(ref continuation, null); + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + { + return continuation is { } c && ValueTaskSourceHelpers.IsStub(c) || IsDisposingOrDisposed + ? ValueTaskSourceStatus.Succeeded + : ValueTaskSourceStatus.Pending; + } + + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + continuationState = state; + if (Interlocked.CompareExchange(ref this.continuation, continuation, null) is not null) + { + continuation.Invoke(state); + } + } + + internal ValueTask WaitAsync(KeyValuePair pair) + { + if (!pair.TryAttachNotificationHandler(this)) + continuation = ValueTaskSourceHelpers.CompletedStub; + + return new(this, version); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + MoveTo(ValueTaskSourceHelpers.CanceledStub); + } + + base.Dispose(disposing); + } + } +} + +file static class ValueTaskSourceHelpers +{ + internal static readonly Action CompletedStub = Stub; + internal static readonly Action CanceledStub = Stub; + + private static void Stub(object? state) + { + } + + internal static bool IsStub(Action continuation) + => continuation.Method == CompletedStub.Method; +} \ No newline at end of file diff --git a/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Queue.cs b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Queue.cs new file mode 100644 index 0000000000..ba097f8af2 --- /dev/null +++ b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.Queue.cs @@ -0,0 +1,86 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; + +namespace DotNext.Runtime.Caching; + +using Patterns; + +public partial class RandomAccessCache +{ + // Queue has multiple producers and a single consumer. Consumer doesn't require special lock-free approach to dequeue. + private KeyValuePair queueTail, queueHead; + + private void Promote(KeyValuePair newPair) + { + KeyValuePair currentTail; + do + { + currentTail = queueTail; + } while (Interlocked.CompareExchange(ref currentTail.NextInQueue, newPair, null) is not null); + + // attempt to install a new tail. Do not retry if failed, competing thread installed more recent version of it + Interlocked.CompareExchange(ref queueTail, newPair, currentTail); + + currentTail.Notify(); + } + + internal partial class KeyValuePair + { + // null, or KeyValuePair, or Sentinel.Instance + internal object? NextInQueue; + private volatile IThreadPoolWorkItem? notification; + + internal void Notify() + { + if (Interlocked.Exchange(ref notification, SentinelNotification.Instance) is { } callback + && !ReferenceEquals(callback, SentinelNotification.Instance)) + { + ThreadPool.UnsafeQueueUserWorkItem(callback, preferLocal: false); + } + } + + [ExcludeFromCodeCoverage] + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + private bool IsNotified => ReferenceEquals(notification, SentinelNotification.Instance); + + // true - attached, false - the object is already notified + internal bool TryAttachNotificationHandler(IThreadPoolWorkItem continuation) + => Interlocked.CompareExchange(ref notification, continuation, null) is null; + + [ExcludeFromCodeCoverage] + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + internal int QueueLength + { + get + { + var count = 0; + for (var current = this; current is not null; current = current.NextInQueue as KeyValuePair) + { + count++; + } + + return count; + } + } + } + + // Never call GetValue on this class, it has no storage for TValue. + // It is used as a stub for the first element in the notification queue to keep task completion source + private sealed class FakeKeyValuePair() : KeyValuePair(default!, 0) + { + public override string ToString() => "Fake KV Pair"; + } +} + +file sealed class SentinelNotification : IThreadPoolWorkItem, ISingleton +{ + public static SentinelNotification Instance { get; } = new(); + + private SentinelNotification() + { + } + + void IThreadPoolWorkItem.Execute() + { + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.cs b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.cs new file mode 100644 index 0000000000..26b163c61e --- /dev/null +++ b/src/DotNext.Threading/Runtime/Caching/RandomAccessCache.cs @@ -0,0 +1,460 @@ +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace DotNext.Runtime.Caching; + +using Numerics; +using Threading; + +/// +/// Represents concurrent cache optimized for random access. +/// +/// +/// The cache evicts older records on overflow. +/// +/// The type of the keys. +/// The type of the values. +public partial class RandomAccessCache : Disposable, IAsyncDisposable + where TKey : notnull + where TValue : notnull +{ + private readonly CancellationToken lifetimeToken; + private readonly Task evictionTask; + private readonly IEqualityComparer? keyComparer; + + [SuppressMessage("Usage", "CA2213", Justification = "False positive.")] + private volatile CancellationTokenSource? lifetimeSource; + + /// + /// Initializes a new cache. + /// + /// Maximum cache size. + /// is less than or equal to zero. + public RandomAccessCache(int cacheSize) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(cacheSize); + + maxCacheSize = cacheSize; + var dictionarySize = PrimeNumber.GetPrime(cacheSize); + fastModMultiplier = IntPtr.Size is sizeof(ulong) + ? PrimeNumber.GetFastModMultiplier((uint)dictionarySize) + : default; + + Span.Initialize(buckets = new Bucket[dictionarySize]); + + lifetimeSource = new(); + lifetimeToken = lifetimeSource.Token; + queueHead = queueTail = new FakeKeyValuePair(); + + completionSource = new(); + evictionTask = DoEvictionAsync(); + } + + private string ObjectName => GetType().Name; + + /// + /// Gets or sets a callback that can be used to clean up the evicted value. + /// + public Action? Eviction { get; init; } + + /// + /// Gets or sets key comparer. + /// + public IEqualityComparer? KeyComparer + { + get => keyComparer; + init => keyComparer = ReferenceEquals(value, EqualityComparer.Default) ? null : value; + } + + /// + /// Opens a session that can be used to modify the value associated with the key. + /// + /// + /// The cache guarantees that the value cannot be evicted concurrently with the returned session. However, + /// the value can be evicted immediately after. The caller must dispose session. + /// + /// The key of the cache record. + /// The token that can be used to cancel the operation. + /// The session that can be used to read or modify the cache record. + /// The operation has been canceled. + /// The cache is disposed. + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + public async ValueTask ChangeAsync(TKey key, CancellationToken token = default) + { + var keyComparerCopy = KeyComparer; + var hashCode = keyComparerCopy?.GetHashCode(key) ?? EqualityComparer.Default.GetHashCode(key); + var bucket = GetBucket(hashCode); + + var cts = token.LinkTo(lifetimeToken); + var lockTaken = false; + try + { + await bucket.AcquireAsync(token).ConfigureAwait(false); + lockTaken = true; + + if (bucket.Modify(keyComparerCopy, key, hashCode) is { } valueHolder) + return new(this, valueHolder); + + lockTaken = false; + return new(this, bucket, key, hashCode); + } + catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token) + { + throw cts.CancellationOrigin == lifetimeToken + ? new ObjectDisposedException(ObjectName) + : new OperationCanceledException(cts.CancellationOrigin); + } + catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken) + { + throw new ObjectDisposedException(ObjectName); + } + finally + { + cts?.Dispose(); + if (lockTaken) + bucket.Release(); + } + } + + /// + /// Tries to read the cached record. + /// + /// + /// The cache guarantees that the value cannot be evicted concurrently with the session. However, + /// the value can be evicted immediately after. The caller must dispose session. + /// + /// The key of the cache record. + /// A session that can be used to read the cached record. + /// if the record is available for reading and the session is active; otherwise, . + public bool TryRead(TKey key, out ReadSession session) + { + var keyComparerCopy = KeyComparer; + var hashCode = keyComparerCopy?.GetHashCode(key) ?? EqualityComparer.Default.GetHashCode(key); + + if (GetBucket(hashCode).TryGet(keyComparerCopy, key, hashCode) is { } valueHolder) + { + session = new(Eviction, valueHolder); + return true; + } + + session = default; + return false; + } + + /// + /// Tries to invalidate cache record associated with the provided key. + /// + /// The key of the cache record to be removed. + /// The token that can be used to cancel the operation. + /// + /// The session that can be used to read the removed cache record; + /// or if there is no record associated with . + /// The operation has been canceled. + /// The cache is disposed. + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + public async ValueTask TryRemoveAsync(TKey key, CancellationToken token = default) + { + var keyComparerCopy = KeyComparer; + var hashCode = keyComparerCopy?.GetHashCode(key) ?? EqualityComparer.Default.GetHashCode(key); + var bucket = GetBucket(hashCode); + + var cts = token.LinkTo(lifetimeToken); + var lockTaken = false; + try + { + await bucket.AcquireAsync(token).ConfigureAwait(false); + lockTaken = true; + + return bucket.TryRemove(keyComparerCopy, key, hashCode) is { } removedPair + ? new ReadSession(Eviction, removedPair) + : null; + } + catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token) + { + throw cts.CancellationOrigin == lifetimeToken + ? new ObjectDisposedException(GetType().Name) + : new OperationCanceledException(cts.CancellationOrigin); + } + catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken) + { + throw new ObjectDisposedException(ObjectName); + } + finally + { + if (lockTaken) + bucket.Release(); + } + } + + /// + /// Invalidates the cache record associated with the specified key. + /// + /// The key of the cache record to be removed. + /// The token that can be used to cancel the operation. + /// if the cache record associated with is removed successfully; otherwise, . + /// The operation has been canceled. + /// The cache is disposed. + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + public async ValueTask InvalidateAsync(TKey key, CancellationToken token = default) + { + var keyComparerCopy = KeyComparer; + var hashCode = keyComparerCopy?.GetHashCode(key) ?? EqualityComparer.Default.GetHashCode(key); + var bucket = GetBucket(hashCode); + + var cts = token.LinkTo(lifetimeToken); + var lockTaken = false; + KeyValuePair? removedPair; + try + { + await bucket.AcquireAsync(token).ConfigureAwait(false); + lockTaken = true; + removedPair = bucket.TryRemove(keyComparerCopy, key, hashCode); + } + catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token) + { + throw cts.CancellationOrigin == lifetimeToken + ? new ObjectDisposedException(GetType().Name) + : new OperationCanceledException(cts.CancellationOrigin); + } + catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken) + { + throw new ObjectDisposedException(ObjectName); + } + finally + { + if (lockTaken) + bucket.Release(); + } + + if (removedPair is null) + { + return false; + } + + if (removedPair.ReleaseCounter() is false) + { + Eviction?.Invoke(key, GetValue(removedPair)); + ClearValue(removedPair); + } + + return true; + } + + /// + /// Invalidates the entire cache. + /// + /// The token that can be used to cancel the operation. + /// The operation has been canceled. + /// The cache is disposed. + public async ValueTask InvalidateAsync(CancellationToken token = default) + { + var cleanup = CreateCleanupAction(Eviction); + var cts = token.LinkTo(lifetimeToken); + + try + { + foreach (var bucket in buckets) + { + var lockTaken = false; + try + { + await bucket.AcquireAsync(token).ConfigureAwait(false); + lockTaken = true; + + bucket.Invalidate(keyComparer, cleanup); + } + catch (OperationCanceledException e) when (e.CancellationToken == cts?.Token) + { + throw cts.CancellationOrigin == lifetimeToken + ? new ObjectDisposedException(GetType().Name) + : new OperationCanceledException(cts.CancellationOrigin); + } + catch (OperationCanceledException e) when (e.CancellationToken == lifetimeToken) + { + throw new ObjectDisposedException(ObjectName); + } + finally + { + if (lockTaken) + bucket.Release(); + } + } + } + finally + { + cts?.Dispose(); + } + + static Action CreateCleanupAction(Action? eviction) + { + return eviction is not null + ? pair => CleanUp(eviction, pair) + : ClearValue; + } + + static void CleanUp(Action eviction, KeyValuePair removedPair) + { + eviction(removedPair.Key, GetValue(removedPair)); + ClearValue(removedPair); + } + } + + /// + public new ValueTask DisposeAsync() => base.DisposeAsync(); + + /// + protected override ValueTask DisposeAsyncCore() + { + Dispose(disposing: true); + return new(evictionTask); + } + + /// + protected override void Dispose(bool disposing) + { + try + { + if (disposing) + { + completionSource.Dispose(); + if (Interlocked.Exchange(ref lifetimeSource, null) is { } cts) + { + try + { + cts.Cancel(throwOnFirstException: false); + } + finally + { + cts.Dispose(); + } + } + } + } + finally + { + base.Dispose(disposing); + } + } + + /// + /// Represents a session that can be used to read the cache record value. + /// + /// + /// While session alive, the record cannot be evicted. + /// + [StructLayout(LayoutKind.Auto)] + public readonly struct ReadSession : IDisposable + { + private readonly Action? eviction; + private readonly KeyValuePair valueHolder; + + internal ReadSession(Action? eviction, KeyValuePair valueHolder) + { + this.eviction = eviction; + this.valueHolder = valueHolder; + } + + /// + /// Gets the value of the cache record. + /// + public TValue Value => GetValue(valueHolder); + + /// + /// Closes the session. + /// + void IDisposable.Dispose() + { + if (valueHolder?.ReleaseCounter() is false) + { + eviction?.Invoke(valueHolder.Key, GetValue(valueHolder)); + ClearValue(valueHolder); + } + } + } + + /// + /// Represents a session that can be used to read, modify or promote the cache record value. + /// + /// + /// While session alive, the record cannot be evicted. + /// + [StructLayout(LayoutKind.Auto)] + public readonly struct ReadOrWriteSession : IDisposable + { + private readonly RandomAccessCache cache; + private readonly object bucketOrValueHolder; // Bucket or KeyValuePair + private readonly TKey key; + private readonly int hashCode; + + internal ReadOrWriteSession(RandomAccessCache cache, Bucket bucket, TKey key, int hashCode) + { + this.cache = cache; + bucketOrValueHolder = bucket; + this.key = key; + this.hashCode = hashCode; + } + + internal ReadOrWriteSession(RandomAccessCache cache, KeyValuePair valueHolder) + { + this.cache = cache; + bucketOrValueHolder = valueHolder; + key = valueHolder.Key; + hashCode = valueHolder.KeyHashCode; + } + + /// + /// Tries to get the value of the cache record. + /// + /// The value of the cache record. + /// if value exists; otherwise, . + public bool TryGetValue([MaybeNullWhen(false)] out TValue result) + { + if (bucketOrValueHolder is KeyValuePair valueHolder) + { + result = GetValue(valueHolder); + return true; + } + + result = default; + return false; + } + + /// + /// Promotes or modifies the cache record value. + /// + /// The value to promote or replace the existing value. + /// The session is invalid; the value promotes more than once. + public void SetValue(TValue value) + { + switch (bucketOrValueHolder) + { + case Bucket bucket when bucket.TryAdd(cache.keyComparer, key, hashCode, value) is { } newPair: + cache.Promote(newPair); + break; + case KeyValuePair existingPair: + RandomAccessCache.SetValue(existingPair, value); + break; + default: + throw new InvalidOperationException(); + } + } + + /// + /// Closes the session. + /// + void IDisposable.Dispose() + { + switch (bucketOrValueHolder) + { + case Bucket bucket: + bucket.Release(); + break; + case KeyValuePair pair when pair.ReleaseCounter() is false: + cache.Eviction?.Invoke(key, GetValue(pair)); + ClearValue(pair); + break; + } + } + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/AsyncCorrelationSource.Bucket.cs b/src/DotNext.Threading/Threading/AsyncCorrelationSource.Bucket.cs index a85816ef9b..c64bb85962 100644 --- a/src/DotNext.Threading/Threading/AsyncCorrelationSource.Bucket.cs +++ b/src/DotNext.Threading/Threading/AsyncCorrelationSource.Bucket.cs @@ -5,6 +5,7 @@ namespace DotNext.Threading; +using Numerics; using Tasks; public partial class AsyncCorrelationSource @@ -156,9 +157,13 @@ internal WaitNode CreateNode(TKey eventId, object? userData) private ref Bucket? GetBucket(TKey eventId) { - var bucketIndex = unchecked((uint)(comparer?.GetHashCode(eventId) ?? EqualityComparer.Default.GetHashCode(eventId))) % buckets.LongLength; - Debug.Assert((uint)bucketIndex < (uint)buckets.LongLength); - - return ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(buckets), (nint)bucketIndex); + var hashCode = (uint)(comparer?.GetHashCode(eventId) ?? EqualityComparer.Default.GetHashCode(eventId)); + var bucketIndex = (int)(IntPtr.Size is sizeof(ulong) + ? PrimeNumber.FastMod(hashCode, (uint)buckets.Length, fastModMultiplier) + : hashCode % (uint)buckets.Length); + + Debug.Assert((uint)bucketIndex < (uint)buckets.Length); + + return ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(buckets), bucketIndex); } } \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/AsyncCorrelationSource.cs b/src/DotNext.Threading/Threading/AsyncCorrelationSource.cs index 739545cb14..92dcfd599d 100644 --- a/src/DotNext.Threading/Threading/AsyncCorrelationSource.cs +++ b/src/DotNext.Threading/Threading/AsyncCorrelationSource.cs @@ -2,6 +2,7 @@ namespace DotNext.Threading; +using Numerics; using Tasks; /// @@ -24,6 +25,7 @@ namespace DotNext.Threading; public partial class AsyncCorrelationSource where TKey : notnull { + private readonly ulong fastModMultiplier; private readonly Bucket?[] buckets; private readonly IEqualityComparer? comparer; // if null then use Default comparer @@ -37,6 +39,8 @@ public AsyncCorrelationSource(int concurrencyLevel, IEqualityComparer? com { ArgumentOutOfRangeException.ThrowIfNegativeOrZero(concurrencyLevel); + concurrencyLevel = PrimeNumber.GetPrime(concurrencyLevel); + fastModMultiplier = IntPtr.Size is sizeof(ulong) ? PrimeNumber.GetFastModMultiplier((uint)concurrencyLevel) : default; buckets = new Bucket[concurrencyLevel]; this.comparer = comparer; } diff --git a/src/DotNext.Unsafe/DotNext.Unsafe.csproj b/src/DotNext.Unsafe/DotNext.Unsafe.csproj index 1edad6eb01..4386ae5872 100644 --- a/src/DotNext.Unsafe/DotNext.Unsafe.csproj +++ b/src/DotNext.Unsafe/DotNext.Unsafe.csproj @@ -7,7 +7,7 @@ enable true true - 5.11.0 + 5.12.0 nullablePublicOnly .NET Foundation and Contributors diff --git a/src/DotNext.sln b/src/DotNext.sln index a7ff60b4d3..dce0ee1c5a 100644 --- a/src/DotNext.sln +++ b/src/DotNext.sln @@ -40,6 +40,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNext.MaintenanceServices EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CommandLineAMI", "examples\CommandLineAMI\CommandLineAMI.csproj", "{39583DDE-E579-44AD-B7AF-5BB77D979E55}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RandomAccessCacheBenchmark", "examples\RandomAccessCacheBenchmark\RandomAccessCacheBenchmark.csproj", "{73185946-8EFD-4153-8AC4-05AFD8BAC2E4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Bench|Any CPU = Bench|Any CPU @@ -125,6 +127,12 @@ Global {39583DDE-E579-44AD-B7AF-5BB77D979E55}.Debug|Any CPU.Build.0 = Debug|Any CPU {39583DDE-E579-44AD-B7AF-5BB77D979E55}.Release|Any CPU.ActiveCfg = Release|Any CPU {39583DDE-E579-44AD-B7AF-5BB77D979E55}.Release|Any CPU.Build.0 = Release|Any CPU + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4}.Bench|Any CPU.ActiveCfg = Debug|Any CPU + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4}.Bench|Any CPU.Build.0 = Debug|Any CPU + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -137,6 +145,7 @@ Global {EF588447-3DB4-4719-A9FB-210C21035C65} = {F8DCA620-40E7-411E-8970-85DC808B6BAF} {1A2FAA85-95B5-4377-B430-622DC5000C89} = {F8DCA620-40E7-411E-8970-85DC808B6BAF} {39583DDE-E579-44AD-B7AF-5BB77D979E55} = {F8DCA620-40E7-411E-8970-85DC808B6BAF} + {73185946-8EFD-4153-8AC4-05AFD8BAC2E4} = {F8DCA620-40E7-411E-8970-85DC808B6BAF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {1CE24157-AFE0-4CDF-B063-2876343F0A66} diff --git a/src/DotNext/DotNext.csproj b/src/DotNext/DotNext.csproj index d337795531..ae88b48869 100644 --- a/src/DotNext/DotNext.csproj +++ b/src/DotNext/DotNext.csproj @@ -11,7 +11,7 @@ .NET Foundation and Contributors .NEXT Family of Libraries - 5.11.0 + 5.12.0 DotNext MIT diff --git a/src/DotNext/Runtime/Caching/ConcurrentCache.cs b/src/DotNext/Runtime/Caching/ConcurrentCache.cs index 5595805873..6725951f89 100644 --- a/src/DotNext/Runtime/Caching/ConcurrentCache.cs +++ b/src/DotNext/Runtime/Caching/ConcurrentCache.cs @@ -16,6 +16,7 @@ namespace DotNext.Runtime.Caching; /// /// The type of the keys. /// The type of the cache items. +[Obsolete("Use RandomAccessCache from DotNext.Threading library instead.")] public partial class ConcurrentCache : IReadOnlyDictionary where TKey : notnull { diff --git a/src/DotNext/Runtime/Intrinsics.cs b/src/DotNext/Runtime/Intrinsics.cs index a3b27ed1d3..0ffeded598 100644 --- a/src/DotNext/Runtime/Intrinsics.cs +++ b/src/DotNext/Runtime/Intrinsics.cs @@ -34,7 +34,11 @@ public static bool IsNullable() } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static ref TTo InToRef(ref readonly TFrom source) + internal static ref TTo InToRef(scoped ref readonly TFrom source) + => ref Unsafe.As(ref Unsafe.AsRef(in source)); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ref readonly TTo ChangeType(ref readonly TFrom source) { PushInRef(in source); return ref ReturnRef(); diff --git a/src/DotNext/Runtime/ValueReference.cs b/src/DotNext/Runtime/ValueReference.cs new file mode 100644 index 0000000000..e5f540fd4f --- /dev/null +++ b/src/DotNext/Runtime/ValueReference.cs @@ -0,0 +1,162 @@ +using System.Diagnostics.CodeAnalysis; +using System.Numerics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace DotNext.Runtime; + +/// +/// Represents a mutable reference to the field. +/// +/// An object that owns the field. +/// The reference to the field. +/// The type of the field. +[StructLayout(LayoutKind.Auto)] +public readonly struct ValueReference(object owner, ref T fieldRef) : + IEquatable>, + IEqualityOperators, ValueReference, bool> +{ + private readonly nint offset = RawData.GetOffset(owner, in fieldRef); + + /// + /// Creates a reference to an array element. + /// + /// The array. + /// The index of the array element. + public ValueReference(T[] array, int index) + : this(array, ref array[index]) + { + } + + /// + /// Gets a value indicating that is reference is empty. + /// + public bool IsEmpty => owner is null; + + /// + /// Gets a reference to the field. + /// + public ref T Value => ref RawData.GetObjectData(owner, offset); + + private bool SameObject(object? other) => ReferenceEquals(owner, other); + + /// + public override string? ToString() + => owner is not null ? RawData.GetObjectData(owner, offset)?.ToString() : null; + + /// + public override bool Equals([NotNullWhen(true)] object? other) + => other is ValueReference otherRef && Equals(otherRef); + + /// + public override int GetHashCode() => RuntimeHelpers.GetHashCode(owner) ^ offset.GetHashCode(); + + /// + public bool Equals(ValueReference reference) + => reference.SameObject(owner) && offset == reference.offset; + + /// + /// Determines whether the two references point to the same field. + /// + /// The first reference to compare. + /// The second reference to compare. + /// if both references are equal; otherwise, . + public static bool operator ==(ValueReference x, ValueReference y) + => x.Equals(y); + + /// + /// Determines whether the two references point to the different fields. + /// + /// The first reference to compare. + /// The second reference to compare. + /// if both references are not equal; otherwise, . + public static bool operator !=(ValueReference x, ValueReference y) + => x.Equals(y) is false; + + /// + /// Converts mutable field reference to immutable field reference. + /// + /// The reference to convert. + /// The immutable field reference. + public static implicit operator ReadOnlyValueReference(ValueReference reference) + => Unsafe.BitCast, ReadOnlyValueReference>(reference); +} + +/// +/// Represents a mutable reference to the field. +/// +/// An object that owns the field. +/// The reference to the field. +/// The type of the field. +[StructLayout(LayoutKind.Auto)] +public readonly struct ReadOnlyValueReference(object owner, ref readonly T fieldRef) : + IEquatable>, + IEqualityOperators, ReadOnlyValueReference, bool> +{ + private readonly nint offset = RawData.GetOffset(owner, in fieldRef); + + /// + /// Gets a value indicating that is reference is empty. + /// + public bool IsEmpty => owner is null; + + /// + /// Gets a reference to the field. + /// + public ref readonly T Value => ref RawData.GetObjectData(owner, offset); + + private bool SameObject(object? other) => ReferenceEquals(owner, other); + + /// + public override string? ToString() + => owner is not null ? RawData.GetObjectData(owner, offset)?.ToString() : null; + + /// + public override bool Equals([NotNullWhen(true)] object? other) + => other is ReadOnlyValueReference otherRef && Equals(otherRef); + + /// + public override int GetHashCode() => RuntimeHelpers.GetHashCode(owner) ^ offset.GetHashCode(); + + /// + public bool Equals(ReadOnlyValueReference reference) + => reference.SameObject(owner) && offset == reference.offset; + + /// + /// Determines whether the two references point to the same field. + /// + /// The first reference to compare. + /// The second reference to compare. + /// if both references are equal; otherwise, . + public static bool operator ==(ReadOnlyValueReference x, ReadOnlyValueReference y) + => x.Equals(y); + + /// + /// Determines whether the two references point to the different fields. + /// + /// The first reference to compare. + /// The second reference to compare. + /// if both references are not equal; otherwise, . + public static bool operator !=(ReadOnlyValueReference x, ReadOnlyValueReference y) + => x.Equals(y) is false; +} + +[SuppressMessage("Performance", "CA1812", Justification = "Used for reinterpret cast")] +file sealed class RawData +{ + private byte data; + + private RawData() => throw new NotImplementedException(); + + internal static nint GetOffset(object owner, ref readonly T field) + { + ref var rawData = ref Unsafe.As(owner).data; + return Unsafe.ByteOffset(in rawData, in Intrinsics.ChangeType(in field)); + } + + internal static ref T GetObjectData(object owner, nint offset) + { + ref var rawData = ref Unsafe.As(owner).data; + return ref Unsafe.As(ref Unsafe.Add(ref rawData, offset)); + } +} \ No newline at end of file diff --git a/src/DotNext/Threading/Epoch.Utils.cs b/src/DotNext/Threading/Epoch.Utils.cs index 62cbef83a3..989005aae3 100644 --- a/src/DotNext/Threading/Epoch.Utils.cs +++ b/src/DotNext/Threading/Epoch.Utils.cs @@ -254,22 +254,6 @@ internal void Exit(uint epoch) [ExcludeFromCodeCoverage] internal readonly string GetDebugView(uint epoch) => entries[epoch].DebugView; - [Conditional("DEBUG")] - internal readonly void AssertCounters(uint epoch) - { - ref readonly var entry = ref entries[epoch]; - Debug.Assert(entry.Counter > 0U); - - var prevEpochIndex = entry.Previous; - var prevEpochThreads = entries[prevEpochIndex].Counter; - - var nextEpochIndex = entry.Next; - var nextEpochThreads = entries[nextEpochIndex].Counter; - - Debug.Assert(prevEpochThreads is 0U || nextEpochThreads is 0U, - $"Epoch #{prevEpochIndex}={prevEpochThreads}, Epoch#{nextEpochIndex}={nextEpochThreads}"); - } - [UnscopedRef] internal SafeToReclaimEpoch TryBumpEpoch(uint currentEpoch) { diff --git a/src/DotNext/Threading/Epoch.cs b/src/DotNext/Threading/Epoch.cs index 215bd0f210..c9d7479d15 100644 --- a/src/DotNext/Threading/Epoch.cs +++ b/src/DotNext/Threading/Epoch.cs @@ -32,7 +32,6 @@ public partial class Epoch public Scope Enter(bool drainGlobalCache, out RecycleBin bin) { var scope = new Scope(ref state); - state.AssertCounters(scope.Handle); Reclaim(scope.Handle, drainGlobalCache, out bin); return scope; @@ -54,7 +53,6 @@ public Scope Enter(bool drainGlobalCache, out RecycleBin bin) public Scope Enter(bool? drainGlobalCache = false) { var scope = new Scope(ref state); - state.AssertCounters(scope.Handle); if (drainGlobalCache.HasValue && Reclaim(scope.Handle, drainGlobalCache.GetValueOrDefault()) is { IsEmpty: false } exceptions) { @@ -81,7 +79,6 @@ public Scope Enter(bool? drainGlobalCache = false) public void Enter(bool drainGlobalCache, out Scope scope) { scope = new(ref state); - state.AssertCounters(scope.Handle); UnsafeReclaim(scope.Handle, drainGlobalCache); } diff --git a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj index 5a18bd67ce..47767e3afa 100644 --- a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj +++ b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj @@ -8,7 +8,7 @@ true true nullablePublicOnly - 5.11.0 + 5.12.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj index a5591771c6..86834c4e23 100644 --- a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj +++ b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj @@ -8,7 +8,7 @@ enable true nullablePublicOnly - 5.11.0 + 5.12.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Context.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Context.cs index 78328c3ade..0667895bef 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Context.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Context.cs @@ -70,34 +70,56 @@ struct Context : IDisposable 1674319, 2009191, 2411033, 2893249, 3471899, 4166287, 4999559, 5999471, 7199369]; private ContextEntry?[] entries; + private ulong fastModMultiplier; public Context(int sizeHint) { Debug.Assert(sizeHint > 0); - entries = new ContextEntry?[GetPrime(sizeHint, Primes)]; + var size = GetPrime(sizeHint, Primes); + fastModMultiplier = UIntPtr.Size is sizeof(ulong) ? GetFastModMultiplier((uint)size) : default; + entries = new ContextEntry?[size]; } - private static int Grow(int size) + private static ulong GetFastModMultiplier(ulong divisor) + => ulong.MaxValue / divisor + 1UL; + + // Daniel Lemire's fastmod algorithm: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + private static uint FastMod(uint value, uint divisor, ulong multiplier) + { + Debug.Assert(divisor <= int.MaxValue); + + var result = (uint)(((((multiplier * value) >> 32) + 1UL) * divisor) >> 32); + Debug.Assert(result == value % divisor); + + return result; + } + + private static int Grow(int size, out ulong multiplier) { // This is the maximum prime smaller than Array.MaxLength - const int maxPrimeLength = 0x7FEFFFFD; + const int maxPrimeLength = 0x7FFFFFC3; int newSize; - return size is maxPrimeLength + newSize = size is maxPrimeLength ? throw new InsufficientMemoryException() : (uint)(newSize = size << 1) > maxPrimeLength && maxPrimeLength > size - ? maxPrimeLength - : GetPrime(newSize, Primes); + ? maxPrimeLength + : GetPrime(newSize, Primes); + + multiplier = IntPtr.Size is sizeof(ulong) ? GetFastModMultiplier((uint)newSize) : default; + return newSize; } public Context() => entries = []; - private static int GetIndex(int hashCode, int boundary) - => (hashCode & int.MaxValue) % boundary; + private readonly int GetIndex(int hashCode) + => (int)(IntPtr.Size is sizeof(ulong) + ? FastMod((uint)hashCode, (uint)entries.Length, fastModMultiplier) + : (uint)hashCode % (uint)entries.Length); private readonly int GetIndex(TMember member, out int hashCode) - => GetIndex(hashCode = RuntimeHelpers.GetHashCode(member), entries.Length); + => GetIndex(hashCode = RuntimeHelpers.GetHashCode(member)); private readonly ref ContextEntry? GetEntry(TMember member, out int hashCode) => ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(entries), GetIndex(member, out hashCode)); @@ -105,7 +127,7 @@ private readonly int GetIndex(TMember member, out int hashCode) private void ResizeAndRemoveDeadEntries(CancellationToken token) { var oldEntries = entries; - entries = new ContextEntry?[Grow(oldEntries.Length)]; + entries = new ContextEntry?[Grow(oldEntries.Length, out fastModMultiplier)]; // copy elements from old array to a new one for (var i = 0; i < oldEntries.Length; i++, token.ThrowIfCancellationRequested()) @@ -138,7 +160,7 @@ private readonly bool Insert(ContextEntry entry) Debug.Assert(entry.Next is null); const int maxCollisions = 3; - ref var location = ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(entries), GetIndex(entry.HashCode, entries.Length)); + ref var location = ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(entries), GetIndex(entry.HashCode)); int collisions; for (collisions = 0; location is not null; collisions++) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs index ba3920e5b1..abe2454904 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs @@ -524,7 +524,7 @@ private async ValueTask AppendAsync(ILogEntryProducer entries, l await UnsafeAppendAsync(entries, startIndex, skipCommitted, token).ConfigureAwait(false); // flush updated state. Update index here to guarantee safe reads of recently added log entries - await state.FlushAsync(in NodeState.IndexesRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.IndexesRange, token).ConfigureAwait(false); } finally { @@ -574,7 +574,7 @@ private async ValueTask UnsafeAppendAsync(TEntry entry, long startIndex, await partition.FlushAsync(token).ConfigureAwait(false); state.LastIndex = startIndex; - await state.FlushAsync(in NodeState.IndexesRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.IndexesRange, token).ConfigureAwait(false); WriteRateMeter.Add(1L, measurementTags); } @@ -668,7 +668,7 @@ private async ValueTask AppendUncachedAsync(TEntry entry, Cancella await UnsafeAppendAsync(entry, startIndex, out var partition, token).ConfigureAwait(false); await partition.FlushAsync(token).ConfigureAwait(false); state.LastIndex = startIndex; - await state.FlushAsync(in NodeState.IndexesRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.IndexesRange, token).ConfigureAwait(false); } finally { @@ -798,7 +798,7 @@ public async ValueTask AppendAsync(ILogEntryProducer entri await UnsafeAppendAsync(entries, startIndex, false, token).ConfigureAwait(false); // flush updated state. Update index here to guarantee safe reads of recently added log entries - await state.FlushAsync(in NodeState.IndexesRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.IndexesRange, token).ConfigureAwait(false); } finally { @@ -833,7 +833,7 @@ public async ValueTask DropAsync(long startIndex, bool reuseSpace = false, throw new InvalidOperationException(ExceptionMessages.InvalidAppendIndex); count = state.LastIndex - startIndex + 1L; state.LastIndex = startIndex - 1L; - await state.FlushAsync(in NodeState.IndexesRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.IndexesRange, token).ConfigureAwait(false); if (reuseSpace) InvalidatePartitions(startIndex); @@ -973,7 +973,7 @@ async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member try { result = state.IncrementTerm(member); - await state.FlushAsync(in NodeState.TermAndLastVoteRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.TermAndLastVoteRange, token).ConfigureAwait(false); } finally { @@ -990,7 +990,7 @@ async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote, try { state.UpdateTerm(term, resetLastVote); - await state.FlushAsync(in NodeState.TermAndLastVoteFlagRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.TermAndLastVoteFlagRange, token).ConfigureAwait(false); } finally { @@ -1005,7 +1005,7 @@ async ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id, Cancell try { state.UpdateVotedFor(id); - await state.FlushAsync(in NodeState.LastVoteRange).ConfigureAwait(false); + await state.FlushAsync(in NodeState.LastVoteRange, token).ConfigureAwait(false); } finally { diff --git a/src/examples/RandomAccessCacheBenchmark/Program.cs b/src/examples/RandomAccessCacheBenchmark/Program.cs new file mode 100644 index 0000000000..195939c702 --- /dev/null +++ b/src/examples/RandomAccessCacheBenchmark/Program.cs @@ -0,0 +1,126 @@ +using System.Diagnostics.Metrics; +using System.IO.Hashing; +using DotNext; +using DotNext.Buffers; +using DotNext.Diagnostics; +using DotNext.Runtime.Caching; + +// Usage: program + +switch (args) +{ + case [var numberOfEntries, var cacheSize, var durationInSeconds, var parallelRequests]: + await RunBenchmark( + int.Parse(numberOfEntries), + int.Parse(cacheSize), + TimeSpan.FromSeconds(int.Parse(durationInSeconds)), + int.Parse(parallelRequests)).ConfigureAwait(false); + break; + default: + Console.WriteLine("Usage: program "); + break; +} + +static async Task RunBenchmark(int numberOfEntries, int cacheSize, TimeSpan duration, int parallelRequests) +{ + // setup files to be accessed by using cache + var files = MakeRandomFiles(numberOfEntries); + var cache = new RandomAccessCache>(cacheSize) { Eviction = Evict }; + var state = new BenchmarkState(); + var timeTracker = Task.Delay(duration); + + var requests = new List(parallelRequests); + + for (var i = 0; i < parallelRequests; i++) + { + requests.Add(state.ReadOrAddAsync(files, cache, timeTracker)); + } + + await Task.WhenAll(requests).ConfigureAwait(false); +} + +static void Evict(string fileName, MemoryOwner content) +{ + content.Dispose(); +} + +static IReadOnlyList MakeRandomFiles(int numberOfEntries) +{ + var result = new List(); + var directory = new DirectoryInfo(Path.Combine(Path.GetTempPath(), Path.GetRandomFileName())); + directory.Create(); + + Span buffer = stackalloc byte[BenchmarkState.CacheFileSize]; + for (var i = 0; i < numberOfEntries; i++) + { + var fileName = Path.Combine(directory.FullName, i.ToString()); + result.Add(fileName); + using var handle = File.OpenHandle(fileName, FileMode.CreateNew, FileAccess.Write, FileShare.None, FileOptions.WriteThrough, numberOfEntries); + Random.Shared.NextBytes(buffer); + RandomAccess.Write(handle, buffer, fileOffset: 0L); + } + + return result; +} + +sealed class BenchmarkState +{ + internal const int CacheFileSize = 4096; + + private readonly Histogram accessDuration; + + public BenchmarkState() + { + var meter = new Meter("RandomAccessCache"); + accessDuration = meter.CreateHistogram("AccessDuration", "ms"); + } + + internal async Task ReadOrAddAsync(IReadOnlyList files, RandomAccessCache> cache, Task timeTracker) + { + while (!timeTracker.IsCompleted) + { + var ts = new Timestamp(); + await ReadOrAddAsync(files, cache).ConfigureAwait(false); + accessDuration.Record(ts.ElapsedMilliseconds); + } + } + + private Task ReadOrAddAsync(IReadOnlyList files, RandomAccessCache> cache) + => ReadOrAddAsync(Random.Shared.Peek(files).Value, cache); + + private Task ReadOrAddAsync(string fileName, RandomAccessCache> cache) + { + Task task; + if (cache.TryRead(fileName, out var session)) + { + using (session) + { + Crc32.HashToUInt32(session.Value.Span); + } + + task = Task.CompletedTask; + } + else + { + task = AddAsync(fileName, cache); + } + + return task; + } + + private async Task AddAsync(string fileName, RandomAccessCache> cache) + { + using var session = await cache.ChangeAsync(fileName).ConfigureAwait(false); + if (session.TryGetValue(out var buffer)) + { + Crc32.HashToUInt32(buffer.Span); + } + else + { + using var fileHandle = File.OpenHandle(fileName, options: FileOptions.Asynchronous | FileOptions.SequentialScan); + buffer = Memory.AllocateExactly(CacheFileSize); + await RandomAccess.ReadAsync(fileHandle, buffer.Memory, fileOffset: 0L).ConfigureAwait(false); + session.SetValue(buffer); + } + } +} \ No newline at end of file diff --git a/src/examples/RandomAccessCacheBenchmark/RandomAccessCacheBenchmark.csproj b/src/examples/RandomAccessCacheBenchmark/RandomAccessCacheBenchmark.csproj new file mode 100644 index 0000000000..6d837cbd98 --- /dev/null +++ b/src/examples/RandomAccessCacheBenchmark/RandomAccessCacheBenchmark.csproj @@ -0,0 +1,18 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + +