Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: lessen reload frequency to mitigate load of servers #377

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Node
ROLE_FLAGS = %w[master slave].freeze
EMPTY_ARRAY = [].freeze
EMPTY_HASH = {}.freeze
STATE_REFRESH_INTERVAL = (3..10).freeze

private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT,
:DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH
Expand Down Expand Up @@ -103,6 +104,8 @@ def initialize(concurrent_worker, config:, pool: nil, **kwargs)
@config = config
@mutex = Mutex.new
@last_reloaded_at = nil
@reload_times = 0
@random = Random.new
end

def inspect
Expand Down Expand Up @@ -419,15 +422,27 @@ def with_reload_lock
# performed the reload.
# Probably in the future we should add a circuit breaker to #reload itself, and stop trying if the cluster is
# obviously not working.
wait_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
wait_start = obtain_current_time
@mutex.synchronize do
return if @last_reloaded_at && @last_reloaded_at > wait_start

if @last_reloaded_at && @reload_times > 1
# Mitigate load of servers by naive logic. Don't sleep with exponential backoff.
now = obtain_current_time
elapsed = @last_reloaded_at + @random.rand(STATE_REFRESH_INTERVAL) * 1_000_000
return if now < elapsed
end

r = yield
@last_reloaded_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@last_reloaded_at = obtain_current_time
@reload_times += 1
r
end
end

def obtain_current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
end
end
end
end
25 changes: 10 additions & 15 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,29 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
rescue ::RedisClient::CommandError => e
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)

retry_count -= 1
if e.message.start_with?('MOVED')
node = assign_redirection_node(e.message)
retry_count -= 1
retry if retry_count >= 0
elsif e.message.start_with?('ASK')
node = assign_asking_node(e.message)
retry_count -= 1
if retry_count >= 0
node.call('ASKING')
retry
end
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
@node.reload!
retry_count -= 1
retry if retry_count >= 0
end

raise
rescue ::RedisClient::ConnectionError => e
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)

@node.reload!

raise if retry_count <= 0

retry_count -= 1
retry
@node.reload!
retry if retry_count >= 0
raise
end

def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
Expand Down Expand Up @@ -200,13 +197,13 @@ def find_slot_by_key(key)
::RedisClient::Cluster::KeySlotConverter.convert(key)
end

def find_node(node_key, retry_count: 3)
def find_node(node_key, retry_count: 1)
@node.find_by(node_key)
rescue ::RedisClient::Cluster::Node::ReloadNeeded
raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0

@node.reload!
retry_count -= 1
@node.reload!
retry
end

Expand Down Expand Up @@ -236,17 +233,15 @@ def close

private

def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize
def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:disable Metrics/AbcSize
@node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block))
rescue ::RedisClient::Cluster::ErrorCollection => e
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)
raise if retry_count <= 0
raise if e.errors.values.none? do |err|
err.message.include?('WAIT cannot be used with replica instances')
end
raise if e.errors.values.none? { |err| err.message.include?('WAIT cannot be used with replica instances') }

@node.reload!
retry_count -= 1
@node.reload!
retry
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# frozen_string_literal: true

module Middlewares
module RedirectionCount
module RedirectCount
class Counter
Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true)
Result = Struct.new('RedirectCountResult', :moved, :ask, keyword_init: true)

def initialize
@moved = 0
Expand Down Expand Up @@ -39,9 +39,9 @@ def call(cmd, cfg)
super
rescue ::RedisClient::CommandError => e
if e.message.start_with?('MOVED')
cfg.custom.fetch(:redirection_count).moved
cfg.custom.fetch(:redirect_count).moved
elsif e.message.start_with?('ASK')
cfg.custom.fetch(:redirection_count).ask
cfg.custom.fetch(:redirect_count).ask
end

raise
Expand All @@ -51,9 +51,9 @@ def call_pipelined(cmd, cfg)
super
rescue ::RedisClient::CommandError => e
if e.message.start_with?('MOVED')
cfg.custom.fetch(:redirection_count).moved
cfg.custom.fetch(:redirect_count).moved
elsif e.message.start_with?('ASK')
cfg.custom.fetch(:redirection_count).ask
cfg.custom.fetch(:redirect_count).ask
end

raise
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# frozen_string_literal: true

module Middlewares
module RedirectionEmulation
module RedirectFake
Setting = Struct.new(
'RedirectionEmulationMiddlewareSetting',
'RedirectFakeSetting',
:slot, :to, :command, keyword_init: true
)

def call(cmd, cfg)
s = cfg.custom.fetch(:redirect)
s = cfg.custom.fetch(:redirect_fake)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
end

def call_pipelined(cmd, cfg)
s = cfg.custom.fetch(:redirect)
s = cfg.custom.fetch(:redirect_fake)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
Expand Down
30 changes: 15 additions & 15 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ class TestCluster
module Mixin
def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirection_count = ::Middlewares::RedirectionCount::Counter.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@client = new_test_client
@client.call('FLUSHDB')
wait_for_replication
@captured_commands.clear
@redirection_count.clear
@redirect_count.clear
end

def teardown
@client&.call('FLUSHDB')
wait_for_replication
@client&.close
flunk(@redirection_count.get) unless @redirection_count.zero?
flunk(@redirect_count.get) unless @redirect_count.zero?
end

def test_config
Expand Down Expand Up @@ -850,10 +850,10 @@ def test_only_reshards_own_errors
client2 = new_test_client(
middlewares: [
::RedisClient::Cluster::ErrorIdentification::Middleware,
::Middlewares::RedirectionEmulation
::Middlewares::RedirectFake
],
custom: {
redirect: ::Middlewares::RedirectionEmulation::Setting.new(
redirect_fake: ::Middlewares::RedirectFake::Setting.new(
slot: slot, to: broken_primary_key, command: %w[SET testkey client2]
)
}
Expand Down Expand Up @@ -925,8 +925,8 @@ class PrimaryOnly < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -946,8 +946,8 @@ class ScaleReadRandom < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -969,8 +969,8 @@ class ScaleReadRandomWithPrimary < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -992,8 +992,8 @@ class ScaleReadLatency < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -1015,8 +1015,8 @@ class Pooled < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand Down
7 changes: 5 additions & 2 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ class TestAgainstClusterBroken < TestingWrapper

def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@client = ::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
fixed_hostname: TEST_FIXED_HOSTNAME,
custom: { captured_commands: @captured_commands },
middlewares: [::Middlewares::CommandCapture],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**TEST_GENERIC_OPTIONS
).new_client
@client.call('echo', 'init')
Expand All @@ -22,11 +23,13 @@ def setup
**TEST_GENERIC_OPTIONS.merge(timeout: 30.0)
)
@captured_commands.clear
@redirect_count.clear
end

def teardown
@client&.close
@controller&.close
print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = "
end

def test_a_replica_is_down
Expand Down
7 changes: 5 additions & 2 deletions test/test_against_cluster_scale.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ def self.test_order

def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@client = ::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
fixed_hostname: TEST_FIXED_HOSTNAME,
custom: { captured_commands: @captured_commands },
middlewares: [::Middlewares::CommandCapture],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**TEST_GENERIC_OPTIONS
).new_client
@client.call('echo', 'init')
@captured_commands.clear
@redirect_count.clear
end

def teardown
@client&.close
@controller&.close
print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = "
end

def test_01_scale_out
Expand Down
Loading