Skip to content

Commit

Permalink
Tsavorite: Fix CopyToTail to avoid stale record insertion (#449)
Browse files Browse the repository at this point in the history
* Bring over FASTER fix for CopyToTail that accounts for new records for the same key that were added and then went to disk during the pending Read operation
Add RMWInfo.IsFromPending (for consistency with ReadInfo.IsFromPending)

* Omit the new check when doing ReadAtAddress

* trailing-whitespace 'format' fix
  • Loading branch information
TedHartMS authored Jun 7, 2024
1 parent 0acff38 commit 8c24b1c
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal struct PendingContext<Input, Output, Context>
internal const ushort kNoOpFlags = 0;
internal const ushort kNoKey = 0x0001;
internal const ushort kIsAsync = 0x0002;
internal const ushort kIsReadAtAddress = 0x0004;

internal ReadCopyOptions readCopyOptions; // Two byte enums
internal WriteReason writeReason; // for ConditionalCopyToTail; one byte enum
Expand Down Expand Up @@ -76,6 +77,12 @@ internal bool IsAsync
set => operationFlags = value ? (ushort)(operationFlags | kIsAsync) : (ushort)(operationFlags & ~kIsAsync);
}

internal bool IsReadAtAddress
{
readonly get => (operationFlags & kIsReadAtAddress) != 0;
set => operationFlags = value ? (ushort)(operationFlags | kIsReadAtAddress) : (ushort)(operationFlags & ~kIsReadAtAddress);
}

// RecordInfo is not used as such during the pending phase, so we reuse the space here.
internal long InitialEntryAddress
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ public struct RMWInfo
/// </summary>
public bool PreserveCopyUpdaterSourceRecord { get; set; }

/// <summary>
/// Whether the call is from sync or async (pending) path
/// </summary>
public bool IsFromPending { get; internal set; }

/// <summary>
/// What actions Tsavorite should perform on a false return from the ISessionFunctions method
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,44 @@ internal OperationStatus ContinuePendingRead<Input, Output, Context, TSessionFun

try
{
// During the pending operation, a record for the key may have been added to the log or readcache.
ref var value = ref hlog.GetContextRecordValue(ref request);
if (TryFindRecordInMemory(ref key, ref stackCtx, ref pendingContext))

// During the pending operation, a record for the key may have been added to the log or readcache. Don't look for this if we are reading at address (rather than key).
if (!pendingContext.IsReadAtAddress)
{
srcRecordInfo = ref stackCtx.recSrc.GetInfo();
if (TryFindRecordInMemory(ref key, ref stackCtx, ref pendingContext))
{
srcRecordInfo = ref stackCtx.recSrc.GetInfo();

// V threads cannot access V+1 records. Use the latest logical address rather than the traced address (logicalAddress) per comments in AcquireCPRLatchRMW.
if (sessionFunctions.Ctx.phase == Phase.PREPARE && IsEntryVersionNew(ref stackCtx.hei.entry))
return OperationStatus.CPR_SHIFT_DETECTED; // Pivot thread; retry
value = ref stackCtx.recSrc.GetValue();
// V threads cannot access V+1 records. Use the latest logical address rather than the traced address (logicalAddress) per comments in AcquireCPRLatchRMW.
if (sessionFunctions.Ctx.phase == Phase.PREPARE && IsEntryVersionNew(ref stackCtx.hei.entry))
return OperationStatus.CPR_SHIFT_DETECTED; // Pivot thread; retry
value = ref stackCtx.recSrc.GetValue();
}
else
{
// We didn't find a record for the key in memory, but if recSrc.LogicalAddress (which is the .PreviousAddress of the lowest record
// above InitialLatestLogicalAddress we could reach) is > InitialLatestLogicalAddress, then it means InitialLatestLogicalAddress is
// now below HeadAddress and there is at least one record below HeadAddress but above InitialLatestLogicalAddress. Reissue the Read(),
// using the LogicalAddress we just found as minAddress. We will either find an in-memory version of the key that was added after the
// TryFindRecordInMemory we just did, or do IO and find the record we just found or one above it. Read() updates InitialLatestLogicalAddress,
// so if we do IO, the next time we come to CompletePendingRead we will only search for a newer version of the key in any records added
// after our just-completed TryFindRecordInMemory.
if (stackCtx.recSrc.LogicalAddress > pendingContext.InitialLatestLogicalAddress
&& (!pendingContext.HasMinAddress || stackCtx.recSrc.LogicalAddress >= pendingContext.minAddress))
{
OperationStatus internalStatus;
do
{
internalStatus = InternalRead(ref key, pendingContext.keyHash, ref pendingContext.input.Get(), ref pendingContext.output,
pendingContext.userContext, ref pendingContext, sessionFunctions);
}
while (HandleImmediateRetryStatus(internalStatus, sessionFunctions, ref pendingContext));
return internalStatus;
}
}
}

if (srcRecordInfo.Tombstone)
goto NotFound;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ internal OperationStatus InternalRMW<Input, Output, Context, TSessionFunctionsWr
Version = sessionFunctions.Ctx.version,
SessionID = sessionFunctions.Ctx.sessionID,
Address = stackCtx.recSrc.LogicalAddress,
KeyHash = stackCtx.hei.hash
KeyHash = stackCtx.hei.hash,
IsFromPending = pendingContext.type != OperationType.NONE,
};

rmwInfo.SetRecordInfo(ref srcRecordInfo);
Expand Down Expand Up @@ -363,7 +364,8 @@ private OperationStatus CreateNewRecordRMW<Input, Output, Context, TSessionFunct
Version = sessionFunctions.Ctx.version,
SessionID = sessionFunctions.Ctx.sessionID,
Address = doingCU && !stackCtx.recSrc.HasReadCacheSrc ? stackCtx.recSrc.LogicalAddress : Constants.kInvalidAddress,
KeyHash = stackCtx.hei.hash
KeyHash = stackCtx.hei.hash,
IsFromPending = pendingContext.type != OperationType.NONE,
};

// Perform Need*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ internal OperationStatus InternalReadAtAddress<Input, Output, Context, TSessionF
if (readAtAddress < hlog.BeginAddress)
return OperationStatus.NOTFOUND;

pendingContext.IsReadAtAddress = true;

// We do things in a different order here than in InternalRead, in part to handle NoKey (especially with Revivification).
if (readAtAddress < hlog.HeadAddress)
{
Expand Down
5 changes: 0 additions & 5 deletions libs/storage/Tsavorite/cs/test/BasicTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,6 @@ void VerifyResult()
// ReadCache is used when the record is read from disk.
store.Log.FlushAndEvict(wait: true);

// DisableReadCacheUpdates is primarily for indexing, so a read during index scan does not result in a readcache update.
// Reading at a normal logical address will not use the readcache, because the "readcache" bit is not set in that logical address.
// And we cannot get a readcache address, since reads satisfied from the readcache pass kInvalidAddress to functions.
// Therefore, we test here simply that we do not put it in the readcache when we tell it not to.

// Do not put it into the read cache.
functions.expectedReadAddress = readAtAddress;
ReadOptions readOptions = new() { CopyOptions = ReadCopyOptions.None };
Expand Down
140 changes: 136 additions & 4 deletions libs/storage/Tsavorite/cs/test/CompletePendingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,41 @@
using System.Threading.Tasks;
using NUnit.Framework;
using Tsavorite.core;
using static Tsavorite.test.TestUtils;

namespace Tsavorite.test
{
public struct LocalKeyStructComparer : ITsavoriteEqualityComparer<KeyStruct>
{
internal long? forceCollisionHash;

public long GetHashCode64(ref KeyStruct key)
{
return forceCollisionHash.HasValue ? forceCollisionHash.Value : Utility.GetHashCode(key.kfield1);
}
public bool Equals(ref KeyStruct k1, ref KeyStruct k2)
{
return k1.kfield1 == k2.kfield1 && k1.kfield2 == k2.kfield2;
}

public override string ToString() => $"forceHashCollision: {forceCollisionHash}";
}

[TestFixture]
class CompletePendingTests
{
private TsavoriteKV<KeyStruct, ValueStruct> store;
private IDevice log;
LocalKeyStructComparer comparer = new();

[SetUp]
public void Setup()
{
// Clean up log files from previous test runs in case they weren't cleaned up
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
DeleteDirectory(MethodTestDir, wait: true);

log = Devices.CreateLogDevice(Path.Join(TestUtils.MethodTestDir, "CompletePendingTests.log"), preallocateFile: true, deleteOnClose: true);
store = new TsavoriteKV<KeyStruct, ValueStruct>(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 });
log = Devices.CreateLogDevice(Path.Join(MethodTestDir, "CompletePendingTests.log"), preallocateFile: true, deleteOnClose: true);
store = new TsavoriteKV<KeyStruct, ValueStruct>(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, comparer: comparer);
}

[TearDown]
Expand All @@ -32,7 +50,7 @@ public void TearDown()
store = null;
log?.Dispose();
log = null;
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
DeleteDirectory(MethodTestDir, wait: true);
}

const int numRecords = 1000;
Expand Down Expand Up @@ -216,5 +234,119 @@ public async ValueTask ReadAndCompleteWithPendingOutput([Values] bool useRMW)
Assert.AreEqual(address, recordMetadata.Address);
}
}
public class PendingReadFunctions<TContext> : SessionFunctionsBase<KeyStruct, ValueStruct, InputStruct, OutputStruct, Empty>
{
public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, Empty ctx, Status status, RecordMetadata recordMetadata)
{
Assert.IsTrue(status.Found);
Assert.AreEqual(key.kfield1, output.value.vfield1);
// Do not compare field2; that's our updated value, and the key won't be found if we change kfield2
}

// Read functions
public override bool SingleReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref ReadInfo readInfo)
{
Assert.IsFalse(readInfo.RecordInfo.IsNull());
dst.value = value;
return true;
}

public override bool ConcurrentReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref ReadInfo readInfo, ref RecordInfo recordInfo)
=> SingleReader(ref key, ref input, ref value, ref dst, ref readInfo);
}

[Test]
[Category("FasterKV")]
public void ReadPendingWithNewSameKey([Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode secondRecordFlushMode)
{
const int valueMult = 1000;

using var session = store.NewSession<InputStruct, OutputStruct, Empty, PendingReadFunctions<ContextStruct>>(new PendingReadFunctions<ContextStruct>());
var bContext = session.BasicContext;

// Insert first record
var firstValue = 0; // same as key
var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
bContext.Upsert(ref keyStruct, ref valueStruct);

// Flush to make the Read() go pending.
store.Log.FlushAndEvict(wait: true);

var (status, outputStruct) = bContext.Read(keyStruct);
Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");

// Insert next record with the same key and flush this too if requested.
var secondValue = firstValue + 1;
valueStruct.vfield2 = secondValue * valueMult;
bContext.Upsert(ref keyStruct, ref valueStruct);
if (secondRecordFlushMode == FlushMode.OnDisk)
store.Log.FlushAndEvict(wait: true);

(status, outputStruct) = bContext.GetSinglePendingResult();
Assert.AreEqual(secondValue * valueMult, outputStruct.value.vfield2, "Should have returned second value");
}

[Test]
[Category("FasterKV")]
public void ReadPendingWithNewDifferentKeyInChain([Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode secondRecordFlushMode)
{
const int valueMult = 1000;

using var session = store.NewSession<InputStruct, OutputStruct, Empty, PendingReadFunctions<ContextStruct>>(new PendingReadFunctions<ContextStruct>());
var bContext = session.BasicContext;

// Insert first record
var firstValue = 0; // same as key
var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
bContext.Upsert(ref keyStruct, ref valueStruct);

// Force collisions to test having another key in the chain
comparer.forceCollisionHash = keyStruct.GetHashCode64(ref keyStruct);

// Flush to make the Read() go pending.
store.Log.FlushAndEvict(wait: true);

var (status, outputStruct) = bContext.Read(keyStruct);
Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");

// Insert next record with a different key and flush this too if requested.
var secondValue = firstValue + 1;
keyStruct = new() { kfield1 = secondValue, kfield2 = secondValue * valueMult };
valueStruct = new() { vfield1 = secondValue, vfield2 = secondValue * valueMult };
bContext.Upsert(ref keyStruct, ref valueStruct);
if (secondRecordFlushMode == FlushMode.OnDisk)
store.Log.FlushAndEvict(wait: true);

(status, outputStruct) = bContext.GetSinglePendingResult();
Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "Should have returned first value");
}

[Test]
[Category("FasterKV")]
public void ReadPendingWithNoNewKey()
{
// Basic test of pending read
const int valueMult = 1000;

using var session = store.NewSession<InputStruct, OutputStruct, Empty, PendingReadFunctions<ContextStruct>>(new PendingReadFunctions<ContextStruct>());
var bContext = session.BasicContext;

// Insert first record
var firstValue = 0; // same as key
var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
bContext.Upsert(ref keyStruct, ref valueStruct);

// Flush to make the Read() go pending.
store.Log.FlushAndEvict(wait: true);

var (status, outputStruct) = bContext.Read(keyStruct);
Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");

(status, outputStruct) = bContext.GetSinglePendingResult();
Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "Should have returned first value");
}
}
}

0 comments on commit 8c24b1c

Please sign in to comment.