Skip to content

Commit

Permalink
Various API bugfixes (#417)
Browse files Browse the repository at this point in the history
* tmp - GeoAdd broken

* reverting changes to GeoAdd

* more bugfixes

* broken test fix

* small comment fix

---------

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TalZaccai and badrishc authored May 29, 2024
1 parent ade2991 commit 72e7504
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 156 deletions.
1 change: 1 addition & 0 deletions libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private void GeoAdd(byte* input, int length, byte* output)
_output->opsDone++;

this.UpdateSize(member);
elementsChanged++;
}
}
else if (!nx && scoreStored != score)
Expand Down
9 changes: 1 addition & 8 deletions libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -861,15 +861,8 @@ private bool NetworkTYPE<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storag
private bool NetworkMODULE<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count != 1)
return AbortWithWrongNumberOfArguments("MODULE", count);

// MODULE nameofmodule
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var nameofmodule, ref ptr, recvBufferPtr + bytesRead))
return false;

// TODO: pending implementation for module support.
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERR_GENERIC_UNK_CMD, ref dcurr, dend))
SendAndReset();

// Advance pointers
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private bool NetworkGetRange<TGarnetApi>(byte* ptr, ref TGarnetApi storageApi)
{
sessionMetrics?.incr_total_notfound();
Debug.Assert(o.IsSpanByte);
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_EMPTY, ref dcurr, dend))
SendAndReset();
}

Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_RETURN_VAL_1 => ":1\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_0 => ":0\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_N1 => ":-1\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_N2 => ":-2\r\n"u8;
public static ReadOnlySpan<byte> SUSCRIBE_PONG => "*2\r\n$4\r\npong\r\n$0\r\n\r\n"u8;
public static ReadOnlySpan<byte> RESP_PONG => "+PONG\r\n"u8;
public static ReadOnlySpan<byte> RESP_EMPTY => "$0\r\n\r\n"u8;
Expand Down Expand Up @@ -123,6 +124,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8;
public static ReadOnlySpan<byte> RESP_ERR_UNSUPPORTED_PROTOCOL_VERSION => "ERR Unsupported protocol version"u8;
public static ReadOnlySpan<byte> RESP_ERR_NOT_VALID_FLOAT => "ERR value is not a valid float"u8;
public static ReadOnlySpan<byte> RESP_WRONGPASS_INVALID_PASSWORD => "WRONGPASS Invalid password"u8;
public static ReadOnlySpan<byte> RESP_WRONGPASS_INVALID_USERNAME_PASSWORD => "WRONGPASS Invalid username/password combination"u8;
public static ReadOnlySpan<byte> RESP_SYNTAX_ERROR => "syntax error"u8;
Expand All @@ -135,6 +137,8 @@ static partial class CmdStrings
public const string GenericErrWrongNumArgs = "ERR wrong number of arguments for '{0}' command";
public const string GenericErrUnknownOption = "ERR Unknown option or number of arguments for CONFIG SET - '{0}'";
public const string GenericErrUnknownSubCommand = "ERR unknown subcommand '{0}'. Try {1} HELP";
public const string GenericErrWrongNumArgsTxn =
"ERR Invalid number of parameters to stored proc {0}, expected {1}, actual {2}";

/// <summary>
/// Object types
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/KeyAdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private bool NetworkTTL<TGarnetApi>(byte* ptr, RespCommand command, ref TGarnetA
}
else
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_N1, ref dcurr, dend))
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_N2, ref dcurr, dend))
SendAndReset();
}
return true;
Expand Down
8 changes: 8 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,16 @@ private unsafe bool SetRandomMember<TGarnetApi>(int count, byte* ptr, ref TGarne
return false;
break;
case GarnetStatus.NOTFOUND:
if (count == 2)
{
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
SendAndReset();
break;
}

while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();

break;
}

Expand Down
187 changes: 96 additions & 91 deletions libs/server/Resp/Objects/SharedObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,109 +25,114 @@ private unsafe bool ObjectScan<TGarnetApi>(int count, byte* ptr, GarnetObjectTyp
// Check number of required parameters
if (count < 2)
{
// Forward tokens in the input
ReadLeftToken(count, ref ptr);
var cmdName = objectType switch
{
GarnetObjectType.Hash => nameof(HashOperation.HSCAN),
GarnetObjectType.Set => nameof(SetOperation.SSCAN),
GarnetObjectType.SortedSet => nameof(SortedSetOperation.ZSCAN),
GarnetObjectType.All => nameof(RespCommand.COSCAN),
_ => nameof(RespCommand.NONE)
};

return AbortWithWrongNumberOfArguments(cmdName, count);
}
else

// Read key for the scan
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;

// Get cursor value
if (!RespReadUtils.ReadStringWithLengthHeader(out var cursor, ref ptr, recvBufferPtr + bytesRead))
return false;

if (!Int32.TryParse(cursor, out int cursorValue) || cursorValue < 0)
{
// Read key for the scan
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CURSORVALUE, ref dcurr, dend))
SendAndReset();
ReadLeftToken(count - 1, ref ptr);
return true;
}

// Get cursor value
if (!RespReadUtils.ReadStringWithLengthHeader(out var cursor, ref ptr, recvBufferPtr + bytesRead))
if (NetworkSingleKeySlotVerify(key, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}

if (!Int32.TryParse(cursor, out int cursorValue) || cursorValue < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CURSORVALUE, ref dcurr, dend))
SendAndReset();
ReadLeftToken(count - 1, ref ptr);
return true;
}
// Prepare input
// Header + size of int for the limitCountInOutput
var inputPtr = (ObjectInputHeader*)(ptr - ObjectInputHeader.Size - sizeof(int));
var ptrToInt = (int*)(ptr - sizeof(int));

if (NetworkSingleKeySlotVerify(key, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}
// Save old values on buffer for possible revert
var save = *inputPtr;
var savePtrToInt = *ptrToInt;

// Prepare input
// Header + size of int for the limitCountInOutput
var inputPtr = (ObjectInputHeader*)(ptr - ObjectInputHeader.Size - sizeof(int));
var ptrToInt = (int*)(ptr - sizeof(int));
// Build the input
byte* pcurr = (byte*)inputPtr;

// Save old values on buffer for possible revert
var save = *inputPtr;
var savePtrToInt = *ptrToInt;
// ObjectInputHeader
(*(ObjectInputHeader*)(pcurr)).header.type = objectType;
(*(ObjectInputHeader*)(pcurr)).header.flags = 0;

// Build the input
byte* pcurr = (byte*)inputPtr;
switch (objectType)
{
case GarnetObjectType.Hash:
(*(ObjectInputHeader*)(pcurr)).header.HashOp = HashOperation.HSCAN;
break;
case GarnetObjectType.Set:
(*(ObjectInputHeader*)(pcurr)).header.SetOp = SetOperation.SSCAN;
break;
case GarnetObjectType.SortedSet:
(*(ObjectInputHeader*)(pcurr)).header.SortedSetOp = SortedSetOperation.ZSCAN;
break;
case GarnetObjectType.All:
(*(ObjectInputHeader*)(pcurr)).header.cmd = RespCommand.COSCAN;
break;
}

// ObjectInputHeader
(*(ObjectInputHeader*)(pcurr)).header.type = objectType;
(*(ObjectInputHeader*)(pcurr)).header.flags = 0;
// Tokens already processed: 3, command, key and cursor
(*(ObjectInputHeader*)(pcurr)).count = count - 2;

switch (objectType)
{
case GarnetObjectType.Hash:
(*(ObjectInputHeader*)(pcurr)).header.HashOp = HashOperation.HSCAN;
break;
case GarnetObjectType.Set:
(*(ObjectInputHeader*)(pcurr)).header.SetOp = SetOperation.SSCAN;
break;
case GarnetObjectType.SortedSet:
(*(ObjectInputHeader*)(pcurr)).header.SortedSetOp = SortedSetOperation.ZSCAN;
break;
case GarnetObjectType.All:
(*(ObjectInputHeader*)(pcurr)).header.cmd = RespCommand.COSCAN;
break;
}

// Tokens already processed: 3, command, key and cursor
(*(ObjectInputHeader*)(pcurr)).count = count - 2;

// Cursor value
(*(ObjectInputHeader*)(pcurr)).done = cursorValue;
pcurr += ObjectInputHeader.Size;

// Object Input Limit
*(int*)pcurr = storeWrapper.serverOptions.ObjectScanCountLimit;
pcurr += sizeof(int);

// Prepare length of header in input buffer
var inputLength = (int)(recvBufferPtr + bytesRead - (byte*)inputPtr);

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };
var status = storageApi.ObjectScan(key, new ArgSlice((byte*)inputPtr, inputLength), ref outputFooter);

//restore input buffer
*inputPtr = save;
*ptrToInt = savePtrToInt;

switch (status)
{
case GarnetStatus.OK:
// Process output
var objOutputHeader = ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
// Validation for partial input reading or error
if (objOutputHeader.countDone == Int32.MinValue)
return false;
ptr += objOutputHeader.bytesDone;
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteScanOutputHeader(0, ref dcurr, dend))
SendAndReset();
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
SendAndReset();
// Fast forward left of the input
ReadLeftToken(count - 2, ref ptr);
break;
}
// Cursor value
(*(ObjectInputHeader*)(pcurr)).done = cursorValue;
pcurr += ObjectInputHeader.Size;

// Object Input Limit
*(int*)pcurr = storeWrapper.serverOptions.ObjectScanCountLimit;
pcurr += sizeof(int);

// Prepare length of header in input buffer
var inputLength = (int)(recvBufferPtr + bytesRead - (byte*)inputPtr);

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };
var status = storageApi.ObjectScan(key, new ArgSlice((byte*)inputPtr, inputLength), ref outputFooter);

// Restore input buffer
*inputPtr = save;
*ptrToInt = savePtrToInt;

switch (status)
{
case GarnetStatus.OK:
// Process output
var objOutputHeader = ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
// Validation for partial input reading or error
if (objOutputHeader.countDone == Int32.MinValue)
return false;
ptr += objOutputHeader.bytesDone;
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteScanOutputHeader(0, ref dcurr, dend))
SendAndReset();
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
SendAndReset();
// Fast forward left of the input
ReadLeftToken(count - 2, ref ptr);
break;
}

// Update read pointer
Expand Down
25 changes: 21 additions & 4 deletions libs/server/Resp/Objects/SortedSetGeoCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,11 @@ private unsafe bool GeoCommands<TGarnetApi>(int count, byte* ptr, SortedSetOpera
{
int paramsRequiredInCommand = 0;
string cmd = string.Empty;
var responseWhenNotFound = CmdStrings.RESP_EMPTYLIST;
switch (op)
{
case SortedSetOperation.GEODIST:
paramsRequiredInCommand = 3;
cmd = "GEODIST";
responseWhenNotFound = CmdStrings.RESP_ERRNOTFOUND;
break;
case SortedSetOperation.GEOHASH:
paramsRequiredInCommand = 1;
Expand Down Expand Up @@ -181,8 +179,27 @@ private unsafe bool GeoCommands<TGarnetApi>(int count, byte* ptr, SortedSetOpera
ptr += objOutputHeader.bytesDone;
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteDirect(responseWhenNotFound, ref dcurr, dend))
SendAndReset();
var tokens = ReadLeftToken(inputCount, ref ptr);
if (tokens < inputCount)
return false;

switch (op)
{
case SortedSetOperation.GEODIST:
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
default:
while (!RespWriteUtils.WriteArrayLength(inputCount, ref dcurr, dend))
SendAndReset();
for (var i = 0; i < inputCount; i++)
{
while (!RespWriteUtils.WriteNullArray(ref dcurr, dend))
SendAndReset();
}
break;
}

break;
}
}
Expand Down
7 changes: 6 additions & 1 deletion libs/server/Transaction/TxnRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ private bool NetworkRUNTXPFast(byte* ptr)

private bool NetworkRUNTXP(int count, byte* ptr)
{
if (count < 1)
return AbortWithWrongNumberOfArguments(nameof(RespCommand.RUNTXP), count);

if (!RespReadUtils.ReadIntWithLengthHeader(out int txid, ref ptr, recvBufferPtr + bytesRead))
return false;

Expand All @@ -275,7 +278,9 @@ private bool NetworkRUNTXP(int count, byte* ptr)
}
else
{
while (!RespWriteUtils.WriteError($"ERR Invalid number of parameters to stored proc {txid}, expected {numParams}, actual {count - 1}", ref dcurr, dend))
while (!RespWriteUtils.WriteError(
string.Format(CmdStrings.GenericErrWrongNumArgsTxn, txid, numParams, count - 1), ref dcurr,
dend))
SendAndReset();
return true;
}
Expand Down
12 changes: 12 additions & 0 deletions test/Garnet.test/RespHashTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ public void CanDoHashScan()

db.HashSet("user:user1", [new HashEntry("name", "Alice"), new HashEntry("email", "email@example.com"), new HashEntry("age", "30")]);

// HSCAN without key
try
{
db.Execute("HSCAN");
Assert.Fail();
}
catch (RedisServerException e)
{
var expectedErrorMessage = string.Format(CmdStrings.GenericErrWrongNumArgs, nameof(HashOperation.HSCAN));
Assert.AreEqual(expectedErrorMessage, e.Message);
}

// HSCAN without parameters
members = db.HashScan("user:user1");
Assert.IsTrue(((IScanningCursor)members).Cursor == 0);
Expand Down
Loading

0 comments on commit 72e7504

Please sign in to comment.