diff --git a/libs/server/Custom/CustomObjectBase.cs b/libs/server/Custom/CustomObjectBase.cs index 0ad3a8495a..ab934d7139 100644 --- a/libs/server/Custom/CustomObjectBase.cs +++ b/libs/server/Custom/CustomObjectBase.cs @@ -187,13 +187,15 @@ public sealed override void DoSerialize(BinaryWriter writer) public abstract override void Dispose(); /// - public abstract void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output); + public abstract void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output, out bool removeKey); /// - public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { var header = (RespInputHeader*)input.ToPointer(); sizeChange = 0; + removeKey = false; + switch (header->cmd) { // Scan Command @@ -207,11 +209,12 @@ public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMe break; default: (IMemoryOwner Memory, int Length) outp = (output.Memory, 0); - Operate(header->SubId, input.AsReadOnlySpan().Slice(RespInputHeader.Size), ref outp); + Operate(header->SubId, input.AsReadOnlySpan().Slice(RespInputHeader.Size), ref outp, out removeKey); output.Memory = outp.Memory; output.Length = outp.Length; break; } + return true; } } diff --git a/libs/server/Objects/Hash/HashObject.cs b/libs/server/Objects/Hash/HashObject.cs index 20a6b9a7ef..609db8ae05 100644 --- a/libs/server/Objects/Hash/HashObject.cs +++ b/libs/server/Objects/Hash/HashObject.cs @@ -108,8 +108,10 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new HashObject(hash, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { + removeKey = false; + fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) { @@ -183,6 +185,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou sizeChange = this.Size - previousSize; } + + removeKey = hash.Count == 0; return true; } diff --git a/libs/server/Objects/List/ListObject.cs b/libs/server/Objects/List/ListObject.cs index 439bc8430d..d50823322a 100644 --- a/libs/server/Objects/List/ListObject.cs +++ b/libs/server/Objects/List/ListObject.cs @@ -126,11 +126,13 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new ListObject(list, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) { + removeKey = false; + var header = (RespInputHeader*)_input; if (header->type != GarnetObjectType.List) { @@ -185,6 +187,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou sizeChange = this.Size - previousSize; } + + removeKey = list.Count == 0; return true; } diff --git a/libs/server/Objects/Set/SetObject.cs b/libs/server/Objects/Set/SetObject.cs index a481d235c2..d7db65c3d6 100644 --- a/libs/server/Objects/Set/SetObject.cs +++ b/libs/server/Objects/Set/SetObject.cs @@ -104,7 +104,7 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new SetObject(set, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) @@ -147,6 +147,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou } sizeChange = this.Size - prevSize; } + + removeKey = set.Count == 0; return true; } diff --git a/libs/server/Objects/SortedSet/SortedSetObject.cs b/libs/server/Objects/SortedSet/SortedSetObject.cs index 95d49587d1..49c3aef50d 100644 --- a/libs/server/Objects/SortedSet/SortedSetObject.cs +++ b/libs/server/Objects/SortedSet/SortedSetObject.cs @@ -177,7 +177,7 @@ public override void Dispose() { } public override GarnetObjectBase Clone() => new SortedSetObject(sortedSet, sortedSetDict, Expiration, Size); /// - public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange) + public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey) { fixed (byte* _input = input.AsSpan()) fixed (byte* _output = output.SpanByte.AsSpan()) @@ -271,6 +271,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou } sizeChange = this.Size - previouseSize; } + + removeKey = sortedSetDict.Count == 0; return true; } diff --git a/libs/server/Objects/Types/GarnetObjectBase.cs b/libs/server/Objects/Types/GarnetObjectBase.cs index 537405f844..6e86f3f8d7 100644 --- a/libs/server/Objects/Types/GarnetObjectBase.cs +++ b/libs/server/Objects/Types/GarnetObjectBase.cs @@ -116,7 +116,7 @@ public void CopyUpdate(ref IGarnetObject oldValue, ref IGarnetObject newValue, b public abstract GarnetObjectBase Clone(); /// - public abstract bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange); + public abstract bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey); /// public abstract void Dispose(); diff --git a/libs/server/Objects/Types/IGarnetObject.cs b/libs/server/Objects/Types/IGarnetObject.cs index 3c27b72516..cd5b03c591 100644 --- a/libs/server/Objects/Types/IGarnetObject.cs +++ b/libs/server/Objects/Types/IGarnetObject.cs @@ -34,8 +34,9 @@ public interface IGarnetObject : IDisposable /// /// /// + /// /// - bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange); + bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey); /// /// Serializer diff --git a/libs/server/Resp/Objects/ListCommands.cs b/libs/server/Resp/Objects/ListCommands.cs index 6a49a463a4..4d74469480 100644 --- a/libs/server/Resp/Objects/ListCommands.cs +++ b/libs/server/Resp/Objects/ListCommands.cs @@ -846,6 +846,13 @@ public unsafe bool ListSet(int count, byte* ptr, ref TGarnetApi stor var objOutputHeader = ProcessOutputWithHeader(outputFooter.spanByteAndMemory); ptr += objOutputHeader.bytesDone; break; + case GarnetStatus.NOTFOUND: + var tokens = ReadLeftToken(count - 1, ref ptr); + if (tokens < count - 1) + return false; + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_NOSUCHKEY, ref dcurr, dend)) + SendAndReset(); + break; } } diff --git a/libs/server/Resp/Objects/SortedSetCommands.cs b/libs/server/Resp/Objects/SortedSetCommands.cs index 918fc39bb6..b4d135a91b 100644 --- a/libs/server/Resp/Objects/SortedSetCommands.cs +++ b/libs/server/Resp/Objects/SortedSetCommands.cs @@ -848,6 +848,7 @@ private unsafe bool SortedSetIncrement(int count, byte* ptr, ref TGa ReadOnlySpan errorMessage = default; switch (status) { + case GarnetStatus.NOTFOUND: case GarnetStatus.OK: //verifying length of outputFooter if (outputFooter.spanByteAndMemory.Length == 0) @@ -869,10 +870,6 @@ private unsafe bool SortedSetIncrement(int count, byte* ptr, ref TGa ptr += objOutputHeader.bytesDone; } break; - case GarnetStatus.NOTFOUND: - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend)) - SendAndReset(); - break; } if (errorMessage != default) diff --git a/libs/server/Storage/Functions/MainStore/RMWMethods.cs b/libs/server/Storage/Functions/MainStore/RMWMethods.cs index 490ed9f9f8..6c3b5d572d 100644 --- a/libs/server/Storage/Functions/MainStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/MainStore/RMWMethods.cs @@ -717,11 +717,12 @@ public bool CopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldVa } /// - public void PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) + public bool PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) { functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); if (functionsState.appendOnlyFile != null) WriteLogRMW(ref key, ref input, ref oldValue, rmwInfo.Version, rmwInfo.SessionID); + return true; } } } \ No newline at end of file diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index d9319a5e83..1fc0b3982a 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -27,7 +27,7 @@ public bool InitialUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject byte objectId = (byte)((byte)type - CustomCommandManager.StartOffset); value = functionsState.customObjectCommands[objectId].factory.Create((byte)type); } - value.Operate(ref input, ref output.spanByteAndMemory, out _); + value.Operate(ref input, ref output.spanByteAndMemory, out _, out _); return true; } @@ -74,7 +74,7 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject switch (header->type) { case GarnetObjectType.Expire: - ExpireOption optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); + var optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); bool expiryExists = (value.Expiration > 0); return EvaluateObjectExpireInPlace(optionType, expiryExists, ref input, ref value, ref output); case GarnetObjectType.Persist: @@ -87,7 +87,15 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory); return true; default: - return value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange); + var operateSuccessful = value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange, + out var removeKey); + if (removeKey) + { + rmwInfo.Action = RMWAction.ExpireAndStop; + return false; + } + + return operateSuccessful; } } @@ -110,18 +118,21 @@ public bool CopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject ol } /// - public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo) + public bool PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo) { + // We're performing the object update here (and not in CopyUpdater) so that we are guaranteed that + // the record was CASed into the hash chain before it gets modified oldValue.CopyUpdate(ref oldValue, ref value, rmwInfo.RecordInfo.IsInNewVersion); var header = (RespInputHeader*)input.ToPointer(); functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); + switch (header->type) { case GarnetObjectType.Expire: - ExpireOption optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); - bool expiryExists = (value.Expiration > 0); - EvaluateObjectExpireInPlace(optionType, expiryExists, ref input, ref value, ref output); + var expireOption = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size)); + var expiryExists = (value.Expiration > 0); + EvaluateObjectExpireInPlace(expireOption, expiryExists, ref input, ref value, ref output); break; case GarnetObjectType.Persist: if (value.Expiration > 0) @@ -133,7 +144,12 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory); break; default: - value.Operate(ref input, ref output.spanByteAndMemory, out _); + value.Operate(ref input, ref output.spanByteAndMemory, out _, out var removeKey); + if (removeKey) + { + rmwInfo.Action = RMWAction.ExpireAndStop; + return false; + } break; } @@ -141,6 +157,7 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec if (functionsState.appendOnlyFile != null) WriteLogRMW(ref key, ref input, ref oldValue, rmwInfo.Version, rmwInfo.SessionID); + return true; } } } \ No newline at end of file diff --git a/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs b/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs index 8fdf2b53f0..f35f7d7165 100644 --- a/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/ReadMethods.cs @@ -28,7 +28,7 @@ public bool SingleReader(ref byte[] key, ref SpanByte input, ref IGarnetObject v return true; } - return value.Operate(ref input, ref dst.spanByteAndMemory, out _); + return value.Operate(ref input, ref dst.spanByteAndMemory, out _, out _); } /// @@ -52,7 +52,7 @@ public bool ConcurrentReader(ref byte[] key, ref SpanByte input, ref IGarnetObje CopyRespNumber(ttlValue, ref dst.spanByteAndMemory); return true; } - return value.Operate(ref input, ref dst.spanByteAndMemory, out _); + return value.Operate(ref input, ref dst.spanByteAndMemory, out _, out _); } dst.garnetObject = value; diff --git a/libs/server/Storage/Session/ObjectStore/Common.cs b/libs/server/Storage/Session/ObjectStore/Common.cs index 04904c2107..1c0e9fa5d5 100644 --- a/libs/server/Storage/Session/ObjectStore/Common.cs +++ b/libs/server/Storage/Session/ObjectStore/Common.cs @@ -30,10 +30,7 @@ unsafe GarnetStatus RMWObjectStoreOperation(byte[] key, ArgSlice Debug.Assert(_output.spanByteAndMemory.IsSpanByte); - if (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated) - return GarnetStatus.NOTFOUND; - - return GarnetStatus.OK; + return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND; } /// @@ -57,10 +54,7 @@ GarnetStatus RMWObjectStoreOperationWithOutput(byte[] key, ArgSl if (status.IsPending) CompletePendingForObjectStoreSession(ref status, ref outputFooter, ref objectStoreContext); - if (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated) - return GarnetStatus.NOTFOUND; - - return GarnetStatus.OK; + return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND; } /// diff --git a/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs b/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs index 94267fd0bc..dab6881dc2 100644 --- a/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs +++ b/libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs @@ -72,7 +72,7 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va return true; } - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output, ref RMWInfo rmwInfo) => true; diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs index 3420ec4498..1dfcfcb40c 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs @@ -338,7 +338,7 @@ public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Va public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output, ref RMWInfo rmwInfo) => true; public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs index 74fc3eb1d6..d9fac2751a 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/SessionFunctionsWrapper.cs @@ -87,10 +87,10 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo, ref recordInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) { recordInfo.SetDirtyAndModified(); - _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); } #endregion CopyUpdater diff --git a/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs index e4e4416738..4c7a88ce71 100644 --- a/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/Compaction/LogCompactionFunctions.cs @@ -35,7 +35,7 @@ public void PostSingleDeleter(ref Key key, ref DeleteInfo deleteInfo) { } public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo) { } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs index 949697b6e7..a28bdbba0a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs @@ -163,7 +163,10 @@ public interface ISessionFunctions /// The destination to be updated; because this is an copy to a new location, there is no previous value there. /// The location where is to be copied /// Information about this update operation and its context - void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo); + /// This is the only Post* method that returns non-void. The bool functions the same as CopyUpdater; this is because we do not want to modify + /// objects in-memory until we know the "insert at tail" is successful. Therefore, we allow a false return as a signal to inspect + /// and handle . + bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo); #endregion CopyUpdater #region InPlaceUpdater diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs index ef4da5aecd..719e4a97ea 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctionsWrapper.cs @@ -34,7 +34,7 @@ internal interface ISessionFunctionsWrapper #region CopyUpdater bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output, ref RMWInfo rmwInfo); bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo); - void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo); + bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo); #endregion CopyUpdater #region InPlaceUpdater diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs index 47e975e6a3..a5a7286cbf 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs @@ -40,7 +40,7 @@ public virtual void PostInitialUpdater(ref Key key, ref Input input, ref Value v /// public virtual bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; /// - public virtual void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { } + public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true; /// public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs index 6a435d3a52..57d6ddba86 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs @@ -527,7 +527,7 @@ private OperationStatus CreateNewRecordRMW /// Whether a new record for a previously non-existent key was appended to the log. /// - public bool Created => (statusCode & StatusCode.AdvancedMask) == StatusCode.CreatedRecord; + public bool Created => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CreatedRecord; /// /// Whether an existing record was updated in place. /// - public bool InPlaceUpdated => (statusCode & StatusCode.AdvancedMask) == StatusCode.InPlaceUpdatedRecord; + public bool InPlaceUpdated => (statusCode & StatusCode.RecordStatusMask) == StatusCode.InPlaceUpdatedRecord; /// /// Whether an existing record key was copied, updated, and appended to the log. /// - public bool CopyUpdated => (statusCode & StatusCode.AdvancedMask) == StatusCode.CopyUpdatedRecord; + public bool CopyUpdated => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CopyUpdatedRecord; /// /// Whether an existing record key was copied and appended to the log. /// - public bool Copied => (statusCode & StatusCode.AdvancedMask) == StatusCode.CopiedRecord; + public bool Copied => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CopiedRecord; /// /// Whether an existing record key was copied, updated, and added to the readcache. /// - public bool CopiedToReadCache => (statusCode & StatusCode.AdvancedMask) == StatusCode.CopiedRecordToReadCache; + public bool CopiedToReadCache => (statusCode & StatusCode.RecordStatusMask) == StatusCode.CopiedRecordToReadCache; } /// diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs index aed7964c74..45d584f4d0 100644 --- a/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/StatusCode.cs @@ -123,7 +123,12 @@ internal enum StatusCode : byte // unused 0x70, /// - /// Indicates that an existing record key was auto-expired. This is a flag that is combined with lower Advanced values. + /// Individual record-status values that are not masked together. + /// + RecordStatusMask = 0x70, + + /// + /// Indicates that an existing record key was auto-expired. This is a flag that is combined with RecordStatusMask values. /// /// /// See basic codes for details of usage. diff --git a/libs/storage/Tsavorite/cs/test/AdvancedLockTests.cs b/libs/storage/Tsavorite/cs/test/AdvancedLockTests.cs index 02c01787f7..c3bd56c203 100644 --- a/libs/storage/Tsavorite/cs/test/AdvancedLockTests.cs +++ b/libs/storage/Tsavorite/cs/test/AdvancedLockTests.cs @@ -79,11 +79,12 @@ public override bool CopyUpdater(ref int key, ref Input input, ref int oldValue, return true; } - public override void PostCopyUpdater(ref int key, ref Input input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo) + public override bool PostCopyUpdater(ref int key, ref Input input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo) { base.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); if (input.doTest) readEvent.Set(); + return true; } public override bool InPlaceUpdater(ref int key, ref Input input, ref int value, ref int output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) diff --git a/libs/storage/Tsavorite/cs/test/PostOperationsTests.cs b/libs/storage/Tsavorite/cs/test/PostOperationsTests.cs index 668ff7713a..67767bc145 100644 --- a/libs/storage/Tsavorite/cs/test/PostOperationsTests.cs +++ b/libs/storage/Tsavorite/cs/test/PostOperationsTests.cs @@ -4,6 +4,7 @@ using System.IO; using NUnit.Framework; using Tsavorite.core; +using static Tsavorite.test.TestUtils; namespace Tsavorite.test { @@ -16,6 +17,7 @@ class PostFunctions : SimpleSimpleFunctions internal long piuAddress; internal long pcuAddress; internal long psdAddress; + internal bool returnFalseFromPCU; internal void Clear() { @@ -38,7 +40,13 @@ internal PostFunctions() : base() { } /// public override bool CopyUpdater(ref int key, ref int input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) { newValue = oldValue; return true; } /// - public override void PostCopyUpdater(ref int key, ref int input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo) { pcuAddress = rmwInfo.Address; } + public override bool PostCopyUpdater(ref int key, ref int input, ref int oldValue, ref int newValue, ref int output, ref RMWInfo rmwInfo) + { + pcuAddress = rmwInfo.Address; + if (returnFalseFromPCU) + rmwInfo.Action = RMWAction.ExpireAndStop; + return !returnFalseFromPCU; + } public override void PostSingleDeleter(ref int key, ref DeleteInfo deleteInfo) { psdAddress = deleteInfo.Address; } public override bool ConcurrentDeleter(ref int key, ref int value, ref DeleteInfo deleteInfo, ref RecordInfo recordInfo) => false; @@ -157,6 +165,30 @@ public void PostCopyUpdaterTest() Assert.AreEqual(expectedAddress, session.functions.pcuAddress); } + [Test] + [Category("TsavoriteKV")] + [Category("Smoke")] + public void PostCopyUpdaterFalseTest([Values(FlushMode.ReadOnly, FlushMode.OnDisk)] FlushMode flushMode) + { + // Verify the key exists + var (status, output) = bContext.Read(targetKey); + Assert.IsTrue(status.Found, "Expected the record to exist"); + session.functions.returnFalseFromPCU = true; + + // Make the record read-only + if (flushMode == FlushMode.OnDisk) + store.Log.ShiftReadOnlyAddress(store.Log.ReadOnlyAddress, wait: true); + else + store.Log.FlushAndEvict(wait: true); + + // Call RMW + bContext.RMW(targetKey, targetKey * 1000); + + // Verify the key no longer exists. + (status, output) = bContext.Read(targetKey); + Assert.IsFalse(status.Found, "Expected the record to no longer exist"); + } + [Test] [Category("TsavoriteKV")] [Category("Smoke")] diff --git a/libs/storage/Tsavorite/cs/test/RevivificationTests.cs b/libs/storage/Tsavorite/cs/test/RevivificationTests.cs index 1f1d5a7e36..6d43443a9c 100644 --- a/libs/storage/Tsavorite/cs/test/RevivificationTests.cs +++ b/libs/storage/Tsavorite/cs/test/RevivificationTests.cs @@ -556,10 +556,10 @@ public override bool ConcurrentDeleter(ref SpanByte key, ref SpanByte value, ref return base.ConcurrentDeleter(ref key, ref value, ref deleteInfo, ref recordInfo); } - public override void PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) + public override bool PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) { AssertInfoValid(ref rmwInfo); - base.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + return base.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); } public override void PostInitialUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory output, ref RMWInfo rmwInfo) diff --git a/main/GarnetServer/Extensions/MyDictObject.cs b/main/GarnetServer/Extensions/MyDictObject.cs index bb098ed81b..0a1e988f93 100644 --- a/main/GarnetServer/Extensions/MyDictObject.cs +++ b/main/GarnetServer/Extensions/MyDictObject.cs @@ -66,7 +66,7 @@ public override void SerializeObject(BinaryWriter writer) } } - public override void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output) + public override void Operate(byte subCommand, ReadOnlySpan input, ref (IMemoryOwner, int) output, out bool removeKey) { switch (subCommand) { @@ -93,6 +93,8 @@ public override void Operate(byte subCommand, ReadOnlySpan input, ref (IMe WriteError(ref output, "Unexpected command"); break; } + + removeKey = dict.Count == 0; } public override void Dispose() { } diff --git a/test/Garnet.test/RespHashTests.cs b/test/Garnet.test/RespHashTests.cs index d141b9e75c..45b1882b74 100644 --- a/test/Garnet.test/RespHashTests.cs +++ b/test/Garnet.test/RespHashTests.cs @@ -513,7 +513,24 @@ public async Task CanDoHashSetWithNX() Assert.AreEqual("value3", (string?)val3); Assert.False(set3); #nullable disable + } + + [Test] + public void CheckEmptyHashKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:hash"); + var db = redis.GetDatabase(0); + + db.HashSet(key, [new HashEntry("Title", "Tsavorite"), new HashEntry("Year", "2021")]); + + var result = db.HashDelete(key, new RedisValue("Title")); + Assert.IsTrue(result); + result = db.HashDelete(key, new RedisValue("Year")); + Assert.IsTrue(result); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); } #endregion diff --git a/test/Garnet.test/RespListTests.cs b/test/Garnet.test/RespListTests.cs index 4892a0ecf6..b7aabe27d6 100644 --- a/test/Garnet.test/RespListTests.cs +++ b/test/Garnet.test/RespListTests.cs @@ -57,10 +57,11 @@ public void BasicLPUSHAndLPOP() string popval = db.ListLeftPop(key); Assert.AreEqual(val, popval); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(result.IsNull); } [Test] @@ -395,12 +396,13 @@ public void MultiLPUSHAndLPOPV1() // list is empty, the code should return (nil) popval = db.ListLeftPop(key); - Assert.AreEqual(null, popval); + Assert.IsNull(popval); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(result.IsNull); } [Test] @@ -470,19 +472,15 @@ public void MultiRPUSHAndRPOP() break; } - result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); - // list is empty, the code should return (nil) popval = db.ListLeftPop(key); - Assert.AreEqual(null, popval); + Assert.IsNull(popval); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); result = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == result.Resp2Type ? Int32.Parse(result.ToString()) : -1; - expectedResponse = 104; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(result.IsNull); } [Test] @@ -1229,5 +1227,22 @@ public void CanDoLPushxRPushx() var actualValue = Encoding.ASCII.GetString(len).Substring(0, expectedResponse.Length); Assert.AreEqual(expectedResponse, actualValue); } + + [Test] + public void CheckEmptyListKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:list"); + var db = redis.GetDatabase(0); + var values = new[] { new RedisValue("Hello"), new RedisValue("World") }; + var result = db.ListRightPush(key, values); + Assert.AreEqual(2, result); + + var actualMembers = db.ListRightPop(key, 2); + Assert.AreEqual(values.Length, actualMembers.Length); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + } } } \ No newline at end of file diff --git a/test/Garnet.test/RespSetTest.cs b/test/Garnet.test/RespSetTest.cs index cca9b728e5..2e3351b6ac 100644 --- a/test/Garnet.test/RespSetTest.cs +++ b/test/Garnet.test/RespSetTest.cs @@ -52,6 +52,23 @@ public void CandDoSaddBasic(string key) Assert.IsFalse(result); } + [Test] + public void CheckEmptySetKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:set"); + var db = redis.GetDatabase(0); + var members = new[] { new RedisValue("Hello"), new RedisValue("World") }; + var result = db.SetAdd(key, members); + Assert.AreEqual(2, result); + + var actualMembers = db.SetPop(key, 2); + Assert.AreEqual(members.Length, actualMembers.Length); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + } + [Test] public void CanAddAndListMembers() { diff --git a/test/Garnet.test/RespSortedSetTests.cs b/test/Garnet.test/RespSortedSetTests.cs index 0bd9af070c..8b90270300 100644 --- a/test/Garnet.test/RespSortedSetTests.cs +++ b/test/Garnet.test/RespSortedSetTests.cs @@ -204,10 +204,11 @@ public void AddRemove() card = db.SortedSetLength(key); Assert.AreEqual(0, card); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 200; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); // 1 entry added added = db.SortedSetAdd(key, [entries[0]]); @@ -232,10 +233,11 @@ public void AddRemove() var response_keys = db.SortedSetRangeByRankWithScores(key, 0, 100); Assert.IsEmpty(response_keys); + keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 200; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); // 10 entries are added added = db.SortedSetAdd(key, entries); @@ -266,10 +268,11 @@ public void AddRemove() removed = db.SortedSetRemove(key, entries.Select(e => e.Element).ToArray()); Assert.AreEqual(entries.Length - 1, removed); + keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 200; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); } [Test] @@ -314,10 +317,11 @@ public void AddPopDesc() Assert.AreEqual(entries[6 - i], last3[i]); Assert.AreEqual(0, db.SortedSetLength(key)); + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + response = db.Execute("MEMORY", "USAGE", key); - actualValue = ResultType.Integer == response.Resp2Type ? Int32.Parse(response.ToString()) : -1; - expectedResponse = 192; - Assert.AreEqual(expectedResponse, actualValue); + Assert.IsTrue(response.IsNull); } [Test] @@ -688,6 +692,23 @@ public async Task CanManageZRangeByScoreWhenStartHigherThanExistingMaxScoreSE() Assert.AreEqual(0, range.Length); } + [Test] + public void CheckEmptySortedSetKeyRemoved() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var key = new RedisKey("user1:sortedset"); + var db = redis.GetDatabase(0); + + var added = db.SortedSetAdd(key, entries); + Assert.AreEqual(entries.Length, added); + + var actualMembers = db.SortedSetPop(key, entries.Length); + Assert.AreEqual(entries.Length, actualMembers.Length); + + var keyExists = db.KeyExists(key); + Assert.IsFalse(keyExists); + } + #endregion #region LightClientTests diff --git a/test/Garnet.test/RespTransactionProcTests.cs b/test/Garnet.test/RespTransactionProcTests.cs index 6d5798bac1..23bbed53d1 100644 --- a/test/Garnet.test/RespTransactionProcTests.cs +++ b/test/Garnet.test/RespTransactionProcTests.cs @@ -237,7 +237,7 @@ public void TransactionObjectExpiryProcTest() Assert.AreEqual(0, size); - SortedSetEntry? retEntry = db.SortedSetPop(key); + var retEntry = db.SortedSetPop(key); Assert.IsNull(retEntry); }