diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index e85cb7d..8ff62ce 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -23,6 +23,7 @@ class Node ROLE_FLAGS = %w[master slave].freeze EMPTY_ARRAY = [].freeze EMPTY_HASH = {}.freeze + STATE_REFRESH_INTERVAL = (3..10).freeze private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT, :DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH @@ -103,6 +104,8 @@ def initialize(concurrent_worker, config:, pool: nil, **kwargs) @config = config @mutex = Mutex.new @last_reloaded_at = nil + @reload_times = 0 + @random = Random.new end def inspect @@ -419,15 +422,27 @@ def with_reload_lock # performed the reload. # Probably in the future we should add a circuit breaker to #reload itself, and stop trying if the cluster is # obviously not working. - wait_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + wait_start = obtain_current_time @mutex.synchronize do return if @last_reloaded_at && @last_reloaded_at > wait_start + if @last_reloaded_at && @reload_times > 1 + # Mitigate load of servers by naive logic. Don't sleep with exponential backoff. + now = obtain_current_time + elapsed = @last_reloaded_at + @random.rand(STATE_REFRESH_INTERVAL) * 1_000_000 + return if now < elapsed + end + r = yield - @last_reloaded_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @last_reloaded_at = obtain_current_time + @reload_times += 1 r end end + + def obtain_current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond) + end end end end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index e29fc80..3e32127 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -107,32 +107,29 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me rescue ::RedisClient::CommandError => e raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node) + retry_count -= 1 if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) - retry_count -= 1 retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) - retry_count -= 1 if retry_count >= 0 node.call('ASKING') retry end elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') @node.reload! - retry_count -= 1 retry if retry_count >= 0 end + raise rescue ::RedisClient::ConnectionError => e raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node) - @node.reload! - - raise if retry_count <= 0 - retry_count -= 1 - retry + @node.reload! + retry if retry_count >= 0 + raise end def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize @@ -200,13 +197,13 @@ def find_slot_by_key(key) ::RedisClient::Cluster::KeySlotConverter.convert(key) end - def find_node(node_key, retry_count: 3) + def find_node(node_key, retry_count: 1) @node.find_by(node_key) rescue ::RedisClient::Cluster::Node::ReloadNeeded raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0 - @node.reload! retry_count -= 1 + @node.reload! retry end @@ -236,17 +233,15 @@ def close private - def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize + def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:disable Metrics/AbcSize @node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block)) rescue ::RedisClient::Cluster::ErrorCollection => e raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError) raise if retry_count <= 0 - raise if e.errors.values.none? do |err| - err.message.include?('WAIT cannot be used with replica instances') - end + raise if e.errors.values.none? { |err| err.message.include?('WAIT cannot be used with replica instances') } - @node.reload! retry_count -= 1 + @node.reload! retry end diff --git a/test/middlewares/redirection_count.rb b/test/middlewares/redirect_count.rb similarity index 76% rename from test/middlewares/redirection_count.rb rename to test/middlewares/redirect_count.rb index 98126fa..09a2d8b 100644 --- a/test/middlewares/redirection_count.rb +++ b/test/middlewares/redirect_count.rb @@ -1,9 +1,9 @@ # frozen_string_literal: true module Middlewares - module RedirectionCount + module RedirectCount class Counter - Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true) + Result = Struct.new('RedirectCountResult', :moved, :ask, keyword_init: true) def initialize @moved = 0 @@ -39,9 +39,9 @@ def call(cmd, cfg) super rescue ::RedisClient::CommandError => e if e.message.start_with?('MOVED') - cfg.custom.fetch(:redirection_count).moved + cfg.custom.fetch(:redirect_count).moved elsif e.message.start_with?('ASK') - cfg.custom.fetch(:redirection_count).ask + cfg.custom.fetch(:redirect_count).ask end raise @@ -51,9 +51,9 @@ def call_pipelined(cmd, cfg) super rescue ::RedisClient::CommandError => e if e.message.start_with?('MOVED') - cfg.custom.fetch(:redirection_count).moved + cfg.custom.fetch(:redirect_count).moved elsif e.message.start_with?('ASK') - cfg.custom.fetch(:redirection_count).ask + cfg.custom.fetch(:redirect_count).ask end raise diff --git a/test/middlewares/redirection_emulation.rb b/test/middlewares/redirect_fake.rb similarity index 72% rename from test/middlewares/redirection_emulation.rb rename to test/middlewares/redirect_fake.rb index 3965a71..146f3ce 100644 --- a/test/middlewares/redirection_emulation.rb +++ b/test/middlewares/redirect_fake.rb @@ -1,21 +1,21 @@ # frozen_string_literal: true module Middlewares - module RedirectionEmulation + module RedirectFake Setting = Struct.new( - 'RedirectionEmulationMiddlewareSetting', + 'RedirectFakeSetting', :slot, :to, :command, keyword_init: true ) def call(cmd, cfg) - s = cfg.custom.fetch(:redirect) + s = cfg.custom.fetch(:redirect_fake) raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command super end def call_pipelined(cmd, cfg) - s = cfg.custom.fetch(:redirect) + s = cfg.custom.fetch(:redirect_fake) raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command super diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index b5d3902..408b54e 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -7,19 +7,19 @@ class TestCluster module Mixin def setup @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new - @redirection_count = ::Middlewares::RedirectionCount::Counter.new + @redirect_count = ::Middlewares::RedirectCount::Counter.new @client = new_test_client @client.call('FLUSHDB') wait_for_replication @captured_commands.clear - @redirection_count.clear + @redirect_count.clear end def teardown @client&.call('FLUSHDB') wait_for_replication @client&.close - flunk(@redirection_count.get) unless @redirection_count.zero? + flunk(@redirect_count.get) unless @redirect_count.zero? end def test_config @@ -850,10 +850,10 @@ def test_only_reshards_own_errors client2 = new_test_client( middlewares: [ ::RedisClient::Cluster::ErrorIdentification::Middleware, - ::Middlewares::RedirectionEmulation + ::Middlewares::RedirectFake ], custom: { - redirect: ::Middlewares::RedirectionEmulation::Setting.new( + redirect_fake: ::Middlewares::RedirectFake::Setting.new( slot: slot, to: broken_primary_key, command: %w[SET testkey client2] ) } @@ -925,8 +925,8 @@ class PrimaryOnly < TestingWrapper include Mixin def new_test_client( - custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, - middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) config = ::RedisClient::ClusterConfig.new( @@ -946,8 +946,8 @@ class ScaleReadRandom < TestingWrapper include Mixin def new_test_client( - custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, - middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) config = ::RedisClient::ClusterConfig.new( @@ -969,8 +969,8 @@ class ScaleReadRandomWithPrimary < TestingWrapper include Mixin def new_test_client( - custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, - middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) config = ::RedisClient::ClusterConfig.new( @@ -992,8 +992,8 @@ class ScaleReadLatency < TestingWrapper include Mixin def new_test_client( - custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, - middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) config = ::RedisClient::ClusterConfig.new( @@ -1015,8 +1015,8 @@ class Pooled < TestingWrapper include Mixin def new_test_client( - custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, - middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) config = ::RedisClient::ClusterConfig.new( diff --git a/test/test_against_cluster_broken.rb b/test/test_against_cluster_broken.rb index 62ca402..a8cff3b 100644 --- a/test/test_against_cluster_broken.rb +++ b/test/test_against_cluster_broken.rb @@ -7,12 +7,13 @@ class TestAgainstClusterBroken < TestingWrapper def setup @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new + @redirect_count = ::Middlewares::RedirectCount::Counter.new @client = ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, fixed_hostname: TEST_FIXED_HOSTNAME, - custom: { captured_commands: @captured_commands }, - middlewares: [::Middlewares::CommandCapture], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **TEST_GENERIC_OPTIONS ).new_client @client.call('echo', 'init') @@ -22,11 +23,13 @@ def setup **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) ) @captured_commands.clear + @redirect_count.clear end def teardown @client&.close @controller&.close + print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = " end def test_a_replica_is_down diff --git a/test/test_against_cluster_scale.rb b/test/test_against_cluster_scale.rb index 7bc3980..6f32bfc 100644 --- a/test/test_against_cluster_scale.rb +++ b/test/test_against_cluster_scale.rb @@ -11,21 +11,24 @@ def self.test_order def setup @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new + @redirect_count = ::Middlewares::RedirectCount::Counter.new @client = ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, fixed_hostname: TEST_FIXED_HOSTNAME, - custom: { captured_commands: @captured_commands }, - middlewares: [::Middlewares::CommandCapture], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **TEST_GENERIC_OPTIONS ).new_client @client.call('echo', 'init') @captured_commands.clear + @redirect_count.clear end def teardown @client&.close @controller&.close + print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = " end def test_01_scale_out diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index fc1bcf5..b3bf603 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -14,15 +14,18 @@ def setup **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) ) @controller.rebuild - @redirection_count = ::Middlewares::RedirectionCount::Counter.new + @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new + @redirect_count = ::Middlewares::RedirectCount::Counter.new @client = new_test_client @client.call('echo', 'init') - @redirection_count.clear + @captured_commands.clear + @redirect_count.clear end def teardown @controller&.close @client&.close + print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = " end def test_the_state_of_cluster_down @@ -37,11 +40,13 @@ def test_the_state_of_cluster_failover wait_for_replication 1000.times { |i| assert_equal(i.to_s, @client.call('GET', "key#{i}")) } assert_equal('ok', fetch_cluster_info('cluster_state')) - refute(@redirection_count.zero?, @redirection_count.get) + refute(@redirect_count.zero?, @redirect_count.get) end def test_the_state_of_cluster_resharding + resharded_keys = nil do_resharding_test do |keys| + resharded_keys = keys keys.each do |key| want = key got = @client.call('GET', key) @@ -49,11 +54,18 @@ def test_the_state_of_cluster_resharding end end - refute(@redirection_count.zero?, @redirection_count.get) + refute(@redirect_count.zero?, @redirect_count.get) + resharded_keys.each do |key| + want = key + got = @client.call('GET', key) + assert_equal(want, got, "Case: GET: #{key}") + end end def test_the_state_of_cluster_resharding_with_pipelining + resharded_keys = nil do_resharding_test do |keys| + resharded_keys = keys values = @client.pipelined do |pipeline| keys.each { |key| pipeline.call('GET', key) } end @@ -65,16 +77,28 @@ def test_the_state_of_cluster_resharding_with_pipelining end end + values = @client.pipelined do |pipeline| + resharded_keys.each { |key| pipeline.call('GET', key) } + end + + resharded_keys.each_with_index do |key, i| + want = key + got = values[i] + assert_equal(want, got, "Case: GET: #{key}") + end + # Since redirections are handled by #call_pipelined_aware_of_redirection, # we can't trace them in pipelining processes. # - # refute(@redirection_count.zero?, @redirection_count.get) + # refute(@redirect_count.zero?, @redirect_count.get) end def test_the_state_of_cluster_resharding_with_transaction call_cnt = 0 + resharded_keys = nil do_resharding_test do |keys| + resharded_keys = keys @client.multi do |tx| call_cnt += 1 keys.each do |key| @@ -90,14 +114,31 @@ def test_the_state_of_cluster_resharding_with_transaction end end - assert_equal(1, call_cnt) - refute(@redirection_count.zero?, @redirection_count.get) + refute(@redirect_count.zero?, @redirect_count.get) + + @client.multi do |tx| + call_cnt += 1 + resharded_keys.each do |key| + tx.call('SET', key, '2') + tx.call('INCR', key) + end + end + + resharded_keys.each do |key| + want = '3' + got = @client.call('GET', key) + assert_equal(want, got, "Case: GET: #{key}") + end + + assert_equal(2, call_cnt) end def test_the_state_of_cluster_resharding_with_transaction_and_watch call_cnt = 0 + resharded_keys = nil do_resharding_test do |keys| + resharded_keys = keys @client.multi(watch: keys) do |tx| call_cnt += 1 keys.each do |key| @@ -113,8 +154,23 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch end end - assert_equal(1, call_cnt) - refute(@redirection_count.zero?, @redirection_count.get) + refute(@redirect_count.zero?, @redirect_count.get) + + @client.multi(watch: resharded_keys) do |tx| + call_cnt += 1 + resharded_keys.each do |key| + tx.call('SET', key, '2') + tx.call('INCR', key) + end + end + + resharded_keys.each do |key| + want = '3' + got = @client.call('GET', key) + assert_equal(want, got, "Case: GET: #{key}") + end + + assert_equal(2, call_cnt) end def test_the_state_of_cluster_resharding_with_reexecuted_watch @@ -142,7 +198,7 @@ def test_the_state_of_cluster_resharding_with_reexecuted_watch assert_equal(2, call_cnt) # The second call succeeded assert_equal('@client_value_2', @client.call('GET', 'watch_key')) - refute(@redirection_count.zero?, @redirection_count.get) + refute(@redirect_count.zero?, @redirect_count.get) ensure client2&.close end @@ -210,8 +266,8 @@ class PrimaryOnly < TestingWrapper private def new_test_client( - custom: { redirection_count: @redirection_count }, - middlewares: [::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) ::RedisClient.cluster( @@ -233,8 +289,8 @@ class Pooled < TestingWrapper private def new_test_client( - custom: { redirection_count: @redirection_count }, - middlewares: [::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) ::RedisClient.cluster( @@ -256,8 +312,8 @@ class ScaleReadRandom < TestingWrapper private def new_test_client( - custom: { redirection_count: @redirection_count }, - middlewares: [::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) ::RedisClient.cluster( @@ -281,8 +337,8 @@ class ScaleReadRandomWithPrimary < TestingWrapper private def new_test_client( - custom: { redirection_count: @redirection_count }, - middlewares: [::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) ::RedisClient.cluster( @@ -306,8 +362,8 @@ class ScaleReadLatency < TestingWrapper private def new_test_client( - custom: { redirection_count: @redirection_count }, - middlewares: [::Middlewares::RedirectionCount], + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], **opts ) ::RedisClient.cluster( diff --git a/test/testing_helper.rb b/test/testing_helper.rb index d297571..7f57bd7 100644 --- a/test/testing_helper.rb +++ b/test/testing_helper.rb @@ -7,8 +7,8 @@ require 'testing_constants' require 'cluster_controller' require 'middlewares/command_capture' -require 'middlewares/redirection_emulation' -require 'middlewares/redirection_count' +require 'middlewares/redirect_count' +require 'middlewares/redirect_fake' case ENV.fetch('REDIS_CONNECTION_DRIVER', 'ruby') when 'hiredis' then require 'hiredis-client'