Skip to content

Commit

Permalink
Add a quick check for tail address match during iteration (#677)
Browse files Browse the repository at this point in the history
* Add a quick check for tail address match during iteration

* Add struct wrapper around cursor scan functions
Add a BDN test (not yet working)

* Fix the iterator BDN

* fix formatting

---------

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TedHartMS and badrishc authored Sep 20, 2024
1 parent 96b75ec commit d7d52fd
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 78 deletions.
4 changes: 2 additions & 2 deletions libs/cluster/Session/ClusterCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ private int CountKeysInSessionStore(int slot)
{
ClusterKeyIterationFunctions.MainStoreCountKeys iterFuncs = new(slot);
_ = basicGarnetApi.IterateMainStore(ref iterFuncs);
return iterFuncs.keyCount;
return iterFuncs.KeyCount;
}

private int CountKeysInObjectStore(int slot)
Expand All @@ -29,7 +29,7 @@ private int CountKeysInObjectStore(int slot)
{
ClusterKeyIterationFunctions.ObjectStoreCountKeys iterFuncs = new(slot);
_ = basicGarnetApi.IterateObjectStore(ref iterFuncs);
return iterFuncs.keyCount;
return iterFuncs.KeyCount;
}
return 0;
}
Expand Down
33 changes: 23 additions & 10 deletions libs/cluster/Session/ClusterKeyIterationFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,30 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
{
internal static class ClusterKeyIterationFunctions
{
internal class KeyIterationInfo
{
// This must be a class as it is passed through pending IO operations, so it is wrapped by higher structures for inlining as a generic type arg.
internal int keyCount;
internal readonly int slot;

internal KeyIterationInfo(int slot) => this.slot = slot;
}

internal sealed class MainStoreCountKeys : IScanIteratorFunctions<SpanByte, SpanByte>
{
private readonly KeyIterationInfo info;
// This must be a class as it is passed through pending IO operations
internal int keyCount;
readonly int slot;

internal MainStoreCountKeys(int slot) => this.slot = slot;
internal int KeyCount { get => info.keyCount; set => info.keyCount = value; }
internal int Slot => info.slot;

internal MainStoreCountKeys(int slot) => info = new(slot);

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
if (HashSlotUtils.HashSlot(ref key) == slot && !Expired(ref value))
keyCount++;
if (HashSlotUtils.HashSlot(ref key) == Slot && !Expired(ref value))
KeyCount++;
return true;
}
public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
Expand All @@ -37,19 +48,21 @@ public void OnException(Exception exception, long numberOfRecords) { }

internal sealed class ObjectStoreCountKeys : IScanIteratorFunctions<byte[], IGarnetObject>
{
private readonly KeyIterationInfo info;
// This must be a class as it is passed through pending IO operations
internal int keyCount;
readonly int slot;

internal ObjectStoreCountKeys(int slot) => this.slot = slot;
internal int KeyCount { get => info.keyCount; set => info.keyCount = value; }
internal int Slot => info.slot;

internal ObjectStoreCountKeys(int slot) => info = new(slot);

public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here , out CursorRecordResult cursorRecordResult
fixed (byte* keyPtr = key)
{
if (HashSlotUtils.HashSlot(keyPtr, key.Length) == slot && !Expired(ref value))
keyCount++;
if (HashSlotUtils.HashSlot(keyPtr, key.Length) == Slot && !Expired(ref value))
KeyCount++;
}
return true;
}
Expand Down
83 changes: 52 additions & 31 deletions libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Garnet.server
{
sealed partial class StorageSession : IDisposable
{
// These are classes so instantiate once and re-initialize
// These contain classes so instantiate once and re-initialize
private ArrayKeyIterationFunctions.MainStoreGetDBSize mainStoreDbSizeFuncs;
private ArrayKeyIterationFunctions.ObjectStoreGetDBSize objectStoreDbSizeFuncs;

Expand Down Expand Up @@ -180,48 +180,61 @@ internal int DbSize()
mainStoreDbSizeFuncs.Initialize();
long cursor = 0;
basicContext.Session.ScanCursor(ref cursor, long.MaxValue, mainStoreDbSizeFuncs);
int count = mainStoreDbSizeFuncs.count;
int count = mainStoreDbSizeFuncs.Count;
if (objectStoreBasicContext.Session != null)
{
objectStoreDbSizeFuncs ??= new();
objectStoreDbSizeFuncs.Initialize();
cursor = 0;
objectStoreBasicContext.Session.ScanCursor(ref cursor, long.MaxValue, objectStoreDbSizeFuncs);
count += objectStoreDbSizeFuncs.count;
count += objectStoreDbSizeFuncs.Count;
}

return count;
}

internal static unsafe class ArrayKeyIterationFunctions
{
internal sealed class MainStoreGetDBKeys : IScanIteratorFunctions<SpanByte, SpanByte>
internal class GetDBKeysInfo
{
List<byte[]> keys;
byte* patternB;
int patternLength;
// This must be a class as it is passed through pending IO operations, so it is wrapped by higher structures for inlining as a generic type arg.
internal List<byte[]> keys;
internal byte* patternB;
internal int patternLength;
internal Type matchType;

internal void Initialize(List<byte[]> keys, byte* patternB, int length)
internal void Initialize(List<byte[]> keys, byte* patternB, int length, Type matchType = null)
{
this.keys = keys;
this.patternB = patternB;
this.patternLength = length;
this.matchType = matchType;
}
}

internal sealed class MainStoreGetDBKeys : IScanIteratorFunctions<SpanByte, SpanByte>
{
private readonly GetDBKeysInfo info;

internal MainStoreGetDBKeys() => info = new();

internal void Initialize(List<byte[]> keys, byte* patternB, int length)
=> info.Initialize(keys, patternB, length);

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> ConcurrentReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);

public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
if ((patternB != null && !GlobUtils.Match(patternB, patternLength, key.ToPointer(), key.Length, true))
if ((info.patternB != null && !GlobUtils.Match(info.patternB, info.patternLength, key.ToPointer(), key.Length, true))
|| (value.MetadataSize != 0 && MainSessionFunctions.CheckExpiry(ref value)))
{
cursorRecordResult = CursorRecordResult.Skip;
}
else
{
cursorRecordResult = CursorRecordResult.Accept;
keys.Add(key.ToByteArray());
info.keys.Add(key.ToByteArray());
}
return true;
}
Expand All @@ -233,18 +246,12 @@ public void OnException(Exception exception, long numberOfRecords) { }

internal sealed class ObjectStoreGetDBKeys : IScanIteratorFunctions<byte[], IGarnetObject>
{
List<byte[]> keys;
byte* patternB;
int patternLength;
private Type matchType;
private readonly GetDBKeysInfo info;

internal ObjectStoreGetDBKeys() => info = new();

internal void Initialize(List<byte[]> keys, byte* patternB, int length, Type matchType = null)
{
this.keys = keys;
this.patternB = patternB;
this.patternLength = length;
this.matchType = matchType;
}
=> info.Initialize(keys, patternB, length, matchType);

public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> ConcurrentReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
Expand All @@ -257,25 +264,25 @@ public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMeta
return true;
}

if (patternB != null)
if (info.patternB != null)
{
fixed (byte* keyPtr = key)
{
if (!GlobUtils.Match(patternB, patternLength, keyPtr, key.Length, true))
if (!GlobUtils.Match(info.patternB, info.patternLength, keyPtr, key.Length, true))
{
cursorRecordResult = CursorRecordResult.Skip;
return true;
}
}
}

if (matchType != null && value.GetType() != matchType)
if (info.matchType != null && value.GetType() != info.matchType)
{
cursorRecordResult = CursorRecordResult.Skip;
return true;
}

keys.Add(key);
info.keys.Add(key);
cursorRecordResult = CursorRecordResult.Accept;
return true;
}
Expand All @@ -285,12 +292,23 @@ public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal sealed class MainStoreGetDBSize : IScanIteratorFunctions<SpanByte, SpanByte>
internal class GetDBSizeInfo
{
// This must be a class as it is passed through pending IO operations
// This must be a class as it is passed through pending IO operations, so it is wrapped by higher structures for inlining as a generic type arg.
internal int count;

internal void Initialize() => count = 0;
}

internal sealed class MainStoreGetDBSize : IScanIteratorFunctions<SpanByte, SpanByte>
{
private readonly GetDBSizeInfo info;

internal int Count => info.count;

internal MainStoreGetDBSize() => info = new();

internal void Initialize() => info.Initialize();

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
Expand All @@ -299,7 +317,7 @@ public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata re
else
{
cursorRecordResult = CursorRecordResult.Accept;
++count;
++info.count;
}
return true;
}
Expand All @@ -312,10 +330,13 @@ public void OnException(Exception exception, long numberOfRecords) { }

internal sealed class ObjectStoreGetDBSize : IScanIteratorFunctions<byte[], IGarnetObject>
{
// This must be a class as it is passed through pending IO operations
internal int count;
private readonly GetDBSizeInfo info;

internal void Initialize() => count = 0;
internal int Count => info.count;

internal ObjectStoreGetDBSize() => info = new();

internal void Initialize() => info.Initialize();

public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
Expand All @@ -324,7 +345,7 @@ public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata
else
{
cursorRecordResult = CursorRecordResult.Accept;
++count;
++info.count;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ public class BenchmarkDotNetTestsApp

public static void Main(string[] args)
{
// Check for debugging a test
if (args[0].ToLower() == "cursor")
{
var test = new IterationTests
{
FlushAndEvict = true
};
test.SetupPopulatedStore();
test.Cursor();
test.TearDown();
return;
}

// Do regular invocation.
BenchmarkSwitcher.FromAssembly(typeof(BenchmarkDotNetTestsApp).Assembly).Run(args);
}
}
Expand Down
Loading

0 comments on commit d7d52fd

Please sign in to comment.