Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added LPOS command #673

Merged
merged 14 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ public static bool WriteArrayLength(int len, ref byte* curr, byte* end)
return true;
}

public static bool WriteArrayLength(int len, ref byte* curr, byte* end, out int numDigits, out int totalLen)
{
numDigits = NumUtils.NumDigits(len);
totalLen = 1 + numDigits + 2;
if (totalLen > (int)(end - curr))
return false;
*curr++ = (byte)'*';
NumUtils.IntToBytes(len, numDigits, ref curr);
WriteNewline(ref curr);
return true;
}

/// <summary>
/// Write array item
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public GarnetStatus ListLeftPush(ArgSlice key, ArgSlice element, out int count,
public GarnetStatus ListLeftPush(byte[] key, ref ObjectInput input, out ObjectOutputHeader output)
=> storageSession.ListPush(key, ref input, out output, ref objectContext);

/// <inheritdoc />
public GarnetStatus ListPosition(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPosition(key, ref input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public GarnetStatus ListLeftPop(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPop(key, ref input, ref outputFooter, ref objectContext);
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi

#region ListPush Methods

/// <summary>
/// The command returns the index of matching elements inside a Redis list.
/// By default, when no options are given, it will scan the list from head to tail, looking for the first match of "element".
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="outputFooter"></param>
/// <returns></returns>
GarnetStatus ListPosition(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// ListLeftPush ArgSlice version with ObjectOutputHeader output
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Objects/List/ListObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum ListOperation : byte
LSET,
BRPOP,
BLPOP,
LPOS,
}

/// <summary>
Expand Down Expand Up @@ -179,6 +180,9 @@ public override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory
case ListOperation.LSET:
ListSet(ref input, ref output);
break;
case ListOperation.LPOS:
ListPosition(ref input, ref output);
break;

default:
throw new GarnetException($"Unsupported operation {input.header.ListOp} in ListObject.Operate");
Expand Down
212 changes: 212 additions & 0 deletions libs/server/Objects/List/ListObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Garnet.common;
using Tsavorite.core;
Expand Down Expand Up @@ -418,5 +419,216 @@ private void ListSet(ref ObjectInput input, ref SpanByteAndMemory output)
output.Length = (int)(output_currptr - output_startptr);
}
}

private void ListPosition(ref ObjectInput input, ref SpanByteAndMemory output)
{
var element = input.parseState.GetArgSliceByRef(input.parseStateStartIdx).ReadOnlySpan;
input.parseStateStartIdx++;

var isMemory = false;
MemoryHandle ptrHandle = default;
var output_startptr = output.SpanByte.ToPointer();
var output_currptr = output_startptr;
var output_end = output_currptr + output.Length;
var count = 0;
var isDefaultCount = true;
ObjectOutputHeader outputHeader = default;

try
{
if (!ReadListPositionInput(ref input, out var rank, out count, out isDefaultCount, out var maxlen, out var error))
{
while (!RespWriteUtils.WriteError(error, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (count < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (maxlen < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (rank == 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

count = count == 0 ? list.Count : count;
var totalArrayHeaderLen = 0;
var lastFoundItemIndex = -1;

if (!isDefaultCount)
{
while (!RespWriteUtils.WriteArrayLength(count, ref output_currptr, output_end, out var _, out totalArrayHeaderLen))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}

var noOfFoundItem = 0;
if (rank > 0)
{
var currentNode = list.First;
var currentIndex = 0;
var maxlenIndex = maxlen == 0 ? list.Count : maxlen;
do
{
var nextNode = currentNode.Next;
if (currentNode.Value.AsSpan().SequenceEqual(element))
{
if (rank == 1)
{
lastFoundItemIndex = currentIndex;
while (!RespWriteUtils.WriteInteger(currentIndex, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

noOfFoundItem++;
if (noOfFoundItem == count)
{
break;
}
}
else
{
rank--;
}
}
currentNode = nextNode;
currentIndex++;
}
while (currentNode != null && currentIndex < maxlenIndex);
}
else // (rank < 0)
{
var currentNode = list.Last;
var currentIndex = list.Count - 1;
var maxlenIndex = maxlen == 0 ? 0 : list.Count - maxlen;
do
{
var nextNode = currentNode.Previous;
if (currentNode.Value.AsSpan().SequenceEqual(element))
{
if (rank == -1)
{
lastFoundItemIndex = currentIndex;
while (!RespWriteUtils.WriteInteger(currentIndex, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

noOfFoundItem++;
if (noOfFoundItem == count)
{
break;
}
}
else
{
rank++;
}
}
currentNode = nextNode;
currentIndex--;
}
while (currentNode != null && currentIndex >= maxlenIndex);
}

if (isDefaultCount && noOfFoundItem == 0)
{
output_currptr = output_startptr;
while (!RespWriteUtils.WriteNull(ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
else if (!isDefaultCount && noOfFoundItem == 0)
{
output_currptr = output_startptr;
while (!RespWriteUtils.WriteNullArray(ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
else if (!isDefaultCount && noOfFoundItem != count)
{
var newTotalArrayHeaderLen = 0;
var startOutputStartptr = output_startptr;
RespWriteUtils.WriteArrayLength(noOfFoundItem, ref startOutputStartptr, output_end, out var _, out newTotalArrayHeaderLen); // ReallocateOutput is not needed here as there should be always be available space in the output buffer as we have already written the max array length
Debug.Assert(totalArrayHeaderLen >= newTotalArrayHeaderLen, "newTotalArrayHeaderLen can't be bigger than totalArrayHeaderLen as we have already written max array lenght in the buffer");

if (totalArrayHeaderLen != newTotalArrayHeaderLen)
{
var remainingLength = (output_currptr - output_startptr) - totalArrayHeaderLen;
Buffer.MemoryCopy(output_startptr + totalArrayHeaderLen, output_startptr + newTotalArrayHeaderLen, remainingLength, remainingLength);
output_currptr = output_currptr - (totalArrayHeaderLen - newTotalArrayHeaderLen);
}
}

outputHeader.result1 = noOfFoundItem;
}
finally
{
while (!RespWriteUtils.WriteDirect(ref outputHeader, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

if (isMemory)
ptrHandle.Dispose();
output.Length = (int)(output_currptr - output_startptr);
}
}

private static unsafe bool ReadListPositionInput(ref ObjectInput input, out int rank, out int count, out bool isDefaultCount, out int maxlen, out ReadOnlySpan<byte> error)
{
var currTokenIdx = input.parseStateStartIdx;

rank = 1; // By default, LPOS takes first match element
count = 1; // By default, LPOS return 1 element
isDefaultCount = true;
maxlen = 0; // By default, iterate to all the item

error = default;

while (currTokenIdx < input.parseState.Count)
{
var sbParam = input.parseState.GetArgSliceByRef(currTokenIdx++).ReadOnlySpan;

if (sbParam.SequenceEqual(CmdStrings.RANK) || sbParam.SequenceEqual(CmdStrings.rank))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out rank))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else if (sbParam.SequenceEqual(CmdStrings.COUNT) || sbParam.SequenceEqual(CmdStrings.count))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out count))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}

isDefaultCount = false;
}
else if (sbParam.SequenceEqual(CmdStrings.MAXLEN) || sbParam.SequenceEqual(CmdStrings.maxlen))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out maxlen))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else
{
error = CmdStrings.RESP_SYNTAX_ERROR;
return false;
}
}

return true;
}
}
}
4 changes: 4 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> XX => "XX"u8;
public static ReadOnlySpan<byte> UNSAFETRUNCATELOG => "UNSAFETRUNCATELOG"u8;
public static ReadOnlySpan<byte> SAMPLES => "SAMPLES"u8;
public static ReadOnlySpan<byte> RANK => "RANK"u8;
public static ReadOnlySpan<byte> rank => "rank"u8;
public static ReadOnlySpan<byte> MAXLEN => "MAXLEN"u8;
public static ReadOnlySpan<byte> maxlen => "maxlen"u8;

/// <summary>
/// Response strings
Expand Down
59 changes: 59 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,65 @@ private unsafe bool ListPop<TGarnetApi>(RespCommand command, ref TGarnetApi stor
return true;
}

/// <summary>
/// The command returns the index of matching elements inside a Redis list.
/// By default, when no options are given, it will scan the list from head to tail, looking for the first match of "element".
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <returns></returns>
private unsafe bool ListPosition<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (parseState.Count < 2)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.LPOS));
}

// Get the key for List
var sbKey = parseState.GetArgSliceByRef(0).SpanByte;
var element = parseState.GetArgSliceByRef(1).SpanByte;
var keyBytes = sbKey.ToByteArray();

if (NetworkSingleKeySlotVerify(keyBytes, false))
{
return true;
}

// Prepare input
var input = new ObjectInput
{
header = new RespInputHeader
{
type = GarnetObjectType.List,
ListOp = ListOperation.LPOS,
},
parseState = parseState,
parseStateStartIdx = 1,
};

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };

var statusOp = storageApi.ListPosition(keyBytes, ref input, ref outputFooter);

switch (statusOp)
{
case GarnetStatus.OK:
ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// LMPOP numkeys key [key ...] LEFT | RIGHT [COUNT count]
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum RespCommand : byte
KEYS,
LINDEX,
LLEN,
LPOS,
LRANGE,
MEMORY_USAGE,
MGET,
Expand Down Expand Up @@ -770,6 +771,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.LSET;
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("\r\nLPOS\r\n"u8))
{
return RespCommand.LPOS;
}
break;

case 'M':
Expand Down
Loading