diff --git a/libs/client/ClientSession/GarnetClientSession.cs b/libs/client/ClientSession/GarnetClientSession.cs index d9e81c7e74..45e35211bb 100644 --- a/libs/client/ClientSession/GarnetClientSession.cs +++ b/libs/client/ClientSession/GarnetClientSession.cs @@ -196,7 +196,7 @@ public void ExecuteForArray(params string[] command) } static ReadOnlySpan CLUSTER => "$7\r\nCLUSTER\r\n"u8; - static ReadOnlySpan appendLog => "appendlog"u8; + static ReadOnlySpan appendLog => "APPENDLOG"u8; /// /// ClusterAppendLog diff --git a/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs b/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs index 1c369e97b2..405a1be816 100644 --- a/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs +++ b/libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs @@ -15,10 +15,10 @@ namespace Garnet.client /// public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer { - static ReadOnlySpan initiate_replica_sync => "initiate_replica_sync"u8; - static ReadOnlySpan send_ckpt_metadata => "send_ckpt_metadata"u8; - static ReadOnlySpan send_ckpt_file_segment => "send_ckpt_file_segment"u8; - static ReadOnlySpan begin_replica_recover => "begin_replica_recover"u8; + static ReadOnlySpan initiate_replica_sync => "INITIATE_REPLICA_SYNC"u8; + static ReadOnlySpan send_ckpt_metadata => "SEND_CKPT_METADATA"u8; + static ReadOnlySpan send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8; + static ReadOnlySpan begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8; /// /// Initiate checkpoint retrieval from replica by sending replica checkpoint information and AOF address range @@ -267,7 +267,7 @@ public Task ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); tcsQueue.Enqueue(tcs); byte* curr = offset; - int arraySize = 8; + int arraySize = 9; while (!RespWriteUtils.WriteArrayLength(arraySize, ref curr, end)) { diff --git a/libs/cluster/CmdStrings.cs b/libs/cluster/CmdStrings.cs index 848f2ac9a1..4918a31bd9 100644 --- a/libs/cluster/CmdStrings.cs +++ b/libs/cluster/CmdStrings.cs @@ -14,79 +14,53 @@ static class CmdStrings /// Request strings /// public static ReadOnlySpan INFO => "INFO"u8; - public static ReadOnlySpan info => "info"u8; public static ReadOnlySpan CLUSTER => "CLUSTER"u8; public static ReadOnlySpan NODES => "NODES"u8; - public static ReadOnlySpan nodes => "nodes"u8; public static ReadOnlySpan ADDSLOTS => "ADDSLOTS"u8; - public static ReadOnlySpan addslots => "addslots"u8; public static ReadOnlySpan ADDSLOTSRANGE => "ADDSLOTSRANGE"u8; - public static ReadOnlySpan addslotsrange => "addslotsrange"u8; - public static ReadOnlySpan aofsync => "aofsync"u8; - public static ReadOnlySpan appendlog => "appendlog"u8; - public static ReadOnlySpan initiate_replica_sync => "initiate_replica_sync"u8; - public static ReadOnlySpan send_ckpt_metadata => "send_ckpt_metadata"u8; - public static ReadOnlySpan send_ckpt_file_segment => "send_ckpt_file_segment"u8; - public static ReadOnlySpan begin_replica_recover => "begin_replica_recover"u8; public static ReadOnlySpan BUMPEPOCH => "BUMPEPOCH"u8; - public static ReadOnlySpan bumpepoch => "bumpepoch"u8; public static ReadOnlySpan BANLIST => "BANLIST"u8; - public static ReadOnlySpan banlist => "banlist"u8; public static ReadOnlySpan COUNTKEYSINSLOT => "COUNTKEYSINSLOT"u8; - public static ReadOnlySpan countkeysinslot => "countkeysinslot"u8; - public static ReadOnlySpan delkeysinslot => "delkeysinslot"u8; public static ReadOnlySpan DELKEYSINSLOT => "DELKEYSINSLOT"u8; - public static ReadOnlySpan delkeysinslotrange => "delkeysinslotrange"u8; public static ReadOnlySpan DELKEYSINSLOTRANGE => "DELKEYSINSLOTRANGE"u8; public static ReadOnlySpan DELSLOTS => "DELSLOTS"u8; - public static ReadOnlySpan delslots => "delslots"u8; public static ReadOnlySpan DELSLOTSRANGE => "DELSLOTSRANGE"u8; - public static ReadOnlySpan delslotsrange => "delslotsrange"u8; public static ReadOnlySpan FAILOVER => "FAILOVER"u8; - public static ReadOnlySpan failover => "failover"u8; - public static ReadOnlySpan REPLICAOF => "REPLICAOF"u8; - public static ReadOnlySpan SECONDARYOF => "SLAVEOF"u8; - public static ReadOnlySpan failauthreq => "failauthreq"u8; - public static ReadOnlySpan failstopwrites => "failstopwrites"u8; - public static ReadOnlySpan failreplicationoffset => "failreplicationoffset"u8; public static ReadOnlySpan FORGET => "FORGET"u8; - public static ReadOnlySpan forget => "forget"u8; public static ReadOnlySpan GETKEYSINSLOT => "GETKEYSINSLOT"u8; - public static ReadOnlySpan getkeysinslot => "getkeysinslot"u8; public static ReadOnlySpan KEYSLOT => "KEYSLOT"u8; - public static ReadOnlySpan keyslot => "keyslot"u8; public static ReadOnlySpan HELP => "HELP"u8; - public static ReadOnlySpan help => "help"u8; public static ReadOnlySpan MEET => "MEET"u8; - public static ReadOnlySpan meet => "meet"u8; public static ReadOnlySpan MIGRATE => "MIGRATE"u8; public static ReadOnlySpan MTASKS => "MTASKS"u8; public static ReadOnlySpan MYID => "MYID"u8; - public static ReadOnlySpan myid => "myid"u8; public static ReadOnlySpan MYPARENTID => "MYPARENTID"u8; - public static ReadOnlySpan myparentid => "myparentid"u8; public static ReadOnlySpan ENDPOINT => "ENDPOINT"u8; - public static ReadOnlySpan endpoint => "endpoint"u8; public static ReadOnlySpan REPLICAS => "REPLICAS"u8; - public static ReadOnlySpan replicas => "replicas"u8; public static ReadOnlySpan REPLICATE => "REPLICATE"u8; - public static ReadOnlySpan replicate => "replicate"u8; public static ReadOnlySpan SET_CONFIG_EPOCH => "SET-CONFIG-EPOCH"u8; - public static ReadOnlySpan set_config_epoch => "set-config-epoch"u8; public static ReadOnlySpan SETSLOT => "SETSLOT"u8; - public static ReadOnlySpan setslot => "setslot"u8; public static ReadOnlySpan SETSLOTSRANGE => "SETSLOTSRANGE"u8; - public static ReadOnlySpan setslotsrange => "setslotsrange"u8; public static ReadOnlySpan SHARDS => "SHARDS"u8; - public static ReadOnlySpan shards => "shards"u8; public static ReadOnlySpan SLOTS => "SLOTS"u8; - public static ReadOnlySpan slots => "slots"u8; public static ReadOnlySpan SLOTSTATE => "SLOTSTATE"u8; - public static ReadOnlySpan slotstate => "slotstate"u8; public static ReadOnlySpan GOSSIP => "GOSSIP"u8; public static ReadOnlySpan WITHMEET => "WITHMEET"u8; public static ReadOnlySpan RESET => "RESET"u8; - public static ReadOnlySpan reset => "reset"u8; + + /// + /// Internode communication cluster commands + /// + public static ReadOnlySpan aofsync => "AOFSYNC"u8; + public static ReadOnlySpan appendlog => "APPENDLOG"u8; + public static ReadOnlySpan initiate_replica_sync => "INITIATE_REPLICA_SYNC"u8; + public static ReadOnlySpan send_ckpt_metadata => "SEND_CKPT_METADATA"u8; + public static ReadOnlySpan send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8; + public static ReadOnlySpan begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8; + public static ReadOnlySpan failauthreq => "FAILAUTHREQ"u8; + public static ReadOnlySpan failstopwrites => "FAILSTOPWRITES"u8; + public static ReadOnlySpan failreplicationoffset => "FAILREPLICATIONOFFSET"u8; + /// /// Response strings diff --git a/libs/cluster/Parsing/ClusterSubCommand.cs b/libs/cluster/Parsing/ClusterSubCommand.cs new file mode 100644 index 0000000000..df6581ff98 --- /dev/null +++ b/libs/cluster/Parsing/ClusterSubCommand.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +namespace Garnet.cluster +{ + internal enum ClusterSubcommand : byte + { + NONE, + + // Basic cluster management commands + BUMPEPOCH, + FORGET, + INFO, + HELP, + MEET, + MYID, + MYPARENTID, + ENDPOINT, + NODES, + SETCONFIGEPOCH, + SHARDS, + GOSSIP, + RESET, + + // Failover management commands + FAILOVER, + FAILAUTHREQ, + FAILSTOPWRITES, + FAILREPLICATIONOFFSET, + + // Slot management commands + ADDSLOTS, + ADDSLOTSRANGE, + BANLIST, + COUNTKEYSINSLOT, + DELSLOTS, + DELSLOTSRANGE, + DELKEYSINSLOT, + DELKEYSINSLOTRANGE, + GETKEYSINSLOT, + KEYSLOT, + SETSLOT, + SETSLOTSRANGE, + SLOTS, + SLOTSTATE, + + // Migrate management commands + MIGRATE, + MTASKS, + + // Replication management commands + REPLICAS, + REPLICATE, + AOFSYNC, + APPENDLOG, + INITIATE_REPLICA_SYNC, + SEND_CKPT_METADATA, + SEND_CKPT_FILE_SEGMENT, + BEGIN_REPLICA_RECOVER + } +} \ No newline at end of file diff --git a/libs/cluster/Parsing/ClusterSubCommandParsing.cs b/libs/cluster/Parsing/ClusterSubCommandParsing.cs new file mode 100644 index 0000000000..fcfde78e09 --- /dev/null +++ b/libs/cluster/Parsing/ClusterSubCommandParsing.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using Garnet.common; +using Garnet.server; + +namespace Garnet.cluster +{ + internal sealed unsafe partial class ClusterSession : IClusterSession + { + /// + /// Parse cluster subcommand and convert to ClusterSubcommand type. + /// + /// + /// + /// True if parsing succeeded without any errors, otherwise false + private bool ParseClusterSubcommand(ReadOnlySpan bufSpan, out Span param, out ClusterSubcommand subcmd) + { + subcmd = ClusterSubcommand.NONE; + param = GetNextToken(bufSpan, out var success1); + if (!success1) return false; + + subcmd = param switch + { + _ when param.SequenceEqual(CmdStrings.appendlog) => ClusterSubcommand.APPENDLOG, + _ when param.SequenceEqual(CmdStrings.GOSSIP) => ClusterSubcommand.GOSSIP, + _ when param.SequenceEqual(CmdStrings.send_ckpt_file_segment) => ClusterSubcommand.SEND_CKPT_FILE_SEGMENT, + _ when param.SequenceEqual(CmdStrings.send_ckpt_metadata) => ClusterSubcommand.SEND_CKPT_METADATA, + _ when param.SequenceEqual(CmdStrings.aofsync) => ClusterSubcommand.AOFSYNC, + _ when param.SequenceEqual(CmdStrings.begin_replica_recover) => ClusterSubcommand.BEGIN_REPLICA_RECOVER, + _ when param.SequenceEqual(CmdStrings.initiate_replica_sync) => ClusterSubcommand.INITIATE_REPLICA_SYNC, + _ when param.SequenceEqual(CmdStrings.failauthreq) => ClusterSubcommand.FAILAUTHREQ, + _ when param.SequenceEqual(CmdStrings.failstopwrites) => ClusterSubcommand.FAILSTOPWRITES, + _ when param.SequenceEqual(CmdStrings.failreplicationoffset) => ClusterSubcommand.FAILREPLICATIONOFFSET, + _ => ConvertToClusterSubcommandIgnoreCase(ref param) + }; + + return true; + } + + /// + /// Convert cluster subcommand sequence to ClusterSubcommand type by ignoring case + /// + /// + /// ClusterSubcommand type + private static ClusterSubcommand ConvertToClusterSubcommandIgnoreCase(ref Span subcommand) + { + ConvertUtils.MakeUpperCase(subcommand); + var subcmd = subcommand switch + { + _ when subcommand.SequenceEqual(CmdStrings.MEET) => ClusterSubcommand.MEET, + _ when subcommand.SequenceEqual(CmdStrings.BUMPEPOCH) => ClusterSubcommand.BUMPEPOCH, + _ when subcommand.SequenceEqual(CmdStrings.FORGET) => ClusterSubcommand.FORGET, + _ when subcommand.SequenceEqual(CmdStrings.INFO) => ClusterSubcommand.INFO, + _ when subcommand.SequenceEqual(CmdStrings.HELP) => ClusterSubcommand.HELP, + _ when subcommand.SequenceEqual(CmdStrings.MYID) => ClusterSubcommand.MYID, + _ when subcommand.SequenceEqual(CmdStrings.MYPARENTID) => ClusterSubcommand.MYPARENTID, + _ when subcommand.SequenceEqual(CmdStrings.ENDPOINT) => ClusterSubcommand.ENDPOINT, + _ when subcommand.SequenceEqual(CmdStrings.NODES) => ClusterSubcommand.NODES, + _ when subcommand.SequenceEqual(CmdStrings.SET_CONFIG_EPOCH) => ClusterSubcommand.SETCONFIGEPOCH, + _ when subcommand.SequenceEqual(CmdStrings.SHARDS) => ClusterSubcommand.SHARDS, + _ when subcommand.SequenceEqual(CmdStrings.RESET) => ClusterSubcommand.RESET, + _ when subcommand.SequenceEqual(CmdStrings.FAILOVER) => ClusterSubcommand.FAILOVER, + _ when subcommand.SequenceEqual(CmdStrings.ADDSLOTS) => ClusterSubcommand.ADDSLOTS, + _ when subcommand.SequenceEqual(CmdStrings.ADDSLOTSRANGE) => ClusterSubcommand.ADDSLOTSRANGE, + _ when subcommand.SequenceEqual(CmdStrings.BANLIST) => ClusterSubcommand.BANLIST, + _ when subcommand.SequenceEqual(CmdStrings.COUNTKEYSINSLOT) => ClusterSubcommand.COUNTKEYSINSLOT, + _ when subcommand.SequenceEqual(CmdStrings.DELSLOTS) => ClusterSubcommand.DELSLOTS, + _ when subcommand.SequenceEqual(CmdStrings.DELSLOTSRANGE) => ClusterSubcommand.DELSLOTSRANGE, + _ when subcommand.SequenceEqual(CmdStrings.DELKEYSINSLOT) => ClusterSubcommand.DELKEYSINSLOT, + _ when subcommand.SequenceEqual(CmdStrings.DELKEYSINSLOTRANGE) => ClusterSubcommand.DELKEYSINSLOTRANGE, + _ when subcommand.SequenceEqual(CmdStrings.GETKEYSINSLOT) => ClusterSubcommand.GETKEYSINSLOT, + _ when subcommand.SequenceEqual(CmdStrings.KEYSLOT) => ClusterSubcommand.KEYSLOT, + _ when subcommand.SequenceEqual(CmdStrings.SETSLOT) => ClusterSubcommand.SETSLOT, + _ when subcommand.SequenceEqual(CmdStrings.SETSLOTSRANGE) => ClusterSubcommand.SETSLOTSRANGE, + _ when subcommand.SequenceEqual(CmdStrings.SLOTSTATE) => ClusterSubcommand.SLOTSTATE, + _ when subcommand.SequenceEqual(CmdStrings.SLOTS) => ClusterSubcommand.SLOTS, + _ when subcommand.SequenceEqual(CmdStrings.MIGRATE) => ClusterSubcommand.MIGRATE, + _ when subcommand.SequenceEqual(CmdStrings.MTASKS) => ClusterSubcommand.MTASKS, + _ when subcommand.SequenceEqual(CmdStrings.REPLICAS) => ClusterSubcommand.REPLICAS, + _ when subcommand.SequenceEqual(CmdStrings.REPLICATE) => ClusterSubcommand.REPLICATE, + _ => ClusterSubcommand.NONE + }; + + return subcmd; + } + } +} \ No newline at end of file diff --git a/libs/cluster/Server/HashSlot.cs b/libs/cluster/Server/HashSlot.cs index ea69d43814..80e261c195 100644 --- a/libs/cluster/Server/HashSlot.cs +++ b/libs/cluster/Server/HashSlot.cs @@ -31,9 +31,13 @@ public enum SlotState : byte /// FAIL, /// - /// + /// Not a slot state. Used with SETSLOT /// NODE, + /// + /// Invalid slot state + /// + INVALID, } /// diff --git a/libs/cluster/Session/ClusterCommands.cs b/libs/cluster/Session/ClusterCommands.cs index db704abbec..ee96a8a282 100644 --- a/libs/cluster/Session/ClusterCommands.cs +++ b/libs/cluster/Session/ClusterCommands.cs @@ -2,15 +2,10 @@ // Licensed under the MIT license. using System; -using System.Buffers; using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; using System.Text; using Garnet.common; using Garnet.server; -using Microsoft.Extensions.Logging; -using Tsavorite.core; namespace Garnet.cluster { @@ -109,6 +104,7 @@ private bool TryParseSlots(int count, ref byte* ptr, out HashSet slots, out if (slotStart > slotEnd) { + errorMessage = Encoding.ASCII.GetBytes($"ERR Invalid range {slotStart} > {slotEnd}!"); invalidRange = true; continue; } @@ -144,1542 +140,156 @@ public void UnsafeWaitForConfigTransition() private bool ProcessClusterCommands(ReadOnlySpan bufSpan, int count) { - if (clusterProvider.clusterManager == null) - { - if (!DrainCommands(bufSpan, count)) - return false; - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CLUSTER, ref dcurr, dend)) - SendAndReset(); - return true; - } - - bool errorFlag; - string errorCmd; - if (count > 0) - { - var param = GetCommand(bufSpan, out var success1); - if (!success1) return false; - - if (ProcessClusterBasicCommands(bufSpan, param, count - 1, out errorFlag, out errorCmd)) - goto checkErrorFlags; - else if (ProcessFailoverCommands(bufSpan, param, count - 1, out errorFlag, out errorCmd)) - goto checkErrorFlags; - else if (ProcessSlotManageCommands(bufSpan, param, count - 1, out errorFlag, out errorCmd)) - goto checkErrorFlags; - else if (ProcessClusterMigrationCommands(bufSpan, param, count - 1, out errorFlag, out errorCmd)) - goto checkErrorFlags; - else if (ProcessClusterReplicationCommands(bufSpan, param, count - 1, out errorFlag, out errorCmd)) - goto checkErrorFlags; - else - { - if (!DrainCommands(bufSpan, count - 1)) - return false; - var paramStr = Encoding.ASCII.GetString(param); - while (!RespWriteUtils.WriteError($"ERR Unknown subcommand or wrong number of arguments for '{paramStr}'. Try CLUSTER HELP.", ref dcurr, dend)) - SendAndReset(); - } - } - else - { - errorFlag = true; - errorCmd = "CLUSTER"; - } - - checkErrorFlags: - if (errorFlag && !string.IsNullOrWhiteSpace(errorCmd)) - { - var errorMsg = string.Format(CmdStrings.GenericErrMissingParam, errorCmd); - var bresp_ERRMISSINGPARAM = Encoding.ASCII.GetBytes(errorMsg); - bresp_ERRMISSINGPARAM.CopyTo(new Span(dcurr, bresp_ERRMISSINGPARAM.Length)); - dcurr += bresp_ERRMISSINGPARAM.Length; - } - sessionMetrics?.incr_total_cluster_commands_processed(); - return true; - } - - private bool ProcessClusterBasicCommands(ReadOnlySpan bufSpan, ReadOnlySpan param, int count, out bool errorFlag, out string errorCmd) - { - errorFlag = false; - errorCmd = string.Empty; - if (param.SequenceEqual(CmdStrings.BUMPEPOCH) || param.SequenceEqual(CmdStrings.bumpepoch)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count > 0) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - success = clusterProvider.clusterManager.TryBumpClusterEpoch(); - readHead = (int)(ptr - recvBufferPtr); - - if (success) - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CONFIG_UPDATE, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.FORGET) || param.SequenceEqual(CmdStrings.forget)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - - var expirySeconds = 60; - if (count == 2) - { - if (!RespReadUtils.ReadIntWithLengthHeader(out expirySeconds, ref ptr, recvBufferPtr + bytesRead)) - return false; - } - readHead = (int)(ptr - recvBufferPtr); - - logger?.LogTrace("CLUSTER FORGET {nodeid} {seconds}", nodeid, expirySeconds); - if (!clusterProvider.clusterManager.TryRemoveWorker(nodeid, expirySeconds, out var errorMessage)) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.INFO) || param.SequenceEqual(CmdStrings.info)) - { - if (count > 0) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - var clusterInfo = clusterProvider.clusterManager.GetInfo(); - while (!RespWriteUtils.WriteAsciiBulkString(clusterInfo, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.HELP) || param.SequenceEqual(CmdStrings.help)) - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - var clusterCommands = ClusterCommandInfo.GetClusterCommands(); - while (!RespWriteUtils.WriteArrayLength(clusterCommands.Count, ref dcurr, dend)) - SendAndReset(); - foreach (var command in clusterCommands) - { - while (!RespWriteUtils.WriteSimpleString(command, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.MEET) || param.SequenceEqual(CmdStrings.meet)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count != 2) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var ipaddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadIntWithLengthHeader(out var port, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var ipaddressStr = Encoding.ASCII.GetString(ipaddress); - logger?.LogTrace("CLUSTER MEET {ipaddressStr} {port}", ipaddressStr, port); - clusterProvider.clusterManager.RunMeetTask(ipaddressStr, port); - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.MYID) || param.SequenceEqual(CmdStrings.myid)) - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - while (!RespWriteUtils.WriteAsciiBulkString(clusterProvider.clusterManager.CurrentConfig.LocalNodeId, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.MYPARENTID) || param.SequenceEqual(CmdStrings.myparentid)) - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - - var current = clusterProvider.clusterManager.CurrentConfig; - var parentId = current.LocalNodeRole == NodeRole.PRIMARY ? current.LocalNodeId : current.LocalNodePrimaryId; - while (!RespWriteUtils.WriteAsciiBulkString(parentId, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.ENDPOINT) || param.SequenceEqual(CmdStrings.endpoint)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - var current = clusterProvider.clusterManager.CurrentConfig; - var (host, port) = current.GetEndpointFromNodeId(nodeid); - while (!RespWriteUtils.WriteAsciiBulkString($"{host}:{port}", ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.NODES) || param.SequenceEqual(CmdStrings.nodes)) - { - if (count > 0) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - var nodes = clusterProvider.clusterManager.CurrentConfig.GetClusterInfo(); - while (!RespWriteUtils.WriteAsciiBulkString(nodes, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.set_config_epoch) || param.SequenceEqual(CmdStrings.SET_CONFIG_EPOCH)) - { - if (count != 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - Debug.WriteLine($"{Encoding.UTF8.GetString(new Span(recvBufferPtr, Math.Min(bytesRead, 128))).Replace("\n", "|").Replace("\r", "")}"); - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadIntWithLengthHeader(out var configEpoch, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - if (clusterProvider.clusterManager.CurrentConfig.NumWorkers > 2) - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CONFIG_EPOCH_ASSIGNMENT, ref dcurr, dend)) - SendAndReset(); - } - else - { - if (!clusterProvider.clusterManager.TrySetLocalConfigEpoch(configEpoch, out var errorMessage)) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - } - else if (param.SequenceEqual(CmdStrings.SHARDS) || param.SequenceEqual(CmdStrings.shards)) - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - var shardsInfo = clusterProvider.clusterManager.CurrentConfig.GetShardsInfo(); - while (!RespWriteUtils.WriteAsciiDirect(shardsInfo, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.GOSSIP)) - { - if (count < 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - var gossipWithMeet = false; - if (count > 1) - { - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var withMeet, ref ptr, recvBufferPtr + bytesRead)) - return false; - Debug.Assert(withMeet.SequenceEqual(CmdStrings.WITHMEET.ToArray())); - if (withMeet.SequenceEqual(CmdStrings.WITHMEET.ToArray())) - gossipWithMeet = true; - } - - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var gossipMessage, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - clusterProvider.clusterManager.gossipStats.UpdateGossipBytesRecv(gossipMessage.Length); - var current = clusterProvider.clusterManager.CurrentConfig; - - // Try merge if not just a ping message - if (gossipMessage.Length > 0) - { - var other = ClusterConfig.FromByteArray(gossipMessage); - // Accept gossip message if it is a gossipWithMeet or node from node that is already known and trusted - // GossipWithMeet messages are only send through a call to CLUSTER MEET at the remote node - if (gossipWithMeet || current.IsKnown(other.LocalNodeId)) - { - _ = clusterProvider.clusterManager.TryMerge(other); - } - else - logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId); - } - - // Respond if configuration has changed or gossipWithMeet option is specified - if (lastSentConfig != current || gossipWithMeet) - { - var configByteArray = current.ToByteArray(); - clusterProvider.clusterManager.gossipStats.UpdateGossipBytesSend(configByteArray.Length); - while (!RespWriteUtils.WriteBulkString(configByteArray, ref dcurr, dend)) - SendAndReset(); - lastSentConfig = current; - } - else - { - while (!RespWriteUtils.WriteBulkString(Array.Empty(), ref dcurr, dend)) - SendAndReset(); - } - return true; - } - } - else if (param.SequenceEqual(CmdStrings.RESET) || param.SequenceEqual(CmdStrings.reset)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - var ptr = recvBufferPtr + readHead; - var soft = true; - var expirySeconds = 60; - - if (count > 0) - { - if (!RespReadUtils.ReadStringWithLengthHeader(out var option, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (option.Equals("HARD", StringComparison.OrdinalIgnoreCase)) - soft = false; - } - - if (count > 1) - { - if (!RespReadUtils.ReadIntWithLengthHeader(out expirySeconds, ref ptr, recvBufferPtr + bytesRead)) - return false; - } - - readHead = (int)(ptr - recvBufferPtr); - - clusterProvider.clusterManager.TryReset(soft, expirySeconds); - if (!soft) clusterProvider.FlushDB(true); - - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - else { return false; } - return true; - } - - public bool ProcessFailoverCommands(ReadOnlySpan bufSpan, ReadOnlySpan param, int count, out bool errorFlag, out string errorCmd) - { - errorFlag = false; - errorCmd = string.Empty; - if (param.SequenceEqual(CmdStrings.FAILOVER) || param.SequenceEqual(CmdStrings.failover)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 0) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - var failoverOption = FailoverOption.DEFAULT; - TimeSpan failoverTimeout = default; - if (count > 0) - { - if (!RespReadUtils.ReadStringWithLengthHeader(out var failoverOptionStr, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!Enum.TryParse(failoverOptionStr, ignoreCase: true, out failoverOption)) - { - while (!RespWriteUtils.WriteError($"ERR Failover option ({failoverOptionStr}) not supported", ref dcurr, dend)) - SendAndReset(); - failoverOption = FailoverOption.INVALID; - } - } - - if (count > 1) - { - if (!RespReadUtils.ReadIntWithLengthHeader(out var failoverTimeoutSeconds, ref ptr, recvBufferPtr + bytesRead)) - return false; - failoverTimeout = TimeSpan.FromSeconds(failoverTimeoutSeconds); - } - readHead = (int)(ptr - recvBufferPtr); - - if (clusterProvider.serverOptions.EnableAOF) - { - if (failoverOption == FailoverOption.ABORT) - { - clusterProvider.failoverManager.TryAbortReplicaFailover(); - } - else - { - var current = clusterProvider.clusterManager.CurrentConfig; - var nodeRole = current.LocalNodeRole; - if (nodeRole == NodeRole.REPLICA) - { - if (!clusterProvider.failoverManager.TryStartReplicaFailover(failoverOption, failoverTimeout)) - { - while (!RespWriteUtils.WriteError($"ERR failed to start failover for primary({current.GetLocalNodePrimaryAddress()})", ref dcurr, dend)) - SendAndReset(); - return true; - } - } - else - { - while (!RespWriteUtils.WriteError($"ERR Node is not a {NodeRole.REPLICA} ~{nodeRole}~", ref dcurr, dend)) - SendAndReset(); - return true; - } - } - } - else - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF, ref dcurr, dend)) - SendAndReset(); - return true; - } - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.failauthreq)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var nodeIdBytes, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var requestEpochBytes, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var claimedSlots, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var resp = clusterProvider.clusterManager.AuthorizeFailover( - Encoding.ASCII.GetString(nodeIdBytes), - BitConverter.ToInt64(requestEpochBytes), - claimedSlots) ? CmdStrings.RESP_RETURN_VAL_1 : CmdStrings.RESP_RETURN_VAL_0; - while (!RespWriteUtils.WriteDirect(resp, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.failstopwrites)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var nodeIdBytes, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - clusterProvider.clusterManager.TryStopWrites(Encoding.ASCII.GetString(nodeIdBytes)); - UnsafeWaitForConfigTransition(); - while (!RespWriteUtils.WriteInteger(clusterProvider.replicationManager.ReplicationOffset, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.failreplicationoffset)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadLongWithLengthHeader(out var primaryReplicationOffset, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var rOffset = clusterProvider.replicationManager.WaitForReplicationOffset(primaryReplicationOffset).GetAwaiter().GetResult(); - while (!RespWriteUtils.WriteInteger(rOffset, ref dcurr, dend)) - SendAndReset(); - } - else { return false; } - return true; - } - - public bool ProcessSlotManageCommands(ReadOnlySpan bufSpan, ReadOnlySpan param, int count, out bool errorFlag, out string errorCmd) - { - errorFlag = false; - errorCmd = string.Empty; - if (param.SequenceEqual(CmdStrings.ADDSLOTS) || param.SequenceEqual(CmdStrings.addslots)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - - //Try to parse slot ranges. - var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: false); - - readHead = (int)(ptr - recvBufferPtr); - - //The slot parsing may give errorMessage even if the methods TryParseSlots true. - if (slotsParsed && errorMessage != default) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - return true; - } - else if (!slotsParsed) return false; - - //Try to to add slots - if (!clusterProvider.clusterManager.TryAddSlots(slots, out var slotIndex) && - slotIndex != -1) - { - while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is already busy", ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.ADDSLOTSRANGE) || param.SequenceEqual(CmdStrings.addslotsrange)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 2 || (count & 0x1) == 0x1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - - // Try to parse slot ranges. - var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: true); - - readHead = (int)(ptr - recvBufferPtr); - - //The slot parsing may give errorMessage even if the TryParseSlots returns true. - if (slotsParsed && errorMessage != default) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - return true; - } - else if (!slotsParsed) return false; - - // Try to to add slots - if (!clusterProvider.clusterManager.TryAddSlots(slots, out var slotIndex) && - slotIndex != -1) - { - while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is already busy", ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.BANLIST) || param.SequenceEqual(CmdStrings.banlist)) - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - var banlist = clusterProvider.clusterManager.GetBanList(); - - while (!RespWriteUtils.WriteArrayLength(banlist.Count, ref dcurr, dend)) - SendAndReset(); - foreach (var replica in banlist) - { - while (!RespWriteUtils.WriteAsciiBulkString(replica, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.COUNTKEYSINSLOT) || param.SequenceEqual(CmdStrings.countkeysinslot)) - { - var current = clusterProvider.clusterManager.CurrentConfig; - if (count != 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - if (ClusterConfig.OutOfRange(slot)) - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE, ref dcurr, dend)) - SendAndReset(); - } - else if (!current.IsLocal((ushort)slot)) - { - Redirect((ushort)slot, current); - } - else - { - try - { - var keyCount = CountKeysInSlot(slot); - while (!RespWriteUtils.WriteInteger(keyCount, ref dcurr, dend)) - SendAndReset(); - } - catch (Exception ex) - { - logger?.LogError(ex, "Critical error in count keys"); - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_N1, ref dcurr, dend)) - SendAndReset(); - } - } - } - } - else if (param.SequenceEqual(CmdStrings.DELSLOTS) || param.SequenceEqual(CmdStrings.delslots)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - //Try to parse slot ranges. - var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: false); - - readHead = (int)(ptr - recvBufferPtr); - - //The slot parsing may give errorMessage even if the TryParseSlots returns true. - if (slotsParsed && errorMessage != default) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - return true; - } - else if (!slotsParsed) return false; - - //Try remove the slots - if (!clusterProvider.clusterManager.TryRemoveSlots(slots, out var slotIndex) && - slotIndex != -1) - { - while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is already not assigned", ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.DELSLOTSRANGE) || param.SequenceEqual(CmdStrings.delslotsrange)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - // CLUSTER ADDSLOTSRANGE [start-slot end-slot] // 2 + [2] even number of arguments - if (count < 2 || (count & 0x1) == 0x1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - - //Try to parse slot ranges. - var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: true); - - readHead = (int)(ptr - recvBufferPtr); - - //The slot parsing may give errorMessage even if the TryParseSlots returns true. - if (slotsParsed && errorMessage != default) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - return true; - } - else if (!slotsParsed) return false; - - //Try remove the slots - if (!clusterProvider.clusterManager.TryRemoveSlots(slots, out var slotIndex) && - slotIndex != -1) - { - while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is already not assigned", ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.DELKEYSINSLOT) || param.SequenceEqual(CmdStrings.delkeysinslot)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count != 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadIntWithLengthHeader(out int slot, ref ptr, recvBufferPtr + bytesRead)) - return false; - - readHead = (int)(ptr - recvBufferPtr); - - var slots = new HashSet() { slot }; - ClusterManager.DeleteKeysInSlotsFromMainStore(basicGarnetApi, slots); - if (!clusterProvider.serverOptions.DisableObjects) - ClusterManager.DeleteKeysInSlotsFromObjectStore(basicGarnetApi, slots); - - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.DELKEYSINSLOTRANGE) || param.SequenceEqual(CmdStrings.delkeysinslotrange)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count != 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - - //Try to parse slot ranges. - var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: true); - - readHead = (int)(ptr - recvBufferPtr); - - //The slot parsing may give errorMessage even if the TryParseSlots returns true. - if (slotsParsed && errorMessage != default) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - return true; - } - else if (!slotsParsed) return false; - - ClusterManager.DeleteKeysInSlotsFromMainStore(basicGarnetApi, slots); - if (!clusterProvider.serverOptions.DisableObjects) - ClusterManager.DeleteKeysInSlotsFromObjectStore(basicGarnetApi, slots); - - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.GETKEYSINSLOT) || param.SequenceEqual(CmdStrings.getkeysinslot)) - { - var current = clusterProvider.clusterManager.CurrentConfig; - - if (count < 2) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadIntWithLengthHeader(out int slot, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadIntWithLengthHeader(out int keyCount, ref ptr, recvBufferPtr + bytesRead)) - return false; - - readHead = (int)(ptr - recvBufferPtr); - - if (ClusterConfig.OutOfRange(slot)) - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE, ref dcurr, dend)) - SendAndReset(); - } - else if (!current.IsLocal((ushort)slot)) - { - Redirect((ushort)slot, current); - } - else - { - var keys = GetKeysInSlot(slot, keyCount); - int keyCountRet = Math.Min(keys.Count, keyCount); - while (!RespWriteUtils.WriteArrayLength(keyCountRet, ref dcurr, dend)) - SendAndReset(); - for (int i = 0; i < keyCountRet; i++) - while (!RespWriteUtils.WriteBulkString(keys[i], ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.KEYSLOT) || param.SequenceEqual(CmdStrings.keyslot)) - { - if (count < 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - byte* keyPtr = null; - int ksize = 0; - if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - int slot = NumUtils.HashSlot(keyPtr, ksize); - while (!RespWriteUtils.WriteInteger(slot, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.SETSLOT) || param.SequenceEqual(CmdStrings.setslot)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 2) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadStringWithLengthHeader(out var subcommand, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!Enum.TryParse(subcommand, ignoreCase: true, out SlotState slotState)) - slotState = SlotState.STABLE; - - string nodeid = null; - if (count > 2) - { - if (!RespReadUtils.ReadStringWithLengthHeader(out nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - } - readHead = (int)(ptr - recvBufferPtr); - - if (!ClusterConfig.OutOfRange(slot)) - { - // Try to set slot state - bool setSlotsSucceeded; - ReadOnlySpan errorMessage = default; - switch (slotState) - { - case SlotState.STABLE: - setSlotsSucceeded = true; - clusterProvider.clusterManager.ResetSlotState(slot); - break; - case SlotState.IMPORTING: - setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForImport(slot, nodeid, out errorMessage); - break; - case SlotState.MIGRATING: - setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForMigration(slot, nodeid, out errorMessage); - break; - case SlotState.NODE: - setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForOwnershipChange(slot, nodeid, out errorMessage); - break; - default: - setSlotsSucceeded = false; - errorMessage = Encoding.ASCII.GetBytes($"ERR Slot state {subcommand} not supported."); - break; - } - - if (setSlotsSucceeded) - { - UnsafeWaitForConfigTransition(); - - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - } - } - else - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.SETSLOTSRANGE) || param.SequenceEqual(CmdStrings.setslotsrange)) + var parseSuccess = true; + if (count > 0) { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - if (count < 3) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - // CLUSTER SETSLOTRANGE IMPORTING [slot-start slot-end] - // CLUSTER SETSLOTRANGE MIGRATING [slot-start slot-end] - // CLUSTER SETSLOTRANGE NODE [slot-start slot-end] - // CLUSTER SETSLOTRANGE STABLE [slot-start slot-end] - - string nodeid = default; - var _count = count - 1; - var ptr = recvBufferPtr + readHead; - // Extract subcommand - if (!RespReadUtils.ReadStringWithLengthHeader(out var subcommand, ref ptr, recvBufferPtr + bytesRead)) - return false; - - // Try parse slot state - if (!Enum.TryParse(subcommand, out SlotState slotState)) - { - // Log error for invalid slot state option - logger?.LogError("The given input '{input}' is not a valid slot state option.", subcommand); + var invalidParameters = false; + if (!ParseClusterSubcommand(bufSpan, out var subcommand, out var subcmd)) return false; + switch (subcmd) + { + case ClusterSubcommand.BUMPEPOCH: + parseSuccess = NetworkClusterBumpEpoch(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.FORGET: + parseSuccess = NetworkClusterForget(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.INFO: + parseSuccess = NetworkClusterInfo(count - 1, out invalidParameters); + break; + case ClusterSubcommand.HELP: + parseSuccess = NetworkClusterHelp(count - 1, out invalidParameters); + break; + case ClusterSubcommand.MEET: + parseSuccess = NetworkClusterMeet(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.MYID: + parseSuccess = NetworkClusterMyid(count - 1, out invalidParameters); + break; + case ClusterSubcommand.MYPARENTID: + parseSuccess = NetworkClusterMyParentId(count - 1, out invalidParameters); + break; + case ClusterSubcommand.ENDPOINT: + parseSuccess = NetworkClusterEndpoint(count - 1, out invalidParameters); + break; + case ClusterSubcommand.NODES: + parseSuccess = NetworkClusterNodes(count - 1, out invalidParameters); + break; + case ClusterSubcommand.SETCONFIGEPOCH: + parseSuccess = NetworkClusterSetConfigEpoch(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.SHARDS: + parseSuccess = NetworkClusterShards(count - 1, out invalidParameters); + break; + case ClusterSubcommand.GOSSIP: + parseSuccess = NetworkClusterGossip(count - 1, out invalidParameters); + break; + case ClusterSubcommand.RESET: + parseSuccess = NetworkClusterReset(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.FAILOVER: + parseSuccess = NetworkClusterFailover(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.FAILAUTHREQ: + parseSuccess = NetworkClusterFailAuthReq(count - 1, out invalidParameters); + break; + case ClusterSubcommand.FAILSTOPWRITES: + parseSuccess = NetworkClusterFailStopWrites(count - 1, out invalidParameters); + break; + case ClusterSubcommand.FAILREPLICATIONOFFSET: + parseSuccess = NetworkClusterFailReplicationOffset(count - 1, out invalidParameters); + break; + case ClusterSubcommand.ADDSLOTS: + parseSuccess = NetworkClusterAddSlots(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.ADDSLOTSRANGE: + parseSuccess = NetworkClusterAddSlotsRange(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.BANLIST: + parseSuccess = NetworkClusterBanList(count - 1, out invalidParameters); + break; + case ClusterSubcommand.COUNTKEYSINSLOT: + parseSuccess = NetworkClusterCountKeysInSlot(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.DELSLOTS: + parseSuccess = NetworkClusterDelSlots(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.DELSLOTSRANGE: + parseSuccess = NetworkClusterDelSlotsRange(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.DELKEYSINSLOT: + parseSuccess = NetworkClusterDelKeysInSlot(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.DELKEYSINSLOTRANGE: + parseSuccess = NetworkClusterDelKeysInSlotRange(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.GETKEYSINSLOT: + parseSuccess = NetworkClusterGetKeysInSlot(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.KEYSLOT: + parseSuccess = NetworkClusterKeySlot(count - 1, out invalidParameters); + break; + case ClusterSubcommand.SETSLOT: + parseSuccess = NetworkClusterSetSlot(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.SETSLOTSRANGE: + parseSuccess = NetworkClusterSetSlotsRange(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.SLOTS: + parseSuccess = NetworkClusterSlots(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.SLOTSTATE: + parseSuccess = NetworkClusterSlotState(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.MIGRATE: + parseSuccess = NetworkClusterMigrate(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.MTASKS: + parseSuccess = NetworkClusterMTasks(count - 1, out invalidParameters); + break; + case ClusterSubcommand.REPLICAS: + parseSuccess = NetworkClusterReplicas(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.REPLICATE: + parseSuccess = NetworkClusterReplicate(bufSpan, count - 1, out invalidParameters); + break; + case ClusterSubcommand.AOFSYNC: + parseSuccess = NetworkClusterAOFSync(count - 1, out invalidParameters); + break; + case ClusterSubcommand.APPENDLOG: + parseSuccess = NetworkClusterAppendLog(count - 1, out invalidParameters); + break; + case ClusterSubcommand.INITIATE_REPLICA_SYNC: + parseSuccess = NetworkClusterInitiateReplicaSync(count - 1, out invalidParameters); + break; + case ClusterSubcommand.SEND_CKPT_METADATA: + parseSuccess = NetworkClusterSendCheckpointMetadata(count - 1, out invalidParameters); + break; + case ClusterSubcommand.SEND_CKPT_FILE_SEGMENT: + parseSuccess = NetworkClusterSendCheckpointFileSegment(count - 1, out invalidParameters); + break; + case ClusterSubcommand.BEGIN_REPLICA_RECOVER: + parseSuccess = NetworkClusterBeginReplicaRecover(count - 1, out invalidParameters); + break; + default: if (!DrainCommands(bufSpan, count - 1)) return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - return true; - } - - // Extract nodeid for operations other than stable - if (slotState != SlotState.STABLE) - { - if (!RespReadUtils.ReadStringWithLengthHeader(out nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - _count = count - 2; - } - - // Try to parse slot ranges. The parsing may give errorMessage even if the TryParseSlots returns true. - var slotsParsed = TryParseSlots(_count, ref ptr, out var slots, out var errorMessage, range: true); - if (slotsParsed && errorMessage != default) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - return true; - } - else if (!slotsParsed) return false; - - readHead = (int)(ptr - recvBufferPtr); - - // Try to set slot states - bool setSlotsSucceeded; - switch (slotState) - { - case SlotState.STABLE: - setSlotsSucceeded = true; - clusterProvider.clusterManager.ResetSlotsState(slots); - break; - case SlotState.IMPORTING: - setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForImport(slots, nodeid, out errorMessage); - break; - case SlotState.MIGRATING: - setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForMigration(slots, nodeid, out errorMessage); - break; - case SlotState.NODE: - setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForOwnershipChange(slots, nodeid, out errorMessage); - break; - default: - setSlotsSucceeded = false; - errorMessage = Encoding.ASCII.GetBytes($"ERR Slot state {subcommand} not supported."); - break; - } - - if (setSlotsSucceeded) - { - UnsafeWaitForConfigTransition(); - - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.SLOTS) || param.SequenceEqual(CmdStrings.slots)) - { - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - var slotsInfo = clusterProvider.clusterManager.CurrentConfig.GetSlotsInfo(); - while (!RespWriteUtils.WriteAsciiDirect(slotsInfo, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.SLOTSTATE) || param.SequenceEqual(CmdStrings.slotstate)) - { - // CLUSTER SLOTSTATE - if (count < 1) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var current = clusterProvider.clusterManager.CurrentConfig; - var nodeId = current.GetNodeIdFromSlot((ushort)slot); - var state = current.GetState((ushort)slot); - var stateStr = state switch - { - SlotState.STABLE => "=", - SlotState.IMPORTING => "<", - SlotState.MIGRATING => ">", - SlotState.OFFLINE => "-", - SlotState.FAIL => "-", - _ => throw new Exception($"Invalid SlotState filetype {state}"), - }; - while (!RespWriteUtils.WriteAsciiDirect($"+{slot} {stateStr} {nodeId}\r\n", ref dcurr, dend)) - SendAndReset(); - } - } - else { return false; } - return true; - } - - public bool ProcessClusterMigrationCommands(ReadOnlySpan bufSpan, ReadOnlySpan param, int count, out bool errorFlag, out string errorCmd) - { - errorFlag = false; - errorCmd = string.Empty; - - if (param.SequenceEqual(CmdStrings.MIGRATE)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } - - // CLUSTER MIGRATE - if (count != 3) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out var sourceNodeId, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadStringWithLengthHeader(out var _replace, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadStringWithLengthHeader(out var storeType, ref ptr, recvBufferPtr + bytesRead)) - return false; - - var replaceOption = _replace.Equals("T"); - - // Check if payload size has been received - if (ptr + 4 > recvBufferPtr + bytesRead) - return false; - - var headerLength = *(int*)ptr; - ptr += 4; - // Check if payload has been received - if (ptr + headerLength > recvBufferPtr + bytesRead) - return false; - - var currentConfig = clusterProvider.clusterManager.CurrentConfig; - - if (storeType.Equals("SSTORE")) - { - var keyCount = *(int*)ptr; - ptr += 4; - var i = 0; - - while (i < keyCount) - { - - byte* keyPtr = null, valPtr = null; - byte keyMetaDataSize = 0, valMetaDataSize = 0; - if (!RespReadUtils.ReadSerializedSpanByte(ref keyPtr, ref keyMetaDataSize, ref valPtr, ref valMetaDataSize, ref ptr, recvBufferPtr + bytesRead)) - return false; - - ref var key = ref SpanByte.Reinterpret(keyPtr); - if (keyMetaDataSize > 0) key.ExtraMetadata = *(long*)(keyPtr + 4); - ref var value = ref SpanByte.Reinterpret(valPtr); - if (valMetaDataSize > 0) value.ExtraMetadata = *(long*)(valPtr + 4); - - // An error has occurred - if (migrateState > 0) - { - i++; - continue; - } - - var slot = NumUtils.HashSlot(key.ToPointer(), key.LengthWithoutMetadata); - if (!currentConfig.IsImportingSlot(slot))//Slot is not in importing state - { - migrateState = 1; - i++; - continue; - } - - if (i < migrateSetCount) - continue; - - migrateSetCount++; - - // Set if key replace flag is set or key does not exist - if (replaceOption || !CheckIfKeyExists(new ArgSlice(key.ToPointer(), key.Length))) - _ = basicGarnetApi.SET(ref key, ref value); - i++; - } - } - else if (storeType.Equals("OSTORE")) - { - var keyCount = *(int*)ptr; - ptr += 4; - var i = 0; - while (i < keyCount) - { - if (!RespReadUtils.ReadSerializedData(out var key, out var data, out var expiration, ref ptr, recvBufferPtr + bytesRead)) - return false; - - // An error has occurred - if (migrateState > 0) - continue; - - var slot = NumUtils.HashSlot(key); - if (!currentConfig.IsImportingSlot(slot))//Slot is not in importing state - { - migrateState = 1; - continue; - } - - if (i < migrateSetCount) - continue; - - migrateSetCount++; - - var value = clusterProvider.storeWrapper.GarnetObjectSerializer.Deserialize(data); - value.Expiration = expiration; - - // Set if key replace flag is set or key does not exist - if (replaceOption || !CheckIfKeyExists(key)) - _ = basicGarnetApi.SET(key, value); - - i++; - } - } - else - { - throw new Exception("CLUSTER MIGRATE STORE TYPE ERROR!"); - } - - if (migrateState == 1) - { - logger?.LogError("{errorMsg}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE)); - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + while (!RespWriteUtils.WriteError($"ERR Unknown subcommand or wrong number of arguments for '{Encoding.ASCII.GetString(subcommand)}'. Try CLUSTER HELP.", ref dcurr, dend)) SendAndReset(); - } - - migrateSetCount = 0; - migrateState = 0; - readHead = (int)(ptr - recvBufferPtr); - } - } - else if (param.SequenceEqual(CmdStrings.MTASKS)) - { - if (count != 0) - { - if (!DrainCommands(bufSpan, count)) - return false; - errorFlag = true; - errorCmd = Encoding.ASCII.GetString(param); - } - else - { - var mtasks = clusterProvider.migrationManager.GetMigrationTaskCount(); - while (!RespWriteUtils.WriteInteger(mtasks, ref dcurr, dend)) - SendAndReset(); - var ptr = recvBufferPtr + readHead; - readHead = (int)(ptr - recvBufferPtr); - } - } - else { return false; } - return true; - } - - private bool ProcessClusterReplicationCommands(ReadOnlySpan bufSpan, ReadOnlySpan param, int count, out bool errorFlag, out string errorCmd) - { - errorFlag = false; - errorCmd = string.Empty; - if (param.SequenceEqual(CmdStrings.REPLICAS) || param.SequenceEqual(CmdStrings.replicas)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; - } + break; - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - var replicas = clusterProvider.clusterManager.ListReplicas(nodeid); - - while (!RespWriteUtils.WriteArrayLength(replicas.Count, ref dcurr, dend)) - SendAndReset(); - foreach (var replica in replicas) - { - while (!RespWriteUtils.WriteAsciiBulkString(replica, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.REPLICATE) || param.SequenceEqual(CmdStrings.replicate)) - { - if (!CheckACLAdminPermissions(bufSpan, count, out var success)) - { - return success; } - var ptr = recvBufferPtr + readHead; - var background = false; - if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (count == 2) + if (invalidParameters) { - if (!RespReadUtils.ReadStringWithLengthHeader(out var backgroundFlag, ref ptr, recvBufferPtr + bytesRead)) + if (!DrainCommands(bufSpan, count - 1)) return false; - - if (backgroundFlag.Equals("SYNC", StringComparison.OrdinalIgnoreCase)) - background = false; - else if (backgroundFlag.Equals("ASYNC", StringComparison.OrdinalIgnoreCase)) - background = true; - else - { - while (!RespWriteUtils.WriteError($"ERR Invalid CLUSTER REPLICATE FLAG ({backgroundFlag}) not valid", ref dcurr, dend)) - SendAndReset(); - readHead = (int)(ptr - recvBufferPtr); - return true; - } - } - readHead = (int)(ptr - recvBufferPtr); - - if (!clusterProvider.serverOptions.EnableAOF) - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF, ref dcurr, dend)) - SendAndReset(); - } - else - { - if (!clusterProvider.replicationManager.TryBeginReplicate(this, nodeid, background, false, out var errorMessage)) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - } - else if (param.SequenceEqual(CmdStrings.aofsync)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadLongWithLengthHeader(out long nextAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - if (clusterProvider.serverOptions.EnableAOF) - { - clusterProvider.replicationManager.TryAddReplicationTask(nodeid, nextAddress, out var aofSyncTaskInfo); - if (!clusterProvider.replicationManager.TryConnectToReplica(nodeid, nextAddress, aofSyncTaskInfo, out var errorMessage)) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) - SendAndReset(); - } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - else - { - while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.appendlog)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out string nodeId, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadLongWithLengthHeader(out long previousAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadLongWithLengthHeader(out long currentAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - - if (!RespReadUtils.ReadLongWithLengthHeader(out long nextAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - - byte* record = null; - var recordLength = 0; - if (!RespReadUtils.ReadPtrWithLengthHeader(ref record, ref recordLength, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var currentConfig = clusterProvider.clusterManager.CurrentConfig; - var localRole = currentConfig.LocalNodeRole; - var primaryId = currentConfig.LocalNodePrimaryId; - if (localRole != NodeRole.REPLICA) - { - // TODO: handle this - //while (!RespWriteUtils.WriteError("ERR aofsync node not a replica"u8, ref dcurr, dend)) - // SendAndReset(); - } - else if (!primaryId.Equals(nodeId)) - { - // TODO: handle this - //while (!RespWriteUtils.WriteError($"ERR aofsync node replicating {primaryId}", ref dcurr, dend)) - // SendAndReset(); - } - else - { - clusterProvider.replicationManager.ProcessPrimaryStream(record, recordLength, previousAddress, currentAddress, nextAddress); - //while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - // SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.initiate_replica_sync)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeId, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadStringWithLengthHeader(out var primary_replid, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var cEntryByteArray, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadLongWithLengthHeader(out var replicaAofBeginAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadLongWithLengthHeader(out var replicaAofTailAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var remoteEntry = CheckpointEntry.FromByteArray(cEntryByteArray); - - if (!clusterProvider.replicationManager.TryBeginReplicaSyncSession( - nodeId, primary_replid, remoteEntry, replicaAofBeginAddress, replicaAofTailAddress, out var errorMessage)) - { - while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + var errorMsg = string.Format(CmdStrings.GenericErrMissingParam, subcmd.ToString()); + while (!RespWriteUtils.WriteError(errorMsg, ref dcurr, dend)) SendAndReset(); } - else - { - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - } - else if (param.SequenceEqual(CmdStrings.send_ckpt_metadata)) - { - var ptr = recvBufferPtr + readHead; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var fileTokenBytes, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadIntWithLengthHeader(out var fileTypeInt, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var checkpointMetadata, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var fileToken = new Guid(fileTokenBytes); - var fileType = (CheckpointFileType)fileTypeInt; - clusterProvider.replicationManager.ProcessCheckpointMetadata(fileToken, fileType, checkpointMetadata); - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.send_ckpt_file_segment)) - { - var ptr = recvBufferPtr + readHead; - Span data = default; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var fileTokenBytes, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadIntWithLengthHeader(out var ckptFileTypeInt, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadLongWithLengthHeader(out var startAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadSpanByteWithLengthHeader(ref data, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadIntWithLengthHeader(out var segmentId, ref ptr, recvBufferPtr + bytesRead)) - return false; - - readHead = (int)(ptr - recvBufferPtr); - var fileToken = new Guid(fileTokenBytes); - var ckptFileType = (CheckpointFileType)ckptFileTypeInt; - - // Commenting due to high verbosity - // logger?.LogTrace("send_ckpt_file_segment {fileToken} {ckptFileType} {startAddress} {dataLength}", fileToken, ckptFileType, startAddress, data.Length); - clusterProvider.replicationManager.recvCheckpointHandler.ProcessFileSegments(segmentId, fileToken, ckptFileType, startAddress, data); - while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) - SendAndReset(); - } - else if (param.SequenceEqual(CmdStrings.begin_replica_recover)) - { - var ptr = recvBufferPtr + readHead; - - if (!RespReadUtils.ReadBoolWithLengthHeader(out var recoverMainStoreFromToken, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadBoolWithLengthHeader(out var recoverObjectStoreFromToken, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadBoolWithLengthHeader(out var replayAOF, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadStringWithLengthHeader(out var primary_replid, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var cEntryByteArray, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadLongWithLengthHeader(out var beginAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - if (!RespReadUtils.ReadLongWithLengthHeader(out var tailAddress, ref ptr, recvBufferPtr + bytesRead)) - return false; - readHead = (int)(ptr - recvBufferPtr); - - var entry = CheckpointEntry.FromByteArray(cEntryByteArray); - var replicationOffset = clusterProvider.replicationManager.BeginReplicaRecover( - recoverMainStoreFromToken, - recoverObjectStoreFromToken, - replayAOF, - primary_replid, - entry, - beginAddress, - tailAddress); - while (!RespWriteUtils.WriteInteger(replicationOffset, ref dcurr, dend)) - SendAndReset(); } - else - return false; - return true; + return parseSuccess; } } } \ No newline at end of file diff --git a/libs/cluster/Session/ClusterSession.cs b/libs/cluster/Session/ClusterSession.cs index f66cfd7a79..f3933afada 100644 --- a/libs/cluster/Session/ClusterSession.cs +++ b/libs/cluster/Session/ClusterSession.cs @@ -159,16 +159,6 @@ void Send(byte* d) } } - bool DrainCommands(ReadOnlySpan bufSpan, int count) - { - for (int i = 0; i < count; i++) - { - GetCommand(bufSpan, out bool success1); - if (!success1) return false; - } - return true; - } - /// /// Updates the user currently authenticated in the session. /// @@ -225,7 +215,17 @@ bool CheckACLAdminPermissions() return true; } - ReadOnlySpan GetCommand(ReadOnlySpan bufSpan, out bool success) + bool DrainCommands(ReadOnlySpan bufSpan, int count) + { + for (int i = 0; i < count; i++) + { + _ = GetNextToken(bufSpan, out bool success1); + if (!success1) return false; + } + return true; + } + + Span GetNextToken(ReadOnlySpan bufSpan, out bool success) { success = false; @@ -253,7 +253,7 @@ ReadOnlySpan GetCommand(ReadOnlySpan bufSpan, out bool success) } success = true; - var result = bufSpan.Slice(readHead, length); + var result = new Span(recvBufferPtr + readHead, length); readHead += length + 2; return result; diff --git a/libs/cluster/Session/RespClusterBasicCommands.cs b/libs/cluster/Session/RespClusterBasicCommands.cs new file mode 100644 index 0000000000..713cf8d3b4 --- /dev/null +++ b/libs/cluster/Session/RespClusterBasicCommands.cs @@ -0,0 +1,502 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Diagnostics; +using System.Linq; +using System.Text; +using Garnet.common; +using Garnet.server; +using Microsoft.Extensions.Logging; + +namespace Garnet.cluster +{ + internal sealed unsafe partial class ClusterSession : IClusterSession + { + /// + /// Implements CLUSTER BUMPEPOCH command + /// + /// + /// + /// + private bool NetworkClusterBumpEpoch(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + // Check admin permissions for command + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + // Process BUMPEPOCH + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + if (clusterProvider.clusterManager.TryBumpClusterEpoch()) + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CONFIG_UPDATE, ref dcurr, dend)) + SendAndReset(); + } + return true; + } + + /// + /// Implements CLUSTER FORGET command + /// + /// + /// + /// + /// + private bool NetworkClusterForget(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting 1 or 2 arguments + if (count is < 1 or > 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + + // Parse Node-Id + if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + + var expirySeconds = 60; + if (count == 2) + { + // [Optional] Parse expiry in seconds + if (!RespReadUtils.ReadIntWithLengthHeader(out expirySeconds, ref ptr, recvBufferPtr + bytesRead)) + return false; + } + readHead = (int)(ptr - recvBufferPtr); + + logger?.LogTrace("CLUSTER FORGET {nodeid} {seconds}", nodeid, expirySeconds); + if (!clusterProvider.clusterManager.TryRemoveWorker(nodeid, expirySeconds, out var errorMessage)) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER INFO command + /// + /// + /// + /// + private bool NetworkClusterInfo(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + var clusterInfo = clusterProvider.clusterManager.GetInfo(); + while (!RespWriteUtils.WriteAsciiBulkString(clusterInfo, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER HELP command + /// + /// + /// + /// + private bool NetworkClusterHelp(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + var clusterCommands = ClusterCommandInfo.GetClusterCommands(); + while (!RespWriteUtils.WriteArrayLength(clusterCommands.Count, ref dcurr, dend)) + SendAndReset(); + foreach (var command in clusterCommands) + { + while (!RespWriteUtils.WriteSimpleString(command, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER MEET command + /// + /// + /// + /// + /// + private bool NetworkClusterMeet(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 2 arguments + if (count != 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var ipaddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadIntWithLengthHeader(out var port, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var ipaddressStr = Encoding.ASCII.GetString(ipaddress); + logger?.LogTrace("CLUSTER MEET {ipaddressStr} {port}", ipaddressStr, port); + clusterProvider.clusterManager.RunMeetTask(ipaddressStr, port); + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER MYID command + /// + /// + /// + /// + private bool NetworkClusterMyid(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + while (!RespWriteUtils.WriteAsciiBulkString(clusterProvider.clusterManager.CurrentConfig.LocalNodeId, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER MYPARENTID command + /// + /// + /// + /// + private bool NetworkClusterMyParentId(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + + var current = clusterProvider.clusterManager.CurrentConfig; + var parentId = current.LocalNodeRole == NodeRole.PRIMARY ? current.LocalNodeId : current.LocalNodePrimaryId; + while (!RespWriteUtils.WriteAsciiBulkString(parentId, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER ENDPOINT command + /// + /// + /// + /// + private bool NetworkClusterEndpoint(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 1 arguments + if (count != 1) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + var current = clusterProvider.clusterManager.CurrentConfig; + var (host, port) = current.GetEndpointFromNodeId(nodeid); + while (!RespWriteUtils.WriteAsciiBulkString($"{host}:{port}", ref dcurr, dend)) + SendAndReset(); + return true; + } + + /// + /// Implements CLUSTER NODES command + /// + /// + /// + /// + private bool NetworkClusterNodes(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + var nodes = clusterProvider.clusterManager.CurrentConfig.GetClusterInfo(); + while (!RespWriteUtils.WriteAsciiBulkString(nodes, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER SET-CONFIG-EPOCH command + /// + /// + /// + /// + /// + private bool NetworkClusterSetConfigEpoch(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 1 arguments + if (count != 1) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadIntWithLengthHeader(out var configEpoch, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + if (clusterProvider.clusterManager.CurrentConfig.NumWorkers > 2) + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_CONFIG_EPOCH_ASSIGNMENT, ref dcurr, dend)) + SendAndReset(); + } + else + { + if (!clusterProvider.clusterManager.TrySetLocalConfigEpoch(configEpoch, out var errorMessage)) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + } + + return true; + } + + /// + /// Implements CLUSTER SHARDS command + /// + /// + /// + /// + private bool NetworkClusterShards(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + var shardsInfo = clusterProvider.clusterManager.CurrentConfig.GetShardsInfo(); + while (!RespWriteUtils.WriteAsciiDirect(shardsInfo, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER GOSSIP command + /// + /// + /// + /// + private bool NetworkClusterGossip(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting 1 or 2 arguments + if (count is < 1 or > 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + var gossipWithMeet = false; + if (count > 1) + { + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var withMeet, ref ptr, recvBufferPtr + bytesRead)) + return false; + Debug.Assert(withMeet.SequenceEqual(CmdStrings.WITHMEET.ToArray())); + if (withMeet.SequenceEqual(CmdStrings.WITHMEET.ToArray())) + gossipWithMeet = true; + } + + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var gossipMessage, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + clusterProvider.clusterManager.gossipStats.UpdateGossipBytesRecv(gossipMessage.Length); + var current = clusterProvider.clusterManager.CurrentConfig; + + // Try merge if not just a ping message + if (gossipMessage.Length > 0) + { + var other = ClusterConfig.FromByteArray(gossipMessage); + // Accept gossip message if it is a gossipWithMeet or node from node that is already known and trusted + // GossipWithMeet messages are only send through a call to CLUSTER MEET at the remote node + if (gossipWithMeet || current.IsKnown(other.LocalNodeId)) + { + _ = clusterProvider.clusterManager.TryMerge(other); + } + else + logger?.LogWarning("Received gossip from unknown node: {node-id}", other.LocalNodeId); + } + + // Respond if configuration has changed or gossipWithMeet option is specified + if (lastSentConfig != current || gossipWithMeet) + { + var configByteArray = current.ToByteArray(); + clusterProvider.clusterManager.gossipStats.UpdateGossipBytesSend(configByteArray.Length); + while (!RespWriteUtils.WriteBulkString(configByteArray, ref dcurr, dend)) + SendAndReset(); + lastSentConfig = current; + } + else + { + while (!RespWriteUtils.WriteBulkString(Array.Empty(), ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER RESET command + /// + /// + /// + /// + /// + private bool NetworkClusterReset(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting 0, 1 or 2 arguments + if (count > 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + var soft = true; + var expirySeconds = 60; + + if (count > 0) + { + if (!RespReadUtils.ReadStringWithLengthHeader(out var option, ref ptr, recvBufferPtr + bytesRead)) + return false; + soft = option.Equals("SOFT", StringComparison.OrdinalIgnoreCase); + } + + if (count > 1) + { + if (!RespReadUtils.ReadIntWithLengthHeader(out expirySeconds, ref ptr, recvBufferPtr + bytesRead)) + return false; + } + readHead = (int)(ptr - recvBufferPtr); + + var resp = clusterProvider.clusterManager.TryReset(soft, expirySeconds); + if (!soft) clusterProvider.FlushDB(true); + + while (!RespWriteUtils.WriteDirect(resp, ref dcurr, dend)) + SendAndReset(); + + return true; + } + } +} \ No newline at end of file diff --git a/libs/cluster/Session/RespClusterFailoverCommands.cs b/libs/cluster/Session/RespClusterFailoverCommands.cs new file mode 100644 index 0000000000..f333e685a2 --- /dev/null +++ b/libs/cluster/Session/RespClusterFailoverCommands.cs @@ -0,0 +1,200 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Text; +using Garnet.common; +using Garnet.server; + +namespace Garnet.cluster +{ + internal sealed unsafe partial class ClusterSession : IClusterSession + { + /// + /// Implements CLUSTER FAILOVER command + /// + /// + /// + /// + /// + private bool NetworkClusterFailover(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting 1 or 2 arguments + if (count is < 0 or > 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + var failoverOption = FailoverOption.DEFAULT; + TimeSpan failoverTimeout = default; + if (count > 0) + { + if (!RespReadUtils.ReadStringWithLengthHeader(out var failoverOptionStr, ref ptr, recvBufferPtr + bytesRead)) + return false; + + // Try to parse failover option + if (!Enum.TryParse(failoverOptionStr, ignoreCase: true, out failoverOption)) + { + // On failure set the invalid flag, write error and continue parsing to drain rest of parameters if any + while (!RespWriteUtils.WriteError($"ERR Failover option ({failoverOptionStr}) not supported", ref dcurr, dend)) + SendAndReset(); + failoverOption = FailoverOption.INVALID; + } + + if (count > 1) + { + if (!RespReadUtils.ReadIntWithLengthHeader(out var failoverTimeoutSeconds, ref ptr, recvBufferPtr + bytesRead)) + return false; + failoverTimeout = TimeSpan.FromSeconds(failoverTimeoutSeconds); + } + } + readHead = (int)(ptr - recvBufferPtr); + + // If option provided is invalid return early + if (failoverOption == FailoverOption.INVALID) + return true; + + if (clusterProvider.serverOptions.EnableAOF) + { + if (failoverOption == FailoverOption.ABORT) + { + clusterProvider.failoverManager.TryAbortReplicaFailover(); + } + else + { + var current = clusterProvider.clusterManager.CurrentConfig; + var nodeRole = current.LocalNodeRole; + if (nodeRole == NodeRole.REPLICA) + { + if (!clusterProvider.failoverManager.TryStartReplicaFailover(failoverOption, failoverTimeout)) + { + while (!RespWriteUtils.WriteError($"ERR failed to start failover for primary({current.GetLocalNodePrimaryAddress()})", ref dcurr, dend)) + SendAndReset(); + return true; + } + } + else + { + while (!RespWriteUtils.WriteError($"ERR Node is not a {NodeRole.REPLICA} ~{nodeRole}~", ref dcurr, dend)) + SendAndReset(); + return true; + } + } + } + else + { + // Return error if AOF is not enabled + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF, ref dcurr, dend)) + SendAndReset(); + return true; + } + + // Finally return +OK if operation completed without any errors + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER failauthreq command (only for internode use) + /// + /// + private bool NetworkClusterFailAuthReq(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 3 arguments + if (count != 3) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var nodeIdBytes, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var requestEpochBytes, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var claimedSlots, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var resp = clusterProvider.clusterManager.AuthorizeFailover( + Encoding.ASCII.GetString(nodeIdBytes), + BitConverter.ToInt64(requestEpochBytes), + claimedSlots) ? CmdStrings.RESP_RETURN_VAL_1 : CmdStrings.RESP_RETURN_VAL_0; + while (!RespWriteUtils.WriteDirect(resp, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER failstopwrites (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterFailStopWrites(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 1 argument + if (count != 1) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var nodeIdBytes, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + clusterProvider.clusterManager.TryStopWrites(Encoding.ASCII.GetString(nodeIdBytes)); + UnsafeWaitForConfigTransition(); + while (!RespWriteUtils.WriteInteger(clusterProvider.replicationManager.ReplicationOffset, ref dcurr, dend)) + SendAndReset(); + return true; + } + + /// + /// Implements CLUSTER failreplicationoffset (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterFailReplicationOffset(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expects exactly 1 argument + if (count != 1) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadLongWithLengthHeader(out var primaryReplicationOffset, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var rOffset = clusterProvider.replicationManager.WaitForReplicationOffset(primaryReplicationOffset).GetAwaiter().GetResult(); + while (!RespWriteUtils.WriteInteger(rOffset, ref dcurr, dend)) + SendAndReset(); + + return true; + } + } +} \ No newline at end of file diff --git a/libs/cluster/Session/RespClusterMigrateCommands.cs b/libs/cluster/Session/RespClusterMigrateCommands.cs new file mode 100644 index 0000000000..3cd370c5f3 --- /dev/null +++ b/libs/cluster/Session/RespClusterMigrateCommands.cs @@ -0,0 +1,192 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Text; +using Garnet.common; +using Garnet.server; +using Microsoft.Extensions.Logging; +using Tsavorite.core; + +namespace Garnet.cluster +{ + internal sealed unsafe partial class ClusterSession : IClusterSession + { + /// + /// Implements CLUSTER MIGRATE command (only for internode use) + /// + /// + /// + /// + /// + /// + private bool NetworkClusterMigrate(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 3 arguments + if (count != 3) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadStringWithLengthHeader(out var sourceNodeId, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadStringWithLengthHeader(out var _replace, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadStringWithLengthHeader(out var storeType, ref ptr, recvBufferPtr + bytesRead)) + return false; + + var replaceOption = _replace.Equals("T"); + + // Check if payload size has been received + if (ptr + 4 > recvBufferPtr + bytesRead) + return false; + + var headerLength = *(int*)ptr; + ptr += 4; + // Check if payload has been received + if (ptr + headerLength > recvBufferPtr + bytesRead) + return false; + + var currentConfig = clusterProvider.clusterManager.CurrentConfig; + + if (storeType.Equals("SSTORE")) + { + var keyCount = *(int*)ptr; + ptr += 4; + var i = 0; + + while (i < keyCount) + { + + byte* keyPtr = null, valPtr = null; + byte keyMetaDataSize = 0, valMetaDataSize = 0; + if (!RespReadUtils.ReadSerializedSpanByte(ref keyPtr, ref keyMetaDataSize, ref valPtr, ref valMetaDataSize, ref ptr, recvBufferPtr + bytesRead)) + return false; + + ref var key = ref SpanByte.Reinterpret(keyPtr); + if (keyMetaDataSize > 0) key.ExtraMetadata = *(long*)(keyPtr + 4); + ref var value = ref SpanByte.Reinterpret(valPtr); + if (valMetaDataSize > 0) value.ExtraMetadata = *(long*)(valPtr + 4); + + // An error has occurred + if (migrateState > 0) + { + i++; + continue; + } + + var slot = NumUtils.HashSlot(key.ToPointer(), key.LengthWithoutMetadata); + if (!currentConfig.IsImportingSlot(slot))//Slot is not in importing state + { + migrateState = 1; + i++; + continue; + } + + if (i < migrateSetCount) + continue; + + migrateSetCount++; + + // Set if key replace flag is set or key does not exist + if (replaceOption || !CheckIfKeyExists(new ArgSlice(key.ToPointer(), key.Length))) + _ = basicGarnetApi.SET(ref key, ref value); + i++; + } + } + else if (storeType.Equals("OSTORE")) + { + var keyCount = *(int*)ptr; + ptr += 4; + var i = 0; + while (i < keyCount) + { + if (!RespReadUtils.ReadSerializedData(out var key, out var data, out var expiration, ref ptr, recvBufferPtr + bytesRead)) + return false; + + // An error has occurred + if (migrateState > 0) + continue; + + var slot = NumUtils.HashSlot(key); + if (!currentConfig.IsImportingSlot(slot))//Slot is not in importing state + { + migrateState = 1; + continue; + } + + if (i < migrateSetCount) + continue; + + migrateSetCount++; + + var value = clusterProvider.storeWrapper.GarnetObjectSerializer.Deserialize(data); + value.Expiration = expiration; + + // Set if key replace flag is set or key does not exist + if (replaceOption || !CheckIfKeyExists(key)) + _ = basicGarnetApi.SET(key, value); + + i++; + } + } + else + { + throw new Exception("CLUSTER MIGRATE STORE TYPE ERROR!"); + } + + if (migrateState == 1) + { + logger?.LogError("{errorMsg}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE)); + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + migrateSetCount = 0; + migrateState = 0; + readHead = (int)(ptr - recvBufferPtr); + return true; + } + + /// + /// Implements CLUSTER MTASKS command + /// + /// + /// + /// + private bool NetworkClusterMTasks(int count, out bool invalidParameters) + { + invalidParameters = false; + + if (count != 0) + { + invalidParameters = true; + return true; + } + + var mtasks = clusterProvider.migrationManager.GetMigrationTaskCount(); + while (!RespWriteUtils.WriteInteger(mtasks, ref dcurr, dend)) + SendAndReset(); + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + + return true; + } + } +} \ No newline at end of file diff --git a/libs/cluster/Session/RespClusterReplicationCommands.cs b/libs/cluster/Session/RespClusterReplicationCommands.cs new file mode 100644 index 0000000000..e4576e5b54 --- /dev/null +++ b/libs/cluster/Session/RespClusterReplicationCommands.cs @@ -0,0 +1,402 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using Garnet.common; +using Garnet.server; + +namespace Garnet.cluster +{ + internal sealed unsafe partial class ClusterSession : IClusterSession + { + /// + /// Implements CLUSTER REPLICAS command + /// + /// + /// + /// + /// + private bool NetworkClusterReplicas(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + var replicas = clusterProvider.clusterManager.ListReplicas(nodeid); + + while (!RespWriteUtils.WriteArrayLength(replicas.Count, ref dcurr, dend)) + SendAndReset(); + foreach (var replica in replicas) + { + while (!RespWriteUtils.WriteAsciiBulkString(replica, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER REPLICATE command + /// + /// + /// + /// + /// + private bool NetworkClusterReplicate(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting 1 or 2 arguments + if (count is < 1 or > 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + var background = false; + if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (count > 1) + { + if (!RespReadUtils.ReadStringWithLengthHeader(out var backgroundFlag, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (backgroundFlag.Equals("SYNC", StringComparison.OrdinalIgnoreCase)) + background = false; + else if (backgroundFlag.Equals("ASYNC", StringComparison.OrdinalIgnoreCase)) + background = true; + else + { + while (!RespWriteUtils.WriteError($"ERR Invalid CLUSTER REPLICATE FLAG ({backgroundFlag}) not valid", ref dcurr, dend)) + SendAndReset(); + readHead = (int)(ptr - recvBufferPtr); + return true; + } + } + readHead = (int)(ptr - recvBufferPtr); + + if (!clusterProvider.serverOptions.EnableAOF) + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF, ref dcurr, dend)) + SendAndReset(); + } + else + { + if (!clusterProvider.replicationManager.TryBeginReplicate(this, nodeid, background, false, out var errorMessage)) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + } + + return true; + } + + /// + /// Implements CLUSTER aofsync command (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterAOFSync(int count, out bool invalidParameters) + { + invalidParameters = false; + + if (count != 2) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadLongWithLengthHeader(out long nextAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + if (clusterProvider.serverOptions.EnableAOF) + { + clusterProvider.replicationManager.TryAddReplicationTask(nodeid, nextAddress, out var aofSyncTaskInfo); + if (!clusterProvider.replicationManager.TryConnectToReplica(nodeid, nextAddress, aofSyncTaskInfo, out var errorMessage)) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + } + else + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_REPLICATION_AOF_TURNEDOFF, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER appendlog command (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterAppendLog(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 5 arguments (5-th argument is AOF page parsed later) + if (count != 5) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadStringWithLengthHeader(out string nodeId, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadLongWithLengthHeader(out long previousAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadLongWithLengthHeader(out long currentAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadLongWithLengthHeader(out long nextAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + + byte* record = null; + var recordLength = 0; + if (!RespReadUtils.ReadPtrWithLengthHeader(ref record, ref recordLength, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var currentConfig = clusterProvider.clusterManager.CurrentConfig; + var localRole = currentConfig.LocalNodeRole; + var primaryId = currentConfig.LocalNodePrimaryId; + if (localRole != NodeRole.REPLICA) + { + // TODO: handle this + //while (!RespWriteUtils.WriteError("ERR aofsync node not a replica"u8, ref dcurr, dend)) + // SendAndReset(); + } + else if (!primaryId.Equals(nodeId)) + { + // TODO: handle this + //while (!RespWriteUtils.WriteError($"ERR aofsync node replicating {primaryId}", ref dcurr, dend)) + // SendAndReset(); + } + else + { + clusterProvider.replicationManager.ProcessPrimaryStream(record, recordLength, previousAddress, currentAddress, nextAddress); + //while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + // SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER initiate_replica_sync command (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterInitiateReplicaSync(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 5 arguments + if (count != 5) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeId, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadStringWithLengthHeader(out var primary_replid, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var cEntryByteArray, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadLongWithLengthHeader(out var replicaAofBeginAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadLongWithLengthHeader(out var replicaAofTailAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var remoteEntry = CheckpointEntry.FromByteArray(cEntryByteArray); + + if (!clusterProvider.replicationManager.TryBeginReplicaSyncSession( + nodeId, primary_replid, remoteEntry, replicaAofBeginAddress, replicaAofTailAddress, out var errorMessage)) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implement CLUSTER send_ckpt_metadata command (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterSendCheckpointMetadata(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 3 arguments + if (count != 3) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var fileTokenBytes, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadIntWithLengthHeader(out var fileTypeInt, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var checkpointMetadata, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var fileToken = new Guid(fileTokenBytes); + var fileType = (CheckpointFileType)fileTypeInt; + clusterProvider.replicationManager.ProcessCheckpointMetadata(fileToken, fileType, checkpointMetadata); + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER send_ckpt_file_segment command (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterSendCheckpointFileSegment(int count, out bool invalidParameters) + { + invalidParameters = false; + + if (count != 5) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + Span data = default; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var fileTokenBytes, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadIntWithLengthHeader(out var ckptFileTypeInt, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadLongWithLengthHeader(out var startAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadSpanByteWithLengthHeader(ref data, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadIntWithLengthHeader(out var segmentId, ref ptr, recvBufferPtr + bytesRead)) + return false; + + readHead = (int)(ptr - recvBufferPtr); + var fileToken = new Guid(fileTokenBytes); + var ckptFileType = (CheckpointFileType)ckptFileTypeInt; + + // Commenting due to high verbosity + // logger?.LogTrace("send_ckpt_file_segment {fileToken} {ckptFileType} {startAddress} {dataLength}", fileToken, ckptFileType, startAddress, data.Length); + clusterProvider.replicationManager.recvCheckpointHandler.ProcessFileSegments(segmentId, fileToken, ckptFileType, startAddress, data); + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER begin_replica_recover (only for internode use) + /// + /// + /// + /// + private bool NetworkClusterBeginReplicaRecover(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 7 arguments + if (count != 7) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadBoolWithLengthHeader(out var recoverMainStoreFromToken, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadBoolWithLengthHeader(out var recoverObjectStoreFromToken, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadBoolWithLengthHeader(out var replayAOF, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadStringWithLengthHeader(out var primary_replid, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var cEntryByteArray, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadLongWithLengthHeader(out var beginAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + if (!RespReadUtils.ReadLongWithLengthHeader(out var tailAddress, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var entry = CheckpointEntry.FromByteArray(cEntryByteArray); + var replicationOffset = clusterProvider.replicationManager.BeginReplicaRecover( + recoverMainStoreFromToken, + recoverObjectStoreFromToken, + replayAOF, + primary_replid, + entry, + beginAddress, + tailAddress); + while (!RespWriteUtils.WriteInteger(replicationOffset, ref dcurr, dend)) + SendAndReset(); + + return true; + } + } +} \ No newline at end of file diff --git a/libs/cluster/Session/RespClusterSlotManagementCommands.cs b/libs/cluster/Session/RespClusterSlotManagementCommands.cs new file mode 100644 index 0000000000..06bcc3a4cf --- /dev/null +++ b/libs/cluster/Session/RespClusterSlotManagementCommands.cs @@ -0,0 +1,747 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Text; +using Garnet.common; +using Garnet.server; +using Microsoft.Extensions.Logging; + +namespace Garnet.cluster +{ + internal sealed unsafe partial class ClusterSession : IClusterSession + { + /// + /// Implements CLUSTER ADDSLOTS command + /// + /// + /// + /// + /// + private bool NetworkClusterAddSlots(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting at least 1 slot or at most maximum number of slots + if (count < 1 || count >= ClusterConfig.MAX_HASH_SLOT_VALUE) + { + invalidParameters = true; + return false; + } + + var ptr = recvBufferPtr + readHead; + // Try to parse slot ranges. + var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: false); + readHead = (int)(ptr - recvBufferPtr); + + // The slot parsing may give errorMessage even if the methods TryParseSlots true. + if (slotsParsed && errorMessage != default) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + return true; + } + else if (!slotsParsed) return false; + + // Try to to add slots + if (!clusterProvider.clusterManager.TryAddSlots(slots, out var slotIndex) && slotIndex != -1) + { + while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is already busy", ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER ADDSLOTSRANGE command + /// + /// + /// + /// + /// + private bool NetworkClusterAddSlotsRange(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting even number of arguments + if (count == 0 || (count & 0x1) != 0) + { + invalidParameters = true; + return false; + } + + var ptr = recvBufferPtr + readHead; + // Try to parse slot ranges. + var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: true); + readHead = (int)(ptr - recvBufferPtr); + + //The slot parsing may give errorMessage even if the TryParseSlots returns true. + if (slotsParsed && errorMessage != default) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + return true; + } + else if (!slotsParsed) return false; + + // Try to to add slots + if (!clusterProvider.clusterManager.TryAddSlots(slots, out var slotIndex) && slotIndex != -1) + { + while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is already busy", ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER BANLIST command + /// + /// + /// + /// + private bool NetworkClusterBanList(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + var banlist = clusterProvider.clusterManager.GetBanList(); + + while (!RespWriteUtils.WriteArrayLength(banlist.Count, ref dcurr, dend)) + SendAndReset(); + foreach (var replica in banlist) + { + while (!RespWriteUtils.WriteAsciiBulkString(replica, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER COUNTKEYSINSLOT command + /// + /// + /// + /// + /// + private bool NetworkClusterCountKeysInSlot(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 1 argument + if (count != 1) + { + invalidParameters = true; + return true; + } + + var current = clusterProvider.clusterManager.CurrentConfig; + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + if (ClusterConfig.OutOfRange(slot)) + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE, ref dcurr, dend)) + SendAndReset(); + } + else if (!current.IsLocal((ushort)slot)) + { + Redirect((ushort)slot, current); + } + else + { + try + { + var keyCount = CountKeysInSlot(slot); + while (!RespWriteUtils.WriteInteger(keyCount, ref dcurr, dend)) + SendAndReset(); + } + catch (Exception ex) + { + logger?.LogError(ex, "Critical error in count keys"); + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_N1, ref dcurr, dend)) + SendAndReset(); + } + } + + return true; + } + + /// + /// Implements CLUSTER DELSLOTS command + /// + /// + /// + /// + /// + private bool NetworkClusterDelSlots(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting at least 1 slot or at most maximum number of slots + if (count < 1 || count >= ClusterConfig.MAX_HASH_SLOT_VALUE) + { + invalidParameters = true; + return false; + } + + var ptr = recvBufferPtr + readHead; + //Try to parse slot ranges. + var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: false); + readHead = (int)(ptr - recvBufferPtr); + + //The slot parsing may give errorMessage even if the TryParseSlots returns true. + if (slotsParsed && errorMessage != default) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + return true; + } + else if (!slotsParsed) return false; + + //Try remove the slots + if (!clusterProvider.clusterManager.TryRemoveSlots(slots, out var slotIndex) && slotIndex != -1) + { + while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is not assigned", ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER DELSLOTSRANGE command + /// + /// + /// + /// + /// + private bool NetworkClusterDelSlotsRange(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting even number of arguments + if (count == 0 || (count & 0x1) != 0) + { + invalidParameters = true; + return false; + } + + var ptr = recvBufferPtr + readHead; + //Try to parse slot ranges. + var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: true); + readHead = (int)(ptr - recvBufferPtr); + + //The slot parsing may give errorMessage even if the TryParseSlots returns true. + if (slotsParsed && errorMessage != default) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + return true; + } + else if (!slotsParsed) return false; + + //Try remove the slots + if (!clusterProvider.clusterManager.TryRemoveSlots(slots, out var slotIndex) && slotIndex != -1) + { + while (!RespWriteUtils.WriteError($"ERR Slot {slotIndex} is not assigned", ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER DELKEYSINSLOT command + /// + /// + /// + /// + /// + private bool NetworkClusterDelKeysInSlot(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 1 argument + if (count != 1) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var slots = new HashSet() { slot }; + ClusterManager.DeleteKeysInSlotsFromMainStore(basicGarnetApi, slots); + if (!clusterProvider.serverOptions.DisableObjects) + ClusterManager.DeleteKeysInSlotsFromObjectStore(basicGarnetApi, slots); + + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER DELKEYSINSLOTRANGE command + /// + /// + /// + /// + /// + private bool NetworkClusterDelKeysInSlotRange(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting even number of arguments + if (count == 0 || (count & 0x1) != 0) + { + invalidParameters = true; + return false; + } + + var ptr = recvBufferPtr + readHead; + //Try to parse slot ranges. + var slotsParsed = TryParseSlots(count, ref ptr, out var slots, out var errorMessage, range: true); + readHead = (int)(ptr - recvBufferPtr); + + //The slot parsing may give errorMessage even if the TryParseSlots returns true. + if (slotsParsed && errorMessage != default) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + return true; + } + else if (!slotsParsed) return false; + + ClusterManager.DeleteKeysInSlotsFromMainStore(basicGarnetApi, slots); + if (!clusterProvider.serverOptions.DisableObjects) + ClusterManager.DeleteKeysInSlotsFromObjectStore(basicGarnetApi, slots); + + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER GETKEYSINSLOT command + /// + /// + private bool NetworkClusterGetKeysInSlot(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 1 argument + if (count != 2) + { + invalidParameters = true; + return true; + } + + var current = clusterProvider.clusterManager.CurrentConfig; + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadIntWithLengthHeader(out int slot, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadIntWithLengthHeader(out int keyCount, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + if (ClusterConfig.OutOfRange(slot)) + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE, ref dcurr, dend)) + SendAndReset(); + } + else if (!current.IsLocal((ushort)slot)) + { + Redirect((ushort)slot, current); + } + else + { + var keys = GetKeysInSlot(slot, keyCount); + var keyCountRet = Math.Min(keys.Count, keyCount); + while (!RespWriteUtils.WriteArrayLength(keyCountRet, ref dcurr, dend)) + SendAndReset(); + for (var i = 0; i < keyCountRet; i++) + while (!RespWriteUtils.WriteBulkString(keys[i], ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER KEYSLOT + /// + /// + /// + /// + private bool NetworkClusterKeySlot(int count, out bool invalidParameters) + { + invalidParameters = false; + + // Expecting exactly 1 argument + if (count != 1) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + byte* keyPtr = null; + var ksize = 0; + if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + int slot = NumUtils.HashSlot(keyPtr, ksize); + while (!RespWriteUtils.WriteInteger(slot, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER SETSLOT command + /// + /// + /// + /// + /// + private bool NetworkClusterSetSlot(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting 2 or 3 arguments + if (count is < 2 or > 3) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!RespReadUtils.ReadStringWithLengthHeader(out var subcommand, ref ptr, recvBufferPtr + bytesRead)) + return false; + + if (!Enum.TryParse(subcommand, ignoreCase: true, out SlotState slotState)) + slotState = SlotState.INVALID; + + string nodeid = null; + if (count > 2) + { + if (!RespReadUtils.ReadStringWithLengthHeader(out nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + } + readHead = (int)(ptr - recvBufferPtr); + + if (!ClusterConfig.OutOfRange(slot)) + { + // Try to set slot state + bool setSlotsSucceeded; + ReadOnlySpan errorMessage = default; + switch (slotState) + { + case SlotState.STABLE: + setSlotsSucceeded = true; + clusterProvider.clusterManager.ResetSlotState(slot); + break; + case SlotState.IMPORTING: + setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForImport(slot, nodeid, out errorMessage); + break; + case SlotState.MIGRATING: + setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForMigration(slot, nodeid, out errorMessage); + break; + case SlotState.NODE: + setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForOwnershipChange(slot, nodeid, out errorMessage); + break; + default: + setSlotsSucceeded = false; + errorMessage = Encoding.ASCII.GetBytes($"ERR Slot state {subcommand} not supported."); + break; + } + + if (setSlotsSucceeded) + { + UnsafeWaitForConfigTransition(); + + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + } + else + { + while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER SETSLOTSRANGE command + /// + /// + /// + /// + /// + private bool NetworkClusterSetSlotsRange(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting at least 3 (STABLE + range) arguments. + if (count < 3) + { + invalidParameters = true; + return true; + } + + // CLUSTER SETSLOTRANGE IMPORTING [slot-start slot-end] + // CLUSTER SETSLOTRANGE MIGRATING [slot-start slot-end] + // CLUSTER SETSLOTRANGE NODE [slot-start slot-end] + // CLUSTER SETSLOTRANGE STABLE [slot-start slot-end] + string nodeid = default; + var _count = count - 1; + var ptr = recvBufferPtr + readHead; + // Extract subcommand + if (!RespReadUtils.ReadStringWithLengthHeader(out var subcommand, ref ptr, recvBufferPtr + bytesRead)) + return false; + + // Try parse slot state + if (!Enum.TryParse(subcommand, out SlotState slotState)) + { + // Log error for invalid slot state option + logger?.LogError("The given input '{input}' is not a valid slot state option.", subcommand); + slotState = SlotState.INVALID; + } + + // Extract nodeid for operations other than stable + if (slotState != SlotState.STABLE && slotState != SlotState.INVALID) + { + if (!RespReadUtils.ReadStringWithLengthHeader(out nodeid, ref ptr, recvBufferPtr + bytesRead)) + return false; + _count = count - 2; + } + + // Try to parse slot ranges. The parsing may give errorMessage even if the TryParseSlots returns true. + var slotsParsed = TryParseSlots(_count, ref ptr, out var slots, out var errorMessage, range: true); + if (slotsParsed && errorMessage != default) + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + return true; + } + else if (!slotsParsed) return false; + readHead = (int)(ptr - recvBufferPtr); + + // Try to set slot states + bool setSlotsSucceeded; + switch (slotState) + { + case SlotState.STABLE: + setSlotsSucceeded = true; + clusterProvider.clusterManager.ResetSlotsState(slots); + break; + case SlotState.IMPORTING: + setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForImport(slots, nodeid, out errorMessage); + break; + case SlotState.MIGRATING: + setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForMigration(slots, nodeid, out errorMessage); + break; + case SlotState.NODE: + setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForOwnershipChange(slots, nodeid, out errorMessage); + break; + default: + setSlotsSucceeded = false; + errorMessage = Encoding.ASCII.GetBytes($"ERR Slot state {subcommand} not supported."); + break; + } + + if (setSlotsSucceeded) + { + UnsafeWaitForConfigTransition(); + + while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// Implements CLUSTER SLOTS command + /// + /// + /// + /// + /// + private bool NetworkClusterSlots(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 0 argument + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + readHead = (int)(ptr - recvBufferPtr); + var slotsInfo = clusterProvider.clusterManager.CurrentConfig.GetSlotsInfo(); + while (!RespWriteUtils.WriteAsciiDirect(slotsInfo, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + /// + /// Implements CLUSTER SLOTSTATE + /// + /// + /// + /// + /// + private bool NetworkClusterSlotState(ReadOnlySpan bufSpan, int count, out bool invalidParameters) + { + invalidParameters = false; + + if (!CheckACLAdminPermissions(bufSpan, count, out var success)) + { + return success; + } + + // Expecting exactly 0 arguments + if (count != 0) + { + invalidParameters = true; + return true; + } + + var ptr = recvBufferPtr + readHead; + if (!RespReadUtils.ReadIntWithLengthHeader(out var slot, ref ptr, recvBufferPtr + bytesRead)) + return false; + readHead = (int)(ptr - recvBufferPtr); + + var current = clusterProvider.clusterManager.CurrentConfig; + var nodeId = current.GetNodeIdFromSlot((ushort)slot); + var state = current.GetState((ushort)slot); + var stateStr = state switch + { + SlotState.STABLE => "=", + SlotState.IMPORTING => "<", + SlotState.MIGRATING => ">", + SlotState.OFFLINE => "-", + SlotState.FAIL => "-", + _ => throw new Exception($"Invalid SlotState filetype {state}"), + }; + while (!RespWriteUtils.WriteAsciiDirect($"+{slot} {stateStr} {nodeId}\r\n", ref dcurr, dend)) + SendAndReset(); + + return true; + } + } +} \ No newline at end of file diff --git a/libs/common/ConvertUtils.cs b/libs/common/ConvertUtils.cs index 1eea7fd755..42bb6c4f47 100644 --- a/libs/common/ConvertUtils.cs +++ b/libs/common/ConvertUtils.cs @@ -42,5 +42,16 @@ public static long MillisecondsFromDiffUtcNowTicks(long ticks) } return milliseconds; } + + /// + /// Convert ASCII Span to upper case + /// + /// + public static void MakeUpperCase(Span command) + { + foreach (ref var c in command) + if (c > 96 && c < 123) + c -= 32; + } } } \ No newline at end of file diff --git a/libs/server/Resp/AdminCommands.cs b/libs/server/Resp/AdminCommands.cs index 92ff0698b3..0bdc137d80 100644 --- a/libs/server/Resp/AdminCommands.cs +++ b/libs/server/Resp/AdminCommands.cs @@ -271,7 +271,7 @@ private bool ProcessAdminCommands(RespCommand command, ReadOnlySpan< errorCmd = "ping"; } } - else if ((command == RespCommand.CLUSTER) || (command == RespCommand.MIGRATE) || (command == RespCommand.FAILOVER) || (command == RespCommand.REPLICAOF) || (command == RespCommand.SECONDARYOF)) + else if (command is RespCommand.CLUSTER or RespCommand.MIGRATE or RespCommand.FAILOVER or RespCommand.REPLICAOF or RespCommand.SECONDARYOF) { if (clusterSession == null) { @@ -489,16 +489,6 @@ private bool ProcessAdminCommands(RespCommand command, ReadOnlySpan< return true; } - bool DrainCommands(ReadOnlySpan bufSpan, int count) - { - for (int i = 0; i < count; i++) - { - GetCommand(bufSpan, out bool success1); - if (!success1) return false; - } - return true; - } - /// /// Performs @admin command group permission checks for the current user and the given command. /// (NOTE: This function is temporary until per-command permissions are implemented) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index d40b36f77d..10193e0969 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -224,7 +224,6 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) while (!RespWriteUtils.WriteError($"ERR Protocol Error: {ex.Message}", ref dcurr, dend)) SendAndReset(); - // Send message and dispose the network sender to end the session Send(networkSender.GetResponseObjectHead()); networkSender.Dispose(); } @@ -239,6 +238,7 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) { networkSender.ReturnResponseObject(); clusterSession?.ReleaseCurrentEpoch(); + } if (txnManager.IsSkippingOperations()) @@ -363,7 +363,6 @@ private bool MakeUpperCase(byte* ptr) return false; } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ProcessBasicCommands(RespCommand cmd, byte subcmd, int count, byte* ptr, ref TGarnetApi storageApi) where TGarnetApi : IGarnetApi @@ -653,6 +652,16 @@ private bool ProcessOtherCommands(RespCommand command, byte subcmd, return true; } + bool DrainCommands(ReadOnlySpan bufSpan, int count) + { + for (int i = 0; i < count; i++) + { + GetCommand(bufSpan, out bool success1); + if (!success1) return false; + } + return true; + } + ReadOnlySpan GetCommand(ReadOnlySpan bufSpan, out bool success) { var ptr = recvBufferPtr + readHead; diff --git a/test/Garnet.test.cluster/ClusterNegativeTests.cs b/test/Garnet.test.cluster/ClusterNegativeTests.cs new file mode 100644 index 0000000000..217c896df3 --- /dev/null +++ b/test/Garnet.test.cluster/ClusterNegativeTests.cs @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using NUnit.Framework; +using StackExchange.Redis; + +namespace Garnet.test.cluster +{ + [TestFixture, NonParallelizable] + public class ClusterNegativeTests + { + ClusterTestContext context; + + readonly HashSet monitorTests = []; + + [SetUp] + public void Setup() + { + context = new ClusterTestContext(); + context.Setup(monitorTests); + } + + [TearDown] + public void TearDown() + { + context.TearDown(); + } + + [Test, Order(1)] + [Category("CLUSTER")] + [TestCase("bumpepoch", new int[] { 1, 2, 3 })] + [TestCase("failover", new int[] { 3, 4 })] + [TestCase("forget", new int[] { 0, 3, 4 })] + [TestCase("info", new int[] { 1, 2, 3 })] + [TestCase("help", new int[] { 1, 2, 3 })] + [TestCase("meet", new int[] { 0, 1, 3, 4 })] + [TestCase("myid", new int[] { 1, 2, 3 })] + [TestCase("myparentid", new int[] { 1, 2, 3 })] + [TestCase("endpoint", new int[] { 0, 2, 3 })] + [TestCase("nodes", new int[] { 1, 2, 3 })] + [TestCase("set-config-epoch", new int[] { 0, 2, 3 })] + [TestCase("shards", new int[] { 1, 2, 3 })] + [TestCase("reset", new int[] { 3, 4, 5 })] + [TestCase("addslots", new int[] { 0, 17000 })] + [TestCase("addslotsrange", new int[] { 0, 3, 5, 7 })] + [TestCase("banlist", new int[] { 1, 2, 3, 4 })] + [TestCase("countkeysinslot", new int[] { 0, 2, 3 })] + [TestCase("delslots", new int[] { 0, 1700 })] + [TestCase("delslotsrange", new int[] { 0, 3, 5, 7 })] + [TestCase("delkeysinslot", new int[] { 0, 2, 3, 4 })] + [TestCase("delkeysinslotrange", new int[] { 0, 3, 5, 7 })] + [TestCase("getkeysinslot", new int[] { 0, 1, 3, 4 })] + [TestCase("keyslot", new int[] { 0, 2, 3, 4 })] + [TestCase("setslot", new int[] { 0, 1, 4, 5 })] + [TestCase("setslotsrange", new int[] { 0, 1, 2 })] + [TestCase("slots", new int[] { 1, 2, 3 })] + [TestCase("slotstate", new int[] { 1, 2, 3 })] + [TestCase("MIGRATE", new int[] { 0, 1, 2, 4, 5 })] + [TestCase("mtasks", new int[] { 1, 2, 3, 4 })] + [TestCase("replicas", new int[] { 1, 2, 3, 4 })] + [TestCase("replicate", new int[] { 0, 3, 4 })] + [TestCase("AOFSYNC", new int[] { 0, 1, 3, 4 })] + [TestCase("APPENDLOG", new int[] { 0, 1, 2, 3, 4, 6 })] + [TestCase("INITIATE_REPLICA_SYNC", new int[] { 0, 1, 2, 3, 4, 6 })] + [TestCase("SEND_CKPT_METADATA", new int[] { 0, 1, 2, 4, 5, 6 })] + [TestCase("SEND_CKPT_FILE_SEGMENT", new int[] { 0, 1, 2, 3, 4, 6 })] + [TestCase("BEGIN_REPLICA_RECOVER", new int[] { 0, 1, 2, 3, 4, 5, 6, 8, 9 })] + [TestCase("FAILAUTHREQ", new int[] { 0, 2, 4 })] + [TestCase("FAILSTOPWRITES", new int[] { 0, 2, 3, 4 })] + [TestCase("FAILREPLICATIONOFFSET", new int[] { 0, 2, 3, 4 })] + public void ClusterCommandWrongParameters(string subcommand, params int[] invalidCount) + { + context.CreateInstances(1); + + using var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + socket.NoDelay = true; + socket.Connect(IPAddress.Loopback, 7000); + + var clusterCMD = $"$7\r\ncluster\r\n${subcommand.Length}\r\n{subcommand}\r\n"; + var errorCmd = subcommand.ToUpper(); + + if (subcommand.Equals("set-config-epoch")) + errorCmd = "SETCONFIGEPOCH"; + + var expectedResp = $"-ERR wrong number of arguments for '{errorCmd}' command\r\n"; + foreach (var count in invalidCount) + { + var packet = $"*{2 + count}\r\n" + clusterCMD; + for (var i = 0; i < count; i++) + packet += $"$3\r\nabc\r\n"; + + var buffer = new byte[1024]; + var packetBytes = Encoding.ASCII.GetBytes(packet); + var sent = socket.Send(packetBytes); + Assert.AreEqual(packetBytes.Length, sent); + int read; + if ((read = socket.Receive(buffer)) > 0) + { + var resp = Encoding.ASCII.GetString(buffer, 0, read); + Assert.AreEqual(expectedResp, resp); + break; + } + } + } + + [Test, Order(2)] + [Category("CLUSTER")] + [TestCase(1024)] + [TestCase(10240)] + public void ClusterAddSlotsPartialPackage(int chunkSize) + { + context.CreateInstances(1); + using var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + socket.NoDelay = true; + socket.Connect(IPAddress.Loopback, 7000); + + var slots = Enumerable.Range(0, 8192).ToList(); + var packet = $"*{2 + slots.Count}\r\n$7\r\ncluster\r\n$8\r\naddslots\r\n"; + + foreach (var slot in slots) + packet += $"${slot.ToString().Length}\r\n{slot}\r\n"; + + Span packetBytes = Encoding.ASCII.GetBytes(packet); + for (var i = 0; i < packetBytes.Length; i += chunkSize) + { + var size = i + chunkSize < packetBytes.Length ? chunkSize : packetBytes.Length - i; + var slicePacket = packetBytes.Slice(i, size); + var sent = socket.Send(slicePacket); + Assert.AreEqual(slicePacket.Length, sent); + Thread.Sleep(100); + } + + var buffer = new byte[1024]; + int read; + if ((read = socket.Receive(buffer)) > 0) + { + var resp = Encoding.ASCII.GetString(buffer, 0, read); + Assert.AreEqual("+OK\r\n", resp); + } + } + } +} \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index c0531be2b5..e4813db3e2 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -965,20 +965,6 @@ public static Dictionary MergeSlotPortMap(Dictionary a return a; } - public void AddSlots(IPEndPoint endPoint, ushort startSlot, ushort endSlot, ILogger logger = null) - { - try - { - var server = redis.GetServer(endPoint); - var resp = server.Execute("cluster", "addslotsrange", $"{startSlot}", $"{endSlot}"); - Assert.AreEqual((string)resp, "OK"); - } - catch (Exception ex) - { - logger?.LogError(ex, "An error has occured"); - } - } - public string AddSlotsRange(int nodeIndex, List<(int, int)> ranges, ILogger logger) => (string)AddSlotsRange((IPEndPoint)endpoints[nodeIndex], ranges, logger);