Skip to content

Commit

Permalink
[Compatibility] Added LPOS command (#673)
Browse files Browse the repository at this point in the history
* Added LPOS command

* Fixed doc

* Added RespCommand but it fails

* Final commit before review comments changes :)

* Fixed code style issues

* Review comment fix

* Fixed Buffer Copy length

---------

Co-authored-by: Yoganand Rajasekaran <60369795+yrajas@users.noreply.github.com>
  • Loading branch information
Vijay-Nirmal and yrajas authored Sep 30, 2024
1 parent bc8c7c0 commit 01a58f6
Show file tree
Hide file tree
Showing 16 changed files with 544 additions and 1 deletion.
12 changes: 12 additions & 0 deletions libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ public static bool WriteArrayLength(int len, ref byte* curr, byte* end)
return true;
}

public static bool WriteArrayLength(int len, ref byte* curr, byte* end, out int numDigits, out int totalLen)
{
numDigits = NumUtils.NumDigits(len);
totalLen = 1 + numDigits + 2;
if (totalLen > (int)(end - curr))
return false;
*curr++ = (byte)'*';
NumUtils.IntToBytes(len, numDigits, ref curr);
WriteNewline(ref curr);
return true;
}

/// <summary>
/// Write array item
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public GarnetStatus ListLeftPush(ArgSlice key, ArgSlice element, out int count,
public GarnetStatus ListLeftPush(byte[] key, ref ObjectInput input, out ObjectOutputHeader output)
=> storageSession.ListPush(key, ref input, out output, ref objectContext);

/// <inheritdoc />
public GarnetStatus ListPosition(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPosition(key, ref input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public GarnetStatus ListLeftPop(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPop(key, ref input, ref outputFooter, ref objectContext);
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi

#region ListPush Methods

/// <summary>
/// The command returns the index of matching elements inside a Redis list.
/// By default, when no options are given, it will scan the list from head to tail, looking for the first match of "element".
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="outputFooter"></param>
/// <returns></returns>
GarnetStatus ListPosition(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// ListLeftPush ArgSlice version with ObjectOutputHeader output
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Objects/List/ListObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum ListOperation : byte
LSET,
BRPOP,
BLPOP,
LPOS,
}

/// <summary>
Expand Down Expand Up @@ -179,6 +180,9 @@ public override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory
case ListOperation.LSET:
ListSet(ref input, ref output);
break;
case ListOperation.LPOS:
ListPosition(ref input, ref output);
break;

default:
throw new GarnetException($"Unsupported operation {input.header.ListOp} in ListObject.Operate");
Expand Down
212 changes: 212 additions & 0 deletions libs/server/Objects/List/ListObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Garnet.common;
using Tsavorite.core;
Expand Down Expand Up @@ -418,5 +419,216 @@ private void ListSet(ref ObjectInput input, ref SpanByteAndMemory output)
output.Length = (int)(output_currptr - output_startptr);
}
}

private void ListPosition(ref ObjectInput input, ref SpanByteAndMemory output)
{
var element = input.parseState.GetArgSliceByRef(input.parseStateStartIdx).ReadOnlySpan;
input.parseStateStartIdx++;

var isMemory = false;
MemoryHandle ptrHandle = default;
var output_startptr = output.SpanByte.ToPointer();
var output_currptr = output_startptr;
var output_end = output_currptr + output.Length;
var count = 0;
var isDefaultCount = true;
ObjectOutputHeader outputHeader = default;

try
{
if (!ReadListPositionInput(ref input, out var rank, out count, out isDefaultCount, out var maxlen, out var error))
{
while (!RespWriteUtils.WriteError(error, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (count < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (maxlen < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (rank == 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

count = count == 0 ? list.Count : count;
var totalArrayHeaderLen = 0;
var lastFoundItemIndex = -1;

if (!isDefaultCount)
{
while (!RespWriteUtils.WriteArrayLength(count, ref output_currptr, output_end, out var _, out totalArrayHeaderLen))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}

var noOfFoundItem = 0;
if (rank > 0)
{
var currentNode = list.First;
var currentIndex = 0;
var maxlenIndex = maxlen == 0 ? list.Count : maxlen;
do
{
var nextNode = currentNode.Next;
if (currentNode.Value.AsSpan().SequenceEqual(element))
{
if (rank == 1)
{
lastFoundItemIndex = currentIndex;
while (!RespWriteUtils.WriteInteger(currentIndex, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

noOfFoundItem++;
if (noOfFoundItem == count)
{
break;
}
}
else
{
rank--;
}
}
currentNode = nextNode;
currentIndex++;
}
while (currentNode != null && currentIndex < maxlenIndex);
}
else // (rank < 0)
{
var currentNode = list.Last;
var currentIndex = list.Count - 1;
var maxlenIndex = maxlen == 0 ? 0 : list.Count - maxlen;
do
{
var nextNode = currentNode.Previous;
if (currentNode.Value.AsSpan().SequenceEqual(element))
{
if (rank == -1)
{
lastFoundItemIndex = currentIndex;
while (!RespWriteUtils.WriteInteger(currentIndex, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

noOfFoundItem++;
if (noOfFoundItem == count)
{
break;
}
}
else
{
rank++;
}
}
currentNode = nextNode;
currentIndex--;
}
while (currentNode != null && currentIndex >= maxlenIndex);
}

if (isDefaultCount && noOfFoundItem == 0)
{
output_currptr = output_startptr;
while (!RespWriteUtils.WriteNull(ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
else if (!isDefaultCount && noOfFoundItem == 0)
{
output_currptr = output_startptr;
while (!RespWriteUtils.WriteNullArray(ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
else if (!isDefaultCount && noOfFoundItem != count)
{
var newTotalArrayHeaderLen = 0;
var startOutputStartptr = output_startptr;
RespWriteUtils.WriteArrayLength(noOfFoundItem, ref startOutputStartptr, output_end, out var _, out newTotalArrayHeaderLen); // ReallocateOutput is not needed here as there should be always be available space in the output buffer as we have already written the max array length
Debug.Assert(totalArrayHeaderLen >= newTotalArrayHeaderLen, "newTotalArrayHeaderLen can't be bigger than totalArrayHeaderLen as we have already written max array lenght in the buffer");

if (totalArrayHeaderLen != newTotalArrayHeaderLen)
{
var remainingLength = (output_currptr - output_startptr) - totalArrayHeaderLen;
Buffer.MemoryCopy(output_startptr + totalArrayHeaderLen, output_startptr + newTotalArrayHeaderLen, remainingLength, remainingLength);
output_currptr = output_currptr - (totalArrayHeaderLen - newTotalArrayHeaderLen);
}
}

outputHeader.result1 = noOfFoundItem;
}
finally
{
while (!RespWriteUtils.WriteDirect(ref outputHeader, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

if (isMemory)
ptrHandle.Dispose();
output.Length = (int)(output_currptr - output_startptr);
}
}

private static unsafe bool ReadListPositionInput(ref ObjectInput input, out int rank, out int count, out bool isDefaultCount, out int maxlen, out ReadOnlySpan<byte> error)
{
var currTokenIdx = input.parseStateStartIdx;

rank = 1; // By default, LPOS takes first match element
count = 1; // By default, LPOS return 1 element
isDefaultCount = true;
maxlen = 0; // By default, iterate to all the item

error = default;

while (currTokenIdx < input.parseState.Count)
{
var sbParam = input.parseState.GetArgSliceByRef(currTokenIdx++).ReadOnlySpan;

if (sbParam.SequenceEqual(CmdStrings.RANK) || sbParam.SequenceEqual(CmdStrings.rank))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out rank))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else if (sbParam.SequenceEqual(CmdStrings.COUNT) || sbParam.SequenceEqual(CmdStrings.count))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out count))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}

isDefaultCount = false;
}
else if (sbParam.SequenceEqual(CmdStrings.MAXLEN) || sbParam.SequenceEqual(CmdStrings.maxlen))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out maxlen))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else
{
error = CmdStrings.RESP_SYNTAX_ERROR;
return false;
}
}

return true;
}
}
}
4 changes: 4 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> XX => "XX"u8;
public static ReadOnlySpan<byte> UNSAFETRUNCATELOG => "UNSAFETRUNCATELOG"u8;
public static ReadOnlySpan<byte> SAMPLES => "SAMPLES"u8;
public static ReadOnlySpan<byte> RANK => "RANK"u8;
public static ReadOnlySpan<byte> rank => "rank"u8;
public static ReadOnlySpan<byte> MAXLEN => "MAXLEN"u8;
public static ReadOnlySpan<byte> maxlen => "maxlen"u8;

/// <summary>
/// Response strings
Expand Down
59 changes: 59 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,65 @@ private unsafe bool ListPop<TGarnetApi>(RespCommand command, ref TGarnetApi stor
return true;
}

/// <summary>
/// The command returns the index of matching elements inside a Redis list.
/// By default, when no options are given, it will scan the list from head to tail, looking for the first match of "element".
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <returns></returns>
private unsafe bool ListPosition<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (parseState.Count < 2)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.LPOS));
}

// Get the key for List
var sbKey = parseState.GetArgSliceByRef(0).SpanByte;
var element = parseState.GetArgSliceByRef(1).SpanByte;
var keyBytes = sbKey.ToByteArray();

if (NetworkSingleKeySlotVerify(keyBytes, false))
{
return true;
}

// Prepare input
var input = new ObjectInput
{
header = new RespInputHeader
{
type = GarnetObjectType.List,
ListOp = ListOperation.LPOS,
},
parseState = parseState,
parseStateStartIdx = 1,
};

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };

var statusOp = storageApi.ListPosition(keyBytes, ref input, ref outputFooter);

switch (statusOp)
{
case GarnetStatus.OK:
ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// LMPOP numkeys key [key ...] LEFT | RIGHT [COUNT count]
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum RespCommand : byte
KEYS,
LINDEX,
LLEN,
LPOS,
LRANGE,
MEMORY_USAGE,
MGET,
Expand Down Expand Up @@ -770,6 +771,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.LSET;
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("\r\nLPOS\r\n"u8))
{
return RespCommand.LPOS;
}
break;

case 'M':
Expand Down
Loading

0 comments on commit 01a58f6

Please sign in to comment.