Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing key from object store when collection object it empty #443

Merged
merged 10 commits into from
Jun 5, 2024
9 changes: 6 additions & 3 deletions libs/server/Custom/CustomObjectBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,15 @@ public sealed override void DoSerialize(BinaryWriter writer)
public abstract override void Dispose();

/// <inheritdoc />
public abstract void Operate(byte subCommand, ReadOnlySpan<byte> input, ref (IMemoryOwner<byte>, int) output);
public abstract void Operate(byte subCommand, ReadOnlySpan<byte> input, ref (IMemoryOwner<byte>, int) output, out bool removeKey);

/// <inheritdoc />
public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange)
public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
{
var header = (RespInputHeader*)input.ToPointer();
sizeChange = 0;
removeKey = false;

switch (header->cmd)
{
// Scan Command
Expand All @@ -207,11 +209,12 @@ public sealed override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMe
break;
default:
(IMemoryOwner<byte> Memory, int Length) outp = (output.Memory, 0);
Operate(header->SubId, input.AsReadOnlySpan().Slice(RespInputHeader.Size), ref outp);
Operate(header->SubId, input.AsReadOnlySpan().Slice(RespInputHeader.Size), ref outp, out removeKey);
output.Memory = outp.Memory;
output.Length = outp.Length;
break;
}

return true;
}
}
Expand Down
6 changes: 5 additions & 1 deletion libs/server/Objects/Hash/HashObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ public override void Dispose() { }
public override GarnetObjectBase Clone() => new HashObject(hash, Expiration, Size);

/// <inheritdoc />
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange)
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
{
removeKey = false;

fixed (byte* _input = input.AsSpan())
fixed (byte* _output = output.SpanByte.AsSpan())
{
Expand Down Expand Up @@ -183,6 +185,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou

sizeChange = this.Size - previousSize;
}

removeKey = hash.Count == 0;
return true;
}

Expand Down
6 changes: 5 additions & 1 deletion libs/server/Objects/List/ListObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ public override void Dispose() { }
public override GarnetObjectBase Clone() => new ListObject(list, Expiration, Size);

/// <inheritdoc />
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange)
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
{
fixed (byte* _input = input.AsSpan())
fixed (byte* _output = output.SpanByte.AsSpan())
{
removeKey = false;

var header = (RespInputHeader*)_input;
if (header->type != GarnetObjectType.List)
{
Expand Down Expand Up @@ -185,6 +187,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou

sizeChange = this.Size - previousSize;
}

removeKey = list.Count == 0;
return true;
}

Expand Down
4 changes: 3 additions & 1 deletion libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public override void Dispose() { }
public override GarnetObjectBase Clone() => new SetObject(set, Expiration, Size);

/// <inheritdoc />
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange)
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
{
fixed (byte* _input = input.AsSpan())
fixed (byte* _output = output.SpanByte.AsSpan())
Expand Down Expand Up @@ -147,6 +147,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
}
sizeChange = this.Size - prevSize;
}

removeKey = set.Count == 0;
return true;
}

Expand Down
4 changes: 3 additions & 1 deletion libs/server/Objects/SortedSet/SortedSetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public override void Dispose() { }
public override GarnetObjectBase Clone() => new SortedSetObject(sortedSet, sortedSetDict, Expiration, Size);

/// <inheritdoc />
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange)
public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey)
{
fixed (byte* _input = input.AsSpan())
fixed (byte* _output = output.SpanByte.AsSpan())
Expand Down Expand Up @@ -271,6 +271,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
}
sizeChange = this.Size - previouseSize;
}

removeKey = sortedSetDict.Count == 0;
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion libs/server/Objects/Types/GarnetObjectBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void CopyUpdate(ref IGarnetObject oldValue, ref IGarnetObject newValue, b
public abstract GarnetObjectBase Clone();

/// <inheritdoc />
public abstract bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange);
public abstract bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey);

/// <inheritdoc />
public abstract void Dispose();
Expand Down
3 changes: 2 additions & 1 deletion libs/server/Objects/Types/IGarnetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ public interface IGarnetObject : IDisposable
/// <param name="input"></param>
/// <param name="output"></param>
/// <param name="sizeChange"></param>
/// <param name="removeKey"></param>
/// <returns></returns>
bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange);
bool Operate(ref SpanByte input, ref SpanByteAndMemory output, out long sizeChange, out bool removeKey);

/// <summary>
/// Serializer
Expand Down
7 changes: 7 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,13 @@ public unsafe bool ListSet<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
var objOutputHeader = ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
ptr += objOutputHeader.bytesDone;
break;
case GarnetStatus.NOTFOUND:
var tokens = ReadLeftToken(count - 1, ref ptr);
if (tokens < count - 1)
return false;
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_NOSUCHKEY, ref dcurr, dend))
SendAndReset();
break;
}
}

Expand Down
5 changes: 1 addition & 4 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ private unsafe bool SortedSetIncrement<TGarnetApi>(int count, byte* ptr, ref TGa
ReadOnlySpan<byte> errorMessage = default;
switch (status)
{
case GarnetStatus.NOTFOUND:
case GarnetStatus.OK:
//verifying length of outputFooter
if (outputFooter.spanByteAndMemory.Length == 0)
Expand All @@ -869,10 +870,6 @@ private unsafe bool SortedSetIncrement<TGarnetApi>(int count, byte* ptr, ref TGa
ptr += objOutputHeader.bytesDone;
}
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
}

if (errorMessage != default)
Expand Down
3 changes: 2 additions & 1 deletion libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -717,11 +717,12 @@ public bool CopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldVa
}

/// <inheritdoc />
public void PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo)
public bool PostCopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo)
{
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
if (functionsState.appendOnlyFile != null)
WriteLogRMW(ref key, ref input, ref oldValue, rmwInfo.Version, rmwInfo.SessionID);
return true;
}
}
}
33 changes: 25 additions & 8 deletions libs/server/Storage/Functions/ObjectStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public bool InitialUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject
byte objectId = (byte)((byte)type - CustomCommandManager.StartOffset);
value = functionsState.customObjectCommands[objectId].factory.Create((byte)type);
}
value.Operate(ref input, ref output.spanByteAndMemory, out _);
value.Operate(ref input, ref output.spanByteAndMemory, out _, out _);
return true;
}

Expand Down Expand Up @@ -74,7 +74,7 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject
switch (header->type)
{
case GarnetObjectType.Expire:
ExpireOption optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size));
var optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size));
bool expiryExists = (value.Expiration > 0);
return EvaluateObjectExpireInPlace(optionType, expiryExists, ref input, ref value, ref output);
case GarnetObjectType.Persist:
Expand All @@ -87,7 +87,15 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref SpanByte input, ref IGarnetObject
CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory);
return true;
default:
return value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange);
var operateSuccessful = value.Operate(ref input, ref output.spanByteAndMemory, out sizeChange,
out var removeKey);
if (removeKey)
{
rmwInfo.Action = RMWAction.ExpireAndStop;
return false;
}

return operateSuccessful;
}
}

Expand All @@ -110,18 +118,21 @@ public bool CopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject ol
}

/// <inheritdoc />
public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo)
public bool PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObject oldValue, ref IGarnetObject value, ref GarnetObjectStoreOutput output, ref RMWInfo rmwInfo)
{
// We're performing the object update here (and not in CopyUpdater) so that we are guaranteed that
// the record was CASed into the hash chain before it gets modified
oldValue.CopyUpdate(ref oldValue, ref value, rmwInfo.RecordInfo.IsInNewVersion);

var header = (RespInputHeader*)input.ToPointer();
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);

switch (header->type)
{
case GarnetObjectType.Expire:
ExpireOption optionType = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size));
bool expiryExists = (value.Expiration > 0);
EvaluateObjectExpireInPlace(optionType, expiryExists, ref input, ref value, ref output);
var expireOption = (ExpireOption)(*(input.ToPointer() + RespInputHeader.Size));
var expiryExists = (value.Expiration > 0);
EvaluateObjectExpireInPlace(expireOption, expiryExists, ref input, ref value, ref output);
break;
case GarnetObjectType.Persist:
if (value.Expiration > 0)
Expand All @@ -133,14 +144,20 @@ public void PostCopyUpdater(ref byte[] key, ref SpanByte input, ref IGarnetObjec
CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output.spanByteAndMemory);
break;
default:
value.Operate(ref input, ref output.spanByteAndMemory, out _);
value.Operate(ref input, ref output.spanByteAndMemory, out _, out var removeKey);
if (removeKey)
{
rmwInfo.Action = RMWAction.ExpireAndStop;
return false;
}
break;
}

functionsState.objectStoreSizeTracker?.AddTrackedSize(MemoryUtils.CalculateKeyValueSize(key, value));

if (functionsState.appendOnlyFile != null)
WriteLogRMW(ref key, ref input, ref oldValue, rmwInfo.Version, rmwInfo.SessionID);
return true;
}
}
}
4 changes: 2 additions & 2 deletions libs/server/Storage/Functions/ObjectStore/ReadMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public bool SingleReader(ref byte[] key, ref SpanByte input, ref IGarnetObject v
return true;
}

return value.Operate(ref input, ref dst.spanByteAndMemory, out _);
return value.Operate(ref input, ref dst.spanByteAndMemory, out _, out _);
}

/// <inheritdoc />
Expand All @@ -52,7 +52,7 @@ public bool ConcurrentReader(ref byte[] key, ref SpanByte input, ref IGarnetObje
CopyRespNumber(ttlValue, ref dst.spanByteAndMemory);
return true;
}
return value.Operate(ref input, ref dst.spanByteAndMemory, out _);
return value.Operate(ref input, ref dst.spanByteAndMemory, out _, out _);
}

dst.garnetObject = value;
Expand Down
10 changes: 2 additions & 8 deletions libs/server/Storage/Session/ObjectStore/Common.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ unsafe GarnetStatus RMWObjectStoreOperation<TObjectContext>(byte[] key, ArgSlice
Debug.Assert(_output.spanByteAndMemory.IsSpanByte);


if (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated)
return GarnetStatus.NOTFOUND;

return GarnetStatus.OK;
return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND;
}

/// <summary>
Expand All @@ -57,10 +54,7 @@ GarnetStatus RMWObjectStoreOperationWithOutput<TObjectContext>(byte[] key, ArgSl
if (status.IsPending)
CompletePendingForObjectStoreSession(ref status, ref outputFooter, ref objectStoreContext);

if (!status.Record.Created && !status.Record.CopyUpdated && !status.Record.InPlaceUpdated)
return GarnetStatus.NOTFOUND;

return GarnetStatus.OK;
return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND;
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/storage/Tsavorite/cs/benchmark/SessionFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
return true;
}

public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { }
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true;

public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output, ref RMWInfo rmwInfo) => true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Va

public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output, ref RMWInfo rmwInfo) => true;
public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true;
public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { }
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true;

public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output, ref RMWInfo rmwInfo) => true;
public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo, ref recordInfo);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo)
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
}
#endregion CopyUpdater

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void PostSingleDeleter(ref Key key, ref DeleteInfo deleteInfo) { }

public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true;

public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) { }
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo) => true;

public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) => true;
public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RMWInfo rmwInfo) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ public interface ISessionFunctions<Key, Value, Input, Output, Context>
/// <param name="newValue">The destination to be updated; because this is an copy to a new location, there is no previous value there.</param>
/// <param name="output">The location where <paramref name="newValue"/> is to be copied</param>
/// <param name="rmwInfo">Information about this update operation and its context</param>
void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo);
/// <returns>This is the only Post* method that returns non-void. The bool functions the same as CopyUpdater; this is because we do not want to modify
/// objects in-memory until we know the "insert at tail" is successful. Therefore, we allow a false return as a signal to inspect <paramref name="rmwInfo.Action"/>
/// and handle <see cref="RMWAction.ExpireAndStop"/>.</returns>
bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RMWInfo rmwInfo);
#endregion CopyUpdater

#region InPlaceUpdater
Expand Down
Loading
Loading