diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index f0b38dd..2406405 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -15,10 +15,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize slot = find_slot(keys) raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil? - # We have not yet selected a node for this transaction, initially, which means we can handle - # redirections freely initially (i.e. for the first WATCH call) - node = @router.find_primary_node_by_slot(slot) - handle_redirection(node, retry_count: 1) do |nd| + handle_redirection(slot, retry_count: 1) do |nd| nd.with do |c| c.ensure_connected_cluster_scoped(retryable: false) do c.call('ASKING') if @asking @@ -45,10 +42,22 @@ def watch(keys) # rubocop:disable Metrics/AbcSize private - def handle_redirection(node, retry_count: 1, &blk) - @router.handle_redirection(node, retry_count: retry_count) do |nd| + def handle_redirection(slot, retry_count: 1, &blk) + # We have not yet selected a node for this transaction, initially, which means we can handle + # redirections freely initially (i.e. for the first WATCH call) + node = @router.find_primary_node_by_slot(slot) + times_block_executed = 0 + @router.handle_redirection(node, nil, retry_count: retry_count) do |nd| + times_block_executed += 1 handle_asking_once(nd, &blk) end + rescue ::RedisClient::ConnectionError + # Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining + # _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block. + retry_count -= [times_block_executed, 1].min + raise if retry_count < 0 + + retry end def handle_asking_once(node) diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index f3981a1..71082a0 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -90,7 +90,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi # @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding def try_send(node, method, command, args, retry_count: 3, &block) - handle_redirection(node, retry_count: retry_count) do |on_node| + handle_redirection(node, command, retry_count: retry_count) do |on_node| if args.empty? # prevent memory allocation for variable-length args on_node.public_send(method, command, &block) @@ -101,12 +101,12 @@ def try_send(node, method, command, args, retry_count: 3, &block) end def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) - handle_redirection(node, retry_count: retry_count) do |on_node| + handle_redirection(node, nil, retry_count: retry_count) do |on_node| on_node.public_send(method, *args, **kwargs, &block) end end - def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity yield node rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise @@ -134,6 +134,17 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me retry_count -= 1 renew_cluster_state + + if retry_count >= 0 + # Find the node to use for this command - if this fails for some reason, though, re-use + # the old node. + begin + node = find_node(find_node_key(command)) if command + rescue StandardError # rubocop:disable Lint/SuppressedException + end + retry + end + retry if retry_count >= 0 raise end diff --git a/test/cluster_controller.rb b/test/cluster_controller.rb index 69e3697..860efbd 100644 --- a/test/cluster_controller.rb +++ b/test/cluster_controller.rb @@ -58,6 +58,8 @@ def initialize(node_addrs, @debug = ENV.fetch('DEBUG', '0') end + attr_reader :clients + def wait_for_cluster_to_be_ready(skip_clients: []) print_debug('wait for nodes to be recognized...') wait_meeting(@clients, max_attempts: @max_attempts) diff --git a/test/test_against_cluster_broken.rb b/test/test_against_cluster_broken.rb index 757dee9..fd76bc2 100644 --- a/test/test_against_cluster_broken.rb +++ b/test/test_against_cluster_broken.rb @@ -3,6 +3,7 @@ require 'logger' require 'json' require 'testing_helper' +require 'securerandom' class TestAgainstClusterBroken < TestingWrapper WAIT_SEC = 0.1 @@ -54,6 +55,41 @@ def test_client_patience do_assertions(offset: 3) end + def test_reloading_on_connection_error + sacrifice = @controller.select_sacrifice_of_primary + # Find a key which lives on the sacrifice node + test_key = generate_key_for_node(sacrifice) + @clients[0].call('SET', test_key, 'foobar1') + + # Shut the node down. + kill_a_node_and_wait_for_failover(sacrifice) + + # When we try and fetch the key, it'll attempt to connect to the broken node, and + # thus trigger a reload of the cluster topology. + assert_equal 'OK', @clients[0].call('SET', test_key, 'foobar2') + end + + def test_transaction_retry_on_connection_error + sacrifice = @controller.select_sacrifice_of_primary + # Find a key which lives on the sacrifice node + test_key = generate_key_for_node(sacrifice) + @clients[0].call('SET', test_key, 'foobar1') + + call_count = 0 + # Begin a transaction, but shut the node down after the WATCH is issued + res = @clients[0].multi(watch: [test_key]) do |tx| + kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0 + call_count += 1 + tx.call('SET', test_key, 'foobar2') + end + + # The transaction should have retried once and successfully completed + # the second time. + assert_equal ['OK'], res + assert_equal 'foobar2', @clients[0].call('GET', test_key) + assert_equal 2, call_count + end + private def prepare_test_data @@ -129,6 +165,18 @@ def do_assertions(offset:) end end + def generate_key_for_node(conn) + # Figure out a slot on the the sacrifice node, and a key in that slot. + conn_id = conn.call('CLUSTER', 'MYID') + conn_slots = conn.call('CLUSTER', 'SLOTS') + .select { |res| res[2][2] == conn_id } + .flat_map { |res| (res[0]..res[1]).to_a } + loop do + test_key = SecureRandom.hex + return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key)) + end + end + def wait_for_replication(client) client_side_timeout = TEST_TIMEOUT_SEC + 1.0 server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i @@ -211,6 +259,23 @@ def retryable(attempts: MAX_ATTEMPTS, wait_sec: WAIT_SEC) end end + def kill_a_node_and_wait_for_failover(sacrifice) + other_client = @controller.clients.reject { _1 == sacrifice }.first + sacrifice_id = sacrifice.call('CLUSTER', 'MYID') + kill_a_node(sacrifice) + failover_checks = 0 + loop do + raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30 + + # Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS. + cluster_slots = other_client.call('CLUSTER', 'SLOTS') + break unless cluster_slots.any? { _1[2][2] == sacrifice_id } + + sleep 1 + failover_checks += 1 + end + end + def build_client( custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],