Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure recoverability from cluster state changes #379

Merged
merged 23 commits into from
Sep 26, 2024
Merged
19 changes: 10 additions & 9 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@ jobs:
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {task: test_cluster_broken, redis: '7.2', restart: 'no', startup: '6'}
- {task: test_cluster_broken, redis: '6.2', restart: 'no', startup: '6'}
- {task: test_cluster_scale, redis: '7.2', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, redis: '6.2', compose: compose.scale.yaml, startup: '8'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {redis: '7.0', ruby: '3.1'}
- {redis: '6.2', ruby: '3.0'}
- {redis: '5.0', ruby: '2.7'}
- {task: test_cluster_state, redis: '8', pattern: 'PrimaryOnly', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'Pooled', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'Pipeline', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'Transaction', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'PubSub', compose: compose.scale.yaml, startup: '8'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
env:
REDIS_VERSION: ${{ matrix.redis || '7.2' }}
DOCKER_COMPOSE_FILE: ${{ matrix.compose || 'compose.yaml' }}
Expand Down
8 changes: 8 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ Metrics/AbcSize:
Exclude:
- 'test/**/*'

Metrics/CyclomaticComplexity:
Exclude:
- 'test/**/*'

Metrics/PerceivedComplexity:
Exclude:
- 'test/**/*'

Metrics/ClassLength:
Max: 500

Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ErrorCollection < ::RedisClient::Error
def initialize(errors)
@errors = {}
if !errors.is_a?(Hash) || errors.empty?
super('')
super(errors.to_s)
return
end

Expand Down
8 changes: 7 additions & 1 deletion lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def initialize(router)
@asking = false
end

def watch(keys)
def watch(keys) # rubocop:disable Metrics/AbcSize
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

Expand All @@ -32,7 +32,13 @@ def watch(keys)
c.call('UNWATCH')
raise
end
rescue ::RedisClient::CommandError => e
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
raise
end
rescue ::RedisClient::ConnectionError
@router.renew_cluster_state
raise
end
end
end
Expand Down
50 changes: 40 additions & 10 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
results = Array.new(commands.size)
@pending_reads += size
write_multi(commands)
redirection_indices = nil
first_exception = nil
redirection_indices = stale_cluster_state = first_exception = nil

size.times do |index|
timeout = timeouts && timeouts[index]
Expand All @@ -73,18 +72,31 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
elsif exception
first_exception ||= result
end

stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served')
end

results[index] = result
end

raise first_exception if exception && first_exception
return results if redirection_indices.nil?
if redirection_indices
err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
err.replies = results
err.indices = redirection_indices
err.first_exception = first_exception
raise err
end

if stale_cluster_state
err = ::RedisClient::Cluster::Pipeline::StaleClusterState.new
err.replies = results
err.first_exception = first_exception
raise err
end

raise first_exception if first_exception

err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
err.replies = results
err.indices = redirection_indices
raise err
results
end
end

Expand All @@ -98,8 +110,12 @@ def ensure_connected_cluster_scoped(retryable: true, &block)

ReplySizeError = Class.new(::RedisClient::Error)

class StaleClusterState < ::RedisClient::Error
attr_accessor :replies, :first_exception
end

class RedirectionNeeded < ::RedisClient::Error
attr_accessor :replies, :indices
attr_accessor :replies, :indices, :first_exception
end

def initialize(router, command_builder, concurrent_worker, exception:, seed: Random.new_seed)
Expand Down Expand Up @@ -166,14 +182,18 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
end
end

all_replies = errors = required_redirections = nil
all_replies = errors = required_redirections = cluster_state_errors = nil

work_group.each do |node_key, v|
case v
when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
required_redirections ||= {}
required_redirections[node_key] = v
when ::RedisClient::Cluster::Pipeline::StaleClusterState
cluster_state_errors ||= {}
cluster_state_errors[node_key] = v
when StandardError
cluster_state_errors ||= {} if v.is_a?(::RedisClient::ConnectionError)
errors ||= {}
errors[node_key] = v
else
Expand All @@ -183,15 +203,25 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
end

work_group.close
@router.renew_cluster_state if cluster_state_errors
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?

required_redirections&.each do |node_key, v|
raise v.first_exception if v.first_exception

all_replies ||= Array.new(@size)
pipeline = @pipelines[node_key]
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
end

cluster_state_errors&.each do |node_key, v|
raise v.first_exception if v.first_exception

all_replies ||= Array.new(@size)
@pipelines[node_key].outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
end

all_replies
end

Expand Down
65 changes: 41 additions & 24 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def ensure_worker
def close
@worker.exit if @worker&.alive?
@client.close
rescue ::RedisClient::ConnectionError
# ignore
end

private
Expand Down Expand Up @@ -51,27 +53,33 @@ def initialize(router, command_builder)
@command_builder = command_builder
@queue = SizedQueue.new(BUF_SIZE)
@state_dict = {}
@commands = []
end

def call(*args, **kwargs)
_call(@command_builder.generate(args, kwargs))
command = @command_builder.generate(args, kwargs)
_call(command)
@commands << command
nil
end

def call_v(command)
_call(@command_builder.generate(command))
command = @command_builder.generate(command)
_call(command)
@commands << command
nil
end

def close
@state_dict.each_value(&:close)
@state_dict.clear
@commands.clear
@queue.clear
@queue.close
nil
end

def next_event(timeout = nil)
def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
@state_dict.each_value(&:ensure_worker)
max_duration = calc_max_duration(timeout)
starting = obtain_current_time
Expand All @@ -80,6 +88,13 @@ def next_event(timeout = nil)
break if max_duration > 0 && obtain_current_time - starting > max_duration

case event = @queue.pop(true)
when ::RedisClient::CommandError
if event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
@router.renew_cluster_state
break start_over
end

raise event
when StandardError then raise event
when Array then break event
end
Expand All @@ -99,13 +114,26 @@ def _call(command)
end
end

def call_to_single_state(command)
def call_to_single_state(command, retry_count: 1)
node_key = @router.find_node_key(command)
try_call(node_key, command)
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key].call(command)
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
retry_count -= 1
retry_count >= 0 ? retry : raise
end

def call_to_all_states(command)
@state_dict.each_value { |s| s.call(command) }
@state_dict.each do |node_key, state|
state.call(command)
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
end
end

def call_for_sharded_states(command)
Expand All @@ -116,31 +144,20 @@ def call_for_sharded_states(command)
end
end

def try_call(node_key, command, retry_count: 1)
add_state(node_key).call(command)
rescue ::RedisClient::CommandError => e
raise if !e.message.start_with?('MOVED') || retry_count <= 0

# for sharded pub/sub
node_key = e.message.split[2]
retry_count -= 1
retry
end

def add_state(node_key)
return @state_dict[node_key] if @state_dict.key?(node_key)

state = State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key] = state
end

def obtain_current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
end

def calc_max_duration(timeout)
timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
end

def start_over
@state_dict.each_value(&:close)
@state_dict.clear
@commands.each { |command| _call(command) }
nil
end
end
end
end
Loading
Loading