Skip to content

Commit

Permalink
Assign a new node after calling renew_cluster_state
Browse files Browse the repository at this point in the history
If we catch a connection error and refresh the cluster topology, we need
to re-calculate what node to send the command to in the router; the node
we're using might not even be a valid node any longer.
  • Loading branch information
KJTsanaktsidis committed Oct 17, 2024
1 parent 5a5f84c commit 11c6a2c
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 9 deletions.
21 changes: 15 additions & 6 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
handle_redirection(node, retry_count: 1) do |nd|
handle_redirection(slot, retry_count: 1) do |nd|
nd.with do |c|
c.ensure_connected_cluster_scoped(retryable: false) do
c.call('ASKING') if @asking
Expand All @@ -45,10 +42,22 @@ def watch(keys) # rubocop:disable Metrics/AbcSize

private

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
def handle_redirection(slot, retry_count: 1, &blk)
# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
times_block_executed = 0
@router.handle_redirection(node, nil, retry_count: retry_count) do |nd|
times_block_executed += 1
handle_asking_once(nd, &blk)
end
rescue ::RedisClient::ConnectionError
# Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining
# _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block.
retry_count -= [times_block_executed, 1].min
raise if retry_count < 0

retry
end

def handle_asking_once(node)
Expand Down
17 changes: 14 additions & 3 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi

# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
def try_send(node, method, command, args, retry_count: 3, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, command, retry_count: retry_count) do |on_node|
if args.empty?
# prevent memory allocation for variable-length args
on_node.public_send(method, command, &block)
Expand All @@ -101,12 +101,12 @@ def try_send(node, method, command, args, retry_count: 3, &block)
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
on_node.public_send(method, *args, **kwargs, &block)
end
end

def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
yield node
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
Expand Down Expand Up @@ -134,6 +134,17 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me

retry_count -= 1
renew_cluster_state

if retry_count >= 0
# Find the node to use for this command - if this fails for some reason, though, re-use
# the old node.
begin
node = find_node(find_node_key(command)) if command
rescue StandardError # rubocop:disable Lint/SuppressedException
end
retry
end

retry if retry_count >= 0
raise
end
Expand Down
2 changes: 2 additions & 0 deletions test/cluster_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def initialize(node_addrs,
@debug = ENV.fetch('DEBUG', '0')
end

attr_reader :clients

def wait_for_cluster_to_be_ready(skip_clients: [])
print_debug('wait for nodes to be recognized...')
wait_meeting(@clients, max_attempts: @max_attempts)
Expand Down
65 changes: 65 additions & 0 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'logger'
require 'json'
require 'testing_helper'
require 'securerandom'

class TestAgainstClusterBroken < TestingWrapper
WAIT_SEC = 0.1
Expand Down Expand Up @@ -54,6 +55,41 @@ def test_client_patience
do_assertions(offset: 3)
end

def test_reloading_on_connection_error
sacrifice = @controller.select_sacrifice_of_primary
# Find a key which lives on the sacrifice node
test_key = generate_key_for_node(sacrifice)
@clients[0].call('SET', test_key, 'foobar1')

# Shut the node down.
kill_a_node_and_wait_for_failover(sacrifice)

# When we try and fetch the key, it'll attempt to connect to the broken node, and
# thus trigger a reload of the cluster topology.
assert_equal 'OK', @clients[0].call('SET', test_key, 'foobar2')
end

def test_transaction_retry_on_connection_error
sacrifice = @controller.select_sacrifice_of_primary
# Find a key which lives on the sacrifice node
test_key = generate_key_for_node(sacrifice)
@clients[0].call('SET', test_key, 'foobar1')

call_count = 0
# Begin a transaction, but shut the node down after the WATCH is issued
res = @clients[0].multi(watch: [test_key]) do |tx|
kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0
call_count += 1
tx.call('SET', test_key, 'foobar2')
end

# The transaction should have retried once and successfully completed
# the second time.
assert_equal ['OK'], res
assert_equal 'foobar2', @clients[0].call('GET', test_key)
assert_equal 2, call_count
end

private

def prepare_test_data
Expand Down Expand Up @@ -129,6 +165,18 @@ def do_assertions(offset:)
end
end

def generate_key_for_node(conn)
# Figure out a slot on the the sacrifice node, and a key in that slot.
conn_id = conn.call('CLUSTER', 'MYID')
conn_slots = conn.call('CLUSTER', 'SLOTS')
.select { |res| res[2][2] == conn_id }
.flat_map { |res| (res[0]..res[1]).to_a }
loop do
test_key = SecureRandom.hex
return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key))
end
end

def wait_for_replication(client)
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
Expand Down Expand Up @@ -211,6 +259,23 @@ def retryable(attempts: MAX_ATTEMPTS, wait_sec: WAIT_SEC)
end
end

def kill_a_node_and_wait_for_failover(sacrifice)
other_client = @controller.clients.reject { _1 == sacrifice }.first
sacrifice_id = sacrifice.call('CLUSTER', 'MYID')
kill_a_node(sacrifice)
failover_checks = 0
loop do
raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30

# Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS.
cluster_slots = other_client.call('CLUSTER', 'SLOTS')
break unless cluster_slots.any? { _1[2][2] == sacrifice_id }

sleep 1
failover_checks += 1
end
end

def build_client(
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
Expand Down

0 comments on commit 11c6a2c

Please sign in to comment.