diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c34cca5..c5b7aac 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -33,21 +33,22 @@ jobs: - {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml} - {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'} - {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'} - - {task: test_cluster_broken, redis: '7.2', restart: 'no', startup: '6'} - - {task: test_cluster_broken, redis: '6.2', restart: 'no', startup: '6'} - - {task: test_cluster_scale, redis: '7.2', compose: compose.scale.yaml, startup: '8'} - - {task: test_cluster_scale, redis: '6.2', compose: compose.scale.yaml, startup: '8'} - {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml} + - {task: test_cluster_broken, restart: 'no', startup: '6'} - {redis: '7.0', ruby: '3.1'} - {redis: '6.2', ruby: '3.0'} - {redis: '5.0', ruby: '2.7'} - - {task: test_cluster_state, redis: '8', pattern: 'PrimaryOnly', compose: compose.valkey.yaml, replica: '2', startup: '9'} - - {task: test_cluster_state, redis: '8', pattern: 'Pooled', compose: compose.valkey.yaml, replica: '2', startup: '9'} - - {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, replica: '2', startup: '9'} - - {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, replica: '2', startup: '9'} - - {task: test_cluster_state, redis: '8', pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, replica: '2', startup: '9'} + - {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'} + - {task: test_cluster_scale, pattern: 'Pipeline', compose: compose.scale.yaml, startup: '8'} + - {task: test_cluster_scale, pattern: 'Transaction', compose: compose.scale.yaml, startup: '8'} + - {task: test_cluster_scale, pattern: 'PubSub', compose: compose.scale.yaml, startup: '8'} - {ruby: 'jruby'} - {ruby: 'truffleruby'} + - {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} env: REDIS_VERSION: ${{ matrix.redis || '7.2' }} DOCKER_COMPOSE_FILE: ${{ matrix.compose || 'compose.yaml' }} diff --git a/.rubocop.yml b/.rubocop.yml index a81474c..488493f 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -14,6 +14,14 @@ Metrics/AbcSize: Exclude: - 'test/**/*' +Metrics/CyclomaticComplexity: + Exclude: + - 'test/**/*' + +Metrics/PerceivedComplexity: + Exclude: + - 'test/**/*' + Metrics/ClassLength: Max: 500 diff --git a/lib/redis_client/cluster/errors.rb b/lib/redis_client/cluster/errors.rb index 77a66d7..79239dc 100644 --- a/lib/redis_client/cluster/errors.rb +++ b/lib/redis_client/cluster/errors.rb @@ -32,7 +32,7 @@ class ErrorCollection < ::RedisClient::Error def initialize(errors) @errors = {} if !errors.is_a?(Hash) || errors.empty? - super('') + super(errors.to_s) return end diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index 8a7651e..bfc978b 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -11,7 +11,7 @@ def initialize(router) @asking = false end - def watch(keys) + def watch(keys) # rubocop:disable Metrics/AbcSize slot = find_slot(keys) raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil? @@ -32,7 +32,13 @@ def watch(keys) c.call('UNWATCH') raise end + rescue ::RedisClient::CommandError => e + @router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served') + raise end + rescue ::RedisClient::ConnectionError + @router.renew_cluster_state + raise end end end diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index 6cef810..29c7267 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -55,8 +55,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco results = Array.new(commands.size) @pending_reads += size write_multi(commands) - redirection_indices = nil - first_exception = nil + redirection_indices = stale_cluster_state = first_exception = nil size.times do |index| timeout = timeouts && timeouts[index] @@ -73,18 +72,31 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco elsif exception first_exception ||= result end + + stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served') end results[index] = result end - raise first_exception if exception && first_exception - return results if redirection_indices.nil? + if redirection_indices + err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new + err.replies = results + err.indices = redirection_indices + err.first_exception = first_exception + raise err + end + + if stale_cluster_state + err = ::RedisClient::Cluster::Pipeline::StaleClusterState.new + err.replies = results + err.first_exception = first_exception + raise err + end + + raise first_exception if first_exception - err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new - err.replies = results - err.indices = redirection_indices - raise err + results end end @@ -98,8 +110,12 @@ def ensure_connected_cluster_scoped(retryable: true, &block) ReplySizeError = Class.new(::RedisClient::Error) + class StaleClusterState < ::RedisClient::Error + attr_accessor :replies, :first_exception + end + class RedirectionNeeded < ::RedisClient::Error - attr_accessor :replies, :indices + attr_accessor :replies, :indices, :first_exception end def initialize(router, command_builder, concurrent_worker, exception:, seed: Random.new_seed) @@ -166,14 +182,18 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met end end - all_replies = errors = required_redirections = nil + all_replies = errors = required_redirections = cluster_state_errors = nil work_group.each do |node_key, v| case v when ::RedisClient::Cluster::Pipeline::RedirectionNeeded required_redirections ||= {} required_redirections[node_key] = v + when ::RedisClient::Cluster::Pipeline::StaleClusterState + cluster_state_errors ||= {} + cluster_state_errors[node_key] = v when StandardError + cluster_state_errors ||= {} if v.is_a?(::RedisClient::ConnectionError) errors ||= {} errors[node_key] = v else @@ -183,15 +203,25 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met end work_group.close + @router.renew_cluster_state if cluster_state_errors raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil? required_redirections&.each do |node_key, v| + raise v.first_exception if v.first_exception + all_replies ||= Array.new(@size) pipeline = @pipelines[node_key] v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) } pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] } end + cluster_state_errors&.each do |node_key, v| + raise v.first_exception if v.first_exception + + all_replies ||= Array.new(@size) + @pipelines[node_key].outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] } + end + all_replies end diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index 3b1ba45..534854f 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -24,6 +24,8 @@ def ensure_worker def close @worker.exit if @worker&.alive? @client.close + rescue ::RedisClient::ConnectionError + # ignore end private @@ -51,27 +53,33 @@ def initialize(router, command_builder) @command_builder = command_builder @queue = SizedQueue.new(BUF_SIZE) @state_dict = {} + @commands = [] end def call(*args, **kwargs) - _call(@command_builder.generate(args, kwargs)) + command = @command_builder.generate(args, kwargs) + _call(command) + @commands << command nil end def call_v(command) - _call(@command_builder.generate(command)) + command = @command_builder.generate(command) + _call(command) + @commands << command nil end def close @state_dict.each_value(&:close) @state_dict.clear + @commands.clear @queue.clear @queue.close nil end - def next_event(timeout = nil) + def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity @state_dict.each_value(&:ensure_worker) max_duration = calc_max_duration(timeout) starting = obtain_current_time @@ -80,6 +88,13 @@ def next_event(timeout = nil) break if max_duration > 0 && obtain_current_time - starting > max_duration case event = @queue.pop(true) + when ::RedisClient::CommandError + if event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served') + @router.renew_cluster_state + break start_over + end + + raise event when StandardError then raise event when Array then break event end @@ -99,13 +114,26 @@ def _call(command) end end - def call_to_single_state(command) + def call_to_single_state(command, retry_count: 1) node_key = @router.find_node_key(command) - try_call(node_key, command) + @state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue) + @state_dict[node_key].call(command) + rescue ::RedisClient::ConnectionError + @state_dict[node_key].close + @state_dict.delete(node_key) + @router.renew_cluster_state + retry_count -= 1 + retry_count >= 0 ? retry : raise end def call_to_all_states(command) - @state_dict.each_value { |s| s.call(command) } + @state_dict.each do |node_key, state| + state.call(command) + rescue ::RedisClient::ConnectionError + @state_dict[node_key].close + @state_dict.delete(node_key) + @router.renew_cluster_state + end end def call_for_sharded_states(command) @@ -116,24 +144,6 @@ def call_for_sharded_states(command) end end - def try_call(node_key, command, retry_count: 1) - add_state(node_key).call(command) - rescue ::RedisClient::CommandError => e - raise if !e.message.start_with?('MOVED') || retry_count <= 0 - - # for sharded pub/sub - node_key = e.message.split[2] - retry_count -= 1 - retry - end - - def add_state(node_key) - return @state_dict[node_key] if @state_dict.key?(node_key) - - state = State.new(@router.find_node(node_key).pubsub, @queue) - @state_dict[node_key] = state - end - def obtain_current_time Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond) end @@ -141,6 +151,13 @@ def obtain_current_time def calc_max_duration(timeout) timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000 end + + def start_over + @state_dict.each_value(&:close) + @state_dict.clear + @commands.each { |command| _call(command) } + nil + end end end end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 3e32127..9657ad5 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -29,7 +29,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @pool = pool @client_kwargs = kwargs @node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs) - @node.reload! + renew_cluster_state @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder end @@ -68,15 +68,21 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::Cluster::Node::ReloadNeeded - @node.reload! + renew_cluster_state raise ::RedisClient::Cluster::NodeMightBeDown + rescue ::RedisClient::ConnectionError + renew_cluster_state + raise + rescue ::RedisClient::CommandError => e + renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served') + raise rescue ::RedisClient::Cluster::ErrorCollection => e raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError) - @node.reload! if e.errors.values.any? do |err| + renew_cluster_state if e.errors.values.any? do |err| next false if ::RedisClient::Cluster::ErrorIdentification.identifiable?(err) && @node.none? { |c| ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(err, c) } - err.message.start_with?('CLUSTERDOWN Hash slot not served') + err.message.start_with?('CLUSTERDOWN Hash slot not served') || err.is_a?(::RedisClient::ConnectionError) end raise @@ -118,7 +124,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me retry end elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') - @node.reload! + renew_cluster_state retry if retry_count >= 0 end @@ -127,7 +133,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node) retry_count -= 1 - @node.reload! + renew_cluster_state retry if retry_count >= 0 raise end @@ -154,11 +160,16 @@ def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize client_index += 1 if result_cursor == 0 [((result_cursor << 8) + client_index).to_s, result_keys] + rescue ::RedisClient::ConnectionError + renew_cluster_state + raise end def assign_node(command) - node_key = find_node_key(command) - find_node(node_key) + handle_node_reload_error do + node_key = find_node_key(command) + @node.find_by(node_key) + end end def find_node_key_by_key(key, seed: nil, primary: false) @@ -171,8 +182,10 @@ def find_node_key_by_key(key, seed: nil, primary: false) end def find_primary_node_by_slot(slot) - node_key = @node.find_node_key_of_primary(slot) - find_node(node_key) + handle_node_reload_error do + node_key = @node.find_node_key_of_primary(slot) + @node.find_by(node_key) + end end def find_node_key(command, seed: nil) @@ -197,14 +210,8 @@ def find_slot_by_key(key) ::RedisClient::Cluster::KeySlotConverter.convert(key) end - def find_node(node_key, retry_count: 1) - @node.find_by(node_key) - rescue ::RedisClient::Cluster::Node::ReloadNeeded - raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0 - - retry_count -= 1 - @node.reload! - retry + def find_node(node_key) + handle_node_reload_error { @node.find_by(node_key) } end def command_exists?(name) @@ -215,18 +222,22 @@ def assign_redirection_node(err_msg) _, slot, node_key = err_msg.split slot = slot.to_i @node.update_slot(slot, node_key) - find_node(node_key) + handle_node_reload_error { @node.find_by(node_key) } end def assign_asking_node(err_msg) _, _, node_key = err_msg.split - find_node(node_key) + handle_node_reload_error { @node.find_by(node_key) } end def node_keys @node.node_keys end + def renew_cluster_state + @node.reload! + end + def close @node.each(&:close) end @@ -241,7 +252,7 @@ def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:d raise if e.errors.values.none? { |err| err.message.include?('WAIT cannot be used with replica instances') } retry_count -= 1 - @node.reload! + renew_cluster_state retry end @@ -270,7 +281,7 @@ def send_client_command(method, command, args, &block) end end - def send_cluster_command(method, command, args, &block) + def send_cluster_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize case subcommand = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command) when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate', 'reset', 'set-config-epoch', 'setslot' @@ -279,7 +290,10 @@ def send_cluster_command(method, command, args, &block) when 'getkeysinslot' raise ArgumentError, command.join(' ') if command.size != 4 - find_node(@node.find_node_key_of_replica(command[2])).public_send(method, *args, command, &block) + handle_node_reload_error do + node_key = @node.find_node_key_of_replica(command[2]) + @node.find_by(node_key).public_send(method, *args, command, &block) + end else assign_node(command).public_send(method, *args, command, &block) end end @@ -365,6 +379,16 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis end block_given? ? yield(result) : result end + + def handle_node_reload_error(retry_count: 1) + yield + rescue ::RedisClient::Cluster::Node::ReloadNeeded + raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0 + + retry_count -= 1 + renew_cluster_state + retry + end end end end diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index a7f7413..7312829 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -122,7 +122,7 @@ def send_transaction(client, redirect:) end end - def send_pipeline(client, redirect:) + def send_pipeline(client, redirect:) # rubocop:disable Metrics/AbcSize replies = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection| commands = @pipeline._commands client.middlewares.call_pipelined(commands, client.config) do @@ -138,6 +138,9 @@ def send_pipeline(client, redirect:) return if replies.last.nil? coerce_results!(replies.last) + rescue ::RedisClient::ConnectionError + @router.renew_cluster_state if @watching_slot.nil? + raise end def coerce_results!(results, offset: 1) @@ -167,6 +170,9 @@ def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize elsif err.message.start_with?('ASK') node = @router.assign_asking_node(err.message) try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err + elsif err.message.start_with?('CLUSTERDOWN Hash slot not served') + @router.renew_cluster_state if @watching_slot.nil? + raise err else raise err end diff --git a/test/cluster_controller.rb b/test/cluster_controller.rb index ebc8777..1aacc6e 100644 --- a/test/cluster_controller.rb +++ b/test/cluster_controller.rb @@ -6,12 +6,15 @@ class ClusterController SLOT_SIZE = 16_384 DEFAULT_SHARD_SIZE = 3 DEFAULT_REPLICA_SIZE = 1 - DEFAULT_MAX_ATTEMPTS = 600 + DEFAULT_MAX_ATTEMPTS = 300 DEFAULT_TIMEOUT_SEC = 5.0 + SLEEP_SEC = 1.0 private_constant :SLOT_SIZE, :DEFAULT_SHARD_SIZE, :DEFAULT_REPLICA_SIZE, :DEFAULT_MAX_ATTEMPTS, :DEFAULT_TIMEOUT_SEC + MaxRetryExceeded = Class.new(StandardError) + RedisNodeInfo = Struct.new( 'RedisClusterNodeInfo', :id, :node_key, :flags, :role, :myself?, :primary_id, :ping_sent, :pong_recv, @@ -55,7 +58,7 @@ def initialize(node_addrs, @debug = ENV.fetch('DEBUG', '0') end - def wait_for_cluster_to_be_ready + def wait_for_cluster_to_be_ready(skip_clients: []) print_debug('wait for nodes to be recognized...') wait_meeting(@clients, max_attempts: @max_attempts) print_debug('wait for the cluster state to be ok...') @@ -63,7 +66,7 @@ def wait_for_cluster_to_be_ready print_debug('wait for the replication to be established...') wait_replication(@clients, number_of_replicas: @number_of_replicas, max_attempts: @max_attempts) print_debug('wait for commands to be accepted...') - wait_cluster_recovering(@clients, max_attempts: @max_attempts) + wait_cluster_recovering(@clients, max_attempts: @max_attempts, skip_clients: skip_clients) end def rebuild @@ -102,7 +105,7 @@ def failover wait_cluster_recovering(@clients, max_attempts: @max_attempts) end - def start_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Metrics/CyclomaticComplexity + def start_resharding(slot:, src_node_key:, dest_node_key:) rows = associate_with_clients_and_nodes(@clients) src_info = rows.find { |r| r.node_key == src_node_key || r.client_node_key == src_node_key } dest_info = rows.find { |r| r.node_key == dest_node_key || r.client_node_key == dest_node_key } @@ -122,6 +125,7 @@ def start_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Met number_of_keys = src_client.call('CLUSTER', 'COUNTKEYSINSLOT', slot) keys = src_client.call('CLUSTER', 'GETKEYSINSLOT', slot, number_of_keys) + print_debug("#{src_client.config.host}:#{src_client.config.port} => #{dest_client.config.host}:#{dest_client.config.port} ... #{keys}") return if keys.empty? begin @@ -136,7 +140,7 @@ def start_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Met wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout) end - def finish_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def finish_resharding(slot:, src_node_key:, dest_node_key:) rows = associate_with_clients_and_nodes(@clients) src_info = rows.find { |r| r.node_key == src_node_key || r.client_node_key == src_node_key } dest_info = rows.find { |r| r.node_key == dest_node_key || r.client_node_key == dest_node_key } @@ -148,6 +152,7 @@ def finish_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Me ([dest, src] + rest).each do |cli| cli.call('CLUSTER', 'SETSLOT', slot, 'NODE', id) + print_debug("#{cli.config.host}:#{cli.config.port} ... CLUSTER SETSLOT #{slot} NODE #{id}") rescue ::RedisClient::CommandError => e raise unless e.message.start_with?('ERR Please use SETSLOT only with masters.') # how weird, ignore @@ -156,7 +161,7 @@ def finish_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Me wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout) end - def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def scale_out(primary_url:, replica_url:) # @see https://redis.io/docs/manual/scaling/ rows = associate_with_clients_and_nodes(@clients) target_host, target_port = rows.find(&:primary?)&.node_key&.split(':') @@ -176,7 +181,7 @@ def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticCo primary_id = primary.call('CLUSTER', 'MYID') replica.call('CLUSTER', 'REPLICATE', primary_id) save_config(@clients) - wait_for_cluster_to_be_ready + wait_for_cluster_to_be_ready(skip_clients: [primary, replica]) rows = associate_with_clients_and_nodes(@clients) @@ -188,7 +193,7 @@ def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticCo end end - def scale_in # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def scale_in rows = associate_with_clients_and_nodes(@clients) primary_info = rows.reject(&:empty_slots?).min_by(&:slot_size) @@ -198,6 +203,7 @@ def scale_in # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedCo primary_info.slots.each do |slot| src = primary_info.node_key dest = rest_primary_node_keys.sample + print_debug("Resharding slot #{slot}: #{src} => #{dest}") start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) end @@ -340,7 +346,7 @@ def replicate(clients, shard_size:, replica_size:) rescue ::RedisClient::CommandError => e print_debug(e.message) # ERR Unknown node [node-id] - sleep 1.0 + sleep SLEEP_SEC primary_id = primaries[i].call('CLUSTER', 'MYID') next end @@ -402,11 +408,11 @@ def wait_replication_delay(clients, replica_size:, timeout:) end end - def wait_cluster_recovering(clients, max_attempts:) + def wait_cluster_recovering(clients, max_attempts:, skip_clients: []) key = 0 wait_for_state(clients, max_attempts: max_attempts) do |client| print_debug("#{client.config.host}:#{client.config.port} ... GET #{key}") - client.call('GET', key) if primary_client?(client) + client.call('GET', key) if primary_client?(client) && !skip_clients.include?(client) true rescue ::RedisClient::CommandError => e if e.message.start_with?('CLUSTERDOWN') @@ -426,12 +432,12 @@ def wait_for_state(clients, max_attempts:) attempt_count = 1 clients.each do |client| attempt_count.step(max_attempts) do |i| - break if i >= max_attempts + raise MaxRetryExceeded if i >= max_attempts attempt_count += 1 break if yield(client) - sleep 0.1 + sleep SLEEP_SEC end end end @@ -459,7 +465,7 @@ def associate_with_clients_and_nodes(clients) end end - def parse_cluster_nodes(rows) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def parse_cluster_nodes(rows) rows.map do |row| flags = row[2].split(',') slots = if row[8].nil? diff --git a/test/redis_client/cluster/test_errors.rb b/test/redis_client/cluster/test_errors.rb index 760e33c..75ac6db 100644 --- a/test/redis_client/cluster/test_errors.rb +++ b/test/redis_client/cluster/test_errors.rb @@ -50,7 +50,8 @@ def test_error_collection_error errors: { '127.0.0.1:6379' => DummyError.new('foo'), '127.0.0.1:6380' => DummyError.new('bar') }, want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo, 127.0.0.1:6380: bar', size: 2 } }, - { errors: {}, want: { msg: '', size: 0 } }, + { errors: {}, want: { msg: '{}', size: 0 } }, + { errors: [], want: { msg: '[]', size: 0 } }, { errors: '', want: { msg: '', size: 0 } }, { errors: nil, want: { msg: '', size: 0 } } ].each_with_index do |c, idx| diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index b0dfb00..48b3283 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -352,7 +352,7 @@ def test_send_ping assert_equal(want, got, 'Case: scale read') end - def test_clients_for_scanning # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def test_clients_for_scanning test_config = @test_node.instance_variable_get(:@config) want = @test_node_info_list.select(&:primary?) .map(&:node_key) diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 408b54e..bfa0c0a 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -737,7 +737,7 @@ def test_dedicated_multiple_keys_command end end - def test_dedicated_commands # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def test_dedicated_commands 10.times { |i| @client.call('SET', "key#{i}", i) } wait_for_replication [ diff --git a/test/test_against_cluster_scale.rb b/test/test_against_cluster_scale.rb index 6fd2dd8..eac371a 100644 --- a/test/test_against_cluster_scale.rb +++ b/test/test_against_cluster_scale.rb @@ -2,115 +2,240 @@ require 'testing_helper' -class TestAgainstClusterScale < TestingWrapper - WAIT_SEC = 1 - MAX_ATTEMPTS = 20 - NUMBER_OF_KEYS = 20_000 +module TestAgainstClusterScale + PATTERN = ENV.fetch('TEST_CLASS_PATTERN', '') + + module Mixin + WAIT_SEC = 1 + MAX_ATTEMPTS = 20 + NUMBER_OF_KEYS = 20_000 + MAX_PIPELINE_SIZE = 40 + HASH_TAG_GRAIN = 5 + SLICED_NUMBERS = (0...NUMBER_OF_KEYS).each_slice(MAX_PIPELINE_SIZE).freeze + + def setup + @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new + @redirect_count = ::Middlewares::RedirectCount::Counter.new + @client = ::RedisClient.cluster( + nodes: TEST_NODE_URIS, + replica: true, + fixed_hostname: TEST_FIXED_HOSTNAME, + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], + **TEST_GENERIC_OPTIONS + ).new_client + @client.call('echo', 'init') + @captured_commands.clear + @redirect_count.clear + @cluster_down_error_count = 0 + end - def self.test_order - :alpha - end + def teardown + @client&.close + @controller&.close + print "#{@redirect_count.get}, "\ + "ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')}, "\ + "ClusterDownError: #{@cluster_down_error_count} = " + end - def setup - @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new - @redirect_count = ::Middlewares::RedirectCount::Counter.new - @client = ::RedisClient.cluster( - nodes: TEST_NODE_URIS, - replica: true, - fixed_hostname: TEST_FIXED_HOSTNAME, - custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, - middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], - **TEST_GENERIC_OPTIONS - ).new_client - @client.call('echo', 'init') - @captured_commands.clear - @redirect_count.clear - @cluster_down_error_count = 0 - end + def test_01_scale_out + SLICED_NUMBERS.each do |numbers| + @client.pipelined do |pi| + numbers.each do |i| + pi.call('SET', "key#{i}", i) + pi.call('SET', "{group#{i / HASH_TAG_GRAIN}}:key#{i}", i) + end + end + end + + wait_for_replication + + primary_url, replica_url = build_additional_node_urls + @controller = build_cluster_controller(TEST_NODE_URIS, shard_size: 3) + @controller.scale_out(primary_url: primary_url, replica_url: replica_url) + + do_test_after_scaled_out + + want = (TEST_NODE_URIS + build_additional_node_urls).size + got = @client.instance_variable_get(:@router) + .instance_variable_get(:@node) + .instance_variable_get(:@topology) + .instance_variable_get(:@clients) + .size + assert_equal(want, got, 'Case: number of nodes') + + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) + end - def teardown - @client&.close - @controller&.close - print "#{@redirect_count.get}, "\ - "ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')}, "\ - "ClusterDownError: #{@cluster_down_error_count} = " - end + def test_02_scale_in + @controller = build_cluster_controller(TEST_NODE_URIS + build_additional_node_urls, shard_size: 4) + @controller.scale_in - def test_01_scale_out - @controller = build_cluster_controller(TEST_NODE_URIS, shard_size: 3) + do_test_after_scaled_in - @client.pipelined { |pi| NUMBER_OF_KEYS.times { |i| pi.call('SET', "key#{i}", i) } } - wait_for_replication + want = TEST_NODE_URIS.size + got = @client.instance_variable_get(:@router) + .instance_variable_get(:@node) + .instance_variable_get(:@topology) + .instance_variable_get(:@clients) + .size + assert_equal(want, got, 'Case: number of nodes') - primary_url, replica_url = build_additional_node_urls - @controller.scale_out(primary_url: primary_url, replica_url: replica_url) + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) + end - NUMBER_OF_KEYS.times { |i| assert_equal(i.to_s, @client.call('GET', "key#{i}"), "Case: key#{i}") } + private - want = (TEST_NODE_URIS + build_additional_node_urls).size - got = @client.instance_variable_get(:@router) - .instance_variable_get(:@node) - .instance_variable_get(:@topology) - .instance_variable_get(:@clients) - .size - assert_equal(want, got, 'Case: number of nodes') - refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) - end + def wait_for_replication + client_side_timeout = TEST_TIMEOUT_SEC + 1.0 + server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i + swap_timeout(@client, timeout: 0.1) do |client| + client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout) + end + end - def test_02_scale_in - @controller = build_cluster_controller(TEST_NODE_URIS + build_additional_node_urls, shard_size: 4) - @controller.scale_in + def build_cluster_controller(nodes, shard_size:) + ClusterController.new( + nodes, + shard_size: shard_size, + replica_size: TEST_REPLICA_SIZE, + **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) + ) + end - NUMBER_OF_KEYS.times do |i| - got = retry_call(attempts: MAX_ATTEMPTS) { @client.call('GET', "key#{i}") } - assert_equal(i.to_s, got, "Case: key#{i}") + def build_additional_node_urls + max = TEST_REDIS_PORTS.max + (max + 1..max + 2).map { |port| "#{TEST_REDIS_SCHEME}://#{TEST_REDIS_HOST}:#{port}" } end - want = TEST_NODE_URIS.size - got = @client.instance_variable_get(:@router) - .instance_variable_get(:@node) - .instance_variable_get(:@topology) - .instance_variable_get(:@clients) - .size - assert_equal(want, got, 'Case: number of nodes') - refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) - end + def retryable(attempts:) + loop do + raise MaxRetryExceeded if attempts <= 0 - private + attempts -= 1 + break yield + rescue ::RedisClient::CommandError => e + raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served') - def wait_for_replication - client_side_timeout = TEST_TIMEOUT_SEC + 1.0 - server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i - swap_timeout(@client, timeout: 0.1) do |client| - client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout) + @cluster_down_error_count += 1 + sleep WAIT_SEC + end end end - def build_cluster_controller(nodes, shard_size:) - ClusterController.new( - nodes, - shard_size: shard_size, - replica_size: TEST_REPLICA_SIZE, - **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) - ) + if PATTERN == 'Single' || PATTERN.empty? + class Single < TestingWrapper + include Mixin + + def self.test_order + :alpha + end + + def do_test_after_scaled_out + NUMBER_OF_KEYS.times do |i| + assert_equal(i.to_s, @client.call('GET', "key#{i}"), "Case: key#{i}") + end + end + + def do_test_after_scaled_in + NUMBER_OF_KEYS.times do |i| + got = retryable(attempts: MAX_ATTEMPTS) { @client.call('GET', "key#{i}") } + assert_equal(i.to_s, got, "Case: key#{i}") + end + end + end end - def build_additional_node_urls - max = TEST_REDIS_PORTS.max - (max + 1..max + 2).map { |port| "#{TEST_REDIS_SCHEME}://#{TEST_REDIS_HOST}:#{port}" } + if PATTERN == 'Pipeline' || PATTERN.empty? + class Pipeline < TestingWrapper + include Mixin + + def self.test_order + :alpha + end + + def do_test_after_scaled_out + SLICED_NUMBERS.each do |numbers| + got = @client.pipelined do |pi| + numbers.each { |i| pi.call('GET', "key#{i}") } + end + + assert_equal(numbers.map(&:to_s), got, 'Case: GET') + end + end + + def do_test_after_scaled_in + SLICED_NUMBERS.each do |numbers| + got = retryable(attempts: MAX_ATTEMPTS) do + @client.pipelined do |pi| + numbers.each { |i| pi.call('GET', "key#{i}") } + end + end + + assert_equal(numbers.map(&:to_s), got, 'Case: GET') + end + end + end end - def retry_call(attempts:) - loop do - raise MaxRetryExceeded if attempts <= 0 - - attempts -= 1 - break yield - rescue ::RedisClient::CommandError => e - raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served') + if PATTERN == 'Transaction' || PATTERN.empty? + class Transaction < TestingWrapper + include Mixin + + def self.test_order + :alpha + end + + def do_test_after_scaled_out + NUMBER_OF_KEYS.times.group_by { |i| i / HASH_TAG_GRAIN }.each do |group, numbers| + keys = numbers.map { |i| "{group#{group}}:key#{i}" } + got = @client.multi(watch: group.odd? ? nil : keys) do |tx| + keys.each { |key| tx.call('INCR', key) } + end + + want = numbers.map { |i| (i + 1) } + assert_equal(want, got, 'Case: INCR') + end + end + + def do_test_after_scaled_in + NUMBER_OF_KEYS.times.group_by { |i| i / HASH_TAG_GRAIN }.each do |group, numbers| + keys = numbers.map { |i| "{group#{group}}:key#{i}" } + got = retryable(attempts: MAX_ATTEMPTS) do + @client.multi(watch: group.odd? ? nil : keys) do |tx| + keys.each { |key| tx.call('INCR', key) } + end + end + + want = numbers.map { |i| (i + 2) } + assert_equal(want, got, 'Case: INCR') + end + end + end + end - @cluster_down_error_count += 1 - sleep WAIT_SEC + if PATTERN == 'PubSub' || PATTERN.empty? + class PubSub < TestingWrapper + include Mixin + + def self.test_order + :alpha + end + + def do_test_after_scaled_out + 1000.times do |i| + pubsub = @client.pubsub + pubsub.call('SSUBSCRIBE', "chan#{i}") + event = pubsub.next_event(0.01) + event = pubsub.next_event(0.01) if event.nil? # state changed + assert_equal(['ssubscribe', "chan#{i}", 1], event) + assert_nil(pubsub.next_event(0.01)) + ensure + pubsub&.close + end + end + + alias do_test_after_scaled_in do_test_after_scaled_out end end end diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 29df288..0eb606b 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -3,10 +3,11 @@ require 'testing_helper' module TestAgainstClusterState - SLOT_SIZE = 16_384 PATTERN = ENV.fetch('TEST_CLASS_PATTERN', '') module Mixin + SLOT_SIZE = 16_384 + def setup @controller = ClusterController.new( TEST_NODE_URIS,