Skip to content

Commit

Permalink
Invoke custom commands from custom proc/txn (#597)
Browse files Browse the repository at this point in the history
Adds ability to invoke custom raw string commands and object commands from within custom procedures and transactions.

 New APIs in CustomFunctions to invoke custom commands from custom procedures and transactions
 CustomProcedure registration updated to take in a delegate similar to custom transaction registration
 RespServerSession reference added to CustomFunctions to enable parsing of custom commands and invoking underlying API
 Sample custom procedure and transaction that invoke custom raw string and object commands
 Corresponding tests.

* Invoke custom raw string cmd from custom proc/txn

* Support cmd name matching

* API changes.

* Added custom object command support.

* Cleanup.

* Updated tests.

* Format fix.

* Added tests.

* Unified API. Added test for invalid command.

* Fixed formatting.

* Merged latest changes.
Moved custom procs, txn to test project.

* Separate parsing to a separate API to avoid parsing in the hot path.

* Renamed to CustomProcedureFactory

* Made session maps as private.

* Adding an additional parse state for custom commands use

---------

Co-authored-by: Tal Zaccai <talzacc@microsoft.com>
  • Loading branch information
yrajas and TalZaccai authored Nov 22, 2024
1 parent ebebe65 commit 08f3b68
Show file tree
Hide file tree
Showing 21 changed files with 417 additions and 33 deletions.
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Operations/CustomOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void CreateExtensions()
new RespCommandsInfo { Arity = CustomProcs.CustomTxnSet.Arity });

// Register custom procedure
server.Register.NewProcedure(CustomProcs.CustomProcSet.CommandName, new CustomProcSet(),
server.Register.NewProcedure(CustomProcs.CustomProcSet.CommandName, () => new CustomProcSet(),
new RespCommandsInfo { Arity = CustomProcs.CustomProcSet.Arity });
}

Expand Down
4 changes: 2 additions & 2 deletions libs/server/Custom/CustomCommandManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ internal void RegisterType(int objectTypeId, CustomObjectFactory factory)
/// <param name="commandDocs"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
internal int Register(string name, CustomProcedure customProcedure, RespCommandsInfo commandInfo = null, RespCommandDocs commandDocs = null)
internal int Register(string name, Func<CustomProcedure> customProcedure, RespCommandsInfo commandInfo = null, RespCommandDocs commandDocs = null)
{
int id = Interlocked.Increment(ref CustomProcedureId) - 1;
if (id >= MaxRegistrations)
throw new Exception("Out of registration space");

customProcedureMap[id] = new CustomProcedureWrapper(name, (byte)id, customProcedure);
customProcedureMap[id] = new CustomProcedureWrapper(name, (byte)id, customProcedure, this);
if (commandInfo != null) CustomCommandsInfo.Add(name, commandInfo);
if (commandDocs != null) CustomCommandsDocs.Add(name, commandDocs);
return id;
Expand Down
26 changes: 22 additions & 4 deletions libs/server/Custom/CustomCommandManagerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,43 @@ namespace Garnet.server
internal sealed class CustomCommandManagerSession
{
readonly CustomCommandManager customCommandManager;
public readonly (CustomTransactionProcedure, int)[] sessionTransactionProcMap;

// These session specific arrays are indexed by the same ID as the arrays in CustomCommandManager
readonly (CustomTransactionProcedure, int)[] sessionTransactionProcMap;
readonly CustomProcedure[] sessionCustomProcMap;


public CustomCommandManagerSession(CustomCommandManager customCommandManager)
{
this.customCommandManager = customCommandManager;
sessionTransactionProcMap = new (CustomTransactionProcedure, int)[CustomCommandManager.MaxRegistrations];
sessionCustomProcMap = new CustomProcedure[CustomCommandManager.MaxRegistrations];
}

public CustomProcedure GetCustomProcedure(int id, RespServerSession respServerSession)
{
if (sessionCustomProcMap[id] == null)
{
var entry = customCommandManager.customProcedureMap[id] ?? throw new GarnetException($"Custom procedure {id} not found");
sessionCustomProcMap[id] = entry.CustomProcedureFactory();
sessionCustomProcMap[id].respServerSession = respServerSession;
}

return sessionCustomProcMap[id];
}

public (CustomTransactionProcedure, int) GetCustomTransactionProcedure(int id, TransactionManager txnManager, ScratchBufferManager scratchBufferManager)
public (CustomTransactionProcedure, int) GetCustomTransactionProcedure(int id, RespServerSession respServerSession, TransactionManager txnManager, ScratchBufferManager scratchBufferManager)
{
if (sessionTransactionProcMap[id].Item1 == null)
{
var entry = customCommandManager.transactionProcMap[id] ?? throw new GarnetException($"Transaction procedure {id} not found");
_ = customCommandManager.CustomCommandsInfo.TryGetValue(entry.NameStr, out var cmdInfo);
return GetCustomTransactionProcedure(entry, txnManager, scratchBufferManager, cmdInfo?.Arity ?? 0);
return GetCustomTransactionProcedure(entry, respServerSession, txnManager, scratchBufferManager, cmdInfo?.Arity ?? 0);
}
return sessionTransactionProcMap[id];
}

public (CustomTransactionProcedure, int) GetCustomTransactionProcedure(CustomTransaction entry, TransactionManager txnManager, ScratchBufferManager scratchBufferManager, int arity)
public (CustomTransactionProcedure, int) GetCustomTransactionProcedure(CustomTransaction entry, RespServerSession respServerSession, TransactionManager txnManager, ScratchBufferManager scratchBufferManager, int arity)
{
int id = entry.id;
if (sessionTransactionProcMap[id].Item1 == null)
Expand All @@ -40,6 +57,7 @@ public CustomCommandManagerSession(CustomCommandManager customCommandManager)

sessionTransactionProcMap[id].Item1.txnManager = txnManager;
sessionTransactionProcMap[id].Item1.scratchBufferManager = scratchBufferManager;
sessionTransactionProcMap[id].Item1.respServerSession = respServerSession;
}
return sessionTransactionProcMap[id];
}
Expand Down
44 changes: 44 additions & 0 deletions libs/server/Custom/CustomFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public abstract class CustomFunctions
/// </summary>
protected static MemoryPool<byte> MemoryPool => MemoryPool<byte>.Shared;

internal RespServerSession respServerSession;

/// <summary>
/// Create output as simple string, from given string
/// </summary>
Expand Down Expand Up @@ -207,5 +209,47 @@ protected static unsafe ArgSlice GetNextArg(ref CustomProcedureInput procInput,
{
return GetNextArg(ref procInput.parseState, ref idx);
}

/// <summary>Parse custom raw string command</summary>
/// <param name="cmd">Command name</param>
/// <param name="rawStringCommand">Parsed raw string command</param>
/// <returns>True if command found, false otherwise</returns>
protected bool ParseCustomRawStringCommand(string cmd, out CustomRawStringCommand rawStringCommand) =>
respServerSession.ParseCustomRawStringCommand(cmd, out rawStringCommand);

/// <summary>Parse custom object command</summary>
/// <param name="cmd">Command name</param>
/// <param name="objectCommand">Parsed object command</param>
/// <returns>True if command found, false othrewise</returns>
protected bool ParseCustomObjectCommand(string cmd, out CustomObjectCommand objectCommand) =>
respServerSession.ParseCustomObjectCommand(cmd, out objectCommand);

/// <summary>Execute a specific custom raw string command</summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="garnetApi"></param>
/// <param name="rawStringCommand">Custom raw string command to execute</param>
/// <param name="key">Key param</param>
/// <param name="input">Args to the command</param>
/// <param name="output">Output from the command</param>
/// <returns>True if successful</returns>
protected bool ExecuteCustomRawStringCommand<TGarnetApi>(TGarnetApi garnetApi, CustomRawStringCommand rawStringCommand, ArgSlice key, ArgSlice[] input, out ArgSlice output)
where TGarnetApi : IGarnetApi
{
return respServerSession.InvokeCustomRawStringCommand(ref garnetApi, rawStringCommand, key, input, out output);
}

/// <summary>Execute a specific custom object command</summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="garnetApi"></param>
/// <param name="objectCommand">Custom object command to execute</param>
/// <param name="key">Key parameter</param>
/// <param name="input">Args to the command</param>
/// <param name="output">Output from the command</param>
/// <returns>True if successful</returns>
protected bool ExecuteCustomObjectCommand<TGarnetApi>(TGarnetApi garnetApi, CustomObjectCommand objectCommand, ArgSlice key, ArgSlice[] input, out ArgSlice output)
where TGarnetApi : IGarnetApi
{
return respServerSession.InvokeCustomObjectCommand(ref garnetApi, objectCommand, key, input, out output);
}
}
}
2 changes: 1 addition & 1 deletion libs/server/Custom/CustomObjectCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Garnet.server
{
class CustomObjectCommand
public class CustomObjectCommand
{
public readonly string NameStr;
public readonly byte[] name;
Expand Down
13 changes: 8 additions & 5 deletions libs/server/Custom/CustomProcedureWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using Garnet.common;

namespace Garnet.server
Expand All @@ -26,20 +27,22 @@ class CustomProcedureWrapper
public readonly string NameStr;
public readonly byte[] Name;
public readonly byte Id;
public readonly CustomProcedure CustomProcedureImpl;
public readonly Func<CustomProcedure> CustomProcedureFactory;

internal CustomProcedureWrapper(string name, byte id, CustomProcedure customScriptProc)
internal CustomProcedureWrapper(string name, byte id, Func<CustomProcedure> customProcedureFactory, CustomCommandManager customCommandManager)
{
if (string.IsNullOrEmpty(name))
throw new ArgumentNullException(nameof(name));

if (customScriptProc == null)
throw new ArgumentNullException(nameof(customScriptProc));
if (customProcedureFactory == null)
throw new ArgumentNullException(nameof(customProcedureFactory));

Debug.Assert(customCommandManager != null);

NameStr = name.ToUpperInvariant();
Name = System.Text.Encoding.ASCII.GetBytes(NameStr);
Id = id;
CustomProcedureImpl = customScriptProc;
CustomProcedureFactory = customProcedureFactory;
}
}
}
2 changes: 1 addition & 1 deletion libs/server/Custom/CustomRawStringCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Garnet.server
{
class CustomRawStringCommand
public class CustomRawStringCommand
{
public readonly string NameStr;
public readonly byte[] name;
Expand Down
147 changes: 146 additions & 1 deletion libs/server/Custom/CustomRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Text;
using Garnet.common;
using Tsavorite.core;

Expand Down Expand Up @@ -51,7 +52,7 @@ private bool TryTransactionProc(byte id, CustomTransactionProcedure proc, int st
public bool RunTransactionProc(byte id, ref CustomProcedureInput procInput, ref MemoryResult<byte> output)
{
var proc = customCommandManagerSession
.GetCustomTransactionProcedure(id, txnManager, scratchBufferManager).Item1;
.GetCustomTransactionProcedure(id, this, txnManager, scratchBufferManager).Item1;
return txnManager.RunTransactionProc(id, ref procInput, proc, ref output);

}
Expand Down Expand Up @@ -194,5 +195,149 @@ private bool TryCustomObjectCommand<TGarnetApi>(GarnetObjectType objType, byte s

return true;
}

/// <summary>Parse custom raw string command</summary>
/// <param name="cmd">Command name</param>
/// <param name="customCommand">Parsed raw string command</param>
/// <returns>True if command found, false otherwise</returns>
public bool ParseCustomRawStringCommand(string cmd, out CustomRawStringCommand customCommand) =>
storeWrapper.customCommandManager.Match(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes(cmd)), out customCommand);

/// <summary>Parse custom object command</summary>
/// <param name="cmd">Command name</param>
/// <param name="customObjCommand">Parsed object command</param>
/// <returns>True if command found, false othrewise</returns>
public bool ParseCustomObjectCommand(string cmd, out CustomObjectCommand customObjCommand) =>
storeWrapper.customCommandManager.Match(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes(cmd)), out customObjCommand);

/// <summary>Execute a specific custom raw string command</summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <param name="customCommand">Custom raw string command to execute</param>
/// <param name="key">Key param</param>
/// <param name="args">Args to the command</param>
/// <param name="output">Output from the command</param>
/// <returns>True if successful</returns>
public bool InvokeCustomRawStringCommand<TGarnetApi>(ref TGarnetApi storageApi, CustomRawStringCommand customCommand, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
ArgumentNullException.ThrowIfNull(customCommand);

var sbKey = key.SpanByte;
var inputArg = customCommand.expirationTicks > 0 ? DateTimeOffset.UtcNow.Ticks + customCommand.expirationTicks : customCommand.expirationTicks;
customCommandParseState.InitializeWithArguments(args);
var rawStringInput = new RawStringInput(customCommand.GetRespCommand(), ref customCommandParseState, arg1: inputArg);

var _output = new SpanByteAndMemory(null);
GarnetStatus status;
if (customCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_MainStore(ref sbKey, ref rawStringInput, ref _output);
Debug.Assert(!_output.IsSpanByte);

if (_output.Memory != null)
{
output = scratchBufferManager.FormatScratch(0, _output.AsReadOnlySpan());
_output.Memory.Dispose();
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
}
}
else
{
status = storageApi.Read_MainStore(ref sbKey, ref rawStringInput, ref _output);
Debug.Assert(!_output.IsSpanByte);

if (status == GarnetStatus.OK)
{
if (_output.Memory != null)
{
output = scratchBufferManager.FormatScratch(0, _output.AsReadOnlySpan());
_output.Memory.Dispose();
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
}
}
else
{
Debug.Assert(_output.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
}
}

return true;
}

/// <summary>Execute a specific custom object command</summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <param name="customObjCommand">Custom object command to execute</param>
/// <param name="key">Key parameter</param>
/// <param name="args">Args to the command</param>
/// <param name="output">Output from the command</param>
/// <returns>True if successful</returns>
public bool InvokeCustomObjectCommand<TGarnetApi>(ref TGarnetApi storageApi, CustomObjectCommand customObjCommand, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
ArgumentNullException.ThrowIfNull(customObjCommand);

output = default;

var keyBytes = key.ToArray();

// Prepare input
var header = new RespInputHeader(customObjCommand.GetObjectType()) { SubId = customObjCommand.subid };
customCommandParseState.InitializeWithArguments(args);
var input = new ObjectInput(header, ref customCommandParseState);

var _output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
GarnetStatus status;
if (customObjCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
default:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
}
}
else
{
status = storageApi.Read_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.OK:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
case GarnetStatus.NOTFOUND:
Debug.Assert(_output.spanByteAndMemory.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
break;
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
}
}

return true;
}
}
}
2 changes: 1 addition & 1 deletion libs/server/Module/ModuleRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public ModuleActionStatus RegisterCommand(string name, CustomObjectFactory facto
/// <param name="commandInfo">Command info</param>
/// <param name="commandDocs">Command docs</param>
/// <returns>Registration status</returns>
public ModuleActionStatus RegisterProcedure(string name, CustomProcedure customScriptProc, RespCommandsInfo commandInfo = null, RespCommandDocs commandDocs = null)
public ModuleActionStatus RegisterProcedure(string name, Func<CustomProcedure> customScriptProc, RespCommandsInfo commandInfo = null, RespCommandDocs commandDocs = null)
{
if (string.IsNullOrEmpty(name) || customScriptProc == null)
return ModuleActionStatus.InvalidRegistrationInfo;
Expand Down
6 changes: 4 additions & 2 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
internal readonly ScratchBufferManager scratchBufferManager;

internal SessionParseState parseState;
internal SessionParseState customCommandParseState;

ClusterSlotVerificationInput csvi;
GCHandle recvHandle;

Expand Down Expand Up @@ -756,7 +758,7 @@ bool NetworkCustomTxn()
// Perform the operation
TryTransactionProc(currentCustomTransaction.id,
customCommandManagerSession
.GetCustomTransactionProcedure(currentCustomTransaction.id, txnManager, scratchBufferManager)
.GetCustomTransactionProcedure(currentCustomTransaction.id, this, txnManager, scratchBufferManager)
.Item1);
currentCustomTransaction = null;
return true;
Expand All @@ -770,7 +772,7 @@ bool NetworkCustomProcedure()
return true;
}

TryCustomProcedure(currentCustomProcedure.CustomProcedureImpl);
TryCustomProcedure(customCommandManagerSession.GetCustomProcedure(currentCustomProcedure.Id, this));

currentCustomProcedure = null;
return true;
Expand Down
Loading

0 comments on commit 08f3b68

Please sign in to comment.