diff --git a/libs/cluster/Session/ClusterCommands.cs b/libs/cluster/Session/ClusterCommands.cs index fb8dc065c1..33469ce790 100644 --- a/libs/cluster/Session/ClusterCommands.cs +++ b/libs/cluster/Session/ClusterCommands.cs @@ -20,7 +20,7 @@ private int CountKeysInSessionStore(int slot) { ClusterKeyIterationFunctions.MainStoreCountKeys iterFuncs = new(slot); _ = basicGarnetApi.IterateMainStore(ref iterFuncs); - return iterFuncs.keyCount; + return iterFuncs.KeyCount; } private int CountKeysInObjectStore(int slot) @@ -29,7 +29,7 @@ private int CountKeysInObjectStore(int slot) { ClusterKeyIterationFunctions.ObjectStoreCountKeys iterFuncs = new(slot); _ = basicGarnetApi.IterateObjectStore(ref iterFuncs); - return iterFuncs.keyCount; + return iterFuncs.KeyCount; } return 0; } diff --git a/libs/cluster/Session/ClusterKeyIterationFunctions.cs b/libs/cluster/Session/ClusterKeyIterationFunctions.cs index 174e20a700..54d91d6cd3 100644 --- a/libs/cluster/Session/ClusterKeyIterationFunctions.cs +++ b/libs/cluster/Session/ClusterKeyIterationFunctions.cs @@ -13,19 +13,30 @@ internal sealed unsafe partial class ClusterSession : IClusterSession { internal static class ClusterKeyIterationFunctions { + internal class KeyIterationInfo + { + // This must be a class as it is passed through pending IO operations, so it is wrapped by higher structures for inlining as a generic type arg. + internal int keyCount; + internal readonly int slot; + + internal KeyIterationInfo(int slot) => this.slot = slot; + } + internal sealed class MainStoreCountKeys : IScanIteratorFunctions { + private readonly KeyIterationInfo info; // This must be a class as it is passed through pending IO operations - internal int keyCount; - readonly int slot; - internal MainStoreCountKeys(int slot) => this.slot = slot; + internal int KeyCount { get => info.keyCount; set => info.keyCount = value; } + internal int Slot => info.slot; + + internal MainStoreCountKeys(int slot) => info = new(slot); public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) { cursorRecordResult = CursorRecordResult.Accept; // default; not used here - if (HashSlotUtils.HashSlot(ref key) == slot && !Expired(ref value)) - keyCount++; + if (HashSlotUtils.HashSlot(ref key) == Slot && !Expired(ref value)) + KeyCount++; return true; } public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) @@ -37,19 +48,21 @@ public void OnException(Exception exception, long numberOfRecords) { } internal sealed class ObjectStoreCountKeys : IScanIteratorFunctions { + private readonly KeyIterationInfo info; // This must be a class as it is passed through pending IO operations - internal int keyCount; - readonly int slot; - internal ObjectStoreCountKeys(int slot) => this.slot = slot; + internal int KeyCount { get => info.keyCount; set => info.keyCount = value; } + internal int Slot => info.slot; + + internal ObjectStoreCountKeys(int slot) => info = new(slot); public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) { cursorRecordResult = CursorRecordResult.Accept; // default; not used here , out CursorRecordResult cursorRecordResult fixed (byte* keyPtr = key) { - if (HashSlotUtils.HashSlot(keyPtr, key.Length) == slot && !Expired(ref value)) - keyCount++; + if (HashSlotUtils.HashSlot(keyPtr, key.Length) == Slot && !Expired(ref value)) + KeyCount++; } return true; } diff --git a/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs b/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs index f4a3c749ab..25aedc1ec4 100644 --- a/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs +++ b/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs @@ -10,7 +10,7 @@ namespace Garnet.server { sealed partial class StorageSession : IDisposable { - // These are classes so instantiate once and re-initialize + // These contain classes so instantiate once and re-initialize private ArrayKeyIterationFunctions.MainStoreGetDBSize mainStoreDbSizeFuncs; private ArrayKeyIterationFunctions.ObjectStoreGetDBSize objectStoreDbSizeFuncs; @@ -180,14 +180,14 @@ internal int DbSize() mainStoreDbSizeFuncs.Initialize(); long cursor = 0; basicContext.Session.ScanCursor(ref cursor, long.MaxValue, mainStoreDbSizeFuncs); - int count = mainStoreDbSizeFuncs.count; + int count = mainStoreDbSizeFuncs.Count; if (objectStoreBasicContext.Session != null) { objectStoreDbSizeFuncs ??= new(); objectStoreDbSizeFuncs.Initialize(); cursor = 0; objectStoreBasicContext.Session.ScanCursor(ref cursor, long.MaxValue, objectStoreDbSizeFuncs); - count += objectStoreDbSizeFuncs.count; + count += objectStoreDbSizeFuncs.Count; } return count; @@ -195,25 +195,38 @@ internal int DbSize() internal static unsafe class ArrayKeyIterationFunctions { - internal sealed class MainStoreGetDBKeys : IScanIteratorFunctions + internal class GetDBKeysInfo { - List keys; - byte* patternB; - int patternLength; + // This must be a class as it is passed through pending IO operations, so it is wrapped by higher structures for inlining as a generic type arg. + internal List keys; + internal byte* patternB; + internal int patternLength; + internal Type matchType; - internal void Initialize(List keys, byte* patternB, int length) + internal void Initialize(List keys, byte* patternB, int length, Type matchType = null) { this.keys = keys; this.patternB = patternB; this.patternLength = length; + this.matchType = matchType; } + } + + internal sealed class MainStoreGetDBKeys : IScanIteratorFunctions + { + private readonly GetDBKeysInfo info; + + internal MainStoreGetDBKeys() => info = new(); + + internal void Initialize(List keys, byte* patternB, int length) + => info.Initialize(keys, patternB, length); public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) => ConcurrentReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult); public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) { - if ((patternB != null && !GlobUtils.Match(patternB, patternLength, key.ToPointer(), key.Length, true)) + if ((info.patternB != null && !GlobUtils.Match(info.patternB, info.patternLength, key.ToPointer(), key.Length, true)) || (value.MetadataSize != 0 && MainSessionFunctions.CheckExpiry(ref value))) { cursorRecordResult = CursorRecordResult.Skip; @@ -221,7 +234,7 @@ public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadat else { cursorRecordResult = CursorRecordResult.Accept; - keys.Add(key.ToByteArray()); + info.keys.Add(key.ToByteArray()); } return true; } @@ -233,18 +246,12 @@ public void OnException(Exception exception, long numberOfRecords) { } internal sealed class ObjectStoreGetDBKeys : IScanIteratorFunctions { - List keys; - byte* patternB; - int patternLength; - private Type matchType; + private readonly GetDBKeysInfo info; + + internal ObjectStoreGetDBKeys() => info = new(); internal void Initialize(List keys, byte* patternB, int length, Type matchType = null) - { - this.keys = keys; - this.patternB = patternB; - this.patternLength = length; - this.matchType = matchType; - } + => info.Initialize(keys, patternB, length, matchType); public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) => ConcurrentReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult); @@ -257,11 +264,11 @@ public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMeta return true; } - if (patternB != null) + if (info.patternB != null) { fixed (byte* keyPtr = key) { - if (!GlobUtils.Match(patternB, patternLength, keyPtr, key.Length, true)) + if (!GlobUtils.Match(info.patternB, info.patternLength, keyPtr, key.Length, true)) { cursorRecordResult = CursorRecordResult.Skip; return true; @@ -269,13 +276,13 @@ public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMeta } } - if (matchType != null && value.GetType() != matchType) + if (info.matchType != null && value.GetType() != info.matchType) { cursorRecordResult = CursorRecordResult.Skip; return true; } - keys.Add(key); + info.keys.Add(key); cursorRecordResult = CursorRecordResult.Accept; return true; } @@ -285,12 +292,23 @@ public void OnStop(bool completed, long numberOfRecords) { } public void OnException(Exception exception, long numberOfRecords) { } } - internal sealed class MainStoreGetDBSize : IScanIteratorFunctions + internal class GetDBSizeInfo { - // This must be a class as it is passed through pending IO operations + // This must be a class as it is passed through pending IO operations, so it is wrapped by higher structures for inlining as a generic type arg. internal int count; internal void Initialize() => count = 0; + } + + internal sealed class MainStoreGetDBSize : IScanIteratorFunctions + { + private readonly GetDBSizeInfo info; + + internal int Count => info.count; + + internal MainStoreGetDBSize() => info = new(); + + internal void Initialize() => info.Initialize(); public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) { @@ -299,7 +317,7 @@ public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata re else { cursorRecordResult = CursorRecordResult.Accept; - ++count; + ++info.count; } return true; } @@ -312,10 +330,13 @@ public void OnException(Exception exception, long numberOfRecords) { } internal sealed class ObjectStoreGetDBSize : IScanIteratorFunctions { - // This must be a class as it is passed through pending IO operations - internal int count; + private readonly GetDBSizeInfo info; - internal void Initialize() => count = 0; + internal int Count => info.count; + + internal ObjectStoreGetDBSize() => info = new(); + + internal void Initialize() => info.Initialize(); public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) { @@ -324,7 +345,7 @@ public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata else { cursorRecordResult = CursorRecordResult.Accept; - ++count; + ++info.count; } return true; } diff --git a/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/BenchmarkDotNetTestsApp.cs b/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/BenchmarkDotNetTestsApp.cs index 85baa0ed05..ef5777c186 100644 --- a/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/BenchmarkDotNetTestsApp.cs +++ b/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/BenchmarkDotNetTestsApp.cs @@ -11,6 +11,20 @@ public class BenchmarkDotNetTestsApp public static void Main(string[] args) { + // Check for debugging a test + if (args[0].ToLower() == "cursor") + { + var test = new IterationTests + { + FlushAndEvict = true + }; + test.SetupPopulatedStore(); + test.Cursor(); + test.TearDown(); + return; + } + + // Do regular invocation. BenchmarkSwitcher.FromAssembly(typeof(BenchmarkDotNetTestsApp).Assembly).Run(args); } } diff --git a/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs b/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs new file mode 100644 index 0000000000..519c32c55c --- /dev/null +++ b/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs @@ -0,0 +1,135 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using Tsavorite.core; + +#pragma warning disable 0649 // Field 'field' is never assigned to, and will always have its default value 'value'; happens due to [Params(..)] +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member +#pragma warning disable IDE0048 // Add parentheses for clarity +#pragma warning disable IDE0130 // Namespace does not match folder structure + +namespace BenchmarkDotNetTests +{ +#pragma warning disable IDE0065 // Misplaced using directive + using SpanByteStoreFunctions = StoreFunctions; + + [GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory, BenchmarkLogicalGroupRule.ByParams)] + public class IterationTests + { + const int NumRecords = 1_000_000; + + [Params(true, false)] + public bool FlushAndEvict; + + TsavoriteKV> store; + IDevice logDevice; + string logDirectory; + + void SetupStore() + { + logDirectory = BenchmarkDotNetTestsApp.TestDirectory; + var logFilename = Path.Combine(logDirectory, $"{nameof(IterationTests)}_{Guid.NewGuid()}.log"); + logDevice = Devices.CreateLogDevice(logFilename, preallocateFile: true, deleteOnClose: true, useIoCompletionPort: true); + + store = new(new() + { + IndexSize = 1L << 26, + LogDevice = logDevice + }, StoreFunctions.Create() + , (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions) + ); + } + + unsafe void PopulateStore() + { + using var session = store.NewSession>(new()); + var bContext = session.BasicContext; + + Span keyVec = stackalloc byte[sizeof(long)]; + var keySpanByte = SpanByte.FromPinnedSpan(keyVec); + + Span valueVec = stackalloc byte[sizeof(long)]; + var valueSpanByte = SpanByte.FromPinnedSpan(valueVec); + + for (long ii = 0; ii < NumRecords; ++ii) + { + *(long*)keySpanByte.ToPointer() = ii; + *(long*)valueSpanByte.ToPointer() = ii + NumRecords; + _ = bContext.Upsert(keySpanByte, valueSpanByte); + } + + if (FlushAndEvict) + store.Log.FlushAndEvict(wait: true); + } + + [GlobalSetup] + public void SetupPopulatedStore() + { + SetupStore(); + PopulateStore(); + } + + [GlobalCleanup] + public void TearDown() + { + store?.Dispose(); + store = null; + logDevice?.Dispose(); + logDevice = null; + try + { + Directory.Delete(logDirectory); + } + catch { } + } + + [BenchmarkCategory("Cursor"), Benchmark] + public void Cursor() + { + using var session = store.NewSession>(new()); + + var scanFunctions = new ScanFunctions(); + var cursor = 0L; + session.ScanCursor(ref cursor, long.MaxValue, scanFunctions); + if (scanFunctions.Count < NumRecords) + throw new ApplicationException($"Incomplete iteration; {scanFunctions.Count} of {NumRecords} records returned"); + } + + class ScanCounter + { + internal int count; + } + + internal struct ScanFunctions : IScanIteratorFunctions + { + private readonly ScanCounter counter; + + internal readonly int Count => counter.count; + + public ScanFunctions() => counter = new(); + + /// + public bool OnStart(long beginAddress, long endAddress) => true; + + /// + public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) + { + ++counter.count; + cursorRecordResult = CursorRecordResult.Accept; + return true; + } + + /// + public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) + => SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult); + + /// + public void OnStop(bool completed, long numberOfRecords) { } + + /// + public void OnException(Exception exception, long numberOfRecords) { } + } + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs index 411c8034be..4ab5247f20 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs @@ -214,7 +214,7 @@ private protected bool ScanLookup(TSessionFunctionsWrapper sessionFunctions, ScanCursorState scanCursorState, RecordInfo recordInfo, ref TKey key, ref TValue value, long minAddress) + internal Status ConditionalScanPush(TSessionFunctionsWrapper sessionFunctions, ScanCursorState scanCursorState, RecordInfo recordInfo, + ref TKey key, ref TValue value, long currentAddress, long minAddress) where TSessionFunctionsWrapper : ISessionFunctionsWrapper { Debug.Assert(epoch.ThisInstanceProtected(), "This is called only from ScanLookup so the epoch should be protected"); @@ -258,7 +259,7 @@ internal Status ConditionalScanPush(sessionFunctions, ref key, ref stackCtx, minAddress, out internalStatus, out needIO)) + if (sessionFunctions.Store.TryFindRecordInMainLogForConditionalOperation(sessionFunctions, ref key, ref stackCtx, currentAddress, minAddress, out internalStatus, out needIO)) return Status.CreateFound(); } while (sessionFunctions.Store.HandleImmediateNonPendingRetryStatus(internalStatus, sessionFunctions)); diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/BasicContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/BasicContext.cs index 8a8f546424..8f610740dd 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/BasicContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/BasicContext.cs @@ -429,15 +429,16 @@ public void Refresh() /// /// /// + /// LogicalAddress of the record to be copied /// Lower-bound address (addresses are searched from tail (high) to head (low); do not search for "future records" earlier than this) [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status CompactionCopyToTail(ref TKey key, ref TInput input, ref TValue value, ref TOutput output, long untilAddress) + internal Status CompactionCopyToTail(ref TKey key, ref TInput input, ref TValue value, ref TOutput output, long currentAddress, long untilAddress) { UnsafeResumeThread(); try { return store.CompactionConditionalCopyToTail, TStoreFunctions, TAllocator>>( - sessionFunctions, ref key, ref input, ref value, ref output, untilAddress); + sessionFunctions, ref key, ref input, ref value, ref output, currentAddress, untilAddress); } finally { @@ -452,15 +453,16 @@ internal Status CompactionCopyToTail(ref TKey key, ref TInput input, ref TValue /// /// /// + /// LogicalAddress of the record to be copied /// Lower-bound address (addresses are searched from tail (high) to head (low); do not search for "future records" earlier than this) [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status ConditionalScanPush(ScanCursorState scanCursorState, RecordInfo recordInfo, ref TKey key, ref TValue value, long untilAddress) + internal Status ConditionalScanPush(ScanCursorState scanCursorState, RecordInfo recordInfo, ref TKey key, ref TValue value, long currentAddress, long untilAddress) { UnsafeResumeThread(); try { return store.hlogBase.ConditionalScanPush, TStoreFunctions, TAllocator>>( - sessionFunctions, scanCursorState, recordInfo, ref key, ref value, untilAddress); + sessionFunctions, scanCursorState, recordInfo, ref key, ref value, currentAddress, untilAddress); } finally { diff --git a/libs/storage/Tsavorite/cs/src/core/Compaction/TsavoriteCompaction.cs b/libs/storage/Tsavorite/cs/src/core/Compaction/TsavoriteCompaction.cs index c2da08be48..4bef9caa87 100644 --- a/libs/storage/Tsavorite/cs/src/core/Compaction/TsavoriteCompaction.cs +++ b/libs/storage/Tsavorite/cs/src/core/Compaction/TsavoriteCompaction.cs @@ -54,7 +54,7 @@ private long CompactLookup 256) { storebContext.CompletePending(wait: true); @@ -135,7 +135,7 @@ private long CompactScan= untilAddress (scan boundary), we are safe to copy the old record // to the tail. We don't know the actualAddress of the key in the main kv, but we it will not be below untilAddress. - var status = storebContext.CompactionCopyToTail(ref iter3.GetKey(), ref input, ref iter3.GetValue(), ref output, untilAddress - 1); + var status = storebContext.CompactionCopyToTail(ref iter3.GetKey(), ref input, ref iter3.GetValue(), ref output, iter3.CurrentAddress, untilAddress - 1); if (status.IsPending && ++numPending > 256) { storebContext.CompletePending(wait: true); diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ConditionalCopyToTail.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ConditionalCopyToTail.cs index 525f6dc6f7..04bf1aeff3 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ConditionalCopyToTail.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ConditionalCopyToTail.cs @@ -70,7 +70,7 @@ private OperationStatus ConditionalCopyToTail(sessionFunctions, ref key, ref stackCtx2, minAddress, out status, out needIO)) + if (TryFindRecordInMainLogForConditionalOperation(sessionFunctions, ref key, ref stackCtx2, stackCtx.recSrc.LogicalAddress, minAddress, out status, out needIO)) return OperationStatus.SUCCESS; } while (HandleImmediateNonPendingRetryStatus(status, sessionFunctions)); @@ -90,7 +90,7 @@ private OperationStatus ConditionalCopyToTail(TSessionFunctionsWrapper sessionFunctions, ref TKey key, ref TInput input, ref TValue value, - ref TOutput output, long minAddress) + ref TOutput output, long currentAddress, long minAddress) where TSessionFunctionsWrapper : ISessionFunctionsWrapper { Debug.Assert(epoch.ThisInstanceProtected(), "This is called only from Compaction so the epoch should be protected"); @@ -101,7 +101,7 @@ internal Status CompactionConditionalCopyToTail(sessionFunctions, ref key, ref stackCtx, minAddress, out status, out needIO)) + if (TryFindRecordInMainLogForConditionalOperation(sessionFunctions, ref key, ref stackCtx, currentAddress, minAddress, out status, out needIO)) return Status.CreateFound(); } while (sessionFunctions.Store.HandleImmediateNonPendingRetryStatus(status, sessionFunctions)); diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs index 5af24c5a87..a1c2a547d8 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs @@ -293,7 +293,7 @@ internal OperationStatus ContinuePendingConditionalCopyToTail(sessionFunctions, ref key, ref stackCtx, minAddress, out internalStatus, out bool needIO)) + if (TryFindRecordInMainLogForConditionalOperation(sessionFunctions, ref key, ref stackCtx, currentAddress: request.logicalAddress, minAddress, out internalStatus, out bool needIO)) return OperationStatus.SUCCESS; if (!OperationStatusUtils.IsRetry(internalStatus)) { @@ -344,7 +344,7 @@ internal OperationStatus ContinuePendingConditionalScanPush(sessionFunctions, pendingContext.scanCursorState, pendingContext.recordInfo, ref pendingContext.key.Get(), ref pendingContext.value.Get(), - minAddress: pendingContext.InitialLatestLogicalAddress + 1); + currentAddress: request.logicalAddress, minAddress: pendingContext.InitialLatestLogicalAddress + 1); // ConditionalScanPush has already called HandleOperationStatus, so return SUCCESS here. return OperationStatus.SUCCESS; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/FindRecord.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/FindRecord.cs index fbe6c7fec6..335197d956 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/FindRecord.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/FindRecord.cs @@ -51,40 +51,44 @@ internal bool TryFindRecordInMainLog(ref TKey key, ref OperationStackContext(TSessionFunctionsWrapper sessionFunctions, - ref TKey key, ref OperationStackContext stackCtx, long minAddress, out OperationStatus internalStatus, out bool needIO) + ref TKey key, ref OperationStackContext stackCtx, long currentAddress, long minAddress, out OperationStatus internalStatus, out bool needIO) where TSessionFunctionsWrapper : ISessionFunctionsWrapper { + if (!FindTag(ref stackCtx.hei)) + { + internalStatus = OperationStatus.NOTFOUND; + return needIO = false; + } + internalStatus = OperationStatus.SUCCESS; + if (!stackCtx.hei.IsReadCache) + { + // If the address in the HashBucketEntry is the current address, there'll be no record above it, so return false (not found). + // If there are no valid records in the HashBucketEntry (minAddress is inclusive), return false (not found). + if (stackCtx.hei.Address == currentAddress || stackCtx.hei.Address < minAddress || stackCtx.hei.Address < hlogBase.BeginAddress) + { + stackCtx.SetRecordSourceToHashEntry(hlogBase); + return needIO = false; + } + if (stackCtx.hei.Address < hlogBase.HeadAddress) + { + stackCtx.SetRecordSourceToHashEntry(hlogBase); + needIO = true; + return false; + } + } + if (RevivificationManager.UseFreeRecordPool) { // The TransientSLock here is necessary only for the tag chain to avoid record elision/revivification during traceback. - if (!FindTagAndTryTransientSLock(sessionFunctions, ref key, ref stackCtx, out internalStatus)) + if (!TryTransientSLock(sessionFunctions, ref key, ref stackCtx, out internalStatus)) return needIO = false; } else - { - if (!FindTag(ref stackCtx.hei)) - { - internalStatus = OperationStatus.NOTFOUND; - return needIO = false; - } stackCtx.SetRecordSourceToHashEntry(hlogBase); - } try { - // minAddress is inclusive - if (!stackCtx.hei.IsReadCache) - { - if (stackCtx.hei.Address < minAddress) - return needIO = false; - if (stackCtx.hei.Address < hlogBase.HeadAddress) - { - needIO = stackCtx.hei.Address >= hlogBase.BeginAddress; - return false; - } - } - if (UseReadCache) SkipReadCache(ref stackCtx, out _); // Where this is called, we have no dependency on source addresses so we don't care if it Refreshed