From 1fd933a35f46b22672cf0b546339ef16600c3ae1 Mon Sep 17 00:00:00 2001 From: Tal Zaccai Date: Fri, 24 May 2024 17:20:50 -0600 Subject: [PATCH 1/7] wip --- libs/server/Objects/Hash/HashObject.cs | 2 ++ libs/server/Objects/List/ListObject.cs | 3 +++ libs/server/Objects/Set/SetObject.cs | 3 +++ libs/server/Objects/SortedSet/SortedSetObject.cs | 2 ++ .../Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs | 2 +- libs/server/Objects/Types/GarnetObjectBase.cs | 3 +++ libs/server/Objects/Types/IGarnetObject.cs | 5 +++++ .../Storage/Functions/ObjectStore/RMWMethods.cs | 10 +++++++++- main/GarnetServer/Extensions/MyDictObject.cs | 2 ++ test/Garnet.test/RespSetTest.cs | 13 +++++++++++++ 10 files changed, 43 insertions(+), 2 deletions(-) diff --git a/libs/server/Objects/Hash/HashObject.cs b/libs/server/Objects/Hash/HashObject.cs index 0b4e195944..02e94144ab 100644 --- a/libs/server/Objects/Hash/HashObject.cs +++ b/libs/server/Objects/Hash/HashObject.cs @@ -41,6 +41,8 @@ public enum HashOperation : byte /// public unsafe partial class HashObject : GarnetObjectBase { + public override int Count => hash.Count; + readonly Dictionary hash; /// diff --git a/libs/server/Objects/List/ListObject.cs b/libs/server/Objects/List/ListObject.cs index 439bc8430d..a0a89c9bda 100644 --- a/libs/server/Objects/List/ListObject.cs +++ b/libs/server/Objects/List/ListObject.cs @@ -57,6 +57,9 @@ public enum OperationDirection /// public partial class ListObject : GarnetObjectBase { + /// + public override int Count => list.Count; + readonly LinkedList list; /// diff --git a/libs/server/Objects/Set/SetObject.cs b/libs/server/Objects/Set/SetObject.cs index ae0e3e295d..49b8fbe61d 100644 --- a/libs/server/Objects/Set/SetObject.cs +++ b/libs/server/Objects/Set/SetObject.cs @@ -40,6 +40,9 @@ public enum SetOperation : byte /// public unsafe partial class SetObject : GarnetObjectBase { + /// + public override int Count => set.Count; + readonly HashSet set; /// diff --git a/libs/server/Objects/SortedSet/SortedSetObject.cs b/libs/server/Objects/SortedSet/SortedSetObject.cs index ba3315d054..cdd925da10 100644 --- a/libs/server/Objects/SortedSet/SortedSetObject.cs +++ b/libs/server/Objects/SortedSet/SortedSetObject.cs @@ -72,6 +72,8 @@ public enum SortedSetOrderOperation /// public partial class SortedSetObject : GarnetObjectBase { + public override int Count => sortedSetDict.Count; + private readonly SortedSet<(double, byte[])> sortedSet; private readonly Dictionary sortedSetDict; diff --git a/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs b/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs index ce4ceec7d6..34c39fd334 100644 --- a/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs +++ b/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs @@ -15,7 +15,7 @@ namespace Garnet.server /// /// Sorted Set - RESP specific operations for GEO Commands /// - public unsafe partial class SortedSetObject : GarnetObjectBase + public unsafe partial class SortedSetObject : IGarnetObject { /// /// Use this struct for the reply of GEOSEARCH command diff --git a/libs/server/Objects/Types/GarnetObjectBase.cs b/libs/server/Objects/Types/GarnetObjectBase.cs index 537405f844..c66bb5b2a2 100644 --- a/libs/server/Objects/Types/GarnetObjectBase.cs +++ b/libs/server/Objects/Types/GarnetObjectBase.cs @@ -27,6 +27,9 @@ public abstract class GarnetObjectBase : IGarnetObject /// public long Size { get; set; } + /// + public abstract int Count { get; } + protected GarnetObjectBase(long expiration, long size) { Debug.Assert(size >= 0); diff --git a/libs/server/Objects/Types/IGarnetObject.cs b/libs/server/Objects/Types/IGarnetObject.cs index 3c27b72516..e7fb01ad17 100644 --- a/libs/server/Objects/Types/IGarnetObject.cs +++ b/libs/server/Objects/Types/IGarnetObject.cs @@ -28,6 +28,11 @@ public interface IGarnetObject : IDisposable /// long Size { get; set; } + /// + /// Item count in the collection + /// + int Count { get; } + /// /// Operator on object /// diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index 3cb1eb4169..d79f02b3fc 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -87,7 +87,14 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory); return true; default: - return value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange); + var operateSuccessful = value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange); + if (value.Count == 0) + { + rmwInfo.Action = RMWAction.ExpireAndStop; + return false; + } + + return operateSuccessful; } } @@ -134,6 +141,7 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec break; default: value.Operate(ref input, ref output.spanByteAndMemory, out _); + break; } diff --git a/main/GarnetServer/Extensions/MyDictObject.cs b/main/GarnetServer/Extensions/MyDictObject.cs index bb098ed81b..302d47c81a 100644 --- a/main/GarnetServer/Extensions/MyDictObject.cs +++ b/main/GarnetServer/Extensions/MyDictObject.cs @@ -22,6 +22,8 @@ public override CustomObjectBase Deserialize(byte type, BinaryReader reader) class MyDict : CustomObjectBase { + public override int Count => dict.Count; + readonly Dictionary dict; public MyDict(byte type) diff --git a/test/Garnet.test/RespSetTest.cs b/test/Garnet.test/RespSetTest.cs index 945c2225b7..ec34ca9ba7 100644 --- a/test/Garnet.test/RespSetTest.cs +++ b/test/Garnet.test/RespSetTest.cs @@ -52,6 +52,19 @@ public void CandDoSaddBasic(string key) Assert.IsFalse(result); } + [Test] + public void CheckEmptySetKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:set"); + var db = redis.GetDatabase(0); + var result = db.SetAdd(key, ["Hello", "World"]); + Assert.AreEqual(2, result); + + var result1 = db.SetPop(key, 2); + var result2 = db.KeyExists(key); + } + [Test] public void CanAddAndListMembers() { From f7cb991fdf343bb730552dae7a467db78b04d0d6 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Fri, 24 May 2024 17:28:06 -0700 Subject: [PATCH 2/7] Fix StatusCode.RecordStatusMask comparisons in Status --- libs/storage/Tsavorite/cs/src/core/Utilities/Status.cs | 10 +++++----- .../Tsavorite/cs/src/core/Utilities/StatusCode.cs | 7 ++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/Status.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/Status.cs index b144aff11f..c5acc553fc 100644 --- a/libs/storage/Tsavorite/cs/src/core/Utilities/Status.cs +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/Status.cs @@ -19,27 +19,27 @@ public struct RecordStatus /// /// Whether a new record for a previously non-existent key was appended to the log. /// - public bool Created => (statusCode & StatusCode.AdvancedMask) == StatusCode.CreatedRecord; + public bool Created => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CreatedRecord; /// /// Whether an existing record was updated in place. /// - public bool InPlaceUpdated => (statusCode & StatusCode.AdvancedMask) == StatusCode.InPlaceUpdatedRecord; + public bool InPlaceUpdated => (statusCode & StatusCode.RecordStatusMask) == StatusCode.InPlaceUpdatedRecord; /// /// Whether an existing record key was copied, updated, and appended to the log. /// - public bool CopyUpdated => (statusCode & StatusCode.AdvancedMask) == StatusCode.CopyUpdatedRecord; + public bool CopyUpdated => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CopyUpdatedRecord; /// /// Whether an existing record key was copied and appended to the log. /// - public bool Copied => (statusCode & StatusCode.AdvancedMask) == StatusCode.CopiedRecord; + public bool Copied => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CopiedRecord; /// /// Whether an existing record key was copied, updated, and added to the readcache. /// - public bool CopiedToReadCache => (statusCode & StatusCode.AdvancedMask) == StatusCode.CopiedRecordToReadCache; + public bool CopiedToReadCache => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CopiedRecordToReadCache; } /// diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs index ae57657e7d..4a12b081f9 100644 --- a/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs @@ -123,7 +123,12 @@ internal enum StatusCode : byte // unused 0x70, /// - /// Indicates that an existing record key was auto-expired. This is a flag that is combined with lower Advanced values. + /// Individual record-status values that are not masked together. + /// + RecordStatusMask = 0x70, + + /// + /// Indicates that an existing record key was auto-expired. This is a flag that is combined with RecordStatusMask values. /// /// /// See basic codes for details of usage. From bb7e14c36671374ae9cad452e198ab768482b7f4 Mon Sep 17 00:00:00 2001 From: Tal Zaccai Date: Wed, 29 May 2024 18:59:25 -0600 Subject: [PATCH 3/7] wip --- libs/server/Custom/CustomObjectBase.cs | 9 ++++-- libs/server/Objects/Hash/HashObject.cs | 8 +++-- libs/server/Objects/List/ListObject.cs | 8 +++-- libs/server/Objects/Set/SetObject.cs | 6 ++-- .../Objects/SortedSet/SortedSetObject.cs | 6 ++-- libs/server/Objects/Types/GarnetObjectBase.cs | 5 +--- libs/server/Objects/Types/IGarnetObject.cs | 8 ++--- .../Functions/ObjectStore/RMWMethods.cs | 30 +++++++++++++------ .../Functions/ObjectStore/ReadMethods.cs | 4 +-- main/GarnetServer/Extensions/MyDictObject.cs | 6 ++-- test/Garnet.test/RespListTests.cs | 30 +++++++++---------- test/Garnet.test/RespSetTest.cs | 10 +++++-- test/Garnet.test/RespSortedSetTests.cs | 28 +++++++++-------- test/Garnet.test/RespTransactionProcTests.cs | 2 +- 14 files changed, 89 insertions(+), 71 deletions(-) diff --git a/libs/server/Custom/CustomObjectBase.cs b/libs/server/Custom/CustomObjectBase.cs index 44e7ef4cf3..ab5eed847d 100644 --- a/libs/server/Custom/CustomObjectBase.cs +++ b/libs/server/Custom/CustomObjectBase.cs @@ -178,13 +178,15 @@ public sealed override void DoSerialize(BinaryWriter writer) public abstract override void Dispose(); /// - public abstract void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output); + public abstract void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output, out bool removeKey); /// - public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { var header = (RespInputHeader*)input.ToPointer(); sizeChange = 0; + removeKey = false; + switch (header->cmd) { // Scan Command @@ -198,11 +200,12 @@ public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMe break; default: (IMemoryOwner Memory, int Length) outp = (output.Memory, 0); - Operate(header->SubId, input.AsReadOnlySpan().Slice(RespInputHeader.Size), ref outp); + Operate(header->SubId, input.AsReadOnlySpan().Slice(RespInputHeader.Size), ref outp, out removeKey); output.Memory = outp.Memory; output.Length = outp.Length; break; } + return true; } } diff --git a/libs/server/Objects/Hash/HashObject.cs b/libs/server/Objects/Hash/HashObject.cs index 02e94144ab..c41a669f41 100644 --- a/libs/server/Objects/Hash/HashObject.cs +++ b/libs/server/Objects/Hash/HashObject.cs @@ -41,8 +41,6 @@ public enum HashOperation : byte /// public unsafe partial class HashObject : GarnetObjectBase { - public override int Count => hash.Count; - readonly Dictionary hash; /// @@ -110,8 +108,10 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new HashObject(hash, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { + removeKey = false; + fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) { @@ -185,6 +185,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou sizeChange = this.Size - previousSize; } + + removeKey = hash.Count == 0; return true; } diff --git a/libs/server/Objects/List/ListObject.cs b/libs/server/Objects/List/ListObject.cs index a0a89c9bda..4c60d4cd0d 100644 --- a/libs/server/Objects/List/ListObject.cs +++ b/libs/server/Objects/List/ListObject.cs @@ -58,8 +58,6 @@ public enum OperationDirection public partial class ListObject : GarnetObjectBase { /// - public override int Count => list.Count; - readonly LinkedList list; /// @@ -129,11 +127,13 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new ListObject(list, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) { + removeKey = false; + var header = (RespInputHeader*)_input; if (header->type != GarnetObjectType.List) { @@ -188,6 +188,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou sizeChange = this.Size - previousSize; } + + removeKey = list.Count == 0; return true; } diff --git a/libs/server/Objects/Set/SetObject.cs b/libs/server/Objects/Set/SetObject.cs index 49b8fbe61d..ba70c50b64 100644 --- a/libs/server/Objects/Set/SetObject.cs +++ b/libs/server/Objects/Set/SetObject.cs @@ -41,8 +41,6 @@ public enum SetOperation : byte public unsafe partial class SetObject : GarnetObjectBase { /// - public override int Count => set.Count; - readonly HashSet set; /// @@ -107,7 +105,7 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new SetObject(set, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) @@ -150,6 +148,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou } sizeChange = this.Size - prevSize; } + + removeKey = set.Count == 0; return true; } diff --git a/libs/server/Objects/SortedSet/SortedSetObject.cs b/libs/server/Objects/SortedSet/SortedSetObject.cs index cdd925da10..561f0484b7 100644 --- a/libs/server/Objects/SortedSet/SortedSetObject.cs +++ b/libs/server/Objects/SortedSet/SortedSetObject.cs @@ -72,8 +72,6 @@ public enum SortedSetOrderOperation /// public partial class SortedSetObject : GarnetObjectBase { - public override int Count => sortedSetDict.Count; - private readonly SortedSet<(double, byte[])> sortedSet; private readonly Dictionary sortedSetDict; @@ -179,7 +177,7 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new SortedSetObject(sortedSet, sortedSetDict, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) @@ -273,6 +271,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou } sizeChange = this.Size - previouseSize; } + + removeKey = sortedSetDict.Count == 0; return true; } diff --git a/libs/server/Objects/Types/GarnetObjectBase.cs b/libs/server/Objects/Types/GarnetObjectBase.cs index c66bb5b2a2..6e86f3f8d7 100644 --- a/libs/server/Objects/Types/GarnetObjectBase.cs +++ b/libs/server/Objects/Types/GarnetObjectBase.cs @@ -27,9 +27,6 @@ public abstract class GarnetObjectBase : IGarnetObject /// public long Size { get; set; } - /// - public abstract int Count { get; } - protected GarnetObjectBase(long expiration, long size) { Debug.Assert(size >= 0); @@ -119,7 +116,7 @@ public void CopyUpdate(ref IGarnetObject oldValue, ref IGarnetObject newValue, b public abstract GarnetObjectBase Clone(); /// - public abstract bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange); + public abstract bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey); /// public abstract void Dispose(); diff --git a/libs/server/Objects/Types/IGarnetObject.cs b/libs/server/Objects/Types/IGarnetObject.cs index e7fb01ad17..cd5b03c591 100644 --- a/libs/server/Objects/Types/IGarnetObject.cs +++ b/libs/server/Objects/Types/IGarnetObject.cs @@ -28,19 +28,15 @@ public interface IGarnetObject : IDisposable /// long Size { get; set; } - /// - /// Item count in the collection - /// - int Count { get; } - /// /// Operator on object /// /// /// /// + /// /// - bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange); + bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey); /// /// Serializer diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index d79f02b3fc..c1b33cd74c 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -27,7 +27,7 @@ public bool InitialUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject byte objectId = (byte)((byte)type - CustomCommandManager.StartOffset); value = functionsState.customObjectCommands[objectId].factory.Create((byte)type); } - value.Operate(ref input, ref output.spanByteAndMemory, out _); + value.Operate(ref input, ref output.spanByteAndMemory, out _, out _); return true; } @@ -74,7 +74,7 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject switch (header->type) { case GarnetObjectType.Expire: - ExpireOption optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); + var optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); bool expiryExists = (value.Expiration > 0); return EvaluateObjectExpireInPlace(optionType, expiryExists, ref input, ref value, ref output); case GarnetObjectType.Persist: @@ -87,8 +87,9 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory); return true; default: - var operateSuccessful = value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange); - if (value.Count == 0) + var operateSuccessful = value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange, + out var removeKey); + if (removeKey) { rmwInfo.Action = RMWAction.ExpireAndStop; return false; @@ -119,16 +120,21 @@ public bool CopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject ol /// public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo) { + // We're performing the object update here (and not in CopyUpdater) so that we are guaranteed that + // the record was CASed into the hash chain before it gets modified oldValue.CopyUpdate(ref oldValue, ref value, rmwInfo.RecordInfo.IsInNewVersion); var header = (RespInputHeader*)input.ToPointer(); functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); + + var expireInPlace = false; + var expireOption = ExpireOption.None; + switch (header->type) { case GarnetObjectType.Expire: - ExpireOption optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); - bool expiryExists = (value.Expiration > 0); - EvaluateObjectExpireInPlace(optionType, expiryExists, ref input, ref value, ref output); + expireOption = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); + expireInPlace = true; break; case GarnetObjectType.Persist: if (value.Expiration > 0) @@ -140,11 +146,17 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory); break; default: - value.Operate(ref input, ref output.spanByteAndMemory, out _); - + value.Operate(ref input, ref output.spanByteAndMemory, out _, out var removeKey); + expireInPlace = removeKey; break; } + if (expireInPlace) + { + var expiryExists = (value.Expiration > 0); + EvaluateObjectExpireInPlace(expireOption, expiryExists, ref input, ref value, ref output); + } + functionsState.objectStoreSizeTracker?.AddTrackedSize(MemoryUtils.CalculateKeyValueSize(key, value)); if (functionsState.appendOnlyFile != null) diff --git a/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs b/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs index 4405441fd6..ca2cab90c9 100644 --- a/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs @@ -28,7 +28,7 @@ public bool SingleReader(ref byte[] key, ref SpanByte input, ref IGarnetObject v return true; } - return value.Operate(ref input, ref dst.spanByteAndMemory, out _); + return value.Operate(ref input, ref dst.spanByteAndMemory, out _, out _); } /// @@ -52,7 +52,7 @@ public bool ConcurrentReader(ref byte[] key, ref SpanByte input, ref IGarnetObje CopyRespNumber(ttlValue, ref dst.spanByteAndMemory); return true; } - return value.Operate(ref input, ref dst.spanByteAndMemory, out _); + return value.Operate(ref input, ref dst.spanByteAndMemory, out _, out _); } dst.garnetObject = value; diff --git a/main/GarnetServer/Extensions/MyDictObject.cs b/main/GarnetServer/Extensions/MyDictObject.cs index 302d47c81a..0a1e988f93 100644 --- a/main/GarnetServer/Extensions/MyDictObject.cs +++ b/main/GarnetServer/Extensions/MyDictObject.cs @@ -22,8 +22,6 @@ public override CustomObjectBase Deserialize(byte type, BinaryReader reader) class MyDict : CustomObjectBase { - public override int Count => dict.Count; - readonly Dictionary dict; public MyDict(byte type) @@ -68,7 +66,7 @@ public override void SerializeObject(BinaryWriter writer) } } - public override void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output) + public override void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output, out bool removeKey) { switch (subCommand) { @@ -95,6 +93,8 @@ public override void Operate(byte subCommand, ReadOnlySpan input, ref (IMe WriteError(ref output, "Unexpected command"); break; } + + removeKey = dict.Count == 0; } public override void Dispose() { } diff --git a/test/Garnet.test/RespListTests.cs b/test/Garnet.test/RespListTests.cs index 4892a0ecf6..11c8c70d2f 100644 --- a/test/Garnet.test/RespListTests.cs +++ b/test/Garnet.test/RespListTests.cs @@ -57,10 +57,11 @@ public void BasicLPUSHAndLPOP() string popval = db.ListLeftPop(key); Assert.AreEqual(val, popval); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(result.IsNull); } [Test] @@ -395,12 +396,13 @@ public void MultiLPUSHAndLPOPV1() // list is empty, the code should return (nil) popval = db.ListLeftPop(key); - Assert.AreEqual(null, popval); + Assert.IsNull(popval); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(result.IsNull); } [Test] @@ -470,19 +472,15 @@ public void MultiRPUSHAndRPOP() break; } - result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); - // list is empty, the code should return (nil) popval = db.ListLeftPop(key); - Assert.AreEqual(null, popval); + Assert.IsNull(popval); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(result.IsNull); } [Test] diff --git a/test/Garnet.test/RespSetTest.cs b/test/Garnet.test/RespSetTest.cs index ec34ca9ba7..62836c25a2 100644 --- a/test/Garnet.test/RespSetTest.cs +++ b/test/Garnet.test/RespSetTest.cs @@ -58,11 +58,15 @@ public void CheckEmptySetKeyRemoved() using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); var key = new RedisKey("user1:set"); var db = redis.GetDatabase(0); - var result = db.SetAdd(key, ["Hello", "World"]); + var members = new[] { new RedisValue("Hello"), new RedisValue("World") }; + var result = db.SetAdd(key, members); Assert.AreEqual(2, result); - var result1 = db.SetPop(key, 2); - var result2 = db.KeyExists(key); + var actualMembers = db.SetPop(key, 2); + Assert.AreEqual(members.Length, actualMembers.Length); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); } [Test] diff --git a/test/Garnet.test/RespSortedSetTests.cs b/test/Garnet.test/RespSortedSetTests.cs index 5a0ce365c6..83f92d82f5 100644 --- a/test/Garnet.test/RespSortedSetTests.cs +++ b/test/Garnet.test/RespSortedSetTests.cs @@ -204,10 +204,11 @@ public void AddRemove() card = db.SortedSetLength(key); Assert.AreEqual(0, card); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 200; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); // 1 entry added added = db.SortedSetAdd(key, [entries[0]]); @@ -232,10 +233,11 @@ public void AddRemove() var response_keys = db.SortedSetRangeByRankWithScores(key, 0, 100); Assert.IsEmpty(response_keys); + keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 200; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); // 10 entries are added added = db.SortedSetAdd(key, entries); @@ -266,10 +268,11 @@ public void AddRemove() removed = db.SortedSetRemove(key, entries.Select(e => e.Element).ToArray()); Assert.AreEqual(entries.Length - 1, removed); + keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 200; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); } [Test] @@ -314,10 +317,11 @@ public void AddPopDesc() Assert.AreEqual(entries[6 - i], last3[i]); Assert.AreEqual(0, db.SortedSetLength(key)); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 192; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); } [Test] diff --git a/test/Garnet.test/RespTransactionProcTests.cs b/test/Garnet.test/RespTransactionProcTests.cs index b82eb69f0a..e7ad1fc8e9 100644 --- a/test/Garnet.test/RespTransactionProcTests.cs +++ b/test/Garnet.test/RespTransactionProcTests.cs @@ -212,7 +212,7 @@ public void TransactionObjectExpiryProcTest() Assert.AreEqual(0, size); - SortedSetEntry? retEntry = db.SortedSetPop(key); + var retEntry = db.SortedSetPop(key); Assert.IsNull(retEntry); } From 2cf1b252cbe3f680c2b06abf7bac89501999609b Mon Sep 17 00:00:00 2001 From: Tal Zaccai Date: Wed, 29 May 2024 21:22:14 -0600 Subject: [PATCH 4/7] wip --- libs/server/Resp/Objects/ListCommands.cs | 7 +++++++ libs/server/Resp/Objects/SortedSetCommands.cs | 5 +---- .../Storage/Functions/ObjectStore/RMWMethods.cs | 14 +++----------- libs/server/Storage/Session/ObjectStore/Common.cs | 4 ++-- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/libs/server/Resp/Objects/ListCommands.cs b/libs/server/Resp/Objects/ListCommands.cs index 6a49a463a4..4d74469480 100644 --- a/libs/server/Resp/Objects/ListCommands.cs +++ b/libs/server/Resp/Objects/ListCommands.cs @@ -846,6 +846,13 @@ public unsafe bool ListSet(int count, byte* ptr, ref TGarnetApi stor var objOutputHeader = ProcessOutputWithHeader(outputFooter.spanByteAndMemory); ptr += objOutputHeader.bytesDone; break; + case GarnetStatus.NOTFOUND: + var tokens = ReadLeftToken(count - 1, ref ptr); + if (tokens < count - 1) + return false; + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_NOSUCHKEY, ref dcurr, dend)) + SendAndReset(); + break; } } diff --git a/libs/server/Resp/Objects/SortedSetCommands.cs b/libs/server/Resp/Objects/SortedSetCommands.cs index 158ad751c8..fb7209516b 100644 --- a/libs/server/Resp/Objects/SortedSetCommands.cs +++ b/libs/server/Resp/Objects/SortedSetCommands.cs @@ -848,6 +848,7 @@ private unsafe bool SortedSetIncrement(int count, byte* ptr, ref TGa ReadOnlySpan errorMessage = default; switch (status) { + case GarnetStatus.NOTFOUND: case GarnetStatus.OK: //verifying length of outputFooter if (outputFooter.spanByteAndMemory.Length == 0) @@ -869,10 +870,6 @@ private unsafe bool SortedSetIncrement(int count, byte* ptr, ref TGa ptr += objOutputHeader.bytesDone; } break; - case GarnetStatus.NOTFOUND: - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend)) - SendAndReset(); - break; } if (errorMessage != default) diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index c1b33cd74c..7b625a405c 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -127,14 +127,13 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec var header = (RespInputHeader*)input.ToPointer(); functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); - var expireInPlace = false; - var expireOption = ExpireOption.None; switch (header->type) { case GarnetObjectType.Expire: - expireOption = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); - expireInPlace = true; + var expireOption = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); + var expiryExists = (value.Expiration > 0); + EvaluateObjectExpireInPlace(expireOption, expiryExists, ref input, ref value, ref output); break; case GarnetObjectType.Persist: if (value.Expiration > 0) @@ -147,16 +146,9 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec break; default: value.Operate(ref input, ref output.spanByteAndMemory, out _, out var removeKey); - expireInPlace = removeKey; break; } - if (expireInPlace) - { - var expiryExists = (value.Expiration > 0); - EvaluateObjectExpireInPlace(expireOption, expiryExists, ref input, ref value, ref output); - } - functionsState.objectStoreSizeTracker?.AddTrackedSize(MemoryUtils.CalculateKeyValueSize(key, value)); if (functionsState.appendOnlyFile != null) diff --git a/libs/server/Storage/Session/ObjectStore/Common.cs b/libs/server/Storage/Session/ObjectStore/Common.cs index ab663f4ffe..937cc88b75 100644 --- a/libs/server/Storage/Session/ObjectStore/Common.cs +++ b/libs/server/Storage/Session/ObjectStore/Common.cs @@ -30,7 +30,7 @@ unsafe GarnetStatus RMWObjectStoreOperation(byte[] key, ArgSlice Debug.Assert(_output.spanByteAndMemory.IsSpanByte); - if (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated) + if (status.NotFound || (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated)) return GarnetStatus.NOTFOUND; return GarnetStatus.OK; @@ -57,7 +57,7 @@ GarnetStatus RMWObjectStoreOperationWithOutput(byte[] key, ArgSl if (status.IsPending) CompletePendingForObjectStoreSession(ref status, ref outputFooter, ref objectStoreContext); - if (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated) + if (status.NotFound || (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated)) return GarnetStatus.NOTFOUND; return GarnetStatus.OK; From 8185c76b2585cf29dddac38e4f4ae74bb0228c81 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Tue, 4 Jun 2024 17:04:22 -0700 Subject: [PATCH 5/7] Make PostCopyUpdater return bool rather than void, so we can inspect rmwInfo.Action for RMWAction.ExpireAndStop --- .../Storage/Functions/MainStore/RMWMethods.cs | 3 +- .../Functions/ObjectStore/RMWMethods.cs | 4 +-- .../cs/benchmark/SessionFunctions.cs | 2 +- .../cs/src/core/Allocator/AllocatorScan.cs | 2 +- .../ClientSession/SessionFunctionsWrapper.cs | 4 +-- .../core/Compaction/LogCompactionFunctions.cs | 2 +- .../Index/Interfaces/ISessionFunctions.cs | 5 ++- .../Interfaces/ISessionFunctionsWrapper.cs | 2 +- .../Index/Interfaces/SessionFunctionsBase.cs | 2 +- .../Tsavorite/Implementation/InternalRMW.cs | 15 ++++++-- .../Tsavorite/cs/test/AdvancedLockTests.cs | 3 +- .../Tsavorite/cs/test/PostOperationsTests.cs | 34 ++++++++++++++++++- .../Tsavorite/cs/test/RevivificationTests.cs | 4 +-- 13 files changed, 65 insertions(+), 17 deletions(-) diff --git a/libs/server/Storage/Functions/MainStore/RMWMethods.cs b/libs/server/Storage/Functions/MainStore/RMWMethods.cs index 490ed9f9f8..6c3b5d572d 100644 --- a/libs/server/Storage/Functions/MainStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/MainStore/RMWMethods.cs @@ -717,11 +717,12 @@ public bool CopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldVa } /// - public void PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) + public bool PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) { functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); if (functionsState.appendOnlyFile != null) WriteLogRMW(ref key, ref input, ref oldValue, rmwInfo.Version, rmwInfo.SessionID); + return true; } } } \ No newline at end of file diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index 96b438adaf..a1c4fa7330 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -118,7 +118,7 @@ public bool CopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject ol } /// - public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo) + public bool PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo) { // We're performing the object update here (and not in CopyUpdater) so that we are guaranteed that // the record was CASed into the hash chain before it gets modified @@ -127,7 +127,6 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec var header = (RespInputHeader*)input.ToPointer(); functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); - switch (header->type) { case GarnetObjectType.Expire: @@ -153,6 +152,7 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec if (functionsState.appendOnlyFile != null) WriteLogRMW(ref key, ref input, ref oldValue, rmwInfo.Version, rmwInfo.SessionID); + return true; } } } \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs b/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs index 94267fd0bc..dab6881dc2 100644 --- a/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs +++ b/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs @@ -72,7 +72,7 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va return true; } - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output, ref RMWInfo rmwInfo) => true; diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs index 3420ec4498..1dfcfcb40c 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs @@ -338,7 +338,7 @@ public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Va public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output, ref RMWInfo rmwInfo) => true; public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs index 74fc3eb1d6..d9fac2751a 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs @@ -87,10 +87,10 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo, ref recordInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) { recordInfo.SetDirtyAndModified(); - _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); } #endregion CopyUpdater diff --git a/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs index e4e4416738..4c7a88ce71 100644 --- a/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs @@ -35,7 +35,7 @@ public void PostSingleDeleter(ref Key key, ref DeleteInfo deleteInfo) { } public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo) { } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs index 949697b6e7..a28bdbba0a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs @@ -163,7 +163,10 @@ public interface ISessionFunctions /// The destination to be updated; because this is an copy to a new location, there is no previous value there. /// The location where is to be copied /// Information about this update operation and its context - void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo); + /// This is the only Post* method that returns non-void. The bool functions the same as CopyUpdater; this is because we do not want to modify + /// objects in-memory until we know the "insert at tail" is successful. Therefore, we allow a false return as a signal to inspect + /// and handle . + bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo); #endregion CopyUpdater #region InPlaceUpdater diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs index ef4da5aecd..719e4a97ea 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs @@ -34,7 +34,7 @@ internal interface ISessionFunctionsWrapper #region CopyUpdater bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output, ref RMWInfo rmwInfo); bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo); - void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo); + bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo); #endregion CopyUpdater #region InPlaceUpdater diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs index 47e975e6a3..a5a7286cbf 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs @@ -40,7 +40,7 @@ public virtual void PostInitialUpdater(ref Key key, ref Input input, ref Value v /// public virtual bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; /// - public virtual void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; /// public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs index 6a435d3a52..57d6ddba86 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs @@ -527,7 +527,7 @@ private OperationStatus CreateNewRecordRMW internal long piuAddress; internal long pcuAddress; internal long psdAddress; + internal bool returnFalseFromPCU; internal void Clear() { @@ -38,7 +40,13 @@ internal PostFunctions() : base() { } /// public override bool CopyUpdater(ref int key, ref int input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) { newValue = oldValue; return true; } /// - public override void PostCopyUpdater(ref int key, ref int input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo) { pcuAddress = rmwInfo.Address; } + public override bool PostCopyUpdater(ref int key, ref int input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo) + { + pcuAddress = rmwInfo.Address; + if (returnFalseFromPCU) + rmwInfo.Action = RMWAction.ExpireAndStop; + return !returnFalseFromPCU; + } public override void PostSingleDeleter(ref int key, ref DeleteInfo deleteInfo) { psdAddress = deleteInfo.Address; } public override bool ConcurrentDeleter(ref int key, ref int value, ref DeleteInfo deleteInfo, ref RecordInfo recordInfo) => false; @@ -157,6 +165,30 @@ public void PostCopyUpdaterTest() Assert.AreEqual(expectedAddress, session.functions.pcuAddress); } + [Test] + [Category("TsavoriteKV")] + [Category("Smoke")] + public void PostCopyUpdaterFalseTest([Values(FlushMode.ReadOnly, FlushMode.OnDisk)] FlushMode flushMode) + { + // Verify the key exists + var (status, output) = bContext.Read(targetKey); + Assert.IsTrue(status.Found, "Expected the record to exist"); + session.functions.returnFalseFromPCU = true; + + // Make the record read-only + if (flushMode == FlushMode.OnDisk) + store.Log.ShiftReadOnlyAddress(store.Log.ReadOnlyAddress, wait: true); + else + store.Log.FlushAndEvict(wait: true); + + // Call RMW + bContext.RMW(targetKey, targetKey * 1000); + + // Verify the key no longer exists. + (status, output) = bContext.Read(targetKey); + Assert.IsFalse(status.Found, "Expected the record to no longer exist"); + } + [Test] [Category("TsavoriteKV")] [Category("Smoke")] diff --git a/libs/storage/Tsavorite/cs/test/RevivificationTests.cs b/libs/storage/Tsavorite/cs/test/RevivificationTests.cs index 1f1d5a7e36..6d43443a9c 100644 --- a/libs/storage/Tsavorite/cs/test/RevivificationTests.cs +++ b/libs/storage/Tsavorite/cs/test/RevivificationTests.cs @@ -556,10 +556,10 @@ public override bool ConcurrentDeleter(ref SpanByte key, ref SpanByte value, ref return base.ConcurrentDeleter(ref key, ref value, ref deleteInfo, ref recordInfo); } - public override void PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) + public override bool PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) { AssertInfoValid(ref rmwInfo); - base.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + return base.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); } public override void PostInitialUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) From 4e71e85db5370e6f986af1a09ef86e5e6bf09989 Mon Sep 17 00:00:00 2001 From: Tal Zaccai Date: Tue, 4 Jun 2024 17:29:17 -0700 Subject: [PATCH 6/7] Updated PostCopyUpdater to remove key if necessary + added more tests --- .../Storage/Functions/ObjectStore/RMWMethods.cs | 5 +++++ test/Garnet.test/RespHashTests.cs | 17 +++++++++++++++++ test/Garnet.test/RespListTests.cs | 17 +++++++++++++++++ test/Garnet.test/RespSortedSetTests.cs | 17 +++++++++++++++++ 4 files changed, 56 insertions(+) diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index a1c4fa7330..1fc0b3982a 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -145,6 +145,11 @@ public bool PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec break; default: value.Operate(ref input, ref output.spanByteAndMemory, out _, out var removeKey); + if (removeKey) + { + rmwInfo.Action = RMWAction.ExpireAndStop; + return false; + } break; } diff --git a/test/Garnet.test/RespHashTests.cs b/test/Garnet.test/RespHashTests.cs index d141b9e75c..45b1882b74 100644 --- a/test/Garnet.test/RespHashTests.cs +++ b/test/Garnet.test/RespHashTests.cs @@ -513,7 +513,24 @@ public async Task CanDoHashSetWithNX() Assert.AreEqual("value3", (string?)val3); Assert.False(set3); #nullable disable + } + + [Test] + public void CheckEmptyHashKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:hash"); + var db = redis.GetDatabase(0); + + db.HashSet(key, [new HashEntry("Title", "Tsavorite"), new HashEntry("Year", "2021")]); + + var result = db.HashDelete(key, new RedisValue("Title")); + Assert.IsTrue(result); + result = db.HashDelete(key, new RedisValue("Year")); + Assert.IsTrue(result); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); } #endregion diff --git a/test/Garnet.test/RespListTests.cs b/test/Garnet.test/RespListTests.cs index 11c8c70d2f..b7aabe27d6 100644 --- a/test/Garnet.test/RespListTests.cs +++ b/test/Garnet.test/RespListTests.cs @@ -1227,5 +1227,22 @@ public void CanDoLPushxRPushx() var actualValue = Encoding.ASCII.GetString(len).Substring(0, expectedResponse.Length); Assert.AreEqual(expectedResponse, actualValue); } + + [Test] + public void CheckEmptyListKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:list"); + var db = redis.GetDatabase(0); + var values = new[] { new RedisValue("Hello"), new RedisValue("World") }; + var result = db.ListRightPush(key, values); + Assert.AreEqual(2, result); + + var actualMembers = db.ListRightPop(key, 2); + Assert.AreEqual(values.Length, actualMembers.Length); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + } } } \ No newline at end of file diff --git a/test/Garnet.test/RespSortedSetTests.cs b/test/Garnet.test/RespSortedSetTests.cs index 85de2ab0bb..8b90270300 100644 --- a/test/Garnet.test/RespSortedSetTests.cs +++ b/test/Garnet.test/RespSortedSetTests.cs @@ -692,6 +692,23 @@ public async Task CanManageZRangeByScoreWhenStartHigherThanExistingMaxScoreSE() Assert.AreEqual(0, range.Length); } + [Test] + public void CheckEmptySortedSetKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:sortedset"); + var db = redis.GetDatabase(0); + + var added = db.SortedSetAdd(key, entries); + Assert.AreEqual(entries.Length, added); + + var actualMembers = db.SortedSetPop(key, entries.Length); + Assert.AreEqual(entries.Length, actualMembers.Length); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + } + #endregion #region LightClientTests From 2b88d4f8361d3bf4c85bd8e5f83da8131b36b4a0 Mon Sep 17 00:00:00 2001 From: Tal Zaccai Date: Wed, 5 Jun 2024 12:31:59 -0700 Subject: [PATCH 7/7] Fixing some comments --- libs/server/Objects/List/ListObject.cs | 1 - libs/server/Objects/Set/SetObject.cs | 1 - .../Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs | 2 +- libs/server/Storage/Session/ObjectStore/Common.cs | 10 ++-------- 4 files changed, 3 insertions(+), 11 deletions(-) diff --git a/libs/server/Objects/List/ListObject.cs b/libs/server/Objects/List/ListObject.cs index 4c60d4cd0d..d50823322a 100644 --- a/libs/server/Objects/List/ListObject.cs +++ b/libs/server/Objects/List/ListObject.cs @@ -57,7 +57,6 @@ public enum OperationDirection /// public partial class ListObject : GarnetObjectBase { - /// readonly LinkedList list; /// diff --git a/libs/server/Objects/Set/SetObject.cs b/libs/server/Objects/Set/SetObject.cs index 1e871a3193..d7db65c3d6 100644 --- a/libs/server/Objects/Set/SetObject.cs +++ b/libs/server/Objects/Set/SetObject.cs @@ -40,7 +40,6 @@ public enum SetOperation : byte /// public unsafe partial class SetObject : GarnetObjectBase { - /// readonly HashSet set; /// diff --git a/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs b/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs index f8d685bdd7..121c55280e 100644 --- a/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs +++ b/libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs @@ -15,7 +15,7 @@ namespace Garnet.server /// /// Sorted Set - RESP specific operations for GEO Commands /// - public unsafe partial class SortedSetObject : IGarnetObject + public unsafe partial class SortedSetObject : GarnetObjectBase { /// /// Use this struct for the reply of GEOSEARCH command diff --git a/libs/server/Storage/Session/ObjectStore/Common.cs b/libs/server/Storage/Session/ObjectStore/Common.cs index 839cf7e046..1c0e9fa5d5 100644 --- a/libs/server/Storage/Session/ObjectStore/Common.cs +++ b/libs/server/Storage/Session/ObjectStore/Common.cs @@ -30,10 +30,7 @@ unsafe GarnetStatus RMWObjectStoreOperation(byte[] key, ArgSlice Debug.Assert(_output.spanByteAndMemory.IsSpanByte); - if (status.NotFound || (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated)) - return GarnetStatus.NOTFOUND; - - return GarnetStatus.OK; + return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND; } /// @@ -57,10 +54,7 @@ GarnetStatus RMWObjectStoreOperationWithOutput(byte[] key, ArgSl if (status.IsPending) CompletePendingForObjectStoreSession(ref status, ref outputFooter, ref objectStoreContext); - if (status.NotFound || (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated)) - return GarnetStatus.NOTFOUND; - - return GarnetStatus.OK; + return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND; } ///