Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Command Parsing Refactor #373

Merged
merged 11 commits into from
May 14, 2024
2 changes: 1 addition & 1 deletion libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void ExecuteForArray(params string[] command)
}

static ReadOnlySpan<byte> CLUSTER => "$7\r\nCLUSTER\r\n"u8;
static ReadOnlySpan<byte> appendLog => "appendlog"u8;
static ReadOnlySpan<byte> appendLog => "APPENDLOG"u8;

/// <summary>
/// ClusterAppendLog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ namespace Garnet.client
/// </summary>
public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
{
static ReadOnlySpan<byte> initiate_replica_sync => "initiate_replica_sync"u8;
static ReadOnlySpan<byte> send_ckpt_metadata => "send_ckpt_metadata"u8;
static ReadOnlySpan<byte> send_ckpt_file_segment => "send_ckpt_file_segment"u8;
static ReadOnlySpan<byte> begin_replica_recover => "begin_replica_recover"u8;
static ReadOnlySpan<byte> initiate_replica_sync => "INITIATE_REPLICA_SYNC"u8;
static ReadOnlySpan<byte> send_ckpt_metadata => "SEND_CKPT_METADATA"u8;
static ReadOnlySpan<byte> send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8;
static ReadOnlySpan<byte> begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8;

/// <summary>
/// Initiate checkpoint retrieval from replica by sending replica checkpoint information and AOF address range
Expand Down Expand Up @@ -267,7 +267,7 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(tcs);
byte* curr = offset;
int arraySize = 8;
int arraySize = 9;

while (!RespWriteUtils.WriteArrayLength(arraySize, ref curr, end))
{
Expand Down
54 changes: 14 additions & 40 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,79 +14,53 @@ static class CmdStrings
/// Request strings
/// </summary>
public static ReadOnlySpan<byte> INFO => "INFO"u8;
public static ReadOnlySpan<byte> info => "info"u8;
public static ReadOnlySpan<byte> CLUSTER => "CLUSTER"u8;
public static ReadOnlySpan<byte> NODES => "NODES"u8;
public static ReadOnlySpan<byte> nodes => "nodes"u8;
public static ReadOnlySpan<byte> ADDSLOTS => "ADDSLOTS"u8;
public static ReadOnlySpan<byte> addslots => "addslots"u8;
public static ReadOnlySpan<byte> ADDSLOTSRANGE => "ADDSLOTSRANGE"u8;
public static ReadOnlySpan<byte> addslotsrange => "addslotsrange"u8;
public static ReadOnlySpan<byte> aofsync => "aofsync"u8;
public static ReadOnlySpan<byte> appendlog => "appendlog"u8;
public static ReadOnlySpan<byte> initiate_replica_sync => "initiate_replica_sync"u8;
public static ReadOnlySpan<byte> send_ckpt_metadata => "send_ckpt_metadata"u8;
public static ReadOnlySpan<byte> send_ckpt_file_segment => "send_ckpt_file_segment"u8;
public static ReadOnlySpan<byte> begin_replica_recover => "begin_replica_recover"u8;
public static ReadOnlySpan<byte> BUMPEPOCH => "BUMPEPOCH"u8;
public static ReadOnlySpan<byte> bumpepoch => "bumpepoch"u8;
public static ReadOnlySpan<byte> BANLIST => "BANLIST"u8;
public static ReadOnlySpan<byte> banlist => "banlist"u8;
public static ReadOnlySpan<byte> COUNTKEYSINSLOT => "COUNTKEYSINSLOT"u8;
public static ReadOnlySpan<byte> countkeysinslot => "countkeysinslot"u8;
public static ReadOnlySpan<byte> delkeysinslot => "delkeysinslot"u8;
public static ReadOnlySpan<byte> DELKEYSINSLOT => "DELKEYSINSLOT"u8;
public static ReadOnlySpan<byte> delkeysinslotrange => "delkeysinslotrange"u8;
public static ReadOnlySpan<byte> DELKEYSINSLOTRANGE => "DELKEYSINSLOTRANGE"u8;
public static ReadOnlySpan<byte> DELSLOTS => "DELSLOTS"u8;
public static ReadOnlySpan<byte> delslots => "delslots"u8;
public static ReadOnlySpan<byte> DELSLOTSRANGE => "DELSLOTSRANGE"u8;
public static ReadOnlySpan<byte> delslotsrange => "delslotsrange"u8;
public static ReadOnlySpan<byte> FAILOVER => "FAILOVER"u8;
public static ReadOnlySpan<byte> failover => "failover"u8;
public static ReadOnlySpan<byte> REPLICAOF => "REPLICAOF"u8;
public static ReadOnlySpan<byte> SECONDARYOF => "SLAVEOF"u8;
public static ReadOnlySpan<byte> failauthreq => "failauthreq"u8;
public static ReadOnlySpan<byte> failstopwrites => "failstopwrites"u8;
public static ReadOnlySpan<byte> failreplicationoffset => "failreplicationoffset"u8;
public static ReadOnlySpan<byte> FORGET => "FORGET"u8;
public static ReadOnlySpan<byte> forget => "forget"u8;
public static ReadOnlySpan<byte> GETKEYSINSLOT => "GETKEYSINSLOT"u8;
public static ReadOnlySpan<byte> getkeysinslot => "getkeysinslot"u8;
public static ReadOnlySpan<byte> KEYSLOT => "KEYSLOT"u8;
public static ReadOnlySpan<byte> keyslot => "keyslot"u8;
public static ReadOnlySpan<byte> HELP => "HELP"u8;
public static ReadOnlySpan<byte> help => "help"u8;
public static ReadOnlySpan<byte> MEET => "MEET"u8;
public static ReadOnlySpan<byte> meet => "meet"u8;
public static ReadOnlySpan<byte> MIGRATE => "MIGRATE"u8;
public static ReadOnlySpan<byte> MTASKS => "MTASKS"u8;
public static ReadOnlySpan<byte> MYID => "MYID"u8;
public static ReadOnlySpan<byte> myid => "myid"u8;
public static ReadOnlySpan<byte> MYPARENTID => "MYPARENTID"u8;
public static ReadOnlySpan<byte> myparentid => "myparentid"u8;
public static ReadOnlySpan<byte> ENDPOINT => "ENDPOINT"u8;
public static ReadOnlySpan<byte> endpoint => "endpoint"u8;
public static ReadOnlySpan<byte> REPLICAS => "REPLICAS"u8;
public static ReadOnlySpan<byte> replicas => "replicas"u8;
public static ReadOnlySpan<byte> REPLICATE => "REPLICATE"u8;
public static ReadOnlySpan<byte> replicate => "replicate"u8;
public static ReadOnlySpan<byte> SET_CONFIG_EPOCH => "SET-CONFIG-EPOCH"u8;
public static ReadOnlySpan<byte> set_config_epoch => "set-config-epoch"u8;
public static ReadOnlySpan<byte> SETSLOT => "SETSLOT"u8;
public static ReadOnlySpan<byte> setslot => "setslot"u8;
public static ReadOnlySpan<byte> SETSLOTSRANGE => "SETSLOTSRANGE"u8;
public static ReadOnlySpan<byte> setslotsrange => "setslotsrange"u8;
public static ReadOnlySpan<byte> SHARDS => "SHARDS"u8;
public static ReadOnlySpan<byte> shards => "shards"u8;
public static ReadOnlySpan<byte> SLOTS => "SLOTS"u8;
public static ReadOnlySpan<byte> slots => "slots"u8;
public static ReadOnlySpan<byte> SLOTSTATE => "SLOTSTATE"u8;
public static ReadOnlySpan<byte> slotstate => "slotstate"u8;
public static ReadOnlySpan<byte> GOSSIP => "GOSSIP"u8;
public static ReadOnlySpan<byte> WITHMEET => "WITHMEET"u8;
public static ReadOnlySpan<byte> RESET => "RESET"u8;
public static ReadOnlySpan<byte> reset => "reset"u8;

/// <summary>
/// Internode communication cluster commands
/// </summary>
public static ReadOnlySpan<byte> aofsync => "AOFSYNC"u8;
public static ReadOnlySpan<byte> appendlog => "APPENDLOG"u8;
public static ReadOnlySpan<byte> initiate_replica_sync => "INITIATE_REPLICA_SYNC"u8;
public static ReadOnlySpan<byte> send_ckpt_metadata => "SEND_CKPT_METADATA"u8;
public static ReadOnlySpan<byte> send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8;
public static ReadOnlySpan<byte> begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8;
public static ReadOnlySpan<byte> failauthreq => "FAILAUTHREQ"u8;
public static ReadOnlySpan<byte> failstopwrites => "FAILSTOPWRITES"u8;
public static ReadOnlySpan<byte> failreplicationoffset => "FAILREPLICATIONOFFSET"u8;


/// <summary>
/// Response strings
Expand Down
61 changes: 61 additions & 0 deletions libs/cluster/Parsing/ClusterSubCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Garnet.cluster
{
internal enum ClusterSubcommand : byte
{
NONE,

// Basic cluster management commands
BUMPEPOCH,
FORGET,
INFO,
HELP,
MEET,
MYID,
MYPARENTID,
ENDPOINT,
NODES,
SETCONFIGEPOCH,
SHARDS,
GOSSIP,
RESET,

// Failover management commands
FAILOVER,
FAILAUTHREQ,
FAILSTOPWRITES,
FAILREPLICATIONOFFSET,

// Slot management commands
ADDSLOTS,
ADDSLOTSRANGE,
BANLIST,
COUNTKEYSINSLOT,
DELSLOTS,
DELSLOTSRANGE,
DELKEYSINSLOT,
DELKEYSINSLOTRANGE,
GETKEYSINSLOT,
KEYSLOT,
SETSLOT,
SETSLOTSRANGE,
SLOTS,
SLOTSTATE,

// Migrate management commands
MIGRATE,
MTASKS,

// Replication management commands
REPLICAS,
REPLICATE,
AOFSYNC,
APPENDLOG,
INITIATE_REPLICA_SYNC,
SEND_CKPT_METADATA,
SEND_CKPT_FILE_SEGMENT,
BEGIN_REPLICA_RECOVER
}
}
89 changes: 89 additions & 0 deletions libs/cluster/Parsing/ClusterSubCommandParsing.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using Garnet.common;
using Garnet.server;

namespace Garnet.cluster
{
internal sealed unsafe partial class ClusterSession : IClusterSession
{
/// <summary>
/// Parse cluster subcommand and convert to ClusterSubcommand type.
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="subcmd"></param>
/// <returns>True if parsing succeeded without any errors, otherwise false</returns>
private bool ParseClusterSubcommand(ReadOnlySpan<byte> bufSpan, out Span<byte> param, out ClusterSubcommand subcmd)
{
subcmd = ClusterSubcommand.NONE;
param = GetNextToken(bufSpan, out var success1);
if (!success1) return false;

subcmd = param switch
{
_ when param.SequenceEqual(CmdStrings.appendlog) => ClusterSubcommand.APPENDLOG,
_ when param.SequenceEqual(CmdStrings.GOSSIP) => ClusterSubcommand.GOSSIP,
_ when param.SequenceEqual(CmdStrings.send_ckpt_file_segment) => ClusterSubcommand.SEND_CKPT_FILE_SEGMENT,
_ when param.SequenceEqual(CmdStrings.send_ckpt_metadata) => ClusterSubcommand.SEND_CKPT_METADATA,
_ when param.SequenceEqual(CmdStrings.aofsync) => ClusterSubcommand.AOFSYNC,
_ when param.SequenceEqual(CmdStrings.begin_replica_recover) => ClusterSubcommand.BEGIN_REPLICA_RECOVER,
_ when param.SequenceEqual(CmdStrings.initiate_replica_sync) => ClusterSubcommand.INITIATE_REPLICA_SYNC,
_ when param.SequenceEqual(CmdStrings.failauthreq) => ClusterSubcommand.FAILAUTHREQ,
_ when param.SequenceEqual(CmdStrings.failstopwrites) => ClusterSubcommand.FAILSTOPWRITES,
_ when param.SequenceEqual(CmdStrings.failreplicationoffset) => ClusterSubcommand.FAILREPLICATIONOFFSET,
_ => ConvertToClusterSubcommandIgnoreCase(ref param)
};

return true;
}

/// <summary>
/// Convert cluster subcommand sequence to ClusterSubcommand type by ignoring case
/// </summary>
/// <param name="subcommand"></param>
/// <returns>ClusterSubcommand type</returns>
private static ClusterSubcommand ConvertToClusterSubcommandIgnoreCase(ref Span<byte> subcommand)
{
ConvertUtils.MakeUpperCase(subcommand);
var subcmd = subcommand switch
{
_ when subcommand.SequenceEqual(CmdStrings.MEET) => ClusterSubcommand.MEET,
_ when subcommand.SequenceEqual(CmdStrings.BUMPEPOCH) => ClusterSubcommand.BUMPEPOCH,
_ when subcommand.SequenceEqual(CmdStrings.FORGET) => ClusterSubcommand.FORGET,
_ when subcommand.SequenceEqual(CmdStrings.INFO) => ClusterSubcommand.INFO,
_ when subcommand.SequenceEqual(CmdStrings.HELP) => ClusterSubcommand.HELP,
_ when subcommand.SequenceEqual(CmdStrings.MYID) => ClusterSubcommand.MYID,
_ when subcommand.SequenceEqual(CmdStrings.MYPARENTID) => ClusterSubcommand.MYPARENTID,
_ when subcommand.SequenceEqual(CmdStrings.ENDPOINT) => ClusterSubcommand.ENDPOINT,
_ when subcommand.SequenceEqual(CmdStrings.NODES) => ClusterSubcommand.NODES,
_ when subcommand.SequenceEqual(CmdStrings.SET_CONFIG_EPOCH) => ClusterSubcommand.SETCONFIGEPOCH,
_ when subcommand.SequenceEqual(CmdStrings.SHARDS) => ClusterSubcommand.SHARDS,
_ when subcommand.SequenceEqual(CmdStrings.RESET) => ClusterSubcommand.RESET,
_ when subcommand.SequenceEqual(CmdStrings.FAILOVER) => ClusterSubcommand.FAILOVER,
_ when subcommand.SequenceEqual(CmdStrings.ADDSLOTS) => ClusterSubcommand.ADDSLOTS,
_ when subcommand.SequenceEqual(CmdStrings.ADDSLOTSRANGE) => ClusterSubcommand.ADDSLOTSRANGE,
_ when subcommand.SequenceEqual(CmdStrings.BANLIST) => ClusterSubcommand.BANLIST,
_ when subcommand.SequenceEqual(CmdStrings.COUNTKEYSINSLOT) => ClusterSubcommand.COUNTKEYSINSLOT,
_ when subcommand.SequenceEqual(CmdStrings.DELSLOTS) => ClusterSubcommand.DELSLOTS,
_ when subcommand.SequenceEqual(CmdStrings.DELSLOTSRANGE) => ClusterSubcommand.DELSLOTSRANGE,
_ when subcommand.SequenceEqual(CmdStrings.DELKEYSINSLOT) => ClusterSubcommand.DELKEYSINSLOT,
_ when subcommand.SequenceEqual(CmdStrings.DELKEYSINSLOTRANGE) => ClusterSubcommand.DELKEYSINSLOTRANGE,
_ when subcommand.SequenceEqual(CmdStrings.GETKEYSINSLOT) => ClusterSubcommand.GETKEYSINSLOT,
_ when subcommand.SequenceEqual(CmdStrings.KEYSLOT) => ClusterSubcommand.KEYSLOT,
_ when subcommand.SequenceEqual(CmdStrings.SETSLOT) => ClusterSubcommand.SETSLOT,
_ when subcommand.SequenceEqual(CmdStrings.SETSLOTSRANGE) => ClusterSubcommand.SETSLOTSRANGE,
_ when subcommand.SequenceEqual(CmdStrings.SLOTSTATE) => ClusterSubcommand.SLOTSTATE,
_ when subcommand.SequenceEqual(CmdStrings.SLOTS) => ClusterSubcommand.SLOTS,
_ when subcommand.SequenceEqual(CmdStrings.MIGRATE) => ClusterSubcommand.MIGRATE,
_ when subcommand.SequenceEqual(CmdStrings.MTASKS) => ClusterSubcommand.MTASKS,
_ when subcommand.SequenceEqual(CmdStrings.REPLICAS) => ClusterSubcommand.REPLICAS,
_ when subcommand.SequenceEqual(CmdStrings.REPLICATE) => ClusterSubcommand.REPLICATE,
_ => ClusterSubcommand.NONE
};

return subcmd;
}
}
}
6 changes: 5 additions & 1 deletion libs/cluster/Server/HashSlot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ public enum SlotState : byte
/// </summary>
FAIL,
/// <summary>
///
/// Not a slot state. Used with SETSLOT
/// </summary>
NODE,
/// <summary>
/// Invalid slot state
/// </summary>
INVALID,
}

/// <summary>
Expand Down
Loading