Skip to content

Commit

Permalink
feat: add SRANDMEMBER command (#240)
Browse files Browse the repository at this point in the history
* feat: add SRANDMEMBER command

* fix comment

* fix format

* feat: add SRANDMEMBER command

* Optimize: Improved set search algorithm performance

* fix format

* optimize logic

* optimize logic

---------

Co-authored-by: Tal Zaccai <talzacc@microsoft.com>
  • Loading branch information
Zzhiter and TalZaccai authored Apr 12, 2024
1 parent c0393dd commit c745364
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 1 deletion.
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ public GarnetStatus SetPop(ArgSlice key, int count, out ArgSlice[] members)
public GarnetStatus SetPop(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.SetPop(key, input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public GarnetStatus SetRandomMember(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.SetRandomMember(key, input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public GarnetStatus SetScan(ArgSlice key, long cursor, string match, int count, out ArgSlice[] items)
=> storageSession.SetScan(key, cursor, match, count, out items, ref objectContext);
Expand Down
13 changes: 13 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,19 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SetPop(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// When called with just the key argument, return a random element from the set value stored at key.
/// If the provided count argument is positive, return an array of distinct elements.
/// The array's length is either count or the set's cardinality (SCARD), whichever is lower.
/// If called with a negative count, the behavior changes and the command is allowed to return the same element multiple times.
/// In this case, the number of returned elements is the absolute value of the specified count.
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="outputFooter"></param>
/// <returns></returns>
GarnetStatus SetRandomMember(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter);

#endregion

#region List Methods
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum SetOperation : byte
SMEMBERS,
SCARD,
SSCAN,
SRANDMEMBER,
SISMEMBER,
}

Expand Down Expand Up @@ -124,6 +125,9 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
case SetOperation.SPOP:
SetPop(_input, input.Length, ref output);
break;
case SetOperation.SRANDMEMBER:
SetRandomMember(_input, input.Length, ref output);
break;
case SetOperation.SSCAN:
if (ObjectUtils.ReadScanInput(_input, input.Length, ref output, out var cursorInput, out var pattern, out var patternLength, out int limitCount, out int bytesDone))
{
Expand Down
109 changes: 109 additions & 0 deletions libs/server/Objects/Set/SetObjectImpl.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using Garnet.common;
Expand Down Expand Up @@ -258,5 +260,112 @@ private void SetPop(byte* input, int length, ref SpanByteAndMemory output)
output.Length = (int)(curr - ptr);
}
}

private void SetRandomMember(byte* input, int length, ref SpanByteAndMemory output)
{
var _input = (ObjectInputHeader*)input;
int count = _input->count;

byte* input_startptr = input + sizeof(ObjectInputHeader);
byte* input_currptr = input_startptr;

int countDone = 0;
bool isMemory = false;
MemoryHandle ptrHandle = default;
byte* ptr = output.SpanByte.ToPointer();

var curr = ptr;
var end = curr + output.Length;

ObjectOutputHeader _output = default;

try
{
int[] indexes = default;

if (count > 0)
{
// Return an array of distinct elements
var countParameter = count > set.Count ? set.Count : count;

// The order of fields in the reply is not truly random
indexes = Enumerable.Range(0, set.Count).OrderBy(x => Guid.NewGuid()).Take(countParameter).ToArray();

// Write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(countParameter, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var index in indexes)
{
var element = set.ElementAt(index);
while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
countDone++;
}
countDone += count - countParameter;
}
else if (count == int.MinValue) // no count parameter is present
{
// Return a single random element from the set
if (set.Count > 0)
{
int index = RandomNumberGenerator.GetInt32(0, set.Count);
var item = set.ElementAt(index);
while (!RespWriteUtils.WriteBulkString(item, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
else
{
// If set is empty, return nil
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
countDone++;
}
else // count < 0
{
// Return an array with potentially duplicate elements
int countParameter = Math.Abs(count);

indexes = new int[countParameter];
for (int i = 0; i < countParameter; i++)
{
indexes[i] = RandomNumberGenerator.GetInt32(0, set.Count);
}

if (set.Count > 0)
{
// Write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(countParameter, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var index in indexes)
{
var element = set.ElementAt(index);
while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
countDone++;
}
}
else
{
// If set is empty, return nil
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}
// Write bytes parsed from input and count done, into output footer
_output.bytesDone = (int)(input_currptr - input_startptr);
_output.countDone = countDone;
}
finally
{
while (!RespWriteUtils.WriteDirect(ref _output, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

if (isMemory) ptrHandle.Dispose();
output.Length = (int)(curr - ptr);
}
}
}
}
117 changes: 117 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -505,5 +505,122 @@ private unsafe bool SetPop<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
return true;
}

/// <summary>
/// When called with just the key argument, return a random element from the set value stored at key.
/// If the provided count argument is positive, return an array of distinct elements.
/// The array's length is either count or the set's cardinality (SCARD), whichever is lower.
/// If called with a negative count, the behavior changes and the command is allowed to return the same element multiple times.
/// In this case, the number of returned elements is the absolute value of the specified count.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private unsafe bool SetRandomMember<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 1 || count > 2)
{
setItemsDoneCount = setOpsCount = 0;
return AbortWithWrongNumberOfArguments("SRANDMEMBER", count);
}

// Get the key
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;

if (NetworkSingleKeySlotVerify(key, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}

// Prepare input
var inputPtr = (ObjectInputHeader*)(ptr - sizeof(ObjectInputHeader));

// Save old values on buffer for possible revert
var save = *inputPtr;

// Prepare length of header in input buffer
var inputLength = sizeof(ObjectInputHeader);

// Prepare header in input buffer
inputPtr->header.type = GarnetObjectType.Set;
inputPtr->header.SetOp = SetOperation.SRANDMEMBER;
inputPtr->count = Int32.MinValue;

int countParameter = 0;
if (count == 2)
{
// Get the value for the count parameter
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var countParameterByteArray, ref ptr, recvBufferPtr + bytesRead))
return false;

// Prepare response
var canParse = Int32.TryParse(Encoding.ASCII.GetString(countParameterByteArray), out countParameter);
if (!canParse)
{
ReadOnlySpan<byte> errorMessage = "-ERR value is not an integer or out of range\r\n"u8;
while (!RespWriteUtils.WriteDirect(errorMessage, ref dcurr, dend))
SendAndReset();

// Restore input buffer
*inputPtr = save;

// Move input head
readHead = (int)(ptr - recvBufferPtr);
return true;
}
else if (countParameter == 0)
{
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
SendAndReset();

// Restore input buffer
*inputPtr = save;

// Move input head
readHead = (int)(ptr - recvBufferPtr);
return true;
}
inputPtr->count = countParameter;
}

inputPtr->done = 0;

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };

var status = storageApi.SetRandomMember(key, new ArgSlice((byte*)inputPtr, inputLength), ref outputFooter);

// Reset input buffer
*inputPtr = save;

switch (status)
{
case GarnetStatus.OK:
// Process output
var objOutputHeader = ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
ptr += objOutputHeader.bytesDone;
setItemsDoneCount += objOutputHeader.countDone;
if (count == 2 && setItemsDoneCount < countParameter)
return false;
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
}

// Reset session counters
setItemsDoneCount = setOpsCount = 0;

// Move input head
readHead = (int)(ptr - recvBufferPtr);
return true;
}
}
}
4 changes: 4 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.BITFIELD_RO, 0);
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nSRAND"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("MEMBER\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SRANDMEMBER);
}
break;

case 12:
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespCommandsInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public static RespCommandsInfo findCommand(RespCommand cmd, byte subCmd = 0)
{(byte)SetOperation.SMEMBERS, new RespCommandsInfo("SMEMBERS", RespCommand.Set, 1, null, (byte)SetOperation.SMEMBERS)},
{(byte)SetOperation.SREM, new RespCommandsInfo("SREM", RespCommand.Set, -2, null, (byte)SetOperation.SREM)},
{(byte)SetOperation.SCARD, new RespCommandsInfo("SCARD", RespCommand.Set, 1, null, (byte)SetOperation.SCARD)},
{(byte)SetOperation.SRANDMEMBER,new RespCommandsInfo("SRANDMEMBER", RespCommand.Set, -2, null, (byte)SetOperation.SRANDMEMBER)},
{(byte)SetOperation.SPOP, new RespCommandsInfo("SPOP", RespCommand.Set, -1, null, (byte)SetOperation.SPOP) },
{(byte)SetOperation.SSCAN, new RespCommandsInfo("SSCAN", RespCommand.Set, -2, null, (byte)SetOperation.SSCAN) },
{(byte)SetOperation.SISMEMBER, new RespCommandsInfo("SISMEMBER",RespCommand.Set, 2, null, (byte)SetOperation.SISMEMBER) },
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static HashSet<string> GetCommands()
// Pub/sub
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE",
// Set
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SISMEMBER",
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER",
//Scan ops
"DBSIZE", "KEYS","SCAN",
// Geospatial commands
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SREM) => SetRemove(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SCARD) => SetLength(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SPOP) => SetPop(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SRANDMEMBER) => SetRandomMember(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SSCAN) => ObjectScan(count, ptr, GarnetObjectType.Set, ref storageApi),
_ => ProcessOtherCommands(cmd, subcmd, count, ref storageApi),
};
Expand Down
17 changes: 17 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -442,5 +442,22 @@ public GarnetStatus SetIsMember<TObjectContext>(byte[] key, ArgSlice input, ref
public GarnetStatus SetPop<TObjectContext>(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter, ref TObjectContext objectContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
=> RMWObjectStoreOperationWithOutput(key, input, ref objectContext, ref outputFooter);

/// <summary>
/// When called with just the key argument, return a random element from the set value stored at key.
/// If the provided count argument is positive, return an array of distinct elements.
/// The array's length is either count or the set's cardinality (SCARD), whichever is lower.
/// If called with a negative count, the behavior changes and the command is allowed to return the same element multiple times.
/// In this case, the number of returned elements is the absolute value of the specified count.
/// </summary>
/// <typeparam name="TObjectContext"></typeparam>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="outputFooter"></param>
/// <param name="objectContext"></param>
/// <returns></returns>
public GarnetStatus SetRandomMember<TObjectContext>(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter, ref TObjectContext objectContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
=> RMWObjectStoreOperationWithOutput(key, input, ref objectContext, ref outputFooter);
}
}
1 change: 1 addition & 0 deletions libs/server/Transaction/TxnKeyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ private int SetObjectKeys(byte subCommand)
(byte)SetOperation.SMEMBERS => SingleKey(1, true, LockType.Shared),
(byte)SetOperation.SREM => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SCARD => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SRANDMEMBER => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SPOP => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SISMEMBER => SingleKey(1, true, LockType.Shared),
_ => -1
Expand Down
Loading

0 comments on commit c745364

Please sign in to comment.