Skip to content

Commit

Permalink
Fix SCAN commands to skip expired string and object types (#331)
Browse files Browse the repository at this point in the history
Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TedHartMS and badrishc authored Apr 26, 2024
1 parent 57050c4 commit 9d3b621
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 22 deletions.
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/MainStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void EvaluateExpireCopyUpdate(ExpireOption optionType, bool expiryExists, ref Sp
return (0, 0);
}

static bool CheckExpiry(ref SpanByte src) => src.ExtraMetadata < DateTimeOffset.UtcNow.Ticks;
internal static bool CheckExpiry(ref SpanByte src) => src.ExtraMetadata < DateTimeOffset.UtcNow.Ticks;

static bool InPlaceUpdateNumber(long val, ref SpanByte value, ref SpanByteAndMemory output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo)
{
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Storage/Functions/ObjectStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void WriteLogDelete(ref byte[] key, long version, int sessionID)
}
}

internal static bool CheckExpiry(IGarnetObject src) => src.Expiration < DateTimeOffset.UtcNow.Ticks;

static void CopyRespNumber(long number, ref SpanByteAndMemory dst)
{
byte* curr = dst.SpanByte.ToPointer();
Expand Down
53 changes: 45 additions & 8 deletions libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Garnet.server
sealed partial class StorageSession : IDisposable
{
// These are classes so instantiate once and re-initialize
private ArrayKeyIterationFunctions.GetDBSize<SpanByte, SpanByte> mainStoreDbSizeFuncs;
private ArrayKeyIterationFunctions.GetDBSize<byte[], IGarnetObject> objectStoreDbSizeFuncs;
private ArrayKeyIterationFunctions.MainStoreGetDBSize mainStoreDbSizeFuncs;
private ArrayKeyIterationFunctions.ObjectStoreGetDBSize objectStoreDbSizeFuncs;

// Iterators for SCAN command
private ArrayKeyIterationFunctions.MainStoreGetDBKeys mainStoreDbScanFuncs;
Expand Down Expand Up @@ -212,7 +212,8 @@ public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata re

public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
if (patternB != null && !GlobUtils.Match(patternB, patternLength, key.ToPointer(), key.Length, true))
if ((patternB != null && !GlobUtils.Match(patternB, patternLength, key.ToPointer(), key.Length, true))
|| (value.MetadataSize != 0 && MainStoreFunctions.CheckExpiry(ref value)))
{
cursorRecordResult = CursorRecordResult.Skip;
}
Expand Down Expand Up @@ -249,6 +250,12 @@ public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata

public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
if (value.Expiration > 0 && ObjectStoreFunctions.CheckExpiry(value))
{
cursorRecordResult = CursorRecordResult.Skip;
return true;
}

if (patternB != null)
{
fixed (byte* keyPtr = key)
Expand Down Expand Up @@ -277,20 +284,50 @@ public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal sealed class GetDBSize<TKey, TValue> : IScanIteratorFunctions<TKey, TValue>
internal sealed class MainStoreGetDBSize : IScanIteratorFunctions<SpanByte, SpanByte>
{
// This must be a class as it is passed through pending IO operations
internal int count;

internal void Initialize() => count = 0;

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
if (value.MetadataSize != 0 && MainStoreFunctions.CheckExpiry(ref value))
cursorRecordResult = CursorRecordResult.Skip;
else
{
cursorRecordResult = CursorRecordResult.Accept;
++count;
}
return true;
}
public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal sealed class ObjectStoreGetDBSize : IScanIteratorFunctions<byte[], IGarnetObject>
{
// This must be a class as it is passed through pending IO operations
internal int count;

internal void Initialize() => count = 0;

public bool SingleReader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
++count;
if (value.Expiration > 0 && ObjectStoreFunctions.CheckExpiry(value))
cursorRecordResult = CursorRecordResult.Skip;
else
{
cursorRecordResult = CursorRecordResult.Accept;
++count;
}
return true;
}
public bool ConcurrentReader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ internal long SnapToLogicalAddressBoundary(ref long logicalAddress, long headAdd
return logicalAddress += totalSizes - offset;
}

private bool CheckExpiry(ref SpanByte value) => value.MetadataSize > 0 && value.ExtraMetadata < DateTimeOffset.Now.UtcTicks;

/// <summary>
/// Get next record in iterator
/// </summary>
Expand Down Expand Up @@ -179,8 +177,7 @@ public unsafe bool GetNext(out RecordInfo recordInfo)
nextAddress = currentAddress + recordSize;

recordInfo = hlog.GetInfo(physicalAddress);
bool skipOnScan = (includeSealedRecords ? recordInfo.Invalid : recordInfo.SkipOnScan)
|| CheckExpiry(ref hlog.GetValue(physicalAddress));
bool skipOnScan = includeSealedRecords ? recordInfo.Invalid : recordInfo.SkipOnScan;
if (skipOnScan || recordInfo.IsNull())
{
epoch?.Suspend();
Expand Down Expand Up @@ -252,8 +249,7 @@ bool IPushScanIterator<SpanByte>.BeginGetPrevInMemory(ref SpanByte key, out Reco

recordInfo = hlog.GetInfo(physicalAddress);
nextAddress = recordInfo.PreviousAddress;
bool skipOnScan = (includeSealedRecords ? recordInfo.Invalid : recordInfo.SkipOnScan)
|| CheckExpiry(ref hlog.GetValue(physicalAddress));
bool skipOnScan = includeSealedRecords ? recordInfo.Invalid : recordInfo.SkipOnScan;
if (skipOnScan || recordInfo.IsNull() || !comparer.Equals(ref hlog.GetKey(physicalAddress), ref key))
{
epoch?.Suspend();
Expand Down
45 changes: 38 additions & 7 deletions test/Garnet.test/RespTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,31 @@ public void SetExpiry()
db.StringSet("mykey", origValue, TimeSpan.FromSeconds(1));

string retValue = db.StringGet("mykey");
Assert.AreEqual(origValue, retValue, "Expected to Get() the value");
Assert.AreEqual(origValue, retValue, "Get() before expiration");

var actualDbSize = db.Execute("DBSIZE");
Assert.AreEqual(1, (ulong)actualDbSize, "Expected DBSIZE");
Assert.AreEqual(1, (ulong)actualDbSize, "DBSIZE before expiration");

// Sleep to allow expiration
var actualKeys = db.Execute("KEYS", ["*"]);
Assert.AreEqual(1, ((RedisResult[])actualKeys).Length, "KEYS before expiration");

var actualScan = db.Execute("SCAN", "0");
Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN before expiration");

// Sleep to wait for expiration
Thread.Sleep(2000);

retValue = db.StringGet("mykey");
Assert.AreEqual(null, retValue, "Expected null value due to expiration");
Assert.AreEqual(null, retValue, "Get() after expiration");

actualDbSize = db.Execute("DBSIZE");
Assert.AreEqual(0, (ulong)actualDbSize, "Expected DBSIZE of zero due to expiration");
Assert.AreEqual(0, (ulong)actualDbSize, "DBSIZE after expiration");

actualKeys = db.Execute("KEYS", ["*"]);
Assert.AreEqual(0, ((RedisResult[])actualKeys).Length, "KEYS after expiration");

actualScan = db.Execute("SCAN", "0");
Assert.AreEqual(0, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after expiration");
}

[Test]
Expand Down Expand Up @@ -1309,15 +1321,34 @@ public void KeyExpireObjectTest(string command)
db.SortedSetAdd(key, [new SortedSetEntry("element", 1.0)]);

var value = db.SortedSetScore(key, "element");
Assert.AreEqual(1.0, value);
Assert.AreEqual(1.0, value, "Get Score before expiration");

var actualDbSize = db.Execute("DBSIZE");
Assert.AreEqual(1, (ulong)actualDbSize, "DBSIZE before expiration");

var actualKeys = db.Execute("KEYS", ["*"]);
Assert.AreEqual(1, ((RedisResult[])actualKeys).Length, "KEYS before expiration");

var actualScan = db.Execute("SCAN", "0");
Assert.AreEqual(1, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN before expiration");

var exp = db.KeyExpire(key, command.Equals("EXPIRE") ? TimeSpan.FromSeconds(1) : TimeSpan.FromMilliseconds(1000));
Assert.IsTrue(exp);

// Sleep to wait for expiration
Thread.Sleep(1500);

value = db.SortedSetScore(key, "element");
Assert.AreEqual(null, value);
Assert.AreEqual(null, value, "Get Score after expiration");

actualDbSize = db.Execute("DBSIZE");
Assert.AreEqual(0, (ulong)actualDbSize, "DBSIZE after expiration");

actualKeys = db.Execute("KEYS", ["*"]);
Assert.AreEqual(0, ((RedisResult[])actualKeys).Length, "KEYS after expiration");

actualScan = db.Execute("SCAN", "0");
Assert.AreEqual(0, ((RedisValue[])((RedisResult[])actualScan!)[1]).Length, "SCAN after expiration");
}

[Test]
Expand Down

0 comments on commit 9d3b621

Please sign in to comment.