Skip to content

Commit

Permalink
[Cleanup] Kill bufSpan (#452)
Browse files Browse the repository at this point in the history
* [Cleanup] Kill bufSpan

Side effect: fixed a bug with how resposene was sent for a "ping <message>" call

* nit

* add comments for RespCommand boundaries
  • Loading branch information
badrishc authored Jun 10, 2024
1 parent abfbff0 commit 7c9ac99
Show file tree
Hide file tree
Showing 25 changed files with 280 additions and 394 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public bool TryAddSlots(HashSet<int> slots, out int slotAssigned)
}

/// <summary>
/// Try to remove ownernship of slots. Slot state transition to OFFLINE.
/// Try to remove ownership of slots. Slot state transition to OFFLINE.
/// </summary>
/// <param name="slots">Slot list</param>
/// <param name="notLocalSlot">The slot number that is not local.</param>
Expand Down
49 changes: 24 additions & 25 deletions libs/cluster/Session/ClusterCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,63 +142,62 @@ public void UnsafeWaitForConfigTransition()
/// Handle cluster subcommands.
/// </summary>
/// <param name="command">Subcommand to execute.</param>
/// <param name="bufSpan">Remaining parameters in the command buffer.</param>
/// <param name="count">Number of parameters in teh command buffer</param>
/// <returns>True if command is fully processed, false if more processing is needed.</returns>
private bool ProcessClusterCommands(RespCommand command, ReadOnlySpan<byte> bufSpan, int count)
private bool ProcessClusterCommands(RespCommand command, int count)
{
bool result;
bool invalidParameters;

result =
command switch
{
RespCommand.CLUSTER_ADDSLOTS => NetworkClusterAddSlots(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_ADDSLOTSRANGE => NetworkClusterAddSlotsRange(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_ADDSLOTS => NetworkClusterAddSlots(count, out invalidParameters),
RespCommand.CLUSTER_ADDSLOTSRANGE => NetworkClusterAddSlotsRange(count, out invalidParameters),
RespCommand.CLUSTER_AOFSYNC => NetworkClusterAOFSync(count, out invalidParameters),
RespCommand.CLUSTER_APPENDLOG => NetworkClusterAppendLog(count, out invalidParameters),
RespCommand.CLUSTER_BANLIST => NetworkClusterBanList(count, out invalidParameters),
RespCommand.CLUSTER_BEGIN_REPLICA_RECOVER => NetworkClusterBeginReplicaRecover(count, out invalidParameters),
RespCommand.CLUSTER_BUMPEPOCH => NetworkClusterBumpEpoch(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_COUNTKEYSINSLOT => NetworkClusterCountKeysInSlot(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOT => NetworkClusterDelKeysInSlot(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOTRANGE => NetworkClusterDelKeysInSlotRange(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_DELSLOTS => NetworkClusterDelSlots(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_DELSLOTSRANGE => NetworkClusterDelSlotsRange(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_BUMPEPOCH => NetworkClusterBumpEpoch(count, out invalidParameters),
RespCommand.CLUSTER_COUNTKEYSINSLOT => NetworkClusterCountKeysInSlot(count, out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOT => NetworkClusterDelKeysInSlot(count, out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOTRANGE => NetworkClusterDelKeysInSlotRange(count, out invalidParameters),
RespCommand.CLUSTER_DELSLOTS => NetworkClusterDelSlots(count, out invalidParameters),
RespCommand.CLUSTER_DELSLOTSRANGE => NetworkClusterDelSlotsRange(count, out invalidParameters),
RespCommand.CLUSTER_ENDPOINT => NetworkClusterEndpoint(count, out invalidParameters),
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(count, out invalidParameters),
RespCommand.CLUSTER_FAILREPLICATIONOFFSET => NetworkClusterFailReplicationOffset(count, out invalidParameters),
RespCommand.CLUSTER_FAILSTOPWRITES => NetworkClusterFailStopWrites(count, out invalidParameters),
RespCommand.CLUSTER_FORGET => NetworkClusterForget(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_FORGET => NetworkClusterForget(count, out invalidParameters),
RespCommand.CLUSTER_GOSSIP => NetworkClusterGossip(count, out invalidParameters),
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(count, out invalidParameters),
RespCommand.CLUSTER_HELP => NetworkClusterHelp(count, out invalidParameters),
RespCommand.CLUSTER_INFO => NetworkClusterInfo(count, out invalidParameters),
RespCommand.CLUSTER_INITIATE_REPLICA_SYNC => NetworkClusterInitiateReplicaSync(count, out invalidParameters),
RespCommand.CLUSTER_KEYSLOT => NetworkClusterKeySlot(count, out invalidParameters),
RespCommand.CLUSTER_MEET => NetworkClusterMeet(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_MIGRATE => NetworkClusterMigrate(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_MEET => NetworkClusterMeet(count, out invalidParameters),
RespCommand.CLUSTER_MIGRATE => NetworkClusterMigrate(count, out invalidParameters),
RespCommand.CLUSTER_MTASKS => NetworkClusterMTasks(count, out invalidParameters),
RespCommand.CLUSTER_MYID => NetworkClusterMyId(count, out invalidParameters),
RespCommand.CLUSTER_MYPARENTID => NetworkClusterMyParentId(count, out invalidParameters),
RespCommand.CLUSTER_NODES => NetworkClusterNodes(count, out invalidParameters),
RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_RESET => NetworkClusterReset(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(count, out invalidParameters),
RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(count, out invalidParameters),
RespCommand.CLUSTER_RESET => NetworkClusterReset(count, out invalidParameters),
RespCommand.CLUSTER_SEND_CKPT_FILE_SEGMENT => NetworkClusterSendCheckpointFileSegment(count, out invalidParameters),
RespCommand.CLUSTER_SEND_CKPT_METADATA => NetworkClusterSendCheckpointMetadata(count, out invalidParameters),
RespCommand.CLUSTER_SETCONFIGEPOCH => NetworkClusterSetConfigEpoch(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_SETSLOT => NetworkClusterSetSlot(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_SETSLOTSRANGE => NetworkClusterSetSlotsRange(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_SETCONFIGEPOCH => NetworkClusterSetConfigEpoch(count, out invalidParameters),
RespCommand.CLUSTER_SETSLOT => NetworkClusterSetSlot(count, out invalidParameters),
RespCommand.CLUSTER_SETSLOTSRANGE => NetworkClusterSetSlotsRange(count, out invalidParameters),
RespCommand.CLUSTER_SHARDS => NetworkClusterShards(count, out invalidParameters),
RespCommand.CLUSTER_SLOTS => NetworkClusterSlots(bufSpan, count, out invalidParameters),
RespCommand.CLUSTER_SLOTSTATE => NetworkClusterSlotState(bufSpan, count, out invalidParameters),
_ => throw new Exception($"Unexpected cluster subcommad: {command}")
RespCommand.CLUSTER_SLOTS => NetworkClusterSlots(count, out invalidParameters),
RespCommand.CLUSTER_SLOTSTATE => NetworkClusterSlotState(count, out invalidParameters),
_ => throw new Exception($"Unexpected cluster subcommand: {command}")
};

if (invalidParameters)
{
if (!DrainCommands(bufSpan, count))
if (!DrainCommands(count))
return false;

// Have to lookup the RESP name now that we're in the failure case
Expand Down
26 changes: 10 additions & 16 deletions libs/cluster/Session/ClusterSession.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Garnet.common;
Expand Down Expand Up @@ -64,7 +63,7 @@ public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnMan
public void AcquireCurrentEpoch() => _localCurrentEpoch = clusterProvider.GarnetCurrentEpoch;
public void ReleaseCurrentEpoch() => _localCurrentEpoch = 0;

public bool ProcessClusterCommands(RespCommand command, ReadOnlySpan<byte> bufSpan, int count, byte* recvBufferPtr, int bytesRead, ref int readHead, ref byte* dcurr, ref byte* dend, out bool result)
public bool ProcessClusterCommands(RespCommand command, int count, byte* recvBufferPtr, int bytesRead, ref int readHead, ref byte* dcurr, ref byte* dend, out bool result)
{
this.recvBufferPtr = recvBufferPtr;
this.bytesRead = bytesRead;
Expand All @@ -76,7 +75,7 @@ public bool ProcessClusterCommands(RespCommand command, ReadOnlySpan<byte> bufSp
{
if (command.IsClusterSubCommand())
{
result = ProcessClusterCommands(command, bufSpan, count);
result = ProcessClusterCommands(command, count);
ret = true;
}
else
Expand Down Expand Up @@ -154,27 +153,24 @@ public void SetUser(User user)
this.user = user;
}

bool DrainCommands(ReadOnlySpan<byte> bufSpan, int count)
bool DrainCommands(int count)
{
for (int i = 0; i < count; i++)
for (var i = 0; i < count; i++)
{
_ = GetNextToken(bufSpan, out bool success1);
if (!success1) return false;
if (!SkipCommand()) return false;
}
return true;
}

Span<byte> GetNextToken(ReadOnlySpan<byte> bufSpan, out bool success)
bool SkipCommand()
{
success = false;

var ptr = recvBufferPtr + readHead;
var end = recvBufferPtr + bytesRead;

// Try to read the command length
// Try the command length
if (!RespReadUtils.ReadLengthHeader(out int length, ref ptr, end))
{
return default;
return false;
}

readHead = (int)(ptr - recvBufferPtr);
Expand All @@ -183,19 +179,17 @@ Span<byte> GetNextToken(ReadOnlySpan<byte> bufSpan, out bool success)
ptr += length;
if (ptr + 2 > end)
{
return default;
return false;
}

if (*(ushort*)ptr != MemoryMarshal.Read<ushort>("\r\n"u8))
{
RespParsingException.ThrowUnexpectedToken(*ptr);
}

success = true;
var result = new Span<byte>(recvBufferPtr + readHead, length);
readHead += length + 2;

return result;
return true;
}
}
}
16 changes: 5 additions & 11 deletions libs/cluster/Session/RespClusterBasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.Logging;
Expand All @@ -16,10 +15,9 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
/// <summary>
/// Implements CLUSTER BUMPEPOCH command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <returns></returns>
private bool NetworkClusterBumpEpoch(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterBumpEpoch(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down Expand Up @@ -49,11 +47,10 @@ private bool NetworkClusterBumpEpoch(ReadOnlySpan<byte> bufSpan, int count, out
/// <summary>
/// Implements CLUSTER FORGET command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterForget(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterForget(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down Expand Up @@ -154,11 +151,10 @@ private bool NetworkClusterHelp(int count, out bool invalidParameters)
/// <summary>
/// Implements CLUSTER MEET command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterMeet(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterMeet(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down Expand Up @@ -295,11 +291,10 @@ private bool NetworkClusterNodes(int count, out bool invalidParameters)
/// <summary>
/// Implements CLUSTER SET-CONFIG-EPOCH command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterSetConfigEpoch(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterSetConfigEpoch(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down Expand Up @@ -433,11 +428,10 @@ private bool NetworkClusterGossip(int count, out bool invalidParameters)
/// <summary>
/// Implements CLUSTER RESET command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterReset(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterReset(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down
4 changes: 1 addition & 3 deletions libs/cluster/Session/RespClusterFailoverCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Text;
using Garnet.common;
using Garnet.server;

Expand All @@ -13,11 +12,10 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
/// <summary>
/// Implements CLUSTER FAILOVER command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterFailover(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterFailover(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down
3 changes: 1 addition & 2 deletions libs/cluster/Session/RespClusterMigrateCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
/// <summary>
/// Implements CLUSTER MIGRATE command (only for internode use)
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
private bool NetworkClusterMigrate(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterMigrate(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down
6 changes: 2 additions & 4 deletions libs/cluster/Session/RespClusterReplicationCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
/// <summary>
/// Implements CLUSTER REPLICAS command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterReplicas(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterReplicas(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down Expand Up @@ -47,11 +46,10 @@ private bool NetworkClusterReplicas(ReadOnlySpan<byte> bufSpan, int count, out b
/// <summary>
/// Implements CLUSTER REPLICATE command
/// </summary>
/// <param name="bufSpan"></param>
/// <param name="count"></param>
/// <param name="invalidParameters"></param>
/// <returns></returns>
private bool NetworkClusterReplicate(ReadOnlySpan<byte> bufSpan, int count, out bool invalidParameters)
private bool NetworkClusterReplicate(int count, out bool invalidParameters)
{
invalidParameters = false;

Expand Down
Loading

0 comments on commit 7c9ac99

Please sign in to comment.