Skip to content

Commit

Permalink
Tsavorite session reorganization (#436)
Browse files Browse the repository at this point in the history
* - Rename IFunctions to ISessionFunctions (in prep for StoreFunctions)
- Clean up ITsavoriteSession to SessionFunctionsWrapper, minimizing the per-Context code to just the locking portion
- remove RUMD (Read/Upsert/rMw/Delete) functionality from ClientSession; users should use one of the 4 Contexts (Basic, Lockable, Unsafe, LockableUnsafe) for all such actions. This changed a lot of tests

* Clean up more remnants of "TsavoriteSession" naming

* Replace IHeapConvertible with ISessionFunctions.ConvertOutputToHeap

* 'format' whitespace fix

* Fix YCSB build--for some reason it was being skipped

* fix another 'format' whitespace issue

* WIP remove {RUIMD}Async APIs from Tsavorite

* Fix some Tsavorite.tests to use SpanByteAndMemory.AsReadOnlySpan()
Fix some 'format' whitespace issues
Minor other Tsavorite.test cleanup, mostly related to removal of RUMDAsync

* Minor Tsavorite reorg: Break the various contexts into their own files, and move RecoveryInfo into CheckpointManagement.

* more "format" fun

* see previous message

* Fix .sln configuration for Tsavorite.benchmark

* Add ITsavoriteContext.Session and .IsNull
Remove ClientSession member variables in Garnet
  • Loading branch information
TedHartMS authored Jun 4, 2024
1 parent 2dff684 commit a7077a1
Show file tree
Hide file tree
Showing 180 changed files with 3,564 additions and 6,745 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata re
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
var s = HashSlotUtils.HashSlot(key.ToPointer(), key.Length);
if (slots.Contains(s))
session.Delete(key);
session.BasicContext.Delete(key);
return true;
}

Expand Down
55 changes: 29 additions & 26 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public sealed unsafe partial class AofProcessor
/// <summary>
/// Session for main store
/// </summary>
readonly ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session = null;
readonly BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext;

/// <summary>
/// Session for object store
/// </summary>
readonly ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> objectStoreSession = null;
readonly BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> objectStoreBasicContext;

readonly Dictionary<int, List<byte[]>> inflightTxns;
readonly byte[] buffer;
Expand Down Expand Up @@ -75,8 +75,11 @@ public AofProcessor(

this.respServerSession = new RespServerSession(null, replayAofStoreWrapper, null);

session = respServerSession.storageSession.session;
objectStoreSession = respServerSession.storageSession.objectStoreSession;
var session = respServerSession.storageSession.basicContext.Session;
basicContext = session.BasicContext;
var objectStoreSession = respServerSession.storageSession.objectStoreBasicContext.Session;
if (objectStoreSession is not null)
objectStoreBasicContext = objectStoreSession.BasicContext;

inflightTxns = new Dictionary<int, List<byte[]>>();
buffer = new byte[BufferSizeUtils.ServerBufferSize(new MaxSizeSettings())];
Expand All @@ -90,8 +93,8 @@ public AofProcessor(
/// </summary>
public void Dispose()
{
session?.Dispose();
objectStoreSession?.Dispose();
basicContext.Session?.Dispose();
objectStoreBasicContext.Session?.Dispose();
handle.Free();
}

Expand Down Expand Up @@ -232,22 +235,22 @@ private unsafe bool ReplayOp(byte* entryPtr)
switch (header.opType)
{
case AofEntryType.StoreUpsert:
StoreUpsert(session, entryPtr);
StoreUpsert(basicContext, entryPtr);
break;
case AofEntryType.StoreRMW:
StoreRMW(session, entryPtr);
StoreRMW(basicContext, entryPtr);
break;
case AofEntryType.StoreDelete:
StoreDelete(session, entryPtr);
StoreDelete(basicContext, entryPtr);
break;
case AofEntryType.ObjectStoreRMW:
ObjectStoreRMW(objectStoreSession, entryPtr, bufferPtr, buffer.Length);
ObjectStoreRMW(objectStoreBasicContext, entryPtr, bufferPtr, buffer.Length);
break;
case AofEntryType.ObjectStoreUpsert:
ObjectStoreUpsert(objectStoreSession, storeWrapper.GarnetObjectSerializer, entryPtr, bufferPtr, buffer.Length);
ObjectStoreUpsert(objectStoreBasicContext, storeWrapper.GarnetObjectSerializer, entryPtr, bufferPtr, buffer.Length);
break;
case AofEntryType.ObjectStoreDelete:
ObjectStoreDelete(objectStoreSession, entryPtr);
ObjectStoreDelete(objectStoreBasicContext, entryPtr);
break;
case AofEntryType.StoredProcedure:
ref var input = ref Unsafe.AsRef<SpanByte>(entryPtr + sizeof(AofHeader));
Expand All @@ -259,37 +262,37 @@ private unsafe bool ReplayOp(byte* entryPtr)
return true;
}

static unsafe void StoreUpsert(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session, byte* ptr)
static unsafe void StoreUpsert(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize + input.TotalSize);

SpanByteAndMemory output = default;
session.Upsert(ref key, ref input, ref value, ref output);
basicContext.Upsert(ref key, ref input, ref value, ref output);
if (!output.IsSpanByte)
output.Memory.Dispose();
}

static unsafe void StoreRMW(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session, byte* ptr)
static unsafe void StoreRMW(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext, byte* ptr)
{
byte* pbOutput = stackalloc byte[32];
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var output = new SpanByteAndMemory(pbOutput, 32);
if (session.RMW(ref key, ref input, ref output).IsPending)
session.CompletePending(true);
if (basicContext.RMW(ref key, ref input, ref output).IsPending)
basicContext.CompletePending(true);
if (!output.IsSpanByte)
output.Memory.Dispose();
}

static unsafe void StoreDelete(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> session, byte* ptr)
static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
session.Delete(ref key);
basicContext.Delete(ref key);
}

static unsafe void ObjectStoreUpsert(ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> session, GarnetObjectSerializer garnetObjectSerializer, byte* ptr, byte* outputPtr, int outputLength)
static unsafe void ObjectStoreUpsert(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, GarnetObjectSerializer garnetObjectSerializer, byte* ptr, byte* outputPtr, int outputLength)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();
Expand All @@ -299,29 +302,29 @@ static unsafe void ObjectStoreUpsert(ClientSession<byte[], IGarnetObject, SpanBy
var valB = garnetObjectSerializer.Deserialize(value.ToByteArray());

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
session.Upsert(ref keyB, ref valB);
basicContext.Upsert(ref keyB, ref valB);
if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreRMW(ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> session, byte* ptr, byte* outputPtr, int outputLength)
static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr, byte* outputPtr, int outputLength)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();

ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
if (session.RMW(ref keyB, ref input, ref output).IsPending)
session.CompletePending(true);
if (basicContext.RMW(ref keyB, ref input, ref output).IsPending)
basicContext.CompletePending(true);
if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreDelete(ClientSession<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> session, byte* ptr)
static unsafe void ObjectStoreDelete(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();
session.Delete(ref keyB);
basicContext.Delete(ref keyB);
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions libs/server/API/GarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace Garnet.server
/// Garnet API implementation
/// </summary>
public partial struct GarnetApi<TContext, TObjectContext> : IGarnetApi, IGarnetWatchApi
where TContext : ITsavoriteContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
where TContext : ITsavoriteContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
readonly StorageSession storageSession;
TContext context;
Expand Down
4 changes: 2 additions & 2 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Garnet.server
/// Garnet API implementation
/// </summary>
public partial struct GarnetApi<TContext, TObjectContext> : IGarnetApi, IGarnetWatchApi
where TContext : ITsavoriteContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
where TContext : ITsavoriteContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
#region SortedSet Methods

Expand Down
6 changes: 5 additions & 1 deletion libs/server/Objects/Types/GarnetObjectStoreOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Garnet.server
{
/// <summary>
/// Output type used by Garnet object store.
/// Does not convert to heap when going pending, because we immediately complete pending operations for object store.
/// </summary>
public struct GarnetObjectStoreOutput
{
Expand All @@ -20,5 +19,10 @@ public struct GarnetObjectStoreOutput
/// Garnet object
/// </summary>
public IGarnetObject garnetObject;

public void ConvertToHeap()
{
// Does not convert to heap when going pending, because we immediately complete pending operations for object store.
}
}
}
2 changes: 1 addition & 1 deletion libs/server/Providers/TsavoriteKVProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Garnet.server
/// [K, V, I, O, F, P]
/// </summary>
public abstract class TsavoriteKVProviderBase<Key, Value, Input, Output, Functions, ParameterSerializer> : ISessionProvider
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : ISessionFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/CallbackMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public void ReadCompletionCallback(ref SpanByte key, ref SpanByte input, ref SpanByteAndMemory output, long ctx, Status status, RecordMetadata recordMetadata)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/DeleteMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool SingleDeleter(ref SpanByte key, ref SpanByte value, ref DeleteInfo deleteInfo, ref RecordInfo recordInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/DisposeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public void DisposeSingleWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, WriteReason reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
readonly FunctionsState functionsState;

Expand All @@ -20,5 +20,12 @@ internal MainStoreFunctions(FunctionsState functionsState)
{
this.functionsState = functionsState;
}

/// <inheritdoc />
public void ConvertOutputToHeap(ref SpanByte input, ref SpanByteAndMemory output)
{
// TODO: Inspect input to determine whether we're in a context requiring ConvertToHeap.
//output.ConvertToHeap();
}
}
}
7 changes: 6 additions & 1 deletion libs/server/Storage/Functions/MainStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Diagnostics;
using Garnet.common;
using Tsavorite.core;

Expand All @@ -11,7 +12,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
static void CopyTo(ref SpanByte src, ref SpanByteAndMemory dst, MemoryPool<byte> memoryPool)
{
Expand Down Expand Up @@ -361,6 +362,8 @@ static bool InPlaceUpdateNumber(long val, ref SpanByte value, ref SpanByteAndMem
_ = value.ShrinkSerializedLength(ndigits + value.MetadataSize);
_ = NumUtils.LongToSpanByte(val, value.AsSpan());
rmwInfo.SetUsedValueLength(ref recordInfo, ref value, value.TotalSize);

Debug.Assert(output.IsSpanByte, "This code assumes it is called in-place and did not go pending");
value.AsReadOnlySpan().CopyTo(output.SpanByte.AsSpan());
output.SpanByte.Length = value.LengthWithoutMetadata;
return true;
Expand Down Expand Up @@ -502,6 +505,8 @@ void CopyRespNumber(long number, ref SpanByteAndMemory dst)
static void CopyValueLengthToOutput(ref SpanByte value, ref SpanByteAndMemory output)
{
int numDigits = NumUtils.NumDigits(value.LengthWithoutMetadata);

Debug.Assert(output.IsSpanByte, "This code assumes it is called in a non-pending context or in a pending context where dst.SpanByte's pointer remains valid");
var outputPtr = output.SpanByte.ToPointer();
NumUtils.IntToBytes(value.LengthWithoutMetadata, numDigits, ref outputPtr);
output.SpanByte.Length = numDigits;
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool NeedInitialUpdate(ref SpanByte key, ref SpanByte input, ref SpanByteAndMemory output, ref RMWInfo rmwInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/ReadMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool SingleReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory dst, ref ReadInfo readInfo)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/UpsertMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <inheritdoc />
public bool SingleWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, WriteReason reason, ref RecordInfo recordInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Garnet.server
/// <summary>
/// Callback functions for main store
/// </summary>
public readonly unsafe partial struct MainStoreFunctions : IFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
public readonly unsafe partial struct MainStoreFunctions : ISessionFunctions<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long>
{
/// <summary>
/// Parse ASCII byte array into long and validate that only contains ASCII decimal characters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public void ReadCompletionCallback(ref byte[] key, ref SpanByte input, ref GarnetObjectStoreOutput output, long ctx, Status status, RecordMetadata recordMetadata)
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/ObjectStore/DeleteMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Garnet.server
/// <summary>
/// Object store functions
/// </summary>
public readonly unsafe partial struct ObjectStoreFunctions : IFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public readonly unsafe partial struct ObjectStoreFunctions : ISessionFunctions<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
/// <inheritdoc />
public bool SingleDeleter(ref byte[] key, ref IGarnetObject value, ref DeleteInfo deleteInfo, ref RecordInfo recordInfo)
Expand Down
Loading

0 comments on commit a7077a1

Please sign in to comment.